Skip to content

Commit eda4dbb

Browse files
committed
svn merge -c 1170085 from trunk for HDFS-2317.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189405 13f79535-47bb-0310-9956-ffa450edef68
1 parent 23fb417 commit eda4dbb

File tree

13 files changed

+367
-138
lines changed

13 files changed

+367
-138
lines changed

hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ Release 0.23.0 - Unreleased
311311
HDFS-2284. Add a new FileSystem, webhdfs://, for supporting write Http
312312
access to HDFS. (szetszwo)
313313

314+
HDFS-2317. Support read access to HDFS in webhdfs. (szetszwo)
315+
314316
IMPROVEMENTS
315317

316318
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* is made on the successive read(). The normal input stream functions are
3434
* connected to the currently active input stream.
3535
*/
36-
class ByteRangeInputStream extends FSInputStream {
36+
public class ByteRangeInputStream extends FSInputStream {
3737

3838
/**
3939
* This class wraps a URL to allow easy mocking when testing. The URL class
@@ -71,7 +71,8 @@ enum StreamStatus {
7171

7272
StreamStatus status = StreamStatus.SEEK;
7373

74-
ByteRangeInputStream(final URL url) {
74+
/** Create an input stream with the URL. */
75+
public ByteRangeInputStream(final URL url) {
7576
this(new URLOpener(url), new URLOpener(null));
7677
}
7778

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,11 @@ public boolean recoverLease(Path f) throws IOException {
229229
return dfs.recoverLease(getPathName(f));
230230
}
231231

232-
@SuppressWarnings("deprecation")
233232
@Override
234233
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
235234
statistics.incrementReadOps(1);
236235
return new DFSClient.DFSDataInputStream(
237-
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
236+
dfs.open(getPathName(f), bufferSize, verifyChecksum));
238237
}
239238

240239
/** This optional operation is not yet supported. */

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.io.InputStream;
22+
import java.io.OutputStream;
2223
import java.net.InetSocketAddress;
2324
import java.net.URI;
2425
import java.net.URISyntaxException;
@@ -27,6 +28,7 @@
2728
import javax.servlet.ServletContext;
2829
import javax.ws.rs.Consumes;
2930
import javax.ws.rs.DefaultValue;
31+
import javax.ws.rs.GET;
3032
import javax.ws.rs.POST;
3133
import javax.ws.rs.PUT;
3234
import javax.ws.rs.Path;
@@ -36,19 +38,24 @@
3638
import javax.ws.rs.core.Context;
3739
import javax.ws.rs.core.MediaType;
3840
import javax.ws.rs.core.Response;
41+
import javax.ws.rs.core.StreamingOutput;
3942

4043
import org.apache.commons.logging.Log;
4144
import org.apache.commons.logging.LogFactory;
4245
import org.apache.hadoop.conf.Configuration;
4346
import org.apache.hadoop.fs.CreateFlag;
4447
import org.apache.hadoop.fs.FSDataOutputStream;
4548
import org.apache.hadoop.hdfs.DFSClient;
49+
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
4650
import org.apache.hadoop.hdfs.DFSOutputStream;
4751
import org.apache.hadoop.hdfs.server.datanode.DataNode;
4852
import org.apache.hadoop.hdfs.server.namenode.NameNode;
4953
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
5054
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
5155
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
56+
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
57+
import org.apache.hadoop.hdfs.web.resources.LengthParam;
58+
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
5259
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
5360
import org.apache.hadoop.hdfs.web.resources.Param;
5461
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
@@ -61,7 +68,7 @@
6168
/** Web-hdfs DataNode implementation. */
6269
@Path("")
6370
public class DatanodeWebHdfsMethods {
64-
private static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class);
71+
public static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class);
6572

6673
private @Context ServletContext context;
6774

@@ -166,4 +173,56 @@ public Response post(
166173
throw new UnsupportedOperationException(op + " is not supported");
167174
}
168175
}
176+
177+
/** Handle HTTP GET request. */
178+
@GET
179+
@Path("{" + UriFsPathParam.NAME + ":.*}")
180+
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
181+
public Response get(
182+
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
183+
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
184+
final GetOpParam op,
185+
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
186+
final OffsetParam offset,
187+
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
188+
final LengthParam length,
189+
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
190+
final BufferSizeParam bufferSize
191+
) throws IOException, URISyntaxException {
192+
193+
if (LOG.isTraceEnabled()) {
194+
LOG.trace(op + ": " + path
195+
+ Param.toSortedString(", ", offset, length, bufferSize));
196+
}
197+
198+
final String fullpath = path.getAbsolutePath();
199+
final DataNode datanode = (DataNode)context.getAttribute("datanode");
200+
201+
switch(op.getValue()) {
202+
case OPEN:
203+
{
204+
final Configuration conf = new Configuration(datanode.getConf());
205+
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
206+
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
207+
final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
208+
dfsclient.open(fullpath, bufferSize.getValue(), true));
209+
in.seek(offset.getValue());
210+
211+
final StreamingOutput streaming = new StreamingOutput() {
212+
@Override
213+
public void write(final OutputStream out) throws IOException {
214+
final Long n = length.getValue();
215+
if (n == null) {
216+
IOUtils.copyBytes(in, out, bufferSize.getValue());
217+
} else {
218+
IOUtils.copyBytes(in, out, n, false);
219+
}
220+
}
221+
};
222+
return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
223+
}
224+
default:
225+
throw new UnsupportedOperationException(op + " is not supported");
226+
}
227+
}
169228
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

Lines changed: 106 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.namenode.web.resources;
1919

20+
import java.io.FileNotFoundException;
2021
import java.io.IOException;
2122
import java.io.InputStream;
23+
import java.io.OutputStream;
24+
import java.io.PrintStream;
2225
import java.net.URI;
2326
import java.net.URISyntaxException;
2427
import java.util.EnumSet;
@@ -37,11 +40,13 @@
3740
import javax.ws.rs.core.Context;
3841
import javax.ws.rs.core.MediaType;
3942
import javax.ws.rs.core.Response;
43+
import javax.ws.rs.core.StreamingOutput;
4044

4145
import org.apache.commons.logging.Log;
4246
import org.apache.commons.logging.LogFactory;
4347
import org.apache.hadoop.fs.Options;
4448
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
49+
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
4550
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
4651
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
4752
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -58,7 +63,9 @@
5863
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
5964
import org.apache.hadoop.hdfs.web.resources.GroupParam;
6065
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
66+
import org.apache.hadoop.hdfs.web.resources.LengthParam;
6167
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
68+
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
6269
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
6370
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
6471
import org.apache.hadoop.hdfs.web.resources.Param;
@@ -79,15 +86,23 @@ public class NamenodeWebHdfsMethods {
7986
private @Context ServletContext context;
8087

8188
private static DatanodeInfo chooseDatanode(final NameNode namenode,
82-
final String path, final HttpOpParam.Op op) throws IOException {
83-
if (op == PostOpParam.Op.APPEND) {
84-
final HdfsFileStatus status = namenode.getRpcServer().getFileInfo(path);
89+
final String path, final HttpOpParam.Op op, final long openOffset
90+
) throws IOException {
91+
if (op == GetOpParam.Op.OPEN || op == PostOpParam.Op.APPEND) {
92+
final NamenodeProtocols np = namenode.getRpcServer();
93+
final HdfsFileStatus status = np.getFileInfo(path);
8594
final long len = status.getLen();
95+
if (op == GetOpParam.Op.OPEN && (openOffset < 0L || openOffset >= len)) {
96+
throw new IOException("Offset=" + openOffset + " out of the range [0, "
97+
+ len + "); " + op + ", path=" + path);
98+
}
99+
86100
if (len > 0) {
87-
final LocatedBlocks locations = namenode.getRpcServer().getBlockLocations(path, len-1, 1);
101+
final long offset = op == GetOpParam.Op.OPEN? openOffset: len - 1;
102+
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
88103
final int count = locations.locatedBlockCount();
89104
if (count > 0) {
90-
return JspHelper.bestNode(locations.get(count - 1));
105+
return JspHelper.bestNode(locations.get(0));
91106
}
92107
}
93108
}
@@ -98,9 +113,9 @@ private static DatanodeInfo chooseDatanode(final NameNode namenode,
98113
}
99114

100115
private static URI redirectURI(final NameNode namenode,
101-
final String path, final HttpOpParam.Op op,
116+
final String path, final HttpOpParam.Op op, final long openOffset,
102117
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
103-
final DatanodeInfo dn = chooseDatanode(namenode, path, op);
118+
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
104119
final String query = op.toQueryString() + Param.toSortedString("&", parameters);
105120
final String uripath = "/" + WebHdfsFileSystem.PATH_PREFIX + path;
106121

@@ -148,8 +163,9 @@ public Response put(
148163

149164
if (LOG.isTraceEnabled()) {
150165
LOG.trace(op + ": " + path
151-
+ Param.toSortedString(", ", dstPath, owner, group, permission,
152-
overwrite, bufferSize, replication, blockSize));
166+
+ Param.toSortedString(", ", dstPath, owner, group, permission,
167+
overwrite, bufferSize, replication, blockSize,
168+
modificationTime, accessTime, renameOptions));
153169
}
154170

155171
final String fullpath = path.getAbsolutePath();
@@ -159,7 +175,7 @@ public Response put(
159175
switch(op.getValue()) {
160176
case CREATE:
161177
{
162-
final URI uri = redirectURI(namenode, fullpath, op.getValue(),
178+
final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
163179
permission, overwrite, bufferSize, replication, blockSize);
164180
return Response.temporaryRedirect(uri).build();
165181
}
@@ -234,7 +250,8 @@ public Response post(
234250
switch(op.getValue()) {
235251
case APPEND:
236252
{
237-
final URI uri = redirectURI(namenode, fullpath, op.getValue(), bufferSize);
253+
final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
254+
bufferSize);
238255
return Response.temporaryRedirect(uri).build();
239256
}
240257
default:
@@ -250,9 +267,15 @@ public Response post(
250267
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
251268
public Response root(
252269
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
253-
final GetOpParam op
254-
) throws IOException {
255-
return get(ROOT, op);
270+
final GetOpParam op,
271+
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
272+
final OffsetParam offset,
273+
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
274+
final LengthParam length,
275+
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
276+
final BufferSizeParam bufferSize
277+
) throws IOException, URISyntaxException {
278+
return get(ROOT, op, offset, length, bufferSize);
256279
}
257280

258281
/** Handle HTTP GET request. */
@@ -262,27 +285,89 @@ public Response root(
262285
public Response get(
263286
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
264287
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
265-
final GetOpParam op
266-
) throws IOException {
288+
final GetOpParam op,
289+
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
290+
final OffsetParam offset,
291+
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
292+
final LengthParam length,
293+
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
294+
final BufferSizeParam bufferSize
295+
) throws IOException, URISyntaxException {
267296

268297
if (LOG.isTraceEnabled()) {
269298
LOG.trace(op + ", " + path
270-
+ Param.toSortedString(", "));
299+
+ Param.toSortedString(", ", offset, length, bufferSize));
271300
}
272301

302+
final NameNode namenode = (NameNode)context.getAttribute("name.node");
303+
final String fullpath = path.getAbsolutePath();
304+
final NamenodeProtocols np = namenode.getRpcServer();
305+
273306
switch(op.getValue()) {
307+
case OPEN:
308+
{
309+
final URI uri = redirectURI(namenode, fullpath, op.getValue(),
310+
offset.getValue(), offset, length, bufferSize);
311+
return Response.temporaryRedirect(uri).build();
312+
}
274313
case GETFILESTATUS:
275-
final NameNode namenode = (NameNode)context.getAttribute("name.node");
276-
final String fullpath = path.getAbsolutePath();
277-
final HdfsFileStatus status = namenode.getRpcServer().getFileInfo(fullpath);
314+
{
315+
final HdfsFileStatus status = np.getFileInfo(fullpath);
278316
final String js = JsonUtil.toJsonString(status);
279317
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
280-
318+
}
319+
case LISTSTATUS:
320+
{
321+
final StreamingOutput streaming = getListingStream(np, fullpath);
322+
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
323+
}
281324
default:
282325
throw new UnsupportedOperationException(op + " is not supported");
283326
}
284327
}
285328

329+
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np,
330+
final String p, byte[] startAfter) throws IOException {
331+
final DirectoryListing listing = np.getListing(p, startAfter, false);
332+
if (listing == null) { // the directory does not exist
333+
throw new FileNotFoundException("File " + p + " does not exist.");
334+
}
335+
return listing;
336+
}
337+
338+
private static StreamingOutput getListingStream(final NamenodeProtocols np,
339+
final String p) throws IOException {
340+
final DirectoryListing first = getDirectoryListing(np, p,
341+
HdfsFileStatus.EMPTY_NAME);
342+
343+
return new StreamingOutput() {
344+
@Override
345+
public void write(final OutputStream outstream) throws IOException {
346+
final PrintStream out = new PrintStream(outstream);
347+
out.print('[');
348+
349+
final HdfsFileStatus[] partial = first.getPartialListing();
350+
if (partial.length > 0) {
351+
out.print(JsonUtil.toJsonString(partial[0]));
352+
}
353+
for(int i = 1; i < partial.length; i++) {
354+
out.println(',');
355+
out.print(JsonUtil.toJsonString(partial[i]));
356+
}
357+
358+
for(DirectoryListing curr = first; curr.hasMore(); ) {
359+
curr = getDirectoryListing(np, p, curr.getLastName());
360+
for(HdfsFileStatus s : curr.getPartialListing()) {
361+
out.println(',');
362+
out.print(JsonUtil.toJsonString(s));
363+
}
364+
}
365+
366+
out.println(']');
367+
}
368+
};
369+
}
370+
286371
/** Handle HTTP DELETE request. */
287372
@DELETE
288373
@Path("{path:.*}")

0 commit comments

Comments
 (0)