diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 89cc3b74ec63f..079b4022f2225 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1920,7 +1920,9 @@ private FSDataInputStream executeOpen( .withContext(readContext.build()) .withObjectAttributes(createObjectAttributes(path, fileStatus)) .withStreamStatistics(inputStreamStats) - .withEncryptionSecrets(getEncryptionSecrets()); + .withEncryptionSecrets(getEncryptionSecrets()) + .withAuditSpan(auditSpan); + return new FSDataInputStream(getStore().readObject(parameters)); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index e91710a0af3a0..d9a39d5d7dbc2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -21,6 +21,7 @@ import java.util.List; import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; @@ -50,6 +51,8 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE; +import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME; +import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID; /** * Extract information from a request. @@ -193,6 +196,18 @@ private RequestInfo writing(final String verb, || request instanceof CreateSessionRequest; } + /** + * If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing + * of requests which are made outside S3A's requestFactory. + * + * @param executionAttributes request execution attributes + * @return true if request is audited outside of current span + */ + public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) { + return executionAttributes.getAttribute(SPAN_ID) != null + && executionAttributes.getAttribute(OPERATION_NAME) != null; + } + /** * Predicate which returns true if the request is part of the * multipart upload API -and which therefore must be rejected diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java index 840ce5ffd3084..19af82350b126 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java @@ -61,6 +61,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO; import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan; +import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT; @@ -69,6 +70,8 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER; import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName; +import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME; +import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID; /** * The LoggingAuditor logs operations at DEBUG (in SDK Request) and @@ -85,7 +88,6 @@ public class LoggingAuditor private static final Logger LOG = LoggerFactory.getLogger(LoggingAuditor.class); - /** * Some basic analysis for the logs. */ @@ -267,7 +269,14 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) { */ private class LoggingAuditSpan extends AbstractAuditSpanImpl { - private final HttpReferrerAuditHeader referrer; + private HttpReferrerAuditHeader referrer; + + /** + * Builder for the referrer header. Requests that execute outside S3A, such as in AAL, will initially have SpanId + * of the outside-span operation. For such requests, the spanId and operation name in this builder is overwritten + * in the modifyHttpRequest execution interceptor. + */ + private final HttpReferrerAuditHeader.Builder headerBuilder; /** * Attach Range of data for GetObject Request. @@ -300,7 +309,7 @@ private LoggingAuditSpan( final String path2) { super(spanId, operationName); - this.referrer = HttpReferrerAuditHeader.builder() + this.headerBuilder = HttpReferrerAuditHeader.builder() .withContextId(getAuditorId()) .withSpanId(spanId) .withOperationName(operationName) @@ -312,8 +321,9 @@ private LoggingAuditSpan( currentThreadID()) .withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp())) .withEvaluated(context.getEvaluatedEntries()) - .withFilter(filters) - .build(); + .withFilter(filters); + + this.referrer = this.headerBuilder.build(); this.description = referrer.buildHttpReferrer(); } @@ -384,6 +394,26 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, SdkHttpRequest httpRequest = context.httpRequest(); SdkRequest sdkRequest = context.request(); + // If spanId and operationName are set in execution attributes, then use these values, + // instead of the ones in the current span. This is useful when requests are happening in dependencies such as + // the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL + // will attach the current spanId and operationName via execution attributes during it's request creation. These + // can then used to update the values in the logger and referrer header. Without this overwriting, the operation + // name and corresponding span will be whichever is active on the thread the request is getting executed on. + boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes); + + String spanId = isRequestAuditedOutsideCurrentSpan ? + executionAttributes.getAttribute(SPAN_ID) : getSpanId(); + + String operationName = isRequestAuditedOutsideCurrentSpan ? + executionAttributes.getAttribute(OPERATION_NAME) : getOperationName(); + + if (isRequestAuditedOutsideCurrentSpan) { + this.headerBuilder.withSpanId(spanId); + this.headerBuilder.withOperationName(operationName); + this.referrer = this.headerBuilder.build(); + } + // attach range for GetObject requests attachRangeFromRequest(httpRequest, executionAttributes); @@ -400,11 +430,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, .appendHeader(HEADER_REFERRER, header) .build(); } + if (LOG.isDebugEnabled()) { LOG.debug("[{}] {} Executing {} with {}; {}", currentThreadID(), - getSpanId(), - getOperationName(), + spanId, + operationName, analyzer.analyze(context.request()), header); } @@ -533,10 +564,12 @@ public void beforeExecution(Context.BeforeExecution context, + analyzer.analyze(context.request()); final String unaudited = getSpanId() + " " + UNAUDITED_OPERATION + " " + error; + // If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it + // as an audited request. if (isRequestNotAlwaysInSpan(context.request())) { - // can get by auditing during a copy, so don't overreact + // can get by auditing during a copy, so don't overreact. LOG.debug(unaudited); - } else { + } else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) { final RuntimeException ex = new AuditFailureException(unaudited); LOG.debug(unaudited, ex); if (isRejectOutOfSpan()) { @@ -547,5 +580,4 @@ public void beforeExecution(Context.BeforeExecution context, super.beforeExecution(context, executionAttributes); } } - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index 2e78467676bdc..240d1958653bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -994,8 +994,7 @@ private class FactoryCallbacks implements StreamFactoryCallbacks { @Override public S3Client getOrCreateSyncClient() throws IOException { - // Needs support of the CRT before the requireCRT can be used - LOG.debug("Stream factory requested async client"); + LOG.debug("Stream factory requested sync client"); return clientManager().getOrCreateS3Client(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 7d28ed22299ec..8920b5b2dfc7c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -36,6 +36,7 @@ import software.amazon.s3.analyticsaccelerator.common.ObjectRange; import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext; import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -257,12 +258,18 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa .etag(parameters.getObjectAttributes().getETag()).build()); } + if (parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) { EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets()) .ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets( EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build())); } + openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder() + .operationName(parameters.getAuditSpan().getOperationName()) + .spanId(parameters.getAuditSpan().getSpanId()) + .build()); + return openStreamInformationBuilder.build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java index 17f134652decc..50333c68e0cd2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -97,8 +97,7 @@ public StreamFactoryRequirements factoryRequirements() { vectorContext.setMinSeekForVectoredReads(0); return new StreamFactoryRequirements(0, - 0, vectorContext, - StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests); + 0, vectorContext); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java index 321e803b8b33f..459e7cc93f50b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.store.audit.AuditSpan; import static java.util.Objects.requireNonNull; @@ -75,6 +76,11 @@ public final class ObjectReadParameters { */ private EncryptionSecrets encryptionSecrets; + /** + * Span for which this stream is being created. + */ + private AuditSpan auditSpan; + /** * Getter. * @return Encryption secrets. @@ -196,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value return this; } + /** + * Getter. + * @return Audit span. + */ + public AuditSpan getAuditSpan() { + return auditSpan; + } + + /** + * Set audit span. + * @param value new value + * @return the builder + */ + public ObjectReadParameters withAuditSpan(final AuditSpan value) { + auditSpan = value; + return this; + } + /** * Validate that all attributes are as expected. * Mock tests can skip this if required. @@ -210,6 +234,7 @@ public ObjectReadParameters validate() { requireNonNull(objectAttributes, "objectAttributes"); requireNonNull(streamStatistics, "streamStatistics"); requireNonNull(encryptionSecrets, "encryptionSecrets"); + requireNonNull(auditSpan, "auditSpan"); return this; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index 1aec9ff2b57a8..8f8f90f9b1e65 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; @@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable { verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); fs.close(); verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1); + + // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE, + // in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here + // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS: + // [0-8388607, 8388608-16777215, 16777216-21511173]. + verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); } @Test @@ -175,6 +182,8 @@ public void testMultiRowGroupParquet() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + + verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); } @Test