Skip to content

Commit 966149f

Browse files
committed
wip
1 parent 118c85f commit 966149f

File tree

11 files changed

+930
-64
lines changed

11 files changed

+930
-64
lines changed

core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java

Lines changed: 156 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,19 @@
2020
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD;
2121
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER;
2222
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_UNSIGNED_PAYLOAD_TRAILER;
23+
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_DECODED_CONTENT_LENGTH;
2324
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_TRAILER;
25+
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerUtils.computeAndMoveContentLength;
2426

2527
import java.io.InputStream;
28+
import java.nio.ByteBuffer;
2629
import java.nio.charset.StandardCharsets;
2730
import java.util.ArrayList;
2831
import java.util.Collections;
2932
import java.util.List;
33+
import java.util.Optional;
34+
import java.util.concurrent.CompletableFuture;
35+
import org.reactivestreams.Publisher;
3036
import software.amazon.awssdk.annotations.SdkInternalApi;
3137
import software.amazon.awssdk.checksums.SdkChecksum;
3238
import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm;
@@ -35,8 +41,11 @@
3541
import software.amazon.awssdk.http.SdkHttpRequest;
3642
import software.amazon.awssdk.http.auth.aws.internal.signer.CredentialScope;
3743
import software.amazon.awssdk.http.auth.aws.internal.signer.NoOpPayloadChecksumStore;
44+
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.AsyncChunkEncodedPayload;
3845
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChecksumTrailerProvider;
3946
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedInputStream;
47+
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPayload;
48+
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPublisher;
4049
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.TrailerProvider;
4150
import software.amazon.awssdk.http.auth.aws.internal.signer.io.ChecksumInputStream;
4251
import software.amazon.awssdk.http.auth.aws.internal.signer.io.ResettableContentStreamProvider;
@@ -112,10 +121,66 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigni
112121
return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build);
113122
}
114123

124+
/**
125+
* Given a payload and result of request signing, sign the payload via the SigV4 process.
126+
*/
127+
@Override
128+
public Publisher<ByteBuffer> signAsync(Publisher<ByteBuffer> payload, V4aRequestSigningResult requestSigningResult) {
129+
ChunkedEncodedPublisher.Builder chunkedStreamBuilder = ChunkedEncodedPublisher.builder()
130+
.publisher(payload)
131+
.chunkSize(chunkSize)
132+
.addEmptyTrailingChunk(true);
133+
AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload(chunkedStreamBuilder);
134+
135+
signCommon(chunkedPayload, requestSigningResult);
136+
137+
return chunkedStreamBuilder.build();
138+
}
139+
140+
private ChunkedEncodedPayload signCommon(ChunkedEncodedPayload payload, V4aRequestSigningResult requestSigningResult) {
141+
SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest();
142+
143+
payload.decodedContentLength(request.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH)
144+
.map(Long::parseLong)
145+
.orElseThrow(() -> {
146+
String msg = String.format("Expected header '%s' to be present",
147+
X_AMZ_DECODED_CONTENT_LENGTH);
148+
return new RuntimeException(msg);
149+
}));
150+
151+
preExistingTrailers.forEach(trailer -> payload.addTrailer(() -> trailer));
152+
153+
switch (requestSigningResult.getSigningConfig().getSignedBodyValue()) {
154+
case STREAMING_ECDSA_SIGNED_PAYLOAD: {
155+
RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(),
156+
requestSigningResult.getSigningConfig());
157+
payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope));
158+
break;
159+
}
160+
case STREAMING_UNSIGNED_PAYLOAD_TRAILER:
161+
setupChecksumTrailerIfNeeded(payload);
162+
break;
163+
case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: {
164+
RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(),
165+
requestSigningResult.getSigningConfig());
166+
payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope));
167+
setupChecksumTrailerIfNeeded(payload);
168+
payload.addTrailer(
169+
new SigV4aTrailerProvider(payload.trailers(), rollingSigner, credentialScope)
170+
);
171+
break;
172+
}
173+
default:
174+
throw new UnsupportedOperationException();
175+
}
176+
177+
return payload;
178+
}
179+
115180
@Override
116181
public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload, String checksum) {
117182
long encodedContentLength = 0;
118-
long contentLength = SignerUtils.computeAndMoveContentLength(request, payload);
183+
long contentLength = computeAndMoveContentLength(request, payload);
119184
setupPreExistingTrailers(request);
120185

121186
// pre-existing trailers
@@ -157,6 +222,72 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
157222
// CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it
158223
}
159224

225+
@Override
226+
public CompletableFuture<Pair<SdkHttpRequest.Builder, Optional<Publisher<ByteBuffer>>>> beforeSigningAsync(
227+
SdkHttpRequest.Builder request, Publisher<ByteBuffer> payload, String checksum) {
228+
229+
return SignerUtils.moveContentLength(request, payload)
230+
.thenApply(p -> {
231+
SdkHttpRequest.Builder requestBuilder = p.left();
232+
setupPreExistingTrailers(requestBuilder);
233+
234+
long decodedContentLength =
235+
requestBuilder.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH)
236+
.map(Long::parseLong)
237+
// should not happen, this header is added by
238+
// moveContentLength
239+
.orElseThrow(() -> new RuntimeException(
240+
X_AMZ_DECODED_CONTENT_LENGTH + " header not present"));
241+
242+
long encodedContentLength = calculateEncodedContentLength(request, decodedContentLength, checksum);
243+
244+
if (checksumAlgorithm != null) {
245+
String checksumHeaderName = checksumHeaderName(checksumAlgorithm);
246+
request.appendHeader(X_AMZ_TRAILER, checksumHeaderName);
247+
}
248+
request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength));
249+
250+
return Pair.of(requestBuilder, p.right());
251+
});
252+
}
253+
254+
private long calculateEncodedContentLength(SdkHttpRequest.Builder requestBuilder, long decodedContentLength,
255+
String checksum) {
256+
long encodedContentLength = 0;
257+
258+
encodedContentLength += calculateExistingTrailersLength();
259+
260+
switch (checksum) {
261+
case STREAMING_ECDSA_SIGNED_PAYLOAD: {
262+
long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes>
263+
encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength);
264+
break;
265+
}
266+
case STREAMING_UNSIGNED_PAYLOAD_TRAILER:
267+
if (checksumAlgorithm != null) {
268+
encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm));
269+
}
270+
encodedContentLength += calculateChunksLength(decodedContentLength, 0);
271+
break;
272+
case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: {
273+
long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes>
274+
encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength);
275+
if (checksumAlgorithm != null) {
276+
encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm));
277+
}
278+
encodedContentLength += 170; // x-amz-trailer-signature:<sigv4a-ecsda hex signature, 144 bytes>\r\n
279+
break;
280+
}
281+
default:
282+
throw new UnsupportedOperationException();
283+
}
284+
285+
// terminating \r\n
286+
encodedContentLength += 2;
287+
288+
return encodedContentLength;
289+
}
290+
160291
/**
161292
* Set up a map of pre-existing trailer (headers) for the given request to be used when chunk-encoding the payload.
162293
* <p>
@@ -270,6 +401,30 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil
270401
builder.inputStream(checksumInputStream).addTrailer(checksumTrailer);
271402
}
272403

404+
private void setupChecksumTrailerIfNeeded(ChunkedEncodedPayload payload) {
405+
if (checksumAlgorithm == null) {
406+
return;
407+
}
408+
String checksumHeaderName = checksumHeaderName(checksumAlgorithm);
409+
410+
String cachedChecksum = getCachedChecksum();
411+
412+
if (cachedChecksum != null) {
413+
LOG.debug(() -> String.format("Cached payload checksum available for algorithm %s: %s. Using cached value",
414+
checksumAlgorithm.algorithmId(), checksumHeaderName));
415+
payload.addTrailer(() -> Pair.of(checksumHeaderName, Collections.singletonList(cachedChecksum)));
416+
return;
417+
}
418+
419+
SdkChecksum sdkChecksum = fromChecksumAlgorithm(checksumAlgorithm);
420+
payload.checksumPayload(sdkChecksum);
421+
422+
TrailerProvider checksumTrailer =
423+
new ChecksumTrailerProvider(sdkChecksum, checksumHeaderName, checksumAlgorithm, payloadChecksumStore);
424+
425+
payload.addTrailer(checksumTrailer);
426+
}
427+
273428
private String getCachedChecksum() {
274429
byte[] checksumBytes = payloadChecksumStore.getChecksumValue(checksumAlgorithm);
275430
if (checksumBytes != null) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.auth.aws.crt.internal.signer;
17+
18+
import java.nio.ByteBuffer;
19+
import org.reactivestreams.Publisher;
20+
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
22+
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
23+
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;
24+
25+
@SdkInternalApi
26+
public final class CrtRequestBodyAdapter implements HttpRequestBodyStream {
27+
private static final int BUFFER_SIZE = 4 * 1024 * 1024; // 4 MB
28+
private final Publisher<ByteBuffer> requestPublisher;
29+
private final long contentLength;
30+
private ByteBufferStoringSubscriber requestBodySubscriber;
31+
32+
public CrtRequestBodyAdapter(Publisher<ByteBuffer> requestPublisher, long contentLength) {
33+
this.requestPublisher = requestPublisher;
34+
this.contentLength = contentLength;
35+
this.requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE);
36+
}
37+
38+
@Override
39+
public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
40+
return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM;
41+
}
42+
43+
@Override
44+
public boolean resetPosition() {
45+
requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE);
46+
requestPublisher.subscribe(requestBodySubscriber);
47+
return true;
48+
}
49+
50+
@Override
51+
public long getLength() {
52+
return contentLength;
53+
}
54+
}

0 commit comments

Comments
 (0)