From 39b81b70390ba0e3d96ee50baf1a8e38b5dc4346 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 8 Jan 2024 17:00:27 +0000 Subject: [PATCH 1/7] HADOOP-19027. S3A: S3AInputStream to recover from HTTP/channel exceptions Differentiate from "EOF out of range" from "channel problems" through two different subclasses of EOFException and input streams to always retry on http channel errors; out of range GET requests are not retried. Currently an EOFException is always treated as a fail-fast call in read() This allows for all existing external code catching EOFException to handle both, but S3AInputStream to cleanly differentiate range errors (map to -1) from channel errors (retry) - HttpChannelEOFException is subclass of EOFException, so all code which catches EOFException is still happy. retry policy: connectivityFailure - RangeNotSatisfiableEOFException is the subclass of EOFException raised on 416 GET range errors. retry policy: fail - Method ErrorTranslation.maybeExtractChannelException() to create this from shaded/unshaded NoHttpResponseException, using string match to avoid classpath problems. - And do this for SdkClientExceptions with OpenSSL error code WFOPENSSL0035. This isn't strictly the right place for this as its not an IOE we are remapping... - ErrorTranslation.maybeExtractIOException() to perform this translation as appropriate. S3AInputStream.reopen() code retries on EOF, except on RangeNotSatisfiableEOFException, which is converted to a -1 response to the caller as is done historically. HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions S3AInputStream knows to handle these with read(): HttpChannelEOFException: stream aborting close then retry lazySeek(): Map RangeNotSatisfiableEOFException to -1, but do not map any other EOFException class raised. This means that * out of range reads map to -1 * channel problems in reopen are retried * channel problems in read() abort the failed http connection so it isn't recycled Tests for this using/abusing mocking. Change-Id: I0a31c1ae291ea2b38b0294a50dca5e9d0d4d1fdf --- .../fs/s3a/HttpChannelEOFException.java | 39 ++++ .../org/apache/hadoop/fs/s3a/Invoker.java | 2 +- .../s3a/RangeNotSatisfiableEOFException.java | 37 +++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 25 +- .../apache/hadoop/fs/s3a/S3ARetryPolicy.java | 13 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 19 +- .../auth/IAMInstanceCredentialsProvider.java | 3 +- .../hadoop/fs/s3a/impl/ErrorTranslation.java | 95 +++++++- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 59 +++++ .../fs/s3a/TestS3AExceptionTranslation.java | 126 ++++++++-- .../fs/s3a/TestS3AInputStreamRetry.java | 220 ++++++++++++++---- .../fs/s3a/impl/TestErrorTranslation.java | 10 +- 12 files changed, 551 insertions(+), 97 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java new file mode 100644 index 0000000000000..738b8485ed6a5 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.EOFException; + +/** + * Http channel exception; subclass of EOFException. + * In particular: + * - NoHttpResponseException + * - OpenSSL errors + * The http client library exceptions may be shaded/unshaded; this is the + * exception used in retry policies. + */ +public class HttpChannelEOFException extends EOFException { + + public HttpChannelEOFException(final String path, + final String error, + final Throwable cause) { + super(error); + initCause(cause); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java index 9b2c95a90c76f..286e4e00a4678 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java @@ -478,7 +478,7 @@ public T retryUntranslated( if (caught instanceof IOException) { translated = (IOException) caught; } else { - translated = S3AUtils.translateException(text, "", + translated = S3AUtils.translateException(text, "/", (SdkException) caught); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java new file mode 100644 index 0000000000000..e09b30a9acd3d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.EOFException; + +import software.amazon.awssdk.awscore.exception.AwsServiceException; + +/** + * Status code 416, range not satisfiable. + * Subclass of {@link EOFException} so that any code which expects that to + * be the outcome of a 416 failure will continue to work. + */ +public class RangeNotSatisfiableEOFException extends EOFException { + public RangeNotSatisfiableEOFException( + String operation, + AwsServiceException cause) { + super(operation); + initCause(cause); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 2ed9083efcddd..2a780074f1232 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -406,16 +406,23 @@ public boolean seekToNewSource(long targetPos) throws IOException { /** * Perform lazy seek and adjust stream to correct position for reading. - * + * If an EOF Exception is raised there are two possibilities + *
    + *
  1. the stream is at the end of the file
  2. + *
  3. something went wrong with the network connection
  4. + *
+ * This method does not attempt to distinguish; it assumes that an EOF + * exception is always "end of file". * @param targetPos position from where data should be read * @param len length of the content that needs to be read + * @throws RangeNotSatisfiableEOFException GET is out of range + * @throws IOException anything else. */ @Retries.RetryTranslated private void lazySeek(long targetPos, long len) throws IOException { Invoker invoker = context.getReadInvoker(); - invoker.maybeRetry(streamStatistics.getOpenOperations() == 0, - "lazySeek", pathStr, true, + invoker.retry("lazySeek", pathStr, true, () -> { //For lazy seek seekInStream(targetPos, len); @@ -449,7 +456,9 @@ public synchronized int read() throws IOException { try { lazySeek(nextReadPos, 1); - } catch (EOFException e) { + } catch (RangeNotSatisfiableEOFException e) { + // attempt to GET beyond the end of the object + LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos); return -1; } @@ -465,9 +474,7 @@ public synchronized int read() throws IOException { } try { b = wrappedStream.read(); - } catch (EOFException e) { - return -1; - } catch (SocketTimeoutException e) { + } catch (HttpChannelEOFException | SocketTimeoutException e) { onReadFailure(e, true); throw e; } catch (IOException e) { @@ -534,8 +541,8 @@ public synchronized int read(byte[] buf, int off, int len) try { lazySeek(nextReadPos, len); - } catch (EOFException e) { - // the end of the file has moved + } catch (RangeNotSatisfiableEOFException e) { + // attempt to GET beyond the end of the object return -1; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index 9438ac22bdb19..faf105c8e2c86 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -209,9 +209,15 @@ protected Map, RetryPolicy> createExceptionMap() { // in this map. policyMap.put(AWSClientIOException.class, retryAwsClientExceptions); + // Http Channel issues: treat as communication failure + policyMap.put(HttpChannelEOFException.class, connectivityFailure); + // server didn't respond. policyMap.put(AWSNoResponseException.class, retryIdempotentCalls); + // range header is out of scope of object; retrying won't help + policyMap.put(RangeNotSatisfiableEOFException.class, fail); + // should really be handled by resubmitting to new location; // that's beyond the scope of this retry policy policyMap.put(AWSRedirectException.class, fail); @@ -251,10 +257,7 @@ protected Map, RetryPolicy> createExceptionMap() { policyMap.put(ConnectException.class, connectivityFailure); // this can be a sign of an HTTP connection breaking early. - // which can be reacted to by another attempt if the request was idempotent. - // But: could also be a sign of trying to read past the EOF on a GET, - // which isn't going to be recovered from - policyMap.put(EOFException.class, retryIdempotentCalls); + policyMap.put(EOFException.class, connectivityFailure); // object not found. 404 when not unknown bucket; 410 "gone" policyMap.put(FileNotFoundException.class, fail); @@ -300,7 +303,7 @@ public RetryAction shouldRetry(Exception exception, if (exception instanceof SdkException) { // update the sdk exception for the purpose of exception // processing. - ex = S3AUtils.translateException("", "", (SdkException) exception); + ex = S3AUtils.translateException("", "/", (SdkException) exception); } LOG.debug("Retry probe for {} with {} retries and {} failovers," + " idempotent={}, due to {}", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 6ef0cd8dc9938..f7608d21554d9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -167,13 +167,20 @@ public static IOException translateException(String operation, */ @SuppressWarnings("ThrowableInstanceNeverThrown") public static IOException translateException(@Nullable String operation, - String path, + @Nullable String path, SdkException exception) { String message = String.format("%s%s: %s", operation, StringUtils.isNotEmpty(path)? (" on " + path) : "", exception); + if (path == null || path.isEmpty()) { + // handle null path by giving it a stub value. + // not ideal/informative, but ensures that the path is never null in + // exceptions constructed. + path = "/"; + } + if (!(exception instanceof AwsServiceException)) { // exceptions raised client-side: connectivity, auth, network problems... Exception innerCause = containsInterruptedException(exception); @@ -196,7 +203,7 @@ public static IOException translateException(@Nullable String operation, return ioe; } // network problems covered by an IOE inside the exception chain. - ioe = maybeExtractIOException(path, exception); + ioe = maybeExtractIOException(path, exception, message); if (ioe != null) { return ioe; } @@ -300,10 +307,10 @@ public static IOException translateException(@Nullable String operation, break; // out of range. This may happen if an object is overwritten with - // a shorter one while it is being read. + // a shorter one while it is being read or openFile() was invoked + // passing a FileStatus or file length less than that of the object. case SC_416_RANGE_NOT_SATISFIABLE: - ioe = new EOFException(message); - ioe.initCause(ase); + ioe = new RangeNotSatisfiableEOFException(message, ase); break; // this has surfaced as a "no response from server" message. @@ -673,7 +680,7 @@ public static InstanceT getInstanceFromReflection(String className, if (targetException instanceof IOException) { throw (IOException) targetException; } else if (targetException instanceof SdkException) { - throw translateException("Instantiate " + className, "", (SdkException) targetException); + throw translateException("Instantiate " + className, "/", (SdkException) targetException); } else { // supported constructor or factory method found, but the call failed throw instantiationException(uri, className, configKey, targetException); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java index 080b79e7f20d5..b9a7c776b1405 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java @@ -101,7 +101,8 @@ public AwsCredentials resolveCredentials() { // if the exception contains an IOE, extract it // so its type is the immediate cause of this new exception. Throwable t = e; - final IOException ioe = maybeExtractIOException("IAM endpoint", e); + final IOException ioe = maybeExtractIOException("IAM endpoint", e, + "resolveCredentials()"); if (ioe != null) { t = ioe; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java index f8a1f907bb3b1..df6a6d53add6a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java @@ -23,8 +23,11 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.s3a.HttpChannelEOFException; import org.apache.hadoop.fs.PathIOException; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND; /** @@ -42,6 +45,22 @@ */ public final class ErrorTranslation { + /** + * OpenSSL stream closed error: {@value}. + * See HADOOP-19027. + */ + public static final String OPENSSL_STREAM_CLOSED = "WFOPENSSL0035"; + + /** + * Classname of unshaded Http Client exception: {@value}. + */ + private static final String RAW_NO_HTTP_RESPONSE_EXCEPTION = "org.apache.http.NoHttpResponseException"; + + /** + * Classname of shaded Http Client exception: {@value}. + */ + private static final String SHADED_NO_HTTP_RESPONSE_EXCEPTION = + "software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException"; /** * Private constructor for utility class. */ @@ -71,25 +90,51 @@ public static boolean isObjectNotFound(AwsServiceException e) { return e.statusCode() == SC_404_NOT_FOUND && !isUnknownBucket(e); } + /** + * Tail recursive extraction of the innermost throwable. + * @param thrown next thrown in chain. + * @param outer outermost. + * @return the last non-null throwable in the chain. + */ + private static Throwable getInnermostThrowable(Throwable thrown, Throwable outer) { + if (thrown == null) { + return outer; + } + return getInnermostThrowable(thrown.getCause(), thrown); + } + /** * Translate an exception if it or its inner exception is an * IOException. - * If this condition is not met, null is returned. + * This also contains the logic to extract an AWS HTTP channel exception, + * which may or may not be an IOE, depending on the underlying SSL implementation + * in use. + * If an IOException cannot be extracted, null is returned. * @param path path of operation. * @param thrown exception + * @param message message generated by the caller. * @return a translated exception or null. */ - public static IOException maybeExtractIOException(String path, Throwable thrown) { + public static IOException maybeExtractIOException( + String path, + Throwable thrown, + String message) { if (thrown == null) { return null; } - // look inside - Throwable cause = thrown.getCause(); - while (cause != null && cause.getCause() != null) { - cause = cause.getCause(); + // walk down the chain of exceptions to find the innermost. + Throwable cause = getInnermostThrowable(thrown.getCause(), thrown); + + // see if this is an http channel exception + HttpChannelEOFException channelException = + maybeExtractChannelException(path, message, cause); + if (channelException != null) { + return channelException; } + + // not a channel exception, not an IOE. if (!(cause instanceof IOException)) { return null; } @@ -102,8 +147,7 @@ public static IOException maybeExtractIOException(String path, Throwable thrown) // unless no suitable constructor is available. final IOException ioe = (IOException) cause; - return wrapWithInnerIOE(path, thrown, ioe); - + return wrapWithInnerIOE(path, message, thrown, ioe); } /** @@ -116,6 +160,7 @@ public static IOException maybeExtractIOException(String path, Throwable thrown) * See {@code NetUtils}. * @param type of inner exception. * @param path path of the failure. + * @param message message generated by the caller. * @param outer outermost exception. * @param inner inner exception. * @return the new exception. @@ -123,9 +168,12 @@ public static IOException maybeExtractIOException(String path, Throwable thrown) @SuppressWarnings("unchecked") private static IOException wrapWithInnerIOE( String path, + String message, Throwable outer, T inner) { - String msg = outer.toString() + ": " + inner.getMessage(); + String msg = (isNotEmpty(message) ? (message + ":" + + " ") : "") + + outer.toString() + ": " + inner.getMessage(); Class clazz = inner.getClass(); try { Constructor ctor = clazz.getConstructor(String.class); @@ -136,6 +184,35 @@ private static IOException wrapWithInnerIOE( } } + /** + * Extract an AWS HTTP channel exception if the inner exception is considered + * an HttpClient {@code NoHttpResponseException} or an OpenSSL channel exception. + * This is based on string matching, which is inelegant and brittle. + * @param path path of the failure. + * @param message message generated by the caller. + * @param thrown inner exception. + * @return the new exception. + */ + @VisibleForTesting + public static HttpChannelEOFException maybeExtractChannelException( + String path, + String message, + Throwable thrown) { + final String classname = thrown.getClass().getName(); + if (thrown instanceof IOException + && (classname.equals(RAW_NO_HTTP_RESPONSE_EXCEPTION) + || classname.equals(SHADED_NO_HTTP_RESPONSE_EXCEPTION))) { + // shaded or unshaded http client exception class + return new HttpChannelEOFException(path, message, thrown); + } + // there's ambiguity about what exception class this is + // so rather than use its type, we look for an OpenSSL string in the message + if (thrown.getMessage().contains(OPENSSL_STREAM_CLOSED)) { + return new HttpChannelEOFException(path, message, thrown); + } + return null; + } + /** * AWS error codes explicitly recognized and processes specially; * kept in their own class for isolation. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 0d4cf6a2962d8..b679e757f60e7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -72,6 +73,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.exception.SdkClientException; import java.io.Closeable; import java.io.File; @@ -456,6 +458,8 @@ public static E verifyExceptionClass(Class clazz, .describedAs("Exception expected of class %s", clazz) .isNotNull(); if (!(ex.getClass().equals(clazz))) { + LOG.warn("Rethrowing exception: {} as it is not an instance of {}", + ex, clazz, ex); throw ex; } return (E)ex; @@ -1711,4 +1715,59 @@ public static String etag(FileStatus status) { "Not an EtagSource: %s", status); return ((EtagSource) status).getEtag(); } + + /** + * Create an SDK client exception. + * @param message message + * @param cause nullable cause + * @return the exception + */ + public static SdkClientException sdkClientException( + String message, Throwable cause) { + return SdkClientException.builder() + .message(message) + .cause(cause) + .build(); + } + + /** + * Create an SDK client exception using the string value of the cause + * as the message. + * @param cause nullable cause + * @return the exception + */ + public static SdkClientException sdkClientException( + Throwable cause) { + return SdkClientException.builder() + .message(cause.toString()) + .cause(cause) + .build(); + } + + private static final String BYTES_PREFIX = "bytes="; + + /** + * Given a range header, split into start and end. + * Based on AWSRequestAnalyzer. + * @param rangeHeader header string + * @return parse range, or (-1, -1) for problems + */ + public static Pair requestRange(String rangeHeader) { + if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) { + String[] values = rangeHeader + .substring(BYTES_PREFIX.length()) + .split("-"); + if (values.length == 2) { + try { + long start = Long.parseUnsignedLong(values[0]); + long end = Long.parseUnsignedLong(values[0]); + return Pair.of(start, end); + } catch (NumberFormatException e) { + LOG.warn("Failed to parse range header {}", rangeHeader, e); + } + } + } + // error case + return Pair.of(-1L, -1L); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java index b26ca6889bd1b..7690349f722d9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java @@ -20,9 +20,11 @@ import static org.apache.hadoop.fs.s3a.AWSCredentialProviderList.maybeTranslateCredentialException; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.sdkClientException; import static org.apache.hadoop.fs.s3a.S3ATestUtils.verifyExceptionClass; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.maybeTranslateAuditException; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractChannelException; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*; import static org.apache.hadoop.test.LambdaTestUtils.verifyCause; import static org.junit.Assert.*; @@ -36,11 +38,11 @@ import java.util.function.Consumer; import org.assertj.core.api.Assertions; +import org.junit.Before; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException; import software.amazon.awssdk.core.exception.ApiCallTimeoutException; -import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -53,15 +55,32 @@ import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException; import org.apache.hadoop.fs.s3a.impl.ErrorTranslation; import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.http.NoHttpResponseException; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; /** - * Unit test suite covering translation of AWS SDK exceptions to S3A exceptions, + * Unit test suite covering translation of AWS/network exceptions to S3A exceptions, * and retry/recovery policies. */ @SuppressWarnings("ThrowableNotThrown") -public class TestS3AExceptionTranslation { +public class TestS3AExceptionTranslation extends AbstractHadoopTestBase { + + public static final String WFOPENSSL_0035_STREAM_IS_CLOSED = + "Unable to execute HTTP request: " + + ErrorTranslation.OPENSSL_STREAM_CLOSED + + " Stream is closed"; + + /** + * Retry policy to use in tests. + */ + private S3ARetryPolicy retryPolicy; + + @Before + public void setup() { + retryPolicy = new S3ARetryPolicy(new Configuration(false)); + } @Test public void test301ContainsRegion() throws Exception { @@ -91,10 +110,10 @@ protected void assertContained(String text, String contained) { text != null && text.contains(contained)); } - protected void verifyTranslated( + protected E verifyTranslated( int status, Class expected) throws Exception { - verifyTranslated(expected, createS3Exception(status)); + return verifyTranslated(expected, createS3Exception(status)); } @Test @@ -142,7 +161,12 @@ public void test410isNotFound() throws Exception { @Test public void test416isEOF() throws Exception { - verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE, EOFException.class); + + // 416 maps the the subclass of EOFException + final IOException ex = verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE, + RangeNotSatisfiableEOFException.class); + Assertions.assertThat(ex) + .isInstanceOf(EOFException.class); } @Test @@ -254,12 +278,7 @@ public void testExtractInterruptedIO() throws Throwable { .build())); } - private SdkClientException sdkClientException(String message, Throwable cause) { - return SdkClientException.builder() - .message(message) - .cause(cause) - .build(); - } + @Test public void testTranslateCredentialException() throws Throwable { verifyExceptionClass(AccessDeniedException.class, @@ -375,10 +394,89 @@ public void testApiCallAttemptTimeoutExceptionToTimeout() throws Throwable { verifyCause(ApiCallAttemptTimeoutException.class, ex); // and confirm these timeouts are retried. - final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false)); + assertRetried(ex); + } + + @Test + public void testChannelExtraction() throws Throwable { + verifyExceptionClass(HttpChannelEOFException.class, + maybeExtractChannelException("", "/", + new NoHttpResponseException("no response"))); + } + + @Test + public void testShadedChannelExtraction() throws Throwable { + verifyExceptionClass(HttpChannelEOFException.class, + maybeExtractChannelException("", "/", + shadedNoHttpResponse())); + } + + @Test + public void testOpenSSLErrorChannelExtraction() throws Throwable { + verifyExceptionClass(HttpChannelEOFException.class, + maybeExtractChannelException("", "/", + sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null))); + } + + /** + * Test handling of the unshaded HTTP client exception. + */ + @Test + public void testRawNoHttpResponseExceptionRetry() throws Throwable { + assertRetried( + verifyExceptionClass(HttpChannelEOFException.class, + translateException("test", "/", + sdkClientException(new NoHttpResponseException("no response"))))); + } + + /** + * Test handling of the shaded HTTP client exception. + */ + @Test + public void testShadedNoHttpResponseExceptionRetry() throws Throwable { + assertRetried( + verifyExceptionClass(HttpChannelEOFException.class, + translateException("test", "/", + sdkClientException(shadedNoHttpResponse())))); + } + + @Test + public void testOpenSSLErrorRetry() throws Throwable { + assertRetried( + verifyExceptionClass(HttpChannelEOFException.class, + translateException("test", "/", + sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null)))); + } + + /** + * Create a shaded NoHttpResponseException. + * @return an exception. + */ + private static Exception shadedNoHttpResponse() { + return new software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException("shaded"); + } + + /** + * Assert that an exception is retried. + * @param ex exception + * @throws Exception failure during retry policy evaluation. + */ + private void assertRetried(final Exception ex) throws Exception { + assertRetryOutcome(ex, RetryPolicy.RetryAction.RetryDecision.RETRY); + } + + /** + * Assert that the retry policy is as expected for a given exception. + * @param ex exception + * @param decision expected decision + * @throws Exception failure during retry policy evaluation. + */ + private void assertRetryOutcome( + final Exception ex, + final RetryPolicy.RetryAction.RetryDecision decision) throws Exception { Assertions.assertThat(retryPolicy.shouldRetry(ex, 0, 0, true).action) .describedAs("retry policy for exception %s", ex) - .isEqualTo(RetryPolicy.RetryAction.RetryDecision.RETRY); + .isEqualTo(decision); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index da1284343da9f..8d0feed502d24 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -24,7 +24,9 @@ import java.net.SocketException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.assertj.core.api.Assertions; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; @@ -34,13 +36,18 @@ import org.junit.Test; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.http.NoHttpResponseException; import static java.lang.Math.min; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.requestRange; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.sdkClientException; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE; import static org.apache.hadoop.util.functional.FutureIO.eval; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -54,21 +61,31 @@ */ public class TestS3AInputStreamRetry extends AbstractS3AMockTest { - private static final String INPUT = "ab"; + /** + * Test input stream content: charAt(x) == hex value of x. + */ + private static final String INPUT = "012345678ABCDEF"; + + /** + * Status code to raise by default. + */ + public static final int STATUS = 0; @Test public void testInputStreamReadRetryForException() throws IOException { - S3AInputStream s3AInputStream = getMockedS3AInputStream(); - assertEquals("'a' from the test input stream 'ab' should be the first " + + S3AInputStream s3AInputStream = getMockedS3AInputStream(failingInputStreamCallbacks( + awsServiceException(STATUS))); + assertEquals("'0' from the test input stream should be the first " + "character being read", INPUT.charAt(0), s3AInputStream.read()); - assertEquals("'b' from the test input stream 'ab' should be the second " + + assertEquals("'1' from the test input stream should be the second " + "character being read", INPUT.charAt(1), s3AInputStream.read()); } @Test public void testInputStreamReadLengthRetryForException() throws IOException { byte[] result = new byte[INPUT.length()]; - S3AInputStream s3AInputStream = getMockedS3AInputStream(); + S3AInputStream s3AInputStream = getMockedS3AInputStream( + failingInputStreamCallbacks(awsServiceException(STATUS))); s3AInputStream.read(result, 0, INPUT.length()); assertArrayEquals( @@ -79,7 +96,8 @@ public void testInputStreamReadLengthRetryForException() throws IOException { @Test public void testInputStreamReadFullyRetryForException() throws IOException { byte[] result = new byte[INPUT.length()]; - S3AInputStream s3AInputStream = getMockedS3AInputStream(); + S3AInputStream s3AInputStream = getMockedS3AInputStream(failingInputStreamCallbacks( + awsServiceException(STATUS))); s3AInputStream.readFully(0, result); assertArrayEquals( @@ -87,7 +105,48 @@ public void testInputStreamReadFullyRetryForException() throws IOException { INPUT.getBytes(), result); } - private S3AInputStream getMockedS3AInputStream() { + /** + * seek and read repeatedly with every second GET failing; this is effective + * in simulating reopen() failures. + */ + @Test + public void testReadMultipleSeeksNoHttpResponse() throws Throwable { + final int l = INPUT.length(); + final RuntimeException ex = sdkClientException(new NoHttpResponseException("no response")); + // fail on even reads + S3AInputStream stream = getMockedS3AInputStream( + maybeFailInGetCallback(ex, (index) -> (index % 2 == 0))); + // 10 reads with repeated failures. + for (int i = 0; i < 10; i++) { + stream.seek(0); + final int r = stream.read(); + assertReadValueMatchesOffset(r, 0, "read attempt " + i + " of " + stream); + } + } + + /** + * Assert that the result of read() matches the char at the expected offset. + * @param r read result + * @param pos pos in stream + * @param text text for error string. + */ + private static void assertReadValueMatchesOffset( + final int r, final int pos, final String text) { + Assertions.assertThat(r) + .describedAs("read() at %d of %s", pos, text) + .isGreaterThan(-1); + Assertions.assertThat(Character.toString((char) r)) + .describedAs("read() at %d of %s", pos, text) + .isEqualTo(String.valueOf(INPUT.charAt(pos))); + } + + /** + * Create a mocked input stream for a given callback. + * @param streamCallback callback to use on GET calls + * @return a stream. + */ + private S3AInputStream getMockedS3AInputStream( + S3AInputStream.InputStreamCallbacks streamCallback) { Path path = new Path("test-path"); String eTag = "test-etag"; String versionId = "test-version-id"; @@ -113,55 +172,108 @@ private S3AInputStream getMockedS3AInputStream() { return new S3AInputStream( s3AReadOpContext, s3ObjectAttributes, - getMockedInputStreamCallback(), + streamCallback, s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), null); } /** - * Get mocked InputStreamCallbacks where we return mocked S3Object. - * + * Create mocked InputStreamCallbacks which returns a mocked S3Object and fails on + * the third invocation. + * This is the original mock stream used in this test suite; the failure logic and stream + * selection has been factored out to support different failure modes. + * @param ex exception to raise on failure * @return mocked object. */ - private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() { + private S3AInputStream.InputStreamCallbacks failingInputStreamCallbacks( + final RuntimeException ex) { + GetObjectResponse objectResponse = GetObjectResponse.builder() .eTag("test-etag") .build(); - ResponseInputStream[] responseInputStreams = - new ResponseInputStream[] { - getMockedInputStream(objectResponse, true), - getMockedInputStream(objectResponse, true), - getMockedInputStream(objectResponse, false) - }; + final SSLException ioe = new SSLException(new SocketException("Connection reset")); + + // open() -> lazySeek() -> reopen() + // -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1) + // read() -> objectInputStreamBad1 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2) + // -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2 + // -> wrappedStream.read -> objectInputStreamBad2 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(2) -> wrappedStream==null -> reopen + // -> getObject (mockedS3ObjectIndex=3) throws exception + // -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4) + // -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood + // -> wrappedStream.read + + return mockInputStreamCallback(ex, + attempt -> 3 == attempt, + attempt -> mockedInputStream(objectResponse, attempt < 3, ioe)); + } + + /** + * Create mocked InputStreamCallbacks which returns a mocked S3Object and fails + * when the the predicate indicates that it should. + * The stream response itself does not fail. + * @param ex exception to raise on failure + * @return mocked object. + */ + private S3AInputStream.InputStreamCallbacks maybeFailInGetCallback( + final RuntimeException ex, + final Function failurePredicate) { + GetObjectResponse objectResponse = GetObjectResponse.builder() + .eTag("test-etag") + .build(); + + return mockInputStreamCallback(ex, + failurePredicate, + attempt -> mockedInputStream(objectResponse, false, null)); + } + + /** + * Create mocked InputStreamCallbacks which returns a mocked S3Object. + * Raises the given runtime exception if the failure predicate returns true; + * the stream factory returns the input stream for the given attempt. + * @param ex exception to raise on failure + * @param failurePredicate predicate which, when true, triggers a failure on the given attempt. + * @param streamFactory factory for the stream to return on the given attempt. + * @return mocked object. + */ + private S3AInputStream.InputStreamCallbacks mockInputStreamCallback( + final RuntimeException ex, + final Function failurePredicate, + final Function> streamFactory) { + return new S3AInputStream.InputStreamCallbacks() { - private Integer mockedS3ObjectIndex = 0; + private int attempt = 0; @Override public ResponseInputStream getObject(GetObjectRequest request) { - // Set s3 client to return mocked s3object with defined read behavior. - mockedS3ObjectIndex++; - // open() -> lazySeek() -> reopen() - // -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1) - // read() -> objectInputStreamBad1 throws exception - // -> onReadFailure -> close wrappedStream - // -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2) - // -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2 - // -> wrappedStream.read -> objectInputStreamBad2 throws exception - // -> onReadFailure -> close wrappedStream - // -> retry(2) -> wrappedStream==null -> reopen - // -> getObject (mockedS3ObjectIndex=3) throws exception - // -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4) - // -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood - // -> wrappedStream.read - if (mockedS3ObjectIndex == 3) { - throw AwsServiceException.builder() - .message("Failed to get S3Object") - .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build()) - .build(); + attempt++; + + if (failurePredicate.apply(attempt)) { + throw ex; } - return responseInputStreams[min(mockedS3ObjectIndex, responseInputStreams.length) - 1]; + final Pair r = requestRange(request.range()); + final int start = r.getLeft().intValue(); + final int end = r.getRight().intValue(); + if (start < 0 || end < 0 || start > end) { + // not satisfiable + throw awsServiceException(SC_416_RANGE_NOT_SATISFIABLE); + } + + final ResponseInputStream stream = streamFactory.apply(attempt); + + // skip the given number of bytes from the start of the array; no-op if 0. + try { + stream.skip(start); + } catch (IOException e) { + throw sdkClientException(e); + } + return stream; } @Override @@ -180,27 +292,41 @@ public void close() { }; } + /** + * Create an AwsServiceException with the given status code. + * + * @param status HTTP status code + * @return an exception. + */ + private static AwsServiceException awsServiceException(int status) { + return AwsServiceException.builder() + .message("Failed to get S3Object") + .statusCode(status) + .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build()) + .build(); + } + /** * Get mocked ResponseInputStream where we can trigger IOException to * simulate the read failure. * - * @param triggerFailure true when a failure injection is enabled. + * @param triggerFailure true when a failure injection is enabled in read() + * @param ioe exception to raise * @return mocked object. */ - private ResponseInputStream getMockedInputStream( - GetObjectResponse objectResponse, boolean triggerFailure) { + private ResponseInputStream mockedInputStream( + GetObjectResponse objectResponse, + boolean triggerFailure, + final IOException ioe) { FilterInputStream inputStream = new FilterInputStream(IOUtils.toInputStream(INPUT, StandardCharsets.UTF_8)) { - private final IOException exception = - new SSLException(new SocketException("Connection reset")); - @Override public int read() throws IOException { int result = super.read(); if (triggerFailure) { - throw exception; + throw ioe; } return result; } @@ -209,7 +335,7 @@ public int read() throws IOException { public int read(byte[] b, int off, int len) throws IOException { int result = super.read(b, off, len); if (triggerFailure) { - throw exception; + throw ioe; } return result; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java index 0f0b2c0c34bb5..3a4994897a6b9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java @@ -67,7 +67,7 @@ public void testUnknownHostExceptionExtraction() throws Throwable { new UnknownHostException("bottom"))); final IOException ioe = intercept(UnknownHostException.class, "top", () -> { - throw maybeExtractIOException("", thrown); + throw maybeExtractIOException("", thrown, ""); }); // the wrapped exception is the top level one: no stack traces have @@ -85,7 +85,7 @@ public void testNoRouteToHostExceptionExtraction() throws Throwable { throw maybeExtractIOException("p2", sdkException("top", sdkException("middle", - new NoRouteToHostException("bottom")))); + new NoRouteToHostException("bottom"))), null); }); } @@ -96,7 +96,7 @@ public void testConnectExceptionExtraction() throws Throwable { throw maybeExtractIOException("p1", sdkException("top", sdkException("middle", - new ConnectException("bottom")))); + new ConnectException("bottom"))), null); }); } @@ -113,7 +113,7 @@ public void testUncheckedIOExceptionExtraction() throws Throwable { new UncheckedIOException( new SocketTimeoutException("bottom")))); throw maybeExtractIOException("p1", - new NoAwsCredentialsException("IamProvider", thrown.toString(), thrown)); + new NoAwsCredentialsException("IamProvider", thrown.toString(), thrown), null); }); } @@ -124,7 +124,7 @@ public void testNoConstructorExtraction() throws Throwable { throw maybeExtractIOException("p1", sdkException("top", sdkException("middle", - new NoConstructorIOE()))); + new NoConstructorIOE())), null); }); } From 8c90a160314399f8766e5b3327a42e9639f94c9d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 9 Jan 2024 21:19:20 +0000 Subject: [PATCH 2/7] HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions Testing through actually raising 416 exceptions and verifying that readFully(), char read() and vector reads are all good. Not yet validated, PositionedReadable.read() where we need to * return -1 past EOF * retry on the http channel issues Change-Id: Ia37f348ccc7d730ae439422e99dfc828612d0bfd --- .../fs/s3a/HttpChannelEOFException.java | 3 + .../s3a/RangeNotSatisfiableEOFException.java | 6 +- .../apache/hadoop/fs/s3a/S3AInputStream.java | 10 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 3 + .../hadoop/fs/s3a/impl/ErrorTranslation.java | 1 + .../s3a/ITestS3AContractVectoredRead.java | 39 ++++- .../fs/s3a/TestS3AInputStreamRetry.java | 6 +- .../fs/s3a/performance/ITestS3AOpenCost.java | 139 ++++++++++++++---- 8 files changed, 165 insertions(+), 42 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java index 738b8485ed6a5..665d485d7ee54 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java @@ -20,6 +20,8 @@ import java.io.EOFException; +import org.apache.hadoop.classification.InterfaceAudience; + /** * Http channel exception; subclass of EOFException. * In particular: @@ -28,6 +30,7 @@ * The http client library exceptions may be shaded/unshaded; this is the * exception used in retry policies. */ +@InterfaceAudience.Private public class HttpChannelEOFException extends EOFException { public HttpChannelEOFException(final String path, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java index e09b30a9acd3d..4c6b9decb0b4d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java @@ -20,17 +20,19 @@ import java.io.EOFException; -import software.amazon.awssdk.awscore.exception.AwsServiceException; +import org.apache.hadoop.classification.InterfaceAudience; /** * Status code 416, range not satisfiable. * Subclass of {@link EOFException} so that any code which expects that to * be the outcome of a 416 failure will continue to work. */ +@InterfaceAudience.Private public class RangeNotSatisfiableEOFException extends EOFException { + public RangeNotSatisfiableEOFException( String operation, - AwsServiceException cause) { + Exception cause) { super(operation); initCause(cause); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 2a780074f1232..ea3040549fbaf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -422,7 +422,7 @@ public boolean seekToNewSource(long targetPos) throws IOException { private void lazySeek(long targetPos, long len) throws IOException { Invoker invoker = context.getReadInvoker(); - invoker.retry("lazySeek", pathStr, true, + invoker.retry("lazySeek to " + targetPos, pathStr, true, () -> { //For lazy seek seekInStream(targetPos, len); @@ -560,12 +560,12 @@ public synchronized int read(byte[] buf, int off, int len) } try { bytes = wrappedStream.read(buf, off, len); + } catch (HttpChannelEOFException | SocketTimeoutException e) { + onReadFailure(e, true); + throw e; } catch (EOFException e) { // the base implementation swallows EOFs. return -1; - } catch (SocketTimeoutException e) { - onReadFailure(e, true); - throw e; } catch (IOException e) { onReadFailure(e, false); throw e; @@ -994,7 +994,7 @@ private void validateRangeRequest(FileRange range) throws EOFException { final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", range.getOffset(), range.getLength(), pathStr); LOG.warn(errMsg); - throw new EOFException(errMsg); + throw new RangeNotSatisfiableEOFException(errMsg, null); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index f7608d21554d9..6a719739e720e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -309,6 +309,9 @@ public static IOException translateException(@Nullable String operation, // out of range. This may happen if an object is overwritten with // a shorter one while it is being read or openFile() was invoked // passing a FileStatus or file length less than that of the object. + // although the HTTP specification says that the response should + // include a range header specifying the actual range available, + // this isn't picked up here. case SC_416_RANGE_NOT_SATISFIABLE: ioe = new RangeNotSatisfiableEOFException(message, ase); break; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java index df6a6d53add6a..74d2e28ffc307 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java @@ -61,6 +61,7 @@ public final class ErrorTranslation { */ private static final String SHADED_NO_HTTP_RESPONSE_EXCEPTION = "software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException"; + /** * Private constructor for utility class. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 4c357e288c84f..5c91b7340491a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -21,9 +21,11 @@ import java.io.EOFException; import java.io.IOException; import java.io.InterruptedIOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.slf4j.Logger; @@ -36,7 +38,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -44,10 +48,12 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { @@ -72,9 +78,40 @@ public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); - verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); + verifyExceptionalVectoredRead(fs, fileRanges, RangeNotSatisfiableEOFException.class); } + /** + * Verify response to a vector read request which is beyond the + * real length of the file. + * Unlike the {@link #testEOFRanges()} test, the input stream in + * this test thinks the file is longer than it is, so the call + * fails in the GET request. + */ + @Test + public void testEOFRanges416Handling() throws Exception { + FileSystem fs = getFileSystem(); + + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .mustLong(FS_OPTION_OPENFILE_LENGTH, DATASET_LEN + 1024) + .build(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, getAllocate()); + FileRange res = fileRanges.get(0); + CompletableFuture data = res.getData(); + interceptFuture(RangeNotSatisfiableEOFException.class, + "416", + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); + } + } + + @Test public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception { Configuration conf = getFileSystem().getConf(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index 8d0feed502d24..0cc5b5b1ed5fd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -43,8 +43,6 @@ import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.http.NoHttpResponseException; - -import static java.lang.Math.min; import static org.apache.hadoop.fs.s3a.S3ATestUtils.requestRange; import static org.apache.hadoop.fs.s3a.S3ATestUtils.sdkClientException; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE; @@ -54,10 +52,13 @@ /** * Tests S3AInputStream retry behavior on read failure. + *

* These tests are for validating expected behavior of retrying the * S3AInputStream read() and read(b, off, len), it tests that the read should * reopen the input stream and retry the read when IOException is thrown * during the read process. + *

+ * This includes handling of out of range requests. */ public class TestS3AInputStreamRetry extends AbstractS3AMockTest { @@ -111,7 +112,6 @@ public void testInputStreamReadFullyRetryForException() throws IOException { */ @Test public void testReadMultipleSeeksNoHttpResponse() throws Throwable { - final int l = INPUT.length(); final RuntimeException ex = sdkClientException(new NoHttpResponseException("no response")); // fail on even reads S3AInputStream stream = getMockedS3AInputStream( diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 4aae84dca8e53..e7783f1bdd2e5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -28,11 +28,14 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.statistics.IOStatistics; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream; @@ -56,6 +59,8 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { private static final Logger LOG = LoggerFactory.getLogger(ITestS3AOpenCost.class); + public static final String TEXT = "0123456789ABCDEF"; + private Path testFile; private FileStatus testFileStatus; @@ -76,7 +81,7 @@ public void setup() throws Exception { S3AFileSystem fs = getFileSystem(); testFile = methodPath(); - writeTextFile(fs, testFile, "openfile", true); + writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); fileLength = testFileStatus.getLen(); } @@ -137,15 +142,8 @@ public void testOpenFileShorterLength() throws Throwable { int offset = 2; long shortLen = fileLength - offset; // open the file - FSDataInputStream in2 = verifyMetrics(() -> - fs.openFile(testFile) - .must(FS_OPTION_OPENFILE_READ_POLICY, - FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) - .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen) - .build() - .get(), - always(NO_HEAD_OR_LIST), - with(STREAM_READ_OPENED, 0)); + FSDataInputStream in2 = openFile(shortLen, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL); // verify that the statistics are in range IOStatistics ioStatistics = extractStatistics(in2); @@ -171,39 +169,118 @@ public void testOpenFileShorterLength() throws Throwable { } @Test - public void testOpenFileLongerLength() throws Throwable { - // do a second read with the length declared as longer + public void testOpenFileLongerLengthReadFully() throws Throwable { + // do a read with the length declared as longer // than it is. // An EOF will be read on readFully(), -1 on a read() + final int extra = 10; + long longLen = fileLength + extra; + + + // assert behaviors of seeking/reading past the file length. + // there is no attempt at recovery. + verifyMetrics(() -> { + try (FSDataInputStream in = openFile(longLen, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { + byte[] out = new byte[(int) (longLen)]; + intercept(EOFException.class, + () -> in.readFully(0, out)); + in.seek(longLen - 1); + assertEquals("read past real EOF on " + in, + -1, in.read()); + return in.toString(); + } + }, + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + with(STREAM_READ_OPENED, 2)); + + // now on a new stream, try a full read from after the EOF + verifyMetrics(() -> { + try (FSDataInputStream in = openFile(longLen, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { + byte[] out = new byte[extra]; + intercept(EOFException.class, + () -> in.readFully(fileLength, out)); + return in.toString(); + } + }, + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + with(STREAM_READ_OPENED, 2)); + + + } + + /** + * Open a file. + * @param longLen length to declare + * @param policy read policy + * @return file handle + */ + private FSDataInputStream openFile(final long longLen, String policy) + throws Exception { S3AFileSystem fs = getFileSystem(); // set a length past the actual file length - long longLen = fileLength + 10; - FSDataInputStream in3 = verifyMetrics(() -> + return verifyMetrics(() -> fs.openFile(testFile) - .must(FS_OPTION_OPENFILE_READ_POLICY, - FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) + .must(FS_OPTION_OPENFILE_READ_POLICY, policy) .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen) .build() .get(), always(NO_HEAD_OR_LIST)); + } + + /** + * Read a file read() by read and expect it all to work through the file, -1 afterwards. + */ + @Test + public void testOpenFileLongerLengthReadCalls() throws Throwable { + + // set a length past the actual file length + final int extra = 10; + long longLen = fileLength + extra; + try (FSDataInputStream in = openFile(longLen, + FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + for (int i = 0; i < fileLength; i++) { + assertEquals("read() at " + i, + TEXT.charAt(i), in.read()); + } + } + + // now open and read after the EOF; this is + // expected to return -1 on each read; there's a GET per call. + // as the counters are updated on close(), the stream must be closed + // within the verification clause. + // note how there's no attempt to alter file expected length... + // instead the call always goes to S3. + // there's no information in the exception from the SDK + describe("reading past the end of the file"); - // assert behaviors of seeking/reading past the file length. - // there is no attempt at recovery. verifyMetrics(() -> { - byte[] out = new byte[(int) longLen]; - intercept(EOFException.class, - () -> in3.readFully(0, out)); - in3.seek(longLen - 1); - assertEquals("read past real EOF on " + in3, - -1, in3.read()); - in3.close(); - return in3.toString(); - }, - // two GET calls were made, one for readFully, - // the second on the read() past the EOF - // the operation has got as far as S3 - with(STREAM_READ_OPENED, 2)); + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + for (int i = 0; i < extra; i++) { + final long p = fileLength + i; + in.seek(p); + + assertEquals("read() at " + p, + -1, in.read()); + } + return in.toString(); + } + + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); + + // now, next corner case. Do a block read() of more bytes than the file length. + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + + } } } From 64c64433b74e3d038d4b52c709699d4d4ee55c53 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 10 Jan 2024 14:22:14 +0000 Subject: [PATCH 3/7] HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions If a read() on the inner wrapped stream returned -1 then it is closed. There is no attempt to actually recover within a readFully(); there's a switch to turn this on, but if anyone does it a test will spin forever as the inner PositionedReadable.read(position, buffer, len) downgrades all EOF exceptions to -1. A new method would need to be added which controls whether to downgrade/rethrow exceptions, which makes for more complex work. What does that mean? Possibly reduced resilience to non-retried failures on the inner stream, even though more channel exceptions are retried on. +Split out tests of different read methods into their own test cases. Change-Id: I8f7e75ba73a376e7dfd06c5bb9ebc4138bc80394 --- .../apache/hadoop/fs/s3a/S3AInputStream.java | 50 ++++-- .../s3a/ITestS3AContractVectoredRead.java | 22 ++- .../fs/s3a/TestS3AInputStreamRetry.java | 22 ++- .../fs/s3a/performance/ITestS3AOpenCost.java | 152 +++++++++++++++--- 4 files changed, 212 insertions(+), 34 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index ea3040549fbaf..3d2ecc77376bf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, public static final String OPERATION_OPEN = "open"; public static final String OPERATION_REOPEN = "re-open"; + /** + * Switch for behavior on when wrappedStream.read() + * returns -1 or raises an EOF; the original semantics + * are that the stream is kept open. + * Value {@value}. + */ + private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true; + /** * This is the maximum temporary buffer size we use while * populating the data in direct byte buffers during a vectored IO @@ -333,7 +341,7 @@ private void seekQuietly(long positiveTargetPos) { @Retries.OnceTranslated private void seekInStream(long targetPos, long length) throws IOException { checkNotClosed(); - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { return; } // compute how much more to skip @@ -428,7 +436,7 @@ private void lazySeek(long targetPos, long len) throws IOException { seekInStream(targetPos, len); //re-open at specific location if needed - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { reopen("read from new offset", targetPos, len, false); } }); @@ -469,7 +477,7 @@ public synchronized int read() throws IOException { // When exception happens before re-setting wrappedStream in "reopen" called // by onReadFailure, then wrappedStream will be null. But the **retry** may // re-execute this block and cause NPE if we don't check wrappedStream - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { reopen("failure recovery", getPos(), 1, false); } try { @@ -487,10 +495,9 @@ public synchronized int read() throws IOException { if (byteRead >= 0) { pos++; nextReadPos++; - } - - if (byteRead >= 0) { incrementBytesRead(1); + } else { + streamReadResultNegative(); } return byteRead; } @@ -516,6 +523,18 @@ private void onReadFailure(IOException ioe, boolean forceAbort) { closeStream("failure recovery", forceAbort, false); } + /** + * the read() call returned -1. + * this means "the connection has gone past the end of the object" or + * the stream has broken for some reason. + * so close stream (without an abort). + */ + private void streamReadResultNegative() { + if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) { + closeStream("wrappedStream.read() returned -1", false, false); + } + } + /** * {@inheritDoc} * @@ -555,16 +574,18 @@ public synchronized int read(byte[] buf, int off, int len) // When exception happens before re-setting wrappedStream in "reopen" called // by onReadFailure, then wrappedStream will be null. But the **retry** may // re-execute this block and cause NPE if we don't check wrappedStream - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { reopen("failure recovery", getPos(), 1, false); } try { + // read data; will block until there is data or the end of the stream is reached. + // returns 0 for "stream is open but no data yet" and -1 for "end of stream". bytes = wrappedStream.read(buf, off, len); } catch (HttpChannelEOFException | SocketTimeoutException e) { onReadFailure(e, true); throw e; } catch (EOFException e) { - // the base implementation swallows EOFs. + LOG.debug("EOFException raised by http stream read(); downgrading to a -1 response", e); return -1; } catch (IOException e) { onReadFailure(e, false); @@ -576,8 +597,10 @@ public synchronized int read(byte[] buf, int off, int len) if (bytesRead > 0) { pos += bytesRead; nextReadPos += bytesRead; + incrementBytesRead(bytesRead); + } else { + streamReadResultNegative(); } - incrementBytesRead(bytesRead); streamStatistics.readOperationCompleted(len, bytesRead); return bytesRead; } @@ -825,6 +848,9 @@ public void readFully(long position, byte[] buffer, int offset, int length) while (nread < length) { int nbytes = read(buffer, offset + nread, length - nread); if (nbytes < 0) { + // no attempt is currently made to recover from stream read problems; + // a lazy seek to the offset is probably the solution. + // but it will need more qualification against failure handling throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); } nread += nbytes; @@ -1264,8 +1290,12 @@ public boolean hasCapability(String capability) { } } + /** + * Is the inner object stream open? + * @return true if there is an active HTTP request to S3. + */ @VisibleForTesting - boolean isObjectStreamOpen() { + public boolean isObjectStreamOpen() { return wrappedStream != null; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 5c91b7340491a..9966393d41fdb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -48,6 +49,7 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; @@ -92,13 +94,15 @@ public void testEOFRanges() throws Exception { public void testEOFRanges416Handling() throws Exception { FileSystem fs = getFileSystem(); + final int extendedLen = DATASET_LEN + 1024; CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) - .mustLong(FS_OPTION_OPENFILE_LENGTH, DATASET_LEN + 1024) + .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen) .build(); List fileRanges = new ArrayList<>(); fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + describe("Read starting from past EOF"); try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, getAllocate()); FileRange res = fileRanges.get(0); @@ -109,8 +113,22 @@ public void testEOFRanges416Handling() throws Exception { TimeUnit.SECONDS, data); } - } + describe("Read starting 0 continuing past EOF"); + try (FSDataInputStream in = fs.openFile(path(VECTORED_READ_FILE_NAME)) + .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen) + .build().get()) { + final FileRange range = FileRange.createFileRange(0, extendedLen); + in.readVectored(Arrays.asList(range), getAllocate()); + CompletableFuture data = range.getData(); + interceptFuture(EOFException.class, + EOF_IN_READ_FULLY, + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); + } + + } @Test public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index 0cc5b5b1ed5fd..6eccdc23dd5d5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -107,8 +107,8 @@ public void testInputStreamReadFullyRetryForException() throws IOException { } /** - * seek and read repeatedly with every second GET failing; this is effective - * in simulating reopen() failures. + * Seek and read repeatedly with every second GET failing with {@link NoHttpResponseException}. + * This should be effective in simulating {@code reopen()} failures caused by network problems. */ @Test public void testReadMultipleSeeksNoHttpResponse() throws Throwable { @@ -124,6 +124,24 @@ public void testReadMultipleSeeksNoHttpResponse() throws Throwable { } } + /** + * Seek and read repeatedly with every second GET failing with {@link NoHttpResponseException}. + * This should be effective in simulating {@code reopen()} failures caused by network problems. + */ + @Test + public void testReadMultipleSeeksStreamClosed() throws Throwable { + final RuntimeException ex = sdkClientException(new NoHttpResponseException("no response")); + // fail on even reads + S3AInputStream stream = getMockedS3AInputStream( + maybeFailInGetCallback(ex, (index) -> (index % 2 == 0))); + // 10 reads with repeated failures. + for (int i = 0; i < 10; i++) { + stream.seek(0); + final int r = stream.read(); + assertReadValueMatchesOffset(r, 0, "read attempt " + i + " of " + stream); + } + } + /** * Assert that the result of read() matches the char at the expected offset. * @param r read result diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index e7783f1bdd2e5..5de0534b9e3c0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -20,16 +20,20 @@ import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -65,7 +69,7 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { private FileStatus testFileStatus; - private long fileLength; + private int fileLength; public ITestS3AOpenCost() { super(true); @@ -83,7 +87,7 @@ public void setup() throws Exception { writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); - fileLength = testFileStatus.getLen(); + fileLength = (int)testFileStatus.getLen(); } /** @@ -195,7 +199,7 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 - with(STREAM_READ_OPENED, 2)); + with(STREAM_READ_OPENED, 1 + 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -210,9 +214,7 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 - with(STREAM_READ_OPENED, 2)); - - + with(STREAM_READ_OPENED, 1)); } /** @@ -235,19 +237,21 @@ private FSDataInputStream openFile(final long longLen, String policy) } /** - * Read a file read() by read and expect it all to work through the file, -1 afterwards. + * Open a file with a length declared as longer than the actual file length. + * Validate input stream.read() semantics. */ @Test - public void testOpenFileLongerLengthReadCalls() throws Throwable { + public void testReadPastEOF() throws Throwable { // set a length past the actual file length final int extra = 10; - long longLen = fileLength + extra; + int longLen = fileLength + extra; try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { for (int i = 0; i < fileLength; i++) { - assertEquals("read() at " + i, - TEXT.charAt(i), in.read()); + Assertions.assertThat(in.read()) + .describedAs("read() at %d", i) + .isEqualTo(TEXT.charAt(i)); } } @@ -264,23 +268,131 @@ public void testOpenFileLongerLengthReadCalls() throws Throwable { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { for (int i = 0; i < extra; i++) { - final long p = fileLength + i; + final int p = fileLength + i; in.seek(p); - - assertEquals("read() at " + p, - -1, in.read()); + Assertions.assertThat(in.read()) + .describedAs("read() at %d", p) + .isEqualTo(-1); } return in.toString(); } }, with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); + } - // now, next corner case. Do a block read() of more bytes than the file length. - try (FSDataInputStream in = - openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + /** + * Test {@code PositionedReadable.readFully()} past EOF in a file. + */ + @Test + public void testPositionedReadableReadFullyPastEOF() throws Throwable { + // now, next corner case. Do a readFully() of more bytes than the file length. + // we expect failure. + // this codepath does a GET to the end of the (expected) file length, and when + // that GET returns -1 from the read because the bytes returned is less than + // expected then the readFully call fails. + describe("PositionedReadable.readFully() past the end of the file"); + // set a length past the actual file length + final int extra = 10; + int longLen = fileLength + extra; + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + byte[] buf = new byte[(int) (longLen + 1)]; + + // readFully will fail + intercept(EOFException.class, () -> { + in.readFully(0, buf); + return in; + }); + assertS3StreamClosed(in); + return "readFully past EOF"; + } + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + } - } + /** + * Test {@code PositionedReadable#read()} past EOF in a file. + */ + @Test + public void testPositionedReadableReadPastEOF() throws Throwable { + + // set a length past the actual file length + final int extra = 10; + int longLen = fileLength + extra; + + describe("PositionedReadable.read() past the end of the file"); + + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + byte[] buf = new byte[(int) (longLen + 1)]; + + // readFully will read to the end of the file + Assertions.assertThat(in.read(0, buf, 0, buf.length)) + .isEqualTo(fileLength); + assertS3StreamOpen(in); + + // now attempt to read after EOF + Assertions.assertThat(in.read(fileLength, buf, 0, buf.length)) + .describedAs("PositionedReadable.read() past EOF") + .isEqualTo(-1); + // stream is closed as part of this failure + assertS3StreamClosed(in); + + return "PositionedReadable.read()) past EOF"; + } + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + } + + /** + * Test Vector Read past EOF in a file. + */ + @Test + public void testVectorReadPastEOF() throws Throwable { + + // set a length past the actual file length + final int extra = 10; + int longLen = fileLength + extra; + + describe("Vector read past the end of the file"); + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + byte[] buf = new byte[(int) (longLen + 1)]; + ByteBuffer bb = ByteBuffer.wrap(buf); + final FileRange range = FileRange.createFileRange(0, longLen); + in.readVectored(Arrays.asList(range), (i) -> bb); + assertS3StreamClosed(in); + return "vector read past EOF"; + } + + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, 0)); // vector stats don't add this + } + + /** + * Assert that the inner S3 Stream is closed. + * @param in input stream + */ + private static void assertS3StreamClosed(final FSDataInputStream in) { + S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is open") + .isFalse(); + } + + /** + * Assert that the inner S3 Stream is open. + * @param in input stream + */ + private static void assertS3StreamOpen(final FSDataInputStream in) { + S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is closed") + .isTrue(); } } From ba19dabaf06ff0be929c405950afcbe456708cde Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 10 Jan 2024 14:46:08 +0000 Subject: [PATCH 4/7] HADOOP-19027. checkstyles Change-Id: Ided5c0c3967199b7485641a40b06fba92aeea948 --- .../java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java | 3 ++- .../org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java index 74d2e28ffc307..7934a5c7d4d5c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java @@ -54,7 +54,8 @@ public final class ErrorTranslation { /** * Classname of unshaded Http Client exception: {@value}. */ - private static final String RAW_NO_HTTP_RESPONSE_EXCEPTION = "org.apache.http.NoHttpResponseException"; + private static final String RAW_NO_HTTP_RESPONSE_EXCEPTION = + "org.apache.http.NoHttpResponseException"; /** * Classname of shaded Http Client exception: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 5de0534b9e3c0..fc56b0474a53c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -276,7 +276,6 @@ public void testReadPastEOF() throws Throwable { } return in.toString(); } - }, with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); } From dcf1ac22c48e5fdf7de7cf3ff52f1dcd03c0fc0e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 10 Jan 2024 15:05:35 +0000 Subject: [PATCH 5/7] HADOOP-19027. checkstyle and code review. Change-Id: I37f05880cad808db9f53035d4d49af48a333f2c7 --- .../fs/s3a/audit/AWSRequestAnalyzer.java | 7 ++++++- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 2 +- .../fs/s3a/TestS3AExceptionTranslation.java | 1 - .../fs/s3a/performance/ITestS3AOpenCost.java | 20 ++++++++++++++----- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index 3cb8d97532448..3df862055d197 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -294,6 +294,11 @@ private static long toSafeLong(final Number size) { private static final String BYTES_PREFIX = "bytes="; + /** + * Given a range header, determine the size of the request. + * @param rangeHeader header string + * @return parsed size or -1 for problems + */ private static Number sizeFromRangeHeader(String rangeHeader) { if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) { String[] values = rangeHeader @@ -302,7 +307,7 @@ private static Number sizeFromRangeHeader(String rangeHeader) { if (values.length == 2) { try { long start = Long.parseUnsignedLong(values[0]); - long end = Long.parseUnsignedLong(values[0]); + long end = Long.parseUnsignedLong(values[1]); return end - start; } catch(NumberFormatException e) { } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index b679e757f60e7..6dc3ca11028a6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -1760,7 +1760,7 @@ public static Pair requestRange(String rangeHeader) { if (values.length == 2) { try { long start = Long.parseUnsignedLong(values[0]); - long end = Long.parseUnsignedLong(values[0]); + long end = Long.parseUnsignedLong(values[1]); return Pair.of(start, end); } catch (NumberFormatException e) { LOG.warn("Failed to parse range header {}", rangeHeader, e); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java index 7690349f722d9..6b894a6813704 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java @@ -278,7 +278,6 @@ public void testExtractInterruptedIO() throws Throwable { .build())); } - @Test public void testTranslateCredentialException() throws Throwable { verifyExceptionClass(AccessDeniedException.class, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index fc56b0474a53c..5b60da74a430f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -22,6 +22,8 @@ import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -32,12 +34,14 @@ import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.statistics.IOStatistics; +import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; @@ -54,6 +58,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; /** * Cost of openFile(). @@ -311,9 +316,8 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable { with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } - /** - * Test {@code PositionedReadable#read()} past EOF in a file. + * Test {@code PositionedReadable.read()} past EOF in a file. */ @Test public void testPositionedReadableReadPastEOF() throws Throwable { @@ -349,6 +353,7 @@ public void testPositionedReadableReadPastEOF() throws Throwable { /** * Test Vector Read past EOF in a file. + * See related tests in {@code ITestS3AContractVectoredRead} */ @Test public void testVectorReadPastEOF() throws Throwable { @@ -361,16 +366,21 @@ public void testVectorReadPastEOF() throws Throwable { verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { - byte[] buf = new byte[(int) (longLen + 1)]; + assertS3StreamClosed(in); + byte[] buf = new byte[(int) (longLen)]; ByteBuffer bb = ByteBuffer.wrap(buf); final FileRange range = FileRange.createFileRange(0, longLen); in.readVectored(Arrays.asList(range), (i) -> bb); + interceptFuture(EOFException.class, + EOF_IN_READ_FULLY, + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + range.getData()); assertS3StreamClosed(in); return "vector read past EOF"; } - }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 0)); // vector stats don't add this + with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); } /** From fc29efba9ad0fc17c71a8d30b416cc73a363b29d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 10 Jan 2024 15:50:55 +0000 Subject: [PATCH 6/7] HADOOP-19027. unused import Change-Id: I51d860b8ecc774149307ebba2d2b6ec393671fff --- .../org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 5b60da74a430f..72d2a2eec380e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -22,7 +22,6 @@ import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.assertj.core.api.Assertions; From 776ac8b7e9aec690427b4457f28f57da1b6d8f54 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 12 Jan 2024 15:58:32 +0000 Subject: [PATCH 7/7] HADOOP-19027. checkstyle Change-Id: I483d0aa29d45956a4cf9b1efd939978b550fd722 --- .../fs/s3a/performance/ITestS3AOpenCost.java | 155 +++++++++--------- 1 file changed, 76 insertions(+), 79 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 72d2a2eec380e..361c376cffd7f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -189,17 +189,15 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { // assert behaviors of seeking/reading past the file length. // there is no attempt at recovery. verifyMetrics(() -> { - try (FSDataInputStream in = openFile(longLen, - FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { - byte[] out = new byte[(int) (longLen)]; - intercept(EOFException.class, - () -> in.readFully(0, out)); - in.seek(longLen - 1); - assertEquals("read past real EOF on " + in, - -1, in.read()); - return in.toString(); - } - }, + try (FSDataInputStream in = openFile(longLen, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { + byte[] out = new byte[(int) (longLen)]; + intercept(EOFException.class, () -> in.readFully(0, out)); + in.seek(longLen - 1); + assertEquals("read past real EOF on " + in, -1, in.read()); + return in.toString(); + } + }, // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 @@ -207,17 +205,17 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { - try (FSDataInputStream in = openFile(longLen, - FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { - byte[] out = new byte[extra]; - intercept(EOFException.class, - () -> in.readFully(fileLength, out)); - return in.toString(); - } - }, + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { + byte[] out = new byte[extra]; + intercept(EOFException.class, () -> in.readFully(fileLength, out)); + return in.toString(); + } + }, // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 + with(STREAM_READ_OPENED, 1)); } @@ -269,18 +267,18 @@ public void testReadPastEOF() throws Throwable { describe("reading past the end of the file"); verifyMetrics(() -> { - try (FSDataInputStream in = - openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { - for (int i = 0; i < extra; i++) { - final int p = fileLength + i; - in.seek(p); - Assertions.assertThat(in.read()) - .describedAs("read() at %d", p) - .isEqualTo(-1); - } - return in.toString(); - } - }, + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + for (int i = 0; i < extra; i++) { + final int p = fileLength + i; + in.seek(p); + Assertions.assertThat(in.read()) + .describedAs("read() at %d", p) + .isEqualTo(-1); + } + return in.toString(); + } + }, with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); } @@ -299,19 +297,18 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable { final int extra = 10; int longLen = fileLength + extra; verifyMetrics(() -> { - try (FSDataInputStream in = - openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { - byte[] buf = new byte[(int) (longLen + 1)]; - - // readFully will fail - intercept(EOFException.class, () -> { - in.readFully(0, buf); - return in; - }); - assertS3StreamClosed(in); - return "readFully past EOF"; - } - }, + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + byte[] buf = new byte[(int) (longLen + 1)]; + // readFully will fail + intercept(EOFException.class, () -> { + in.readFully(0, buf); + return in; + }); + assertS3StreamClosed(in); + return "readFully past EOF"; + } + }, with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } @@ -328,25 +325,25 @@ public void testPositionedReadableReadPastEOF() throws Throwable { describe("PositionedReadable.read() past the end of the file"); verifyMetrics(() -> { - try (FSDataInputStream in = - openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { - byte[] buf = new byte[(int) (longLen + 1)]; - - // readFully will read to the end of the file - Assertions.assertThat(in.read(0, buf, 0, buf.length)) - .isEqualTo(fileLength); - assertS3StreamOpen(in); - - // now attempt to read after EOF - Assertions.assertThat(in.read(fileLength, buf, 0, buf.length)) - .describedAs("PositionedReadable.read() past EOF") - .isEqualTo(-1); - // stream is closed as part of this failure - assertS3StreamClosed(in); - - return "PositionedReadable.read()) past EOF"; - } - }, + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + byte[] buf = new byte[(int) (longLen + 1)]; + + // readFully will read to the end of the file + Assertions.assertThat(in.read(0, buf, 0, buf.length)) + .isEqualTo(fileLength); + assertS3StreamOpen(in); + + // now attempt to read after EOF + Assertions.assertThat(in.read(fileLength, buf, 0, buf.length)) + .describedAs("PositionedReadable.read() past EOF") + .isEqualTo(-1); + // stream is closed as part of this failure + assertS3StreamClosed(in); + + return "PositionedReadable.read()) past EOF"; + } + }, with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } @@ -363,22 +360,22 @@ public void testVectorReadPastEOF() throws Throwable { describe("Vector read past the end of the file"); verifyMetrics(() -> { - try (FSDataInputStream in = - openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { - assertS3StreamClosed(in); - byte[] buf = new byte[(int) (longLen)]; - ByteBuffer bb = ByteBuffer.wrap(buf); - final FileRange range = FileRange.createFileRange(0, longLen); - in.readVectored(Arrays.asList(range), (i) -> bb); - interceptFuture(EOFException.class, - EOF_IN_READ_FULLY, - ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, - TimeUnit.SECONDS, - range.getData()); - assertS3StreamClosed(in); - return "vector read past EOF"; - } - }, + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + assertS3StreamClosed(in); + byte[] buf = new byte[longLen]; + ByteBuffer bb = ByteBuffer.wrap(buf); + final FileRange range = FileRange.createFileRange(0, longLen); + in.readVectored(Arrays.asList(range), (i) -> bb); + interceptFuture(EOFException.class, + EOF_IN_READ_FULLY, + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + range.getData()); + assertS3StreamClosed(in); + return "vector read past EOF"; + } + }, with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); }