Skip to content

Commit 54f7a6b

Browse files
HDFS-17293. First packet data + checksum size will be set to 516 bytes when writing to a new block. (#6368). Contributed by farmmamba.
Reviewed-by: He Xiaoqiao <[email protected]> Signed-off-by: Shuyan Zhang <[email protected]>
1 parent 7dc166d commit 54f7a6b

File tree

2 files changed

+48
-2
lines changed

2 files changed

+48
-2
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,8 +536,13 @@ protected void adjustChunkBoundary() {
536536
}
537537

538538
if (!getStreamer().getAppendChunk()) {
539-
final int psize = (int) Math
540-
.min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize);
539+
int psize = 0;
540+
if (blockSize == getStreamer().getBytesCurBlock()) {
541+
psize = writePacketSize;
542+
} else {
543+
psize = (int) Math
544+
.min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize);
545+
}
541546
computePacketChunkSize(psize, bytesPerChecksum);
542547
}
543548
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
5050
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
5151
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
52+
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
5253
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
5354
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
5455
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -58,6 +59,7 @@
5859
import org.apache.hadoop.test.GenericTestUtils;
5960
import org.apache.hadoop.test.PathUtils;
6061
import org.apache.hadoop.test.Whitebox;
62+
import org.apache.hadoop.util.DataChecksum;
6163
import org.junit.AfterClass;
6264
import org.junit.Assert;
6365
import org.junit.BeforeClass;
@@ -508,6 +510,45 @@ public void testExceptionInCloseWithoutRecoverLease() throws Exception {
508510
}
509511
}
510512

513+
@Test(timeout=60000)
514+
public void testFirstPacketSizeInNewBlocks() throws IOException {
515+
final long blockSize = (long) 1024 * 1024;
516+
MiniDFSCluster dfsCluster = cluster;
517+
DistributedFileSystem fs = dfsCluster.getFileSystem();
518+
Configuration dfsConf = fs.getConf();
519+
520+
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE);
521+
try(FSDataOutputStream fos = fs.create(new Path("/testfile.dat"),
522+
FsPermission.getDefault(),
523+
flags, 512, (short)3, blockSize, null)) {
524+
525+
DataChecksum crc32c = DataChecksum.newDataChecksum(
526+
DataChecksum.Type.CRC32C, 512);
527+
528+
long loop = 0;
529+
Random r = new Random();
530+
byte[] buf = new byte[(int) blockSize];
531+
r.nextBytes(buf);
532+
fos.write(buf);
533+
fos.hflush();
534+
535+
int chunkSize = crc32c.getBytesPerChecksum() + crc32c.getChecksumSize();
536+
int packetContentSize = (dfsConf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
537+
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT) -
538+
PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize * chunkSize;
539+
540+
while (loop < 20) {
541+
r.nextBytes(buf);
542+
fos.write(buf);
543+
fos.hflush();
544+
loop++;
545+
Assert.assertEquals(((DFSOutputStream) fos.getWrappedStream()).packetSize,
546+
packetContentSize);
547+
}
548+
}
549+
fs.delete(new Path("/testfile.dat"), true);
550+
}
551+
511552
@AfterClass
512553
public static void tearDown() {
513554
if (cluster != null) {

0 commit comments

Comments
 (0)