Skip to content

Commit 0cbe9ad

Browse files
committed
HADOOP-16109. Parquet reading S3AFileSystem causes EOF
Nobody gets seek right. No matter how many times they think they have. Reproducible test from: Dave Christianson Fixed seek() logic: Steve Loughran
1 parent c072458 commit 0cbe9ad

File tree

3 files changed

+280
-3
lines changed

3 files changed

+280
-3
lines changed

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public void testSeekBigFile() throws Throwable {
272272
describe("Seek round a large file and verify the bytes are what is expected");
273273
Path testSeekFile = path("bigseekfile.txt");
274274
byte[] block = dataset(100 * 1024, 0, 255);
275-
createFile(getFileSystem(), testSeekFile, false, block);
275+
createFile(getFileSystem(), testSeekFile, true, block);
276276
instream = getFileSystem().open(testSeekFile);
277277
assertEquals(0, instream.getPos());
278278
//expect that seek to 0 works
@@ -309,7 +309,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
309309
assumeSupportsPositionedReadable();
310310
Path testSeekFile = path("bigseekfile.txt");
311311
byte[] block = dataset(65536, 0, 255);
312-
createFile(getFileSystem(), testSeekFile, false, block);
312+
createFile(getFileSystem(), testSeekFile, true, block);
313313
instream = getFileSystem().open(testSeekFile);
314314
instream.seek(39999);
315315
assertTrue(-1 != instream.read());

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ private void seekInStream(long targetPos, long length) throws IOException {
261261
long forwardSeekLimit = Math.min(remainingInCurrentRequest,
262262
forwardSeekRange);
263263
boolean skipForward = remainingInCurrentRequest > 0
264-
&& diff <= forwardSeekLimit;
264+
&& diff < forwardSeekLimit;
265265
if (skipForward) {
266266
// the forward seek range is within the limits
267267
LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
@@ -275,6 +275,8 @@ private void seekInStream(long targetPos, long length) throws IOException {
275275

276276
if (pos == targetPos) {
277277
// all is well
278+
LOG.debug("Now at {}: bytes remaining in current request: {}",
279+
pos, remainingInCurrentRequest());
278280
return;
279281
} else {
280282
// log a warning; continue to attempt to re-open

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,306 @@
1818

1919
package org.apache.hadoop.fs.contract.s3a;
2020

21+
import java.io.IOException;
22+
import java.net.URI;
23+
import java.net.URISyntaxException;
24+
import java.util.Arrays;
25+
import java.util.Collection;
26+
27+
import org.junit.Test;
28+
import org.junit.runner.RunWith;
29+
import org.junit.runners.Parameterized;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
2133
import org.apache.hadoop.conf.Configuration;
34+
import org.apache.hadoop.fs.FSDataInputStream;
35+
import org.apache.hadoop.fs.FileSystem;
36+
import org.apache.hadoop.fs.Path;
2237
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
2338
import org.apache.hadoop.fs.contract.AbstractFSContract;
39+
import org.apache.hadoop.fs.contract.ContractTestUtils;
40+
import org.apache.hadoop.fs.s3a.S3AFileSystem;
41+
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
42+
import org.apache.hadoop.fs.s3a.S3ATestUtils;
2443

44+
import static com.google.common.base.Preconditions.checkNotNull;
45+
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
46+
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
47+
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
48+
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
49+
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
50+
import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
2551
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
2652

2753
/**
2854
* S3A contract tests covering file seek.
2955
*/
56+
@RunWith(Parameterized.class)
3057
public class ITestS3AContractSeek extends AbstractContractSeekTest {
3158

59+
private static final Logger LOG =
60+
LoggerFactory.getLogger(ITestS3AContractSeek.class);
61+
62+
protected static final int READAHEAD = 1024;
63+
64+
private final String seekPolicy;
65+
66+
public static final int DATASET_LEN = READAHEAD * 2;
67+
68+
public static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
69+
70+
/**
71+
* This test suite is parameterized for the different seek policies
72+
* which S3A Supports.
73+
* @return a list of seek policies to test.
74+
*/
75+
@Parameterized.Parameters
76+
public static Collection<Object[]> params() {
77+
return Arrays.asList(new Object[][]{
78+
{INPUT_FADV_RANDOM},
79+
{INPUT_FADV_NORMAL},
80+
{INPUT_FADV_SEQUENTIAL},
81+
});
82+
}
83+
84+
/**
85+
* Run the test with a chosen seek policy.
86+
* @param seekPolicy fadvise policy to use.
87+
*/
88+
public ITestS3AContractSeek(final String seekPolicy) {
89+
this.seekPolicy = seekPolicy;
90+
}
91+
3292
/**
3393
* Create a configuration, possibly patching in S3Guard options.
94+
* The FS is set to be uncached and the readahead and seek policies
95+
* of the bucket itself are removed, so as to guarantee that the
96+
* parameterized and test settings are
3497
* @return a configuration
3598
*/
3699
@Override
37100
protected Configuration createConfiguration() {
38101
Configuration conf = super.createConfiguration();
39102
// patch in S3Guard options
40103
maybeEnableS3Guard(conf);
104+
// purge any per-bucket overrides.
105+
try {
106+
URI bucketURI = new URI(checkNotNull(conf.get("fs.contract.test.fs.s3a")));
107+
S3ATestUtils.removeBucketOverrides(bucketURI.getHost(), conf,
108+
READAHEAD_RANGE,
109+
INPUT_FADVISE);
110+
} catch (URISyntaxException e) {
111+
throw new RuntimeException(e);
112+
}
113+
// the FS is uncached, so will need clearing in test teardowns.
114+
S3ATestUtils.disableFilesystemCaching(conf);
115+
conf.setInt(READAHEAD_RANGE, READAHEAD);
116+
conf.set(INPUT_FADVISE, seekPolicy);
41117
return conf;
42118
}
43119

44120
@Override
45121
protected AbstractFSContract createContract(Configuration conf) {
46122
return new S3AContract(conf);
47123
}
124+
125+
@Override
126+
public void teardown() throws Exception {
127+
S3AFileSystem fs = getFileSystem();
128+
if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) {
129+
fs.close();
130+
}
131+
super.teardown();
132+
}
133+
134+
/**
135+
* This subclass of the {@code path(path)} operation adds the seek policy
136+
* to the end to guarantee uniqueness across different calls of the same
137+
* method.
138+
*
139+
* {@inheritDoc}
140+
*/
141+
@Override
142+
protected Path path(final String filepath) throws IOException {
143+
return super.path(filepath + "-" + seekPolicy);
144+
}
145+
146+
/**
147+
* Go to end, read then seek back to the previous position to force normal
148+
* seek policy to switch to random IO.
149+
* This will call readByte to trigger the second GET
150+
* @param in input stream
151+
* @return the byte read
152+
* @throws IOException failure.
153+
*/
154+
private byte readAtEndAndReturn(final FSDataInputStream in)
155+
throws IOException {
156+
long pos = in.getPos();
157+
in.seek(DATASET_LEN -1);
158+
in.readByte();
159+
// go back to start and force a new GET
160+
in.seek(pos);
161+
return in.readByte();
162+
}
163+
164+
/**
165+
* Assert that the data read matches the dataset at the given offset.
166+
* This helps verify that the seek process is moving the read pointer
167+
* to the correct location in the file.
168+
* @param readOffset the offset in the file where the read began.
169+
* @param operation operation name for the assertion.
170+
* @param data data read in.
171+
* @param length length of data to check.
172+
*/
173+
private void assertDatasetEquals(
174+
final int readOffset, final String operation,
175+
final byte[] data,
176+
int length) {
177+
for (int i = 0; i < length; i++) {
178+
int o = readOffset + i;
179+
assertEquals(operation + " with seek policy " + seekPolicy
180+
+ "and read offset " + readOffset
181+
+ ": data[" + i + "] != DATASET[" + o + "]",
182+
DATASET[o], data[i]);
183+
}
184+
}
185+
186+
@Override
187+
public S3AFileSystem getFileSystem() {
188+
return (S3AFileSystem) super.getFileSystem();
189+
}
190+
191+
@Test
192+
public void testReadPolicyInFS() throws Throwable {
193+
describe("Verify the read policy is being consistently set");
194+
S3AFileSystem fs = getFileSystem();
195+
assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy());
196+
}
197+
198+
/**
199+
* Test for HADOOP-16109: Parquet reading S3AFileSystem causes EOF.
200+
* This sets up a read which will span the active readahead and,
201+
* in random IO mode, a subsequent GET.
202+
*/
203+
@Test
204+
public void testReadAcrossReadahead() throws Throwable {
205+
describe("Sets up a read which will span the active readahead"
206+
+ " and the rest of the file.");
207+
Path path = path("testReadAcrossReadahead");
208+
writeTestDataset(path);
209+
FileSystem fs = getFileSystem();
210+
// forward seek reading across readahead boundary
211+
try (FSDataInputStream in = fs.open(path)) {
212+
final byte[] temp = new byte[5];
213+
in.readByte();
214+
int offset = READAHEAD - 1;
215+
in.readFully(offset, temp); // <-- works
216+
assertDatasetEquals(offset, "read spanning boundary", temp, temp.length);
217+
}
218+
// Read exactly on the the boundary
219+
try (FSDataInputStream in = fs.open(path)) {
220+
final byte[] temp = new byte[5];
221+
readAtEndAndReturn(in);
222+
assertEquals("current position", 1, (int)(in.getPos()));
223+
in.readFully(READAHEAD, temp);
224+
assertDatasetEquals(READAHEAD, "read exactly on boundary",
225+
temp, temp.length);
226+
}
227+
}
228+
229+
/**
230+
* Read across the end of the read buffer using the readByte call,
231+
* which will read a single byte only.
232+
*/
233+
@Test
234+
public void testReadSingleByteAcrossReadahead() throws Throwable {
235+
describe("Read over boundary using read()/readByte() calls.");
236+
Path path = path("testReadSingleByteAcrossReadahead");
237+
writeTestDataset(path);
238+
FileSystem fs = getFileSystem();
239+
try (FSDataInputStream in = fs.open(path)) {
240+
final byte[] b0 = new byte[1];
241+
readAtEndAndReturn(in);
242+
in.seek(READAHEAD - 1);
243+
b0[0] = in.readByte();
244+
assertDatasetEquals(READAHEAD - 1, "read before end of boundary", b0,
245+
b0.length);
246+
b0[0] = in.readByte();
247+
assertDatasetEquals(READAHEAD, "read at end of boundary", b0, b0.length);
248+
b0[0] = in.readByte();
249+
assertDatasetEquals(READAHEAD + 1, "read after end of boundary", b0,
250+
b0.length);
251+
}
252+
}
253+
254+
@Test
255+
public void testSeekToReadaheadAndRead() throws Throwable {
256+
describe("Seek to just before readahead limit and call"
257+
+ " InputStream.read(byte[])");
258+
Path path = path("testSeekToReadaheadAndRead");
259+
FileSystem fs = getFileSystem();
260+
writeTestDataset(path);
261+
try (FSDataInputStream in = fs.open(path)) {
262+
readAtEndAndReturn(in);
263+
final byte[] temp = new byte[5];
264+
int offset = READAHEAD - 1;
265+
in.seek(offset);
266+
// expect to read at least one byte.
267+
int l = in.read(temp);
268+
assertTrue("Reading in temp data", l > 0);
269+
LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
270+
assertDatasetEquals(offset, "read at end of boundary", temp, l);
271+
}
272+
}
273+
274+
@Test
275+
public void testSeekToReadaheadExactlyAndRead() throws Throwable {
276+
describe("Seek to exactly the readahead limit and call"
277+
+ " InputStream.read(byte[])");
278+
Path path = path("testSeekToReadaheadExactlyAndRead");
279+
FileSystem fs = getFileSystem();
280+
writeTestDataset(path);
281+
try (FSDataInputStream in = fs.open(path)) {
282+
readAtEndAndReturn(in);
283+
final byte[] temp = new byte[5];
284+
int offset = READAHEAD;
285+
in.seek(offset);
286+
// expect to read at least one byte.
287+
int l = in.read(temp);
288+
LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
289+
assertTrue("Reading in temp data", l > 0);
290+
assertDatasetEquals(offset, "read at end of boundary", temp, l);
291+
}
292+
}
293+
294+
@Test
295+
public void testSeekToReadaheadExactlyAndReadByte() throws Throwable {
296+
describe("Seek to exactly the readahead limit and call"
297+
+ " readByte()");
298+
Path path = path("testSeekToReadaheadExactlyAndReadByte");
299+
FileSystem fs = getFileSystem();
300+
writeTestDataset(path);
301+
try (FSDataInputStream in = fs.open(path)) {
302+
readAtEndAndReturn(in);
303+
final byte[] temp = new byte[1];
304+
int offset = READAHEAD;
305+
in.seek(offset);
306+
// expect to read a byte successfully.
307+
temp[0] = in.readByte();
308+
assertDatasetEquals(READAHEAD, "read at end of boundary", temp, 1);
309+
LOG.info("Read of byte at offset {} returned expected value", offset);
310+
}
311+
}
312+
313+
/**
314+
* Write the standard {@link #DATASET} dataset to the given path.
315+
* @param path path to write to.
316+
* @throws IOException failure
317+
*/
318+
private void writeTestDataset(final Path path) throws IOException {
319+
ContractTestUtils.writeDataset(getFileSystem(), path,
320+
DATASET, DATASET_LEN, READAHEAD, true);
321+
}
322+
48323
}

0 commit comments

Comments
 (0)