Skip to content

Commit 6af6439

Browse files
committed
rebase
1 parent fc5eb36 commit 6af6439

File tree

4 files changed

+39
-177
lines changed

4 files changed

+39
-177
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java

Lines changed: 3 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,27 @@
1919
package org.apache.hadoop.hdds.scm.storage;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22-
<<<<<<< HEAD
23-
import com.google.common.base.Preconditions;
24-
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
25-
import org.apache.hadoop.hdds.scm.container.common.helpers
26-
.StorageContainerException;
27-
import org.apache.hadoop.ozone.common.Checksum;
28-
import org.apache.hadoop.ozone.common.ChecksumData;
29-
import org.apache.hadoop.ozone.common.OzoneChecksumException;
30-
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
31-
=======
22+
3223
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
3324
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
3425
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
3526
import org.apache.hadoop.security.UserGroupInformation;
3627
import org.apache.hadoop.security.token.Token;
37-
>>>>>>> Partial Chunk reads
3828
import org.apache.hadoop.fs.Seekable;
3929
import org.apache.hadoop.hdds.scm.XceiverClientManager;
4030
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
4131
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
42-
<<<<<<< HEAD
43-
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
44-
.ReadChunkResponseProto;
45-
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
46-
ContainerCommandResponseProto;
47-
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
48-
ContainerCommandRequestProto;
4932
import org.apache.hadoop.hdds.client.BlockID;
50-
=======
5133
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
5234
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
53-
import org.apache.hadoop.hdds.client.BlockID;
54-
import org.apache.ratis.util.Preconditions;
5535
import org.slf4j.Logger;
5636
import org.slf4j.LoggerFactory;
5737

58-
>>>>>>> Partial Chunk reads
5938
import java.io.EOFException;
6039
import java.io.IOException;
6140
import java.io.InputStream;
6241
import java.nio.ByteBuffer;
42+
import java.util.ArrayList;
6343
import java.util.Arrays;
6444
import java.util.List;
6545

@@ -103,18 +83,6 @@ public class BlockInputStream extends InputStream implements Seekable {
10383
// Index of the chunkStream corresponding to the current position of the
10484
// BlockInputStream i.e offset of the data to be read next from this block
10585
private int chunkIndex;
106-
<<<<<<< HEAD
107-
// ChunkIndexOfCurrentBuffer points to the index of chunk read into the
108-
// buffers or index of the last chunk in the buffers. It is updated only
109-
// when a new chunk is read from container into the buffers.
110-
private int chunkIndexOfCurrentBuffer;
111-
private long[] chunkOffset;
112-
private List<ByteBuffer> buffers;
113-
private int bufferIndex;
114-
private long bufferPosition;
115-
private boolean verifyChecksum;
116-
=======
117-
>>>>>>> Partial Chunk reads
11886

11987
// Position of the BlockInputStream is maintainted by this variable till
12088
// the stream is initialized. This position is w.r.t to the block only and
@@ -241,37 +209,13 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
241209
if (off < 0 || len < 0 || len > b.length - off) {
242210
throw new IndexOutOfBoundsException();
243211
}
244-
<<<<<<< HEAD
245-
// ChunkIndex is the last chunk in the stream. Check if this chunk has
246-
// been read from container or not. Return true if chunkIndex has not
247-
// been read yet and false otherwise.
248-
return chunkIndexOfCurrentBuffer != chunkIndex;
249-
}
250-
251-
/**
252-
* Attempts to read the chunk at the specified offset in the chunk list. If
253-
* successful, then the data of the read chunk is saved so that its bytes can
254-
* be returned from subsequent read calls.
255-
*
256-
* @throws IOException if there is an I/O error while performing the call
257-
*/
258-
private synchronized void readChunkFromContainer() throws IOException {
259-
// Read the chunk at chunkIndex
260-
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
261-
ByteString byteString;
262-
byteString = readChunk(chunkInfo);
263-
buffers = byteString.asReadOnlyByteBufferList();
264-
bufferIndex = 0;
265-
chunkIndexOfCurrentBuffer = chunkIndex;
266-
=======
267212
if (len == 0) {
268213
return 0;
269214
}
270215

271216
if (!initialized) {
272217
initialize();
273218
}
274-
>>>>>>> Partial Chunk reads
275219

276220
checkOpen();
277221
int totalReadLen = 0;
@@ -284,26 +228,6 @@ private synchronized void readChunkFromContainer() throws IOException {
284228
return totalReadLen == 0 ? EOF : totalReadLen;
285229
}
286230

287-
<<<<<<< HEAD
288-
/**
289-
* Send RPC call to get the chunk from the container.
290-
*/
291-
@VisibleForTesting
292-
protected ByteString readChunk(final ChunkInfo chunkInfo)
293-
throws IOException {
294-
ReadChunkResponseProto readChunkResponse;
295-
try {
296-
List<CheckedBiFunction> validators =
297-
ContainerProtocolCalls.getValidatorList();
298-
validators.add(validator);
299-
readChunkResponse = ContainerProtocolCalls
300-
.readChunk(xceiverClient, chunkInfo, blockID, traceID, validators);
301-
} catch (IOException e) {
302-
if (e instanceof StorageContainerException) {
303-
throw e;
304-
}
305-
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
306-
=======
307231
// Get the current chunkStream and read data from it
308232
ChunkInputStream current = chunkStreams.get(chunkIndex);
309233
int numBytesToRead = Math.min(len, (int)current.getRemaining());
@@ -323,33 +247,10 @@ protected ByteString readChunk(final ChunkInfo chunkInfo)
323247
((chunkIndex + 1) < chunkStreams.size())) {
324248
chunkIndex += 1;
325249
}
326-
>>>>>>> Partial Chunk reads
327250
}
328251
return totalReadLen;
329252
}
330253

331-
<<<<<<< HEAD
332-
private CheckedBiFunction<ContainerCommandRequestProto,
333-
ContainerCommandResponseProto, IOException> validator =
334-
(request, response) -> {
335-
ReadChunkResponseProto readChunkResponse = response.getReadChunk();
336-
final ChunkInfo chunkInfo = readChunkResponse.getChunkData();
337-
ByteString byteString = readChunkResponse.getData();
338-
if (byteString.size() != chunkInfo.getLen()) {
339-
// Bytes read from chunk should be equal to chunk size.
340-
throw new OzoneChecksumException(String
341-
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
342-
chunkInfo.getChunkName(), chunkInfo.getLen(),
343-
byteString.size()));
344-
}
345-
ChecksumData checksumData =
346-
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
347-
if (verifyChecksum) {
348-
Checksum.verifyChecksum(byteString, checksumData);
349-
}
350-
};
351-
352-
=======
353254
/**
354255
* Seeks the BlockInputStream to the specified position. If the stream is
355256
* not initialized, save the seeked position via blockPosition. Otherwise,
@@ -366,7 +267,6 @@ protected ByteString readChunk(final ChunkInfo chunkInfo)
366267
* 2. chunkStream[2] will be seeked to position 10
367268
* (= 90 - chunkOffset[2] (= 80)).
368269
*/
369-
>>>>>>> Partial Chunk reads
370270
@Override
371271
public synchronized void seek(long pos) throws IOException {
372272
if (!initialized) {
@@ -386,7 +286,7 @@ public synchronized void seek(long pos) throws IOException {
386286
throw new EOFException(
387287
"EOF encountered at pos: " + pos + " for block: " + blockID);
388288
}
389-
Preconditions.assertTrue(chunkIndex >= 0);
289+
390290
if (chunkIndex >= chunkStreams.size()) {
391291
chunkIndex = Arrays.binarySearch(chunkOffsets, pos);
392292
} else if (pos < chunkOffsets[chunkIndex]) {

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java

Lines changed: 35 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.fs.Seekable;
2424
import org.apache.hadoop.hdds.client.BlockID;
2525
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
26+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
2627
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
2728
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
2829
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
@@ -31,15 +32,14 @@
3132
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
3233
import org.apache.hadoop.ozone.common.Checksum;
3334
import org.apache.hadoop.ozone.common.ChecksumData;
35+
import org.apache.hadoop.ozone.common.OzoneChecksumException;
3436
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
3537

3638
import java.io.EOFException;
3739
import java.io.IOException;
3840
import java.io.InputStream;
3941
import java.nio.ByteBuffer;
40-
import java.util.ArrayList;
4142
import java.util.List;
42-
import java.util.concurrent.ExecutionException;
4343

4444
/**
4545
* An {@link InputStream} used by the REST service in combination with the
@@ -56,7 +56,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
5656
private final BlockID blockID;
5757
private final String traceID;
5858
private XceiverClientSpi xceiverClient;
59-
private final boolean verifyChecksum;
59+
private boolean verifyChecksum;
6060
private boolean allocated = false;
6161

6262
// Buffer to store the chunk data read from the DN container
@@ -275,10 +275,6 @@ private synchronized int prepareRead(int len) throws IOException {
275275
*/
276276
private synchronized void readChunkFromContainer(int len) throws IOException {
277277

278-
List<DatanodeDetails> excludeDns = null;
279-
ByteString byteString;
280-
List<DatanodeDetails> dnList = getDatanodeList();
281-
282278
// index of first byte to be read from the chunk
283279
long startByteIndex;
284280
if (chunkPosition >= 0) {
@@ -307,41 +303,7 @@ private synchronized void readChunkFromContainer(int len) throws IOException {
307303
.setLen(bufferLength)
308304
.build();
309305

310-
while (true) {
311-
List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
312-
byteString = readChunk(adjustedChunkInfo, excludeDns,
313-
dnListFromReadChunkCall);
314-
try {
315-
if (byteString.size() != adjustedChunkInfo.getLen()) {
316-
// Bytes read from chunk should be equal to chunk size.
317-
throw new IOException(String
318-
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
319-
adjustedChunkInfo.getChunkName(), adjustedChunkInfo.getLen(),
320-
byteString.size()));
321-
}
322-
323-
if (verifyChecksum) {
324-
ChecksumData checksumData = ChecksumData.getFromProtoBuf(
325-
chunkInfo.getChecksumData());
326-
int checkumStartIndex =
327-
(int) (bufferOffset / checksumData.getBytesPerChecksum());
328-
Checksum.verifyChecksum(byteString, checksumData, checkumStartIndex);
329-
}
330-
break;
331-
} catch (IOException ioe) {
332-
// we will end up in this situation only if the checksum mismatch
333-
// happens or the length of the chunk mismatches.
334-
// In this case, read should be retried on a different replica.
335-
// TODO: Inform SCM of a possible corrupt container replica here
336-
if (excludeDns == null) {
337-
excludeDns = new ArrayList<>();
338-
}
339-
excludeDns.addAll(dnListFromReadChunkCall);
340-
if (excludeDns.size() == dnList.size()) {
341-
throw ioe;
342-
}
343-
}
344-
}
306+
ByteString byteString = readChunk(adjustedChunkInfo);
345307

346308
buffers = byteString.asReadOnlyByteBufferList();
347309
bufferIndex = 0;
@@ -361,37 +323,51 @@ private synchronized void readChunkFromContainer(int len) throws IOException {
361323
* Send RPC call to get the chunk from the container.
362324
*/
363325
@VisibleForTesting
364-
protected ByteString readChunk(ChunkInfo readChunkInfo,
365-
List<DatanodeDetails> excludeDns,
366-
List<DatanodeDetails> dnListFromReply) throws IOException {
367-
XceiverClientReply reply;
326+
protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException {
368327
ReadChunkResponseProto readChunkResponse;
369328

370329
try {
371-
reply = ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo,
372-
blockID, traceID, excludeDns);
330+
List<CheckedBiFunction> validators =
331+
ContainerProtocolCalls.getValidatorList();
332+
validators.add(validator);
373333

374-
ContainerCommandResponseProto response = reply.getResponse().get();
375-
ContainerProtocolCalls.validateContainerResponse(response);
376-
readChunkResponse = response.getReadChunk();
334+
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
335+
readChunkInfo, blockID, traceID, validators);
377336

378-
dnListFromReply.addAll(reply.getDatanodes());
379337
} catch (IOException e) {
380338
if (e instanceof StorageContainerException) {
381339
throw e;
382340
}
383341
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
384-
} catch (ExecutionException | InterruptedException e) {
385-
throw new IOException("Failed to execute ReadChunk command for chunk "
386-
+ readChunkInfo.getChunkName(), e);
387342
}
343+
388344
return readChunkResponse.getData();
389345
}
390346

391-
@VisibleForTesting
392-
protected List<DatanodeDetails> getDatanodeList() {
393-
return xceiverClient.getPipeline().getNodes();
394-
}
347+
private CheckedBiFunction<ContainerCommandRequestProto,
348+
ContainerCommandResponseProto, IOException> validator =
349+
(request, response) -> {
350+
final ChunkInfo chunkInfo = request.getReadChunk().getChunkData();
351+
352+
ReadChunkResponseProto readChunkResponse = response.getReadChunk();
353+
ByteString byteString = readChunkResponse.getData();
354+
355+
if (byteString.size() != chunkInfo.getLen()) {
356+
// Bytes read from chunk should be equal to chunk size.
357+
throw new OzoneChecksumException(String
358+
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
359+
chunkInfo.getChunkName(), chunkInfo.getLen(),
360+
byteString.size()));
361+
}
362+
363+
if (verifyChecksum) {
364+
ChecksumData checksumData =
365+
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
366+
int checkumStartIndex =
367+
(int) (bufferOffset / checksumData.getBytesPerChecksum());
368+
Checksum.verifyChecksum(byteString, checksumData, checkumStartIndex);
369+
}
370+
};
395371

396372
/**
397373
* Return the offset and length of bytes that need to be read from the

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,8 @@ private class DummyBlockInputStream extends BlockInputStream {
120120
}
121121

122122
@Override
123-
<<<<<<< HEAD
124-
protected ByteString readChunk(final ChunkInfo chunkInfo)
125-
throws IOException {
126-
return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
127-
=======
128123
protected List<ChunkInfo> getChunkInfos() {
129124
return chunks;
130-
>>>>>>> Partial Chunk reads
131125
}
132126

133127
@Override

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,22 +103,14 @@ public DummyChunkInputStream(ChunkInfo chunkInfo,
103103
}
104104

105105
@Override
106-
protected ByteString readChunk(ChunkInfo readChunkInfo,
107-
List<DatanodeDetails> excludeDns,
108-
List<DatanodeDetails> dnListFromReply) {
106+
protected ByteString readChunk(ChunkInfo readChunkInfo) {
109107
ByteString byteString = ByteString.copyFrom(chunkData,
110108
(int) readChunkInfo.getOffset(),
111109
(int) readChunkInfo.getLen());
112110
readByteBuffers.add(byteString);
113111
return byteString;
114112
}
115113

116-
@Override
117-
protected List<DatanodeDetails> getDatanodeList() {
118-
// return an empty dummy list of size 5
119-
return new ArrayList<>(5);
120-
}
121-
122114
@Override
123115
protected void checkOpen() {
124116
// No action needed

0 commit comments

Comments
 (0)