diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 07f3e8ab1e9d4..20e3639783bd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1698,7 +1698,7 @@ fs.s3a.retry.throttle.limit - ${fs.s3a.attempts.maximum} + 20 Number of times to retry any throttled request. @@ -1706,9 +1706,13 @@ fs.s3a.retry.throttle.interval - 1000ms + 5000s - Interval between retry attempts on throttled requests. + Initial between retry attempts on throttled requests, +/- 50%. chosen at random. + i.e. for an intial value of 3000ms, the initial delay would be in the range 1500ms to 4500ms. + Backoffs are exponential; again randomness is used to avoid the thundering heard problem. + Given that throttling in S3 is per-second, very short delays will not initial spread + out work and so continue to create the problem. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index e107d4987f0da..6dfe73e486fc2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -733,8 +733,7 @@ private Constants() { /** * Default throttled retry limit: {@value}. */ - public static final int RETRY_THROTTLE_LIMIT_DEFAULT = - DEFAULT_MAX_ERROR_RETRIES; + public static final int RETRY_THROTTLE_LIMIT_DEFAULT = 20; /** * Interval between retry attempts on throttled requests: {@value}. @@ -745,7 +744,7 @@ private Constants() { /** * Default throttled retry interval: {@value}. */ - public static final String RETRY_THROTTLE_INTERVAL_DEFAULT = "500ms"; + public static final String RETRY_THROTTLE_INTERVAL_DEFAULT = "5000ms"; /** * Should etags be exposed as checksums? diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index ff8ba1d6d5dac..1b833c5bde47e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -57,6 +57,11 @@ public AmazonS3 createS3Client(URI name, Configuration conf = getConf(); final ClientConfiguration awsConf = S3AUtils .createAwsConf(getConf(), bucket, Constants.AWS_SERVICE_IDENTIFIER_S3); + + // throttling is explicitly disabled on the S3 client so that + // all failures are collected + awsConf.setUseThrottleRetries(false); + if (!StringUtils.isEmpty(userAgentSuffix)) { awsConf.setUserAgentSuffix(userAgentSuffix); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index fd2ef9b29d1ab..2b344e35320b6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1647,10 +1647,11 @@ protected void incrementGauge(Statistic statistic, long count) { * @param ex exception. */ public void operationRetried(Exception ex) { - Statistic stat = isThrottleException(ex) - ? STORE_IO_THROTTLED - : IGNORED_ERRORS; - incrementStatistic(stat); + if (isThrottleException(ex)) { + operationThrottled(false); + } else { + incrementStatistic(IGNORED_ERRORS); + } } /** @@ -1683,11 +1684,28 @@ public void operationRetried( public void metastoreOperationRetried(Exception ex, int retries, boolean idempotent) { - operationRetried(ex); incrementStatistic(S3GUARD_METADATASTORE_RETRY); if (isThrottleException(ex)) { + operationThrottled(true); + } else { + incrementStatistic(IGNORED_ERRORS); + } + } + + /** + * Note that an operation was throttled -this will update + * specific counters/metrics. + * @param metastore was the throttling observed in the S3Guard metastore? + */ + private void operationThrottled(boolean metastore) { + LOG.debug("Request throttled on {}", metastore ? "S3": "DynamoDB"); + if (metastore) { incrementStatistic(S3GUARD_METADATASTORE_THROTTLED); - instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1); + instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, + 1); + } else { + incrementStatistic(STORE_IO_THROTTLED); + instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 4e1de370a6cc8..b9918b5098946 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -123,6 +123,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource { private final MutableCounterLong ignoredErrors; private final MutableQuantiles putLatencyQuantile; private final MutableQuantiles throttleRateQuantile; + private final MutableQuantiles s3GuardThrottleRateQuantile; private final MutableCounterLong numberOfFilesCreated; private final MutableCounterLong numberOfFilesCopied; private final MutableCounterLong bytesOfFilesCopied; @@ -248,7 +249,9 @@ public S3AInstrumentation(URI name) { int interval = 1; putLatencyQuantile = quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY, "ops", "latency", interval); - throttleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, + s3GuardThrottleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, + "events", "frequency (Hz)", interval); + throttleRateQuantile = quantiles(STORE_IO_THROTTLE_RATE, "events", "frequency (Hz)", interval); registerAsMetricsSource(name); @@ -617,6 +620,7 @@ public void close() { // task in a shared thread pool. putLatencyQuantile.stop(); throttleRateQuantile.stop(); + s3GuardThrottleRateQuantile.stop(); metricsSystem.unregisterSource(metricsSourceName); int activeSources = --metricsSourceActiveCounter; if (activeSources == 0) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 06c60a46f5e79..1d3d4758028c6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -230,6 +230,8 @@ public enum Statistic { "S3Guard metadata store authoritative directories updated from S3"), STORE_IO_THROTTLED("store_io_throttled", "Requests throttled and retried"), + STORE_IO_THROTTLE_RATE("store_io_throttle_rate", + "Rate of S3 request throttling"), DELEGATION_TOKENS_ISSUED("delegation_tokens_issued", "Number of delegation tokens issued"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java index 7a273a66c2938..536481ac23b7e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java @@ -439,8 +439,8 @@ private void dumpEntry(CsvFile csv, DDBPathMetadata md) { private Pair scanMetastore(CsvFile csv) { S3GuardTableAccess tableAccess = new S3GuardTableAccess(getStore()); ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); - Iterable results = tableAccess.scanMetadata( - builder); + Iterable results = + getStore().wrapWithRetries(tableAccess.scanMetadata(builder)); long live = 0; long tombstone = 0; for (DDBPathMetadata md : results) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 143e276d1fef3..9e01673b4cc02 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -80,6 +81,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.impl.FunctionsRaisingIOE; +import org.apache.hadoop.fs.impl.WrappedIOException; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; import org.apache.hadoop.fs.s3a.Constants; @@ -324,8 +326,12 @@ public class DynamoDBMetadataStore implements MetadataStore, /** Invoker for write operations. */ private Invoker writeOp; + /** Invoker for scan operations. */ + private Invoker scanOp; + private final AtomicLong readThrottleEvents = new AtomicLong(0); private final AtomicLong writeThrottleEvents = new AtomicLong(0); + private final AtomicLong scanThrottleEvents = new AtomicLong(0); private final AtomicLong batchWriteCapacityExceededEvents = new AtomicLong(0); /** @@ -424,11 +430,6 @@ public void initialize(FileSystem fs, ITtlTimeProvider ttlTp) tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket); initDataAccessRetries(conf); - // set up a full retry policy - invoker = new Invoker(new S3GuardDataAccessRetryPolicy(conf), - this::retryEvent - ); - this.ttlTimeProvider = ttlTp; tableHandler = new DynamoDBMetadataStoreTableManager( @@ -543,6 +544,7 @@ private void initDataAccessRetries(Configuration config) { = new S3GuardDataAccessRetryPolicy(config); readOp = new Invoker(throttledRetryRetryPolicy, this::readRetryEvent); writeOp = new Invoker(throttledRetryRetryPolicy, this::writeRetryEvent); + scanOp = new Invoker(throttledRetryRetryPolicy, this::scanRetryEvent); } @Override @@ -809,34 +811,25 @@ private S3AFileStatus makeDirStatus(String dirOwner, Path path) { public DirListingMetadata listChildren(final Path path) throws IOException { checkPath(path); LOG.debug("Listing table {} in region {}: {}", tableName, region, path); - + final QuerySpec spec = new QuerySpec() + .withHashKey(pathToParentKeyAttribute(path)) + .withConsistentRead(true); // strictly consistent read + final List metas = new ArrayList<>(); // find the children in the table - return readOp.retry( + final ItemCollection items = scanOp.retry( "listChildren", path.toString(), true, - () -> { - final QuerySpec spec = new QuerySpec() - .withHashKey(pathToParentKeyAttribute(path)) - .withConsistentRead(true); // strictly consistent read - final ItemCollection items = table.query(spec); - - final List metas = new ArrayList<>(); - for (Item item : items) { - DDBPathMetadata meta = itemToPathMetadata(item, username); - metas.add(meta); - } - - // Minor race condition here - if the path is deleted between - // getting the list of items and the directory metadata we might - // get a null in DDBPathMetadata. - DDBPathMetadata dirPathMeta = get(path); - - final DirListingMetadata dirListing = - getDirListingMetadataFromDirMetaAndList(path, metas, - dirPathMeta); - return dirListing; - }); + () -> table.query(spec)); + // now wrap the result with retry logic + for (Item item : wrapWithRetries(items)) { + metas.add(itemToPathMetadata(item, username)); + } + // Minor race condition here - if the path is deleted between + // getting the list of items and the directory metadata we might + // get a null in DDBPathMetadata. + return getDirListingMetadataFromDirMetaAndList(path, metas, + get(path)); } DirListingMetadata getDirListingMetadataFromDirMetaAndList(Path path, @@ -1992,6 +1985,22 @@ void writeRetryEvent( retryEvent(text, ex, attempts, idempotent); } + /** + * Callback on a scan operation retried. + * @param text text of the operation + * @param ex exception + * @param attempts number of attempts + * @param idempotent is the method idempotent (this is assumed to be true) + */ + void scanRetryEvent( + String text, + IOException ex, + int attempts, + boolean idempotent) { + scanThrottleEvents.incrementAndGet(); + retryEvent(text, ex, attempts, idempotent); + } + /** * Callback from {@link Invoker} when an operation is retried. * @param text text of the operation @@ -2048,14 +2057,18 @@ public long getWriteThrottleEventCount() { return writeThrottleEvents.get(); } + /** + * Get the count of scan throttle events. + * @return the current count of scan throttle events. + */ @VisibleForTesting - public long getBatchWriteCapacityExceededCount() { - return batchWriteCapacityExceededEvents.get(); + public long getScanThrottleEventCount() { + return scanThrottleEvents.get(); } @VisibleForTesting - public Invoker getInvoker() { - return invoker; + public long getBatchWriteCapacityExceededCount() { + return batchWriteCapacityExceededEvents.get(); } /** @@ -2475,4 +2488,111 @@ protected DynamoDBMetadataStoreTableManager getTableHandler() { Preconditions.checkNotNull(tableHandler, "Not initialized"); return tableHandler; } + + /** + * Get the operation invoker for write operations. + * @return an invoker for retrying mutating operations on a store. + */ + Invoker getWriteOperationInvoker() { + return writeOp; + } + + /** + * Wrap an iterator returned from any scan with a retrying one. + * This includes throttle handling. + * @param source source iterator + * @return a retrying iterator. + */ + Iterator wrapWithRetries( + final Iterator source) { + return new RetryingIterator<>(source); + } + + /** + * Wrap an iterator returned from any scan with a retrying one. + * This includes throttle handling. + * @param source source iterator + * @return a retrying iterator. + */ + Iterable wrapWithRetries( + final Iterable source) { + return new RetryingCollection<>(source); + } + + /** + * A collection which wraps the result of a query or scan + * with retries; the {@link #scanThrottleEvents} count is + * then updated. + * Important: iterate through this only once; the outcome + * of repeating an iteration is "undefined" + * @param type of outcome. + */ + private final class RetryingCollection + implements Iterable { + + private final Iterable source; + + private RetryingCollection( + final Iterable source) { + this.source = source; + } + + + @Override + public Iterator iterator() { + return wrapWithRetries(source.iterator()); + } + } + + /** + * An iterator which wraps a non-retrying iterator of scan results + * (i.e {@code S3GuardTableAccess.DDBPathMetadataIterator}. + */ + private final class RetryingIterator implements + Iterator { + + private final Iterator source; + + private RetryingIterator( + final Iterator source) { + this.source = source; + } + + /** + * {@inheritDoc}. + * @throws WrappedIOException for IO failure, including throttling. + */ + @Override + @Retries.RetryTranslated + public boolean hasNext() { + try { + return scanOp.retry( + "Scan Dynamo", + null, + true, + source::hasNext); + } catch (IOException e) { + throw new WrappedIOException(e); + } + } + + /** + * {@inheritDoc}. + * @throws WrappedIOException for IO failure, including throttling. + */ + @Override + @Retries.RetryTranslated + public T next() { + try { + return scanOp.retry( + "Scan Dynamo", + null, + true, + source::next); + } catch (IOException e) { + throw new WrappedIOException(e); + } + } + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java index 244779abb939b..d577a91d57c91 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.s3guard; import javax.annotation.Nullable; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -128,9 +129,10 @@ protected void serviceStart() throws Exception { * delete all entries from that bucket. * @return the exit code. * @throws ServiceLaunchException on failure. + * @throws IOException IO failure. */ @Override - public int execute() throws ServiceLaunchException { + public int execute() throws ServiceLaunchException, IOException { URI uri = getUri(); String host = uri.getHost(); @@ -144,7 +146,8 @@ public int execute() throws ServiceLaunchException { LOG.info("Scanning for entries with prefix {} to delete from {}", prefix, ddbms); - Iterable entries = tableAccess.scanMetadata(builder); + Iterable entries = + ddbms.wrapWithRetries(tableAccess.scanMetadata(builder)); List list = new ArrayList<>(); entries.iterator().forEachRemaining(e -> { if (!(e instanceof S3GuardTableAccess.VersionMarker)) { @@ -169,7 +172,12 @@ public int execute() throws ServiceLaunchException { new DurationInfo(LOG, "deleting %s entries from %s", count, ddbms.toString()); - tableAccess.delete(list); + ddbms.getWriteOperationInvoker() + .retry("delete", + prefix, + true, + () -> tableAccess.delete(list)); + duration.close(); long durationMillis = duration.value(); long timePerEntry = durationMillis / count; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java index 19ef90e455741..0f33f4eb51c4d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java @@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AFileStatus; import static com.google.common.base.Preconditions.checkNotNull; @@ -70,6 +71,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable +@Retries.OnceRaw class S3GuardTableAccess { private static final Logger LOG = @@ -107,6 +109,7 @@ private String getUsername() { * @param spec query spec. * @return the outcome. */ + @Retries.OnceRaw ItemCollection query(QuerySpec spec) { return table.query(spec); } @@ -118,18 +121,22 @@ ItemCollection query(QuerySpec spec) { * @param spec query spec. * @return an iterator over path entries. */ + @Retries.OnceRaw Iterable queryMetadata(QuerySpec spec) { return new DDBPathMetadataCollection<>(query(spec)); } + @Retries.OnceRaw ItemCollection scan(ExpressionSpecBuilder spec) { return table.scan(spec.buildForScan()); } + @Retries.OnceRaw Iterable scanMetadata(ExpressionSpecBuilder spec) { return new DDBPathMetadataCollection<>(scan(spec)); } + @Retries.OnceRaw void delete(Collection paths) { paths.stream() .map(PathMetadataDynamoDBTranslation::pathToKey) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index f2f37f21ea5a7..d2b6fc00a8f6d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -34,6 +34,8 @@ import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder; +import org.assertj.core.api.Assertions; +import org.junit.AfterClass; import org.junit.Assume; import org.junit.FixMethodOrder; import org.junit.Test; @@ -47,14 +49,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.impl.WrappedIOException; import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; +import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AStorageStatistics; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.DurationInfo; @@ -62,7 +65,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; -import static org.junit.Assume.*; +import static org.junit.Assume.assumeTrue; /** * Scale test for DynamoDBMetadataStore. @@ -91,6 +94,7 @@ public class ITestDynamoDBMetadataStoreScale private static final long MAXIMUM_READ_CAPACITY = 10; private static final long MAXIMUM_WRITE_CAPACITY = 15; + private static DynamoDBMetadataStore classMetastore; private DynamoDBMetadataStore ddbms; private DynamoDBMetadataStoreTableManager tableHandler; @@ -114,7 +118,7 @@ public class ITestDynamoDBMetadataStoreScale /** * Create the metadata store. The table and region are determined from * the attributes of the FS used in the tests. - * @return a new metadata store instance + * @return a new metadata store instanceFive * @throws IOException failure to instantiate * @throws AssumptionViolatedException if the FS isn't running S3Guard + DDB/ */ @@ -123,6 +127,9 @@ public MetadataStore createMetadataStore() throws IOException { S3AFileSystem fs = getFileSystem(); assumeTrue("S3Guard is disabled for " + fs.getUri(), fs.hasMetadataStore()); + if (classMetastore != null) { + return classMetastore; + } MetadataStore store = fs.getMetadataStore(); assumeTrue("Metadata store for " + fs.getUri() + " is " + store + " -not DynamoDBMetadataStore", @@ -145,15 +152,21 @@ public MetadataStore createMetadataStore() throws IOException { conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName); conf.set(S3GUARD_DDB_REGION_KEY, region); conf.set(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL, "50ms"); - conf.set(S3GUARD_DDB_MAX_RETRIES, "2"); + conf.set(S3GUARD_DDB_MAX_RETRIES, "1"); conf.set(MAX_ERROR_RETRIES, "1"); conf.set(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, "5ms"); DynamoDBMetadataStore ms = new DynamoDBMetadataStore(); - ms.initialize(conf, new S3Guard.TtlTimeProvider(conf)); + // init the metastore in a bigger retry loop than the test setup + // in case the previous test case overloaded things + final Invoker fsInvoker = fs.createStoreContext().getInvoker(); + fsInvoker.retry("init metastore", null, true, + () -> ms.initialize(conf, new S3Guard.TtlTimeProvider(conf))); // wire up the owner FS so that we can make assertions about throttle // events ms.bindToOwnerFilesystem(fs); + // and update the class value + classMetastore = ms; return ms; } @@ -168,38 +181,40 @@ public void setup() throws Exception { table = ddb.getTable(tableName); originalCapacity = table.describe().getProvisionedThroughput(); - // If you set the same provisioned I/O as already set it throws an - // exception, avoid that. + // is this table too big for throttling to surface? isOverProvisionedForTest = ( originalCapacity.getReadCapacityUnits() > MAXIMUM_READ_CAPACITY || originalCapacity.getWriteCapacityUnits() > MAXIMUM_WRITE_CAPACITY); } - @Override - public void teardown() throws Exception { - if (ddbms != null) { - S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms); - ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); - builder.withCondition( - ExpressionSpecBuilder.S(PARENT).beginsWith("/test/")); - - Iterable entries = tableAccess.scanMetadata(builder); - List list = new ArrayList<>(); - entries.iterator().forEachRemaining(e -> { - Path p = e.getFileStatus().getPath(); - LOG.info("Deleting {}", p); - list.add(p); - }); - tableAccess.delete(list); - } - IOUtils.cleanupWithLogger(LOG, ddbms); - super.teardown(); + @AfterClass + public static void closeClassMetastore() { + IOUtils.cleanupWithLogger(LOG, classMetastore); } private boolean expectThrottling() { return !isOverProvisionedForTest && !isOnDemandTable; } + /** + * The subclass expects the superclass to be throttled; sometimes it is. + */ + @Test + @Override + public void test_010_Put() throws Throwable { + ThrottleTracker tracker = new ThrottleTracker(ddbms); + try { + // if this doesn't throttle, all is well. + super.test_010_Put(); + } catch (AWSServiceThrottledException ex) { + // if the service was throttled, all is good. + // log and continue + LOG.warn("DDB connection was throttled", ex); + } finally { + LOG.info("Statistics {}", tracker); + } + } + /** * The subclass expects the superclass to be throttled; sometimes it is. */ @@ -284,8 +299,7 @@ public void test_030_BatchedWrite() throws Exception { } }); if (expectThrottling()) { - assertNotEquals("No batch retries in " + result, - 0, result.getBatchThrottles()); + result.assertThrottlingDetected(); } } finally { describe("Cleaning up table %s", tableName); @@ -337,8 +351,9 @@ public void test_050_getVersionMarkerItem() throws Throwable { */ private void retryingDelete(final Path path) { try { - ddbms.getInvoker().retry("Delete ", path.toString(), true, - () -> ddbms.delete(path, null)); + ddbms.getWriteOperationInvoker() + .retry("Delete ", path.toString(), true, + () -> ddbms.delete(path, null)); } catch (IOException e) { LOG.warn("Failed to delete {}: ", path, e); } @@ -396,10 +411,11 @@ public void test_080_fullPathsToPut() throws Throwable { BulkOperationState.OperationType.Put, child)) { ddbms.put(new PathMetadata(makeDirStatus(base)), bulkUpdate); ddbms.put(new PathMetadata(makeDirStatus(child)), bulkUpdate); - ddbms.getInvoker().retry("set up directory tree", - base.toString(), - true, - () -> ddbms.put(pms, bulkUpdate)); + ddbms.getWriteOperationInvoker() + .retry("set up directory tree", + base.toString(), + true, + () -> ddbms.put(pms, bulkUpdate)); } try (BulkOperationState bulkUpdate = ddbms.initiateBulkWrite( @@ -453,6 +469,36 @@ public void test_100_forgetMetadata() throws Throwable { } } + @Test + public void test_200_delete_all_entries() throws Throwable { + describe("Delete all entries from the table"); + S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms); + ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); + final String path = "/test/"; + builder.withCondition( + ExpressionSpecBuilder.S(PARENT).beginsWith(path)); + Iterable entries = + ddbms.wrapWithRetries(tableAccess.scanMetadata(builder)); + List list = new ArrayList<>(); + try { + entries.iterator().forEachRemaining(e -> { + Path p = e.getFileStatus().getPath(); + LOG.info("Deleting {}", p); + list.add(p); + }); + } catch (WrappedIOException e) { + // the iterator may have overloaded; swallow if so. + if (!(e.getCause() instanceof AWSServiceThrottledException)) { + throw e; + } + } + ddbms.getWriteOperationInvoker() + .retry("delete", + path, + true, + () -> tableAccess.delete(list)); + } + @Test public void test_900_instrumentation() throws Throwable { describe("verify the owner FS gets updated after throttling events"); @@ -504,25 +550,29 @@ public ThrottleTracker execute(String operation, final ContractTestUtils.NanoTimer t = new ContractTestUtils.NanoTimer(); for (int j = 0; j < operationsPerThread; j++) { - if (tracker.isThrottlingDetected()) { + if (tracker.isThrottlingDetected() + || throttleExceptions.get() > 0) { outcome.skipped = true; return outcome; } try { action.call(); outcome.completed++; - } catch (AWSServiceThrottledException e) { - // this is possibly OK - LOG.info("Operation [{}] raised a throttled exception " + e, j, e); - LOG.debug(e.toString(), e); - throttleExceptions.incrementAndGet(); - // consider it completed - outcome.throttleExceptions.add(e); - outcome.throttled++; } catch (Exception e) { - LOG.error("Failed to execute {}", operation, e); - outcome.exceptions.add(e); - break; + if (e instanceof AWSServiceThrottledException + || (e.getCause() instanceof AWSServiceThrottledException)) { + // this is possibly OK + LOG.info("Operation [{}] raised a throttled exception " + e, j, e); + LOG.debug(e.toString(), e); + throttleExceptions.incrementAndGet(); + // consider it completed + outcome.throttleExceptions.add(e); + outcome.throttled++; + } else { + LOG.error("Failed to execute {}", operation, e); + outcome.exceptions.add(e); + break; + } } tracker.probe(); } @@ -539,9 +589,9 @@ public ThrottleTracker execute(String operation, LOG.info("Completed {} with {}", operation, tracker); LOG.info("time to execute: {} millis", elapsedMs); - for (Future future : futures) { - assertTrue("Future timed out", future.isDone()); - } + Assertions.assertThat(futures) + .describedAs("Futures of all tasks") + .allMatch(Future::isDone); tracker.probe(); if (expectThrottling) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java index 5e33be8367de4..942d7fecaff30 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java @@ -42,12 +42,16 @@ class ThrottleTracker { private long batchWriteThrottleCountOrig = 0; + private long scanThrottleCountOrig; + private long readThrottles; private long writeThrottles; private long batchThrottles; + private long scanThrottles; + ThrottleTracker(final DynamoDBMetadataStore ddbms) { this.ddbms = ddbms; reset(); @@ -65,6 +69,9 @@ public synchronized void reset() { batchWriteThrottleCountOrig = ddbms.getBatchWriteCapacityExceededCount(); + + scanThrottleCountOrig + = ddbms.getScanThrottleEventCount(); } /** @@ -78,6 +85,8 @@ public synchronized boolean probe() { - writeThrottleEventOrig); setBatchThrottles(ddbms.getBatchWriteCapacityExceededCount() - batchWriteThrottleCountOrig); + setScanThrottles(ddbms.getScanThrottleEventCount() + - scanThrottleCountOrig); return isThrottlingDetected(); } @@ -85,9 +94,11 @@ public synchronized boolean probe() { public String toString() { return String.format( "Tracker with read throttle events = %d;" - + " write events = %d;" - + " batch throttles = %d", - getReadThrottles(), getWriteThrottles(), getBatchThrottles()); + + " write throttles = %d;" + + " batch throttles = %d;" + + " scan throttles = %d", + getReadThrottles(), getWriteThrottles(), getBatchThrottles(), + getScanThrottles()); } /** @@ -101,11 +112,13 @@ public void assertThrottlingDetected() { /** * Has there been any throttling on an operation? - * @return true iff read, write or batch operations were throttled. + * @return true if any operations were throttled. */ public boolean isThrottlingDetected() { - return getReadThrottles() > 0 || getWriteThrottles() - > 0 || getBatchThrottles() > 0; + return getReadThrottles() > 0 + || getWriteThrottles() > 0 + || getBatchThrottles() > 0 + || getScanThrottles() > 0; } public long getReadThrottles() { @@ -131,4 +144,12 @@ public long getBatchThrottles() { public void setBatchThrottles(long batchThrottles) { this.batchThrottles = batchThrottles; } + + public long getScanThrottles() { + return scanThrottles; + } + + public void setScanThrottles(final long scanThrottles) { + this.scanThrottles = scanThrottles; + } }