2121
2222import java .io .File ;
2323import java .io .IOException ;
24- import java .nio .charset .Charset ;
25- import java .util .UUID ;
24+ import java .io .InputStream ;
2625
2726import org .junit .Before ;
2827import org .junit .Test ;
2928
30- import org .apache .commons .io .FileUtils ;
3129import org .apache .hadoop .conf .Configuration ;
3230import org .apache .hadoop .fs .FSDataInputStream ;
3331import org .apache .hadoop .fs .FileStatus ;
3432import org .apache .hadoop .fs .FileSystem ;
3533import org .apache .hadoop .fs .Path ;
36- import org .apache .hadoop .fs .PathHandle ;
37- import org .apache .hadoop .fs .s3a .impl .AwsSdkWorkarounds ;
34+ import org .apache .hadoop .fs .s3a . impl . streams . InputStreamType ;
35+ import org .apache .hadoop .fs .s3a .impl .streams . ObjectInputStream ;
3836import org .apache .hadoop .fs .statistics .IOStatistics ;
39- import org .apache .hadoop .test .GenericTestUtils ;
4037
41- import static org .apache .commons .io .FileUtils .ONE_KB ;
4238import static org .apache .hadoop .fs .s3a .Constants .*;
4339import static org .apache .hadoop .fs .s3a .S3ATestUtils .enableAnalyticsAccelerator ;
44- import static org .apache .hadoop .fs .s3a .S3ATestUtils .enablePrefetching ;
4540import static org .apache .hadoop .fs .s3a .S3ATestUtils .removeBaseAndBucketOverrides ;
4641import static org .apache .hadoop .fs .s3a .test .PublicDatasetTestUtils .getExternalData ;
4742import static org .apache .hadoop .fs .s3a .test .PublicDatasetTestUtils .isUsingDefaultExternalDataFile ;
4843import static org .apache .hadoop .fs .statistics .IOStatisticAssertions .verifyStatisticCounterValue ;
4944import static org .apache .hadoop .fs .statistics .StreamStatisticNames .STREAM_READ_ANALYTICS_OPENED ;
50- import static org .apache .hadoop .fs .statistics .StreamStatisticNames .STREAM_READ_OPENED ;
51- import static org .apache .hadoop .fs .statistics .StreamStatisticNames .STREAM_READ_PREFETCH_OPERATIONS ;
52- import static org .apache .hadoop .test .GenericTestUtils .LogCapturer .captureLogs ;
45+
5346
5447import org .assertj .core .api .Assertions ;
5548
56- import org . slf4j . LoggerFactory ;
49+
5750import software .amazon .s3 .analyticsaccelerator .S3SeekableInputStreamConfiguration ;
5851import software .amazon .s3 .analyticsaccelerator .common .ConnectorConfiguration ;
59- import software .amazon .s3 .analyticsaccelerator .io .logical .parquet .ParquetMetadataParsingTask ;
6052import software .amazon .s3 .analyticsaccelerator .util .PrefetchMode ;
6153
6254public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
@@ -90,6 +82,9 @@ public Configuration createConfiguration() {
9082 public void testConnectorFrameWorkIntegration () throws IOException {
9183 describe ("Verify S3 connector framework integration" );
9284
85+ removeBaseAndBucketOverrides (conf , INPUT_FADVISE );
86+ conf .set (INPUT_FADVISE , "whole-file" );
87+
9388 S3AFileSystem fs =
9489 (S3AFileSystem ) FileSystem .get (testFile .toUri (), conf );
9590 byte [] buffer = new byte [500 ];
@@ -99,7 +94,13 @@ public void testConnectorFrameWorkIntegration() throws IOException {
9994 ioStats = inputStream .getIOStatistics ();
10095 inputStream .seek (5 );
10196 inputStream .read (buffer , 0 , 500 );
97+
98+ final InputStream wrappedStream = inputStream .getWrappedStream ();
99+ ObjectInputStream objectInputStream = (ObjectInputStream ) wrappedStream ;
100+ assertEquals (objectInputStream .streamType (), InputStreamType .Analytics );
101+ assertEquals (objectInputStream .getInputPolicy (), S3AInputPolicy .Sequential );
102102 }
103+
103104 verifyStatisticCounterValue (ioStats , STREAM_READ_ANALYTICS_OPENED , 1 );
104105 }
105106
0 commit comments