4242import static org .apache .hadoop .fs .Options .OpenFileOptions .FS_OPTION_OPENFILE_READ_POLICY ;
4343import static org .apache .hadoop .fs .Options .OpenFileOptions .FS_OPTION_OPENFILE_READ_POLICY_PARQUET ;
4444import static org .apache .hadoop .fs .Options .OpenFileOptions .FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE ;
45+ import static org .apache .hadoop .fs .contract .ContractTestUtils .writeDataset ;
46+ import static org .apache .hadoop .fs .contract .ContractTestUtils .dataset ;
4547import static org .apache .hadoop .fs .s3a .Constants .ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX ;
4648import static org .apache .hadoop .fs .s3a .S3ATestUtils .enableAnalyticsAccelerator ;
4749import static org .apache .hadoop .fs .s3a .S3ATestUtils .removeBaseAndBucketOverrides ;
5456import static org .apache .hadoop .fs .statistics .StreamStatisticNames .STREAM_READ_ANALYTICS_HEAD_REQUESTS ;
5557import static org .apache .hadoop .fs .statistics .StreamStatisticNames .STREAM_READ_ANALYTICS_OPENED ;
5658import static org .apache .hadoop .fs .statistics .StreamStatisticNames .ANALYTICS_STREAM_FACTORY_CLOSED ;
59+ import static org .apache .hadoop .io .Sizes .S_1K ;
60+ import static org .apache .hadoop .io .Sizes .S_1M ;
5761import static org .apache .hadoop .test .LambdaTestUtils .intercept ;
5862
5963/**
@@ -154,6 +158,9 @@ public void testMalformedParquetFooter() throws IOException {
154158 }
155159
156160 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_OPENED , 1 );
161+ verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_GET_REQUESTS , 1 );
162+ // S3A passes in the meta data on file open, we expect AAL to make no HEAD requests
163+ verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_HEAD_REQUESTS , 0 );
157164 }
158165
159166 /**
@@ -195,6 +202,7 @@ public void testMultiRowGroupParquet() throws Throwable {
195202 }
196203
197204 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_OPENED , 1 );
205+ // S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests
198206 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_HEAD_REQUESTS , 0 );
199207 }
200208
@@ -215,32 +223,48 @@ public void testInvalidConfigurationThrows() throws Exception {
215223 () -> S3SeekableInputStreamConfiguration .fromConfiguration (connectorConfiguration ));
216224 }
217225
226+ /**
227+ *
228+ * TXT files are classified as SEQUENTIAL format and use SequentialPrefetcher(requests the entire 10MB file)
229+ * RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB)
230+ * The 10MB range gets split into: [0-8MB) and [8MB-10MB)
231+ * Each split range becomes a separate Block, resulting in 2 GET requests:
232+ */
218233 @ Test
219234 public void testLargeFileMultipleGets () throws Throwable {
220235 describe ("Large file should trigger multiple GET requests" );
221236
222- Path dest = writeThenReadFile ("large-test-file.txt" , 10 * 1024 * 1024 ); // 10MB
223-
237+ Path dest = path ("large-test-file.txt" );
238+ byte [] data = dataset (10 * S_1M , 256 , 255 );
239+ writeDataset (getFileSystem (), dest , data , 10 * S_1M , 1024 , true );
224240
241+ byte [] buffer = new byte [S_1M * 10 ];
225242 try (FSDataInputStream inputStream = getFileSystem ().open (dest )) {
226243 IOStatistics ioStats = inputStream .getIOStatistics ();
227- inputStream .readFully (new byte [( int ) getFileSystem (). getFileStatus ( dest ). getLen ()] );
244+ inputStream .readFully (buffer );
228245
229246 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_GET_REQUESTS , 2 );
247+ // Because S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests
248+ verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_HEAD_REQUESTS , 0 );
230249 }
231250 }
232251
233252 @ Test
234253 public void testSmallFileSingleGet () throws Throwable {
235254 describe ("Small file should trigger only one GET request" );
236255
237- Path dest = writeThenReadFile ("small-test-file.txt" , 1 * 1024 * 1024 ); // 1KB
256+ Path dest = path ("small-test-file.txt" );
257+ byte [] data = dataset (S_1M , 256 , 255 );
258+ writeDataset (getFileSystem (), dest , data , S_1M , 1024 , true );
238259
260+ byte [] buffer = new byte [S_1M ];
239261 try (FSDataInputStream inputStream = getFileSystem ().open (dest )) {
240262 IOStatistics ioStats = inputStream .getIOStatistics ();
241- inputStream .readFully (new byte [( int ) getFileSystem (). getFileStatus ( dest ). getLen ()] );
263+ inputStream .readFully (buffer );
242264
243265 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_GET_REQUESTS , 1 );
266+ // Because S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests
267+ verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_HEAD_REQUESTS , 0 );
244268 }
245269 }
246270
@@ -249,93 +273,46 @@ public void testSmallFileSingleGet() throws Throwable {
249273 public void testRandomSeekPatternGets () throws Throwable {
250274 describe ("Random seek pattern should optimize GET requests" );
251275
252- Path dest = writeThenReadFile ("seek-test.txt" , 100 * 1024 );
276+ Path dest = path ("seek-test.txt" );
277+ byte [] data = dataset (5 * S_1M , 256 , 255 );
278+ writeDataset (getFileSystem (), dest , data , 5 * S_1M , 1024 , true );
253279
280+ byte [] buffer = new byte [S_1M ];
254281 try (FSDataInputStream inputStream = getFileSystem ().open (dest )) {
255282 IOStatistics ioStats = inputStream .getIOStatistics ();
256283
257- inputStream .seek (1000 );
258- inputStream .read (new byte [100 ]);
259-
260- inputStream .seek (50000 );
261- inputStream .read (new byte [100 ]);
262-
263- inputStream .seek (90000 );
264- inputStream .read (new byte [100 ]);
284+ inputStream .read (buffer );
285+ inputStream .seek (2 * S_1M );
286+ inputStream .read (new byte [512 * S_1K ]);
287+ inputStream .seek (3 * S_1M );
288+ inputStream .read (new byte [512 * S_1K ]);
265289
266290 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_GET_REQUESTS , 1 );
267- }
268- }
269-
270- @ Test
271- public void testAALNeverMakesHeadRequests () throws Throwable {
272- describe ("Prove AAL never makes HEAD requests - S3A provides all metadata" );
273-
274- Path dest = writeThenReadFile ("no-head-test.txt" , 1024 * 1024 ); // 1MB
275-
276- try (FSDataInputStream inputStream = getFileSystem ().open (dest )) {
277- IOStatistics ioStats = inputStream .getIOStatistics ();
278- inputStream .read (new byte [1024 ]);
279-
280291 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_HEAD_REQUESTS , 0 );
281- verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_OPENED , 1 );
282-
283- ObjectInputStream objectInputStream = (ObjectInputStream ) inputStream .getWrappedStream ();
284- Assertions .assertThat (objectInputStream .streamType ()).isEqualTo (InputStreamType .Analytics );
285-
286292 }
287293 }
288294
289295
290296 @ Test
291- public void testParquetReadingNoHeadRequests () throws Throwable {
292- describe ("Parquet-optimized reading should not trigger AAL HEAD requests" );
293-
294- Path dest = path ("parquet-head-test.parquet" );
295- File file = new File ("src/test/resources/multi_row_group.parquet" );
296- Path sourcePath = new Path (file .toURI ().getPath ());
297- getFileSystem ().copyFromLocalFile (false , true , sourcePath , dest );
298-
299- try (FSDataInputStream stream = getFileSystem ().openFile (dest )
300- .must (FS_OPTION_OPENFILE_READ_POLICY , FS_OPTION_OPENFILE_READ_POLICY_PARQUET )
301- .build ().get ()) {
302-
303- FileStatus fileStatus = getFileSystem ().getFileStatus (dest );
304- stream .readFully (new byte [(int ) fileStatus .getLen ()]);
297+ public void testSequentialStreamsNoDuplicateGets () throws Throwable {
298+ describe ("Sequential streams reading same object should not duplicate GETs" );
305299
306- IOStatistics stats = stream .getIOStatistics ();
307-
308- verifyStatisticCounterValue (stats , STREAM_READ_ANALYTICS_HEAD_REQUESTS , 0 );
309- verifyStatisticCounterValue (stats , STREAM_READ_ANALYTICS_OPENED , 1 );
310-
311- verifyStatisticCounterValue (stats , STREAM_READ_ANALYTICS_GET_REQUESTS , 1 );
312- }
313- }
314-
315-
316- @ Test
317- public void testConcurrentStreamsNoDuplicateGets () throws Throwable {
318- describe ("Concurrent streams reading same object should not duplicate GETs" );
319-
320- Path dest = writeThenReadFile ("concurrent-test.txt" , 1 * 1024 * 1024 );
300+ Path dest = path ("sequential-test.txt" );
301+ byte [] data = dataset (S_1M , 256 , 255 );
302+ writeDataset (getFileSystem (), dest , data , S_1M , 1024 , true );
321303
304+ byte [] buffer = new byte [1024 ];
322305 try (FSDataInputStream stream1 = getFileSystem ().open (dest );
323306 FSDataInputStream stream2 = getFileSystem ().open (dest )) {
324307
325- byte [] buffer1 = new byte [1024 ];
326- byte [] buffer2 = new byte [1024 ];
327-
328- stream1 .read (buffer1 );
329- stream2 .read (buffer2 );
308+ stream1 .read (buffer );
309+ stream2 .read (buffer );
330310
331311 IOStatistics stats1 = stream1 .getIOStatistics ();
332312 IOStatistics stats2 = stream2 .getIOStatistics ();
333313
334- long totalGets = stats1 .counters ().getOrDefault (
335- STREAM_READ_ANALYTICS_GET_REQUESTS , 0L ) +
336- stats2 .counters ().getOrDefault (
337- STREAM_READ_ANALYTICS_GET_REQUESTS , 0L );
338- Assertions .assertThat (totalGets ).isEqualTo (1 );
314+ verifyStatisticCounterValue (stats1 , STREAM_READ_ANALYTICS_GET_REQUESTS , 1 );
315+ verifyStatisticCounterValue (stats2 , STREAM_READ_ANALYTICS_GET_REQUESTS , 0 );
339316 }
340317 }
341318}
0 commit comments