Skip to content

Commit 2260020

Browse files
authored
Merge branch 'apache:branch-3.3' into branch-3.3
2 parents dc33468 + 3af155c commit 2260020

File tree

3 files changed

+350
-14
lines changed

3 files changed

+350
-14
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ private static class BZip2CompressionInputStream extends
335335
private boolean isSubHeaderStripped = false;
336336
private READ_MODE readMode = READ_MODE.CONTINUOUS;
337337
private long startingPos = 0L;
338+
private boolean didInitialRead;
338339

339340
// Following state machine handles different states of compressed stream
340341
// position
@@ -480,24 +481,42 @@ public void close() throws IOException {
480481
*/
481482

482483
public int read(byte[] b, int off, int len) throws IOException {
484+
if (b == null) {
485+
throw new NullPointerException();
486+
}
487+
if (off < 0 || len < 0 || len > b.length - off) {
488+
throw new IndexOutOfBoundsException();
489+
}
490+
if (len == 0) {
491+
return 0;
492+
}
483493
if (needsReset) {
484494
internalReset();
485495
}
486-
487-
int result = 0;
488-
result = this.input.read(b, off, len);
496+
// When startingPos > 0, the stream should be initialized at the end of
497+
// one block (which would correspond to be the start of another block).
498+
// Thus, the initial read would technically be reading one byte passed a
499+
// BZip2 end of block marker. To be consistent, we should also be
500+
// updating the position to be one byte after the end of an block on the
501+
// initial read.
502+
boolean initializedAtEndOfBlock =
503+
!didInitialRead && startingPos > 0 && readMode == READ_MODE.BYBLOCK;
504+
int result = initializedAtEndOfBlock
505+
? BZip2Constants.END_OF_BLOCK
506+
: this.input.read(b, off, len);
489507
if (result == BZip2Constants.END_OF_BLOCK) {
490508
this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
491509
}
492510

493511
if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
494-
result = this.input.read(b, off, off + 1);
512+
result = this.input.read(b, off, 1);
495513
// This is the precise time to update compressed stream position
496514
// to the client of this code.
497515
this.updatePos(true);
498516
this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
499517
}
500518

519+
didInitialRead = true;
501520
return result;
502521

503522
}
@@ -513,6 +532,7 @@ private void internalReset() throws IOException {
513532
needsReset = false;
514533
BufferedInputStream bufferedIn = readStreamHeader();
515534
input = new CBZip2InputStream(bufferedIn, this.readMode);
535+
didInitialRead = false;
516536
}
517537
}
518538

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
package org.apache.hadoop.io.compress;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.util.List;
23+
24+
import org.apache.hadoop.thirdparty.com.google.common.primitives.Bytes;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
import org.apache.commons.io.IOUtils;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.fs.FSDataInputStream;
32+
import org.apache.hadoop.fs.FileSystem;
33+
import org.apache.hadoop.fs.Path;
34+
import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
35+
import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter;
36+
import org.apache.hadoop.io.compress.bzip2.BZip2Utils;
37+
38+
import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK;
39+
import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.CONTINUOUS;
40+
import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
41+
import static org.apache.hadoop.util.Preconditions.checkArgument;
42+
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
43+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
44+
import static org.junit.Assert.assertArrayEquals;
45+
import static org.junit.Assert.assertEquals;
46+
47+
public final class TestBZip2Codec {
48+
49+
private static final long HEADER_LEN = 2;
50+
51+
private Configuration conf;
52+
private FileSystem fs;
53+
private BZip2Codec codec;
54+
private Decompressor decompressor;
55+
private Path tempFile;
56+
57+
@Before
58+
public void setUp() throws Exception {
59+
conf = new Configuration();
60+
61+
Path workDir = new Path(System.getProperty("test.build.data", "target"),
62+
"data/" + getClass().getSimpleName());
63+
64+
Path inputDir = new Path(workDir, "input");
65+
tempFile = new Path(inputDir, "test.txt.bz2");
66+
67+
fs = workDir.getFileSystem(conf);
68+
69+
codec = new BZip2Codec();
70+
codec.setConf(new Configuration(/* loadDefaults */ false));
71+
decompressor = CodecPool.getDecompressor(codec);
72+
}
73+
74+
@After
75+
public void tearDown() throws Exception {
76+
CodecPool.returnDecompressor(decompressor);
77+
fs.delete(tempFile, /* recursive */ false);
78+
}
79+
80+
@Test
81+
public void createInputStreamWithStartAndEnd() throws Exception {
82+
byte[] data1 = newAlternatingByteArray(BLOCK_SIZE, 'a', 'b');
83+
byte[] data2 = newAlternatingByteArray(BLOCK_SIZE, 'c', 'd');
84+
byte[] data3 = newAlternatingByteArray(BLOCK_SIZE, 'e', 'f');
85+
86+
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
87+
writer.write(data1);
88+
writer.write(data2);
89+
writer.write(data3);
90+
}
91+
long fileSize = fs.getFileStatus(tempFile).getLen();
92+
93+
List<Long> nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(tempFile, conf);
94+
long block2Start = nextBlockOffsets.get(0);
95+
long block3Start = nextBlockOffsets.get(1);
96+
97+
try (SplitCompressionInputStream stream = newCompressionStream(tempFile, 0, fileSize,
98+
BYBLOCK)) {
99+
assertEquals(0, stream.getPos());
100+
assertCasesWhereReadDoesNotAdvanceStream(stream);
101+
assertReadingAtPositionZero(stream, data1);
102+
assertCasesWhereReadDoesNotAdvanceStream(stream);
103+
assertReadingPastEndOfBlock(stream, block2Start, data2);
104+
assertReadingPastEndOfBlock(stream, block3Start, data3);
105+
assertEquals(-1, stream.read());
106+
}
107+
108+
try (SplitCompressionInputStream stream = newCompressionStream(tempFile, 1, fileSize - 1,
109+
BYBLOCK)) {
110+
assertEquals(block2Start, stream.getPos());
111+
assertCasesWhereReadDoesNotAdvanceStream(stream);
112+
assertReadingPastEndOfBlock(stream, block2Start, data2);
113+
assertCasesWhereReadDoesNotAdvanceStream(stream);
114+
assertReadingPastEndOfBlock(stream, block3Start, data3);
115+
assertEquals(-1, stream.read());
116+
}
117+
118+
// With continuous mode, only starting at or after the stream header is
119+
// supported.
120+
byte[] allData = Bytes.concat(data1, data2, data3);
121+
assertReadingWithContinuousMode(tempFile, 0, fileSize, allData);
122+
assertReadingWithContinuousMode(tempFile, HEADER_LEN, fileSize - HEADER_LEN, allData);
123+
}
124+
125+
private void assertReadingWithContinuousMode(Path file, long start, long length,
126+
byte[] expectedData) throws IOException {
127+
try (SplitCompressionInputStream stream = newCompressionStream(file, start, length,
128+
CONTINUOUS)) {
129+
assertEquals(HEADER_LEN, stream.getPos());
130+
131+
assertRead(stream, expectedData);
132+
assertEquals(-1, stream.read());
133+
134+
// When specifying CONTINUOUS read mode, the position ends up not being
135+
// updated at all.
136+
assertEquals(HEADER_LEN, stream.getPos());
137+
}
138+
}
139+
140+
private SplitCompressionInputStream newCompressionStream(Path file, long start, long length,
141+
READ_MODE readMode) throws IOException {
142+
FSDataInputStream rawIn = fs.open(file);
143+
rawIn.seek(start);
144+
long end = start + length;
145+
return codec.createInputStream(rawIn, decompressor, start, end, readMode);
146+
}
147+
148+
private static byte[] newAlternatingByteArray(int size, int... choices) {
149+
checkArgument(choices.length > 1);
150+
byte[] result = new byte[size];
151+
for (int i = 0; i < size; i++) {
152+
result[i] = (byte) choices[i % choices.length];
153+
}
154+
return result;
155+
}
156+
157+
private static void assertCasesWhereReadDoesNotAdvanceStream(SplitCompressionInputStream in)
158+
throws IOException {
159+
long initialPos = in.getPos();
160+
161+
assertEquals(0, in.read(new byte[0]));
162+
163+
assertThatNullPointerException().isThrownBy(() -> in.read(null, 0, 1));
164+
assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(
165+
() -> in.read(new byte[5], -1, 2));
166+
assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(
167+
() -> in.read(new byte[5], 0, -1));
168+
assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(
169+
() -> in.read(new byte[5], 1, 5));
170+
171+
assertEquals(initialPos, in.getPos());
172+
}
173+
174+
private static void assertReadingAtPositionZero(SplitCompressionInputStream in,
175+
byte[] expectedData) throws IOException {
176+
byte[] buffer = new byte[expectedData.length];
177+
assertEquals(1, in.read(buffer, 0, 1));
178+
assertEquals(expectedData[0], buffer[0]);
179+
assertEquals(0, in.getPos());
180+
181+
IOUtils.readFully(in, buffer, 1, expectedData.length - 1);
182+
assertArrayEquals(expectedData, buffer);
183+
assertEquals(0, in.getPos());
184+
}
185+
186+
private static void assertReadingPastEndOfBlock(SplitCompressionInputStream in,
187+
long endOfBlockPos, byte[] expectedData) throws IOException {
188+
byte[] buffer = new byte[expectedData.length];
189+
assertEquals(1, in.read(buffer));
190+
assertEquals(expectedData[0], buffer[0]);
191+
assertEquals(endOfBlockPos + 1, in.getPos());
192+
193+
IOUtils.readFully(in, buffer, 1, expectedData.length - 1);
194+
assertArrayEquals(expectedData, buffer);
195+
assertEquals(endOfBlockPos + 1, in.getPos());
196+
}
197+
198+
private static void assertRead(InputStream in, byte[] expectedData) throws IOException {
199+
byte[] buffer = new byte[expectedData.length];
200+
IOUtils.readFully(in, buffer);
201+
assertArrayEquals(expectedData, buffer);
202+
}
203+
}

0 commit comments

Comments
 (0)