Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,9 @@ private FSDataInputStream executeOpen(
.withContext(readContext.build())
.withObjectAttributes(createObjectAttributes(path, fileStatus))
.withStreamStatistics(inputStreamStats)
.withEncryptionSecrets(getEncryptionSecrets());
.withEncryptionSecrets(getEncryptionSecrets())
.withAuditSpan(auditSpan);

return new FSDataInputStream(getStore().readObject(parameters));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
Expand Down Expand Up @@ -50,6 +51,8 @@
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;

/**
* Extract information from a request.
Expand Down Expand Up @@ -193,6 +196,18 @@ private RequestInfo writing(final String verb,
|| request instanceof CreateSessionRequest;
}

/**
* If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing
* of requests which are made outside S3A's requestFactory.
*
* @param executionAttributes request execution attributes
* @return true if request is audited outside of current span
*/
public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
return executionAttributes.getAttribute(SPAN_ID) != null
&& executionAttributes.getAttribute(OPERATION_NAME) != null;
}

/**
* Predicate which returns true if the request is part of the
* multipart upload API -and which therefore must be rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
Expand All @@ -69,6 +70,8 @@
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;

/**
* The LoggingAuditor logs operations at DEBUG (in SDK Request) and
Expand All @@ -85,7 +88,6 @@ public class LoggingAuditor
private static final Logger LOG =
LoggerFactory.getLogger(LoggingAuditor.class);


/**
* Some basic analysis for the logs.
*/
Expand Down Expand Up @@ -267,7 +269,14 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
*/
private class LoggingAuditSpan extends AbstractAuditSpanImpl {

private final HttpReferrerAuditHeader referrer;
private HttpReferrerAuditHeader referrer;

/**
* Builder for the referrer header. Requests that execute outside S3A, such as in AAL, will initially have SpanId
* of the outside-span operation. For such requests, the spanId and operation name in this builder is overwritten
* in the modifyHttpRequest execution interceptor.
*/
private final HttpReferrerAuditHeader.Builder headerBuilder;

/**
* Attach Range of data for GetObject Request.
Expand Down Expand Up @@ -300,7 +309,7 @@ private LoggingAuditSpan(
final String path2) {
super(spanId, operationName);

this.referrer = HttpReferrerAuditHeader.builder()
this.headerBuilder = HttpReferrerAuditHeader.builder()
.withContextId(getAuditorId())
.withSpanId(spanId)
.withOperationName(operationName)
Expand All @@ -312,8 +321,9 @@ private LoggingAuditSpan(
currentThreadID())
.withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
.withEvaluated(context.getEvaluatedEntries())
.withFilter(filters)
.build();
.withFilter(filters);

this.referrer = this.headerBuilder.build();

this.description = referrer.buildHttpReferrer();
}
Expand Down Expand Up @@ -384,6 +394,26 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
SdkHttpRequest httpRequest = context.httpRequest();
SdkRequest sdkRequest = context.request();

// If spanId and operationName are set in execution attributes, then use these values,
// instead of the ones in the current span. This is useful when requests are happening in dependencies such as
// the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL
// will attach the current spanId and operationName via execution attributes during it's request creation. These
// can then used to update the values in the logger and referrer header. Without this overwriting, the operation
// name and corresponding span will be whichever is active on the thread the request is getting executed on.
boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes);

String spanId = isRequestAuditedOutsideCurrentSpan ?
executionAttributes.getAttribute(SPAN_ID) : getSpanId();

String operationName = isRequestAuditedOutsideCurrentSpan ?
executionAttributes.getAttribute(OPERATION_NAME) : getOperationName();

if (isRequestAuditedOutsideCurrentSpan) {
this.headerBuilder.withSpanId(spanId);
this.headerBuilder.withOperationName(operationName);
this.referrer = this.headerBuilder.build();
}

// attach range for GetObject requests
attachRangeFromRequest(httpRequest, executionAttributes);

Expand All @@ -400,11 +430,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
.appendHeader(HEADER_REFERRER, header)
.build();
}

if (LOG.isDebugEnabled()) {
LOG.debug("[{}] {} Executing {} with {}; {}",
currentThreadID(),
getSpanId(),
getOperationName(),
spanId,
operationName,
analyzer.analyze(context.request()),
header);
}
Expand Down Expand Up @@ -533,10 +564,12 @@ public void beforeExecution(Context.BeforeExecution context,
+ analyzer.analyze(context.request());
final String unaudited = getSpanId() + " "
+ UNAUDITED_OPERATION + " " + error;
// If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it
// as an audited request.
if (isRequestNotAlwaysInSpan(context.request())) {
// can get by auditing during a copy, so don't overreact
// can get by auditing during a copy, so don't overreact.
LOG.debug(unaudited);
} else {
} else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
final RuntimeException ex = new AuditFailureException(unaudited);
LOG.debug(unaudited, ex);
if (isRejectOutOfSpan()) {
Expand All @@ -547,5 +580,4 @@ public void beforeExecution(Context.BeforeExecution context,
super.beforeExecution(context, executionAttributes);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,7 @@ private class FactoryCallbacks implements StreamFactoryCallbacks {

@Override
public S3Client getOrCreateSyncClient() throws IOException {
// Needs support of the CRT before the requireCRT can be used
LOG.debug("Stream factory requested async client");
LOG.debug("Stream factory requested sync client");
return clientManager().getOrCreateS3Client();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
Expand Down Expand Up @@ -257,12 +258,18 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa
.etag(parameters.getObjectAttributes().getETag()).build());
}


if (parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) {
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
.ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets(
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
}

openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
.operationName(parameters.getAuditSpan().getOperationName())
.spanId(parameters.getAuditSpan().getSpanId())
.build());

return openStreamInformationBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public StreamFactoryRequirements factoryRequirements() {
vectorContext.setMinSeekForVectoredReads(0);

return new StreamFactoryRequirements(0,
0, vectorContext,
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
0, vectorContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.store.audit.AuditSpan;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -75,6 +76,11 @@ public final class ObjectReadParameters {
*/
private EncryptionSecrets encryptionSecrets;

/**
* Span for which this stream is being created.
*/
private AuditSpan auditSpan;

/**
* Getter.
* @return Encryption secrets.
Expand Down Expand Up @@ -196,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value
return this;
}

/**
* Getter.
* @return Audit span.
*/
public AuditSpan getAuditSpan() {
return auditSpan;
}

/**
* Set audit span.
* @param value new value
* @return the builder
*/
public ObjectReadParameters withAuditSpan(final AuditSpan value) {
auditSpan = value;
return this;
}

/**
* Validate that all attributes are as expected.
* Mock tests can skip this if required.
Expand All @@ -210,6 +234,7 @@ public ObjectReadParameters validate() {
requireNonNull(objectAttributes, "objectAttributes");
requireNonNull(streamStatistics, "streamStatistics");
requireNonNull(encryptionSecrets, "encryptionSecrets");
requireNonNull(auditSpan, "auditSpan");
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
Expand Down Expand Up @@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
fs.close();
verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);

// Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE,
// in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS:
// [0-8388607, 8388608-16777215, 16777216-21511173].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slick. These are parallel GETs aren't they?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, parallel GETs

verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
}

@Test
Expand Down Expand Up @@ -175,6 +182,8 @@ public void testMultiRowGroupParquet() throws Throwable {
}

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);

verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
}

@Test
Expand Down