Skip to content

Commit 9a89a55

Browse files
authored
Request compression sync streaming (#4222)
* Refactor to common class AwsChunkedInputStream * Sync streaming compression * Sync streaming compression functional tests * Sync streaming compression integ tests * Fix integ test * Add unit tests
1 parent f613a2a commit 9a89a55

File tree

16 files changed

+930
-211
lines changed

16 files changed

+930
-211
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/traits/RequestCompressionTrait.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,9 @@ public static CodeBlock create(OperationModel operationModel, IntermediateModel
4242
return CodeBlock.of("");
4343
}
4444

45-
// TODO : remove once request compression for streaming operations is supported
46-
if (operationModel.isStreaming()) {
47-
throw new IllegalStateException("Request compression for streaming operations is not yet supported in the AWS SDK "
48-
+ "for Java.");
49-
}
50-
51-
// TODO : remove once S3 checksum interceptors are moved to occur after CompressRequestStage
45+
// TODO : remove once:
46+
// 1) S3 checksum interceptors are moved to occur after CompressRequestStage
47+
// 2) Transfer-Encoding:chunked is supported in S3
5248
if (model.getMetadata().getServiceName().equals("S3")) {
5349
throw new IllegalStateException("Request compression for S3 is not yet supported in the AWS SDK for Java.");
5450
}

core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
@SdkInternalApi
4141
public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream {
4242

43-
private static final String CRLF = "\r\n";
4443
private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
4544
private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:";
4645
private String previousChunkSignature;

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
3636
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
3737
import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline;
38+
import software.amazon.awssdk.core.internal.sync.CompressionContentStreamProvider;
3839
import software.amazon.awssdk.http.ContentStreamProvider;
3940
import software.amazon.awssdk.http.SdkHttpFullRequest;
4041
import software.amazon.awssdk.utils.IoUtils;
@@ -67,10 +68,21 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ
6768
compressEntirePayload(input, compressor);
6869
updateContentEncodingHeader(input, compressor);
6970
updateContentLengthHeader(input);
71+
return input;
72+
}
73+
74+
if (!isTransferEncodingChunked(input)) {
75+
return input;
7076
}
7177

72-
// TODO : streaming - sync & async
78+
if (context.requestProvider() == null) {
79+
// sync streaming
80+
input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor));
81+
}
82+
83+
// TODO : streaming - async
7384

85+
updateContentEncodingHeader(input, compressor);
7486
return input;
7587
}
7688

@@ -123,6 +135,12 @@ private void updateContentLengthHeader(SdkHttpFullRequest.Builder input) {
123135
}
124136
}
125137

138+
private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) {
139+
return input.firstMatchingHeader("Transfer-Encoding")
140+
.map(headerValue -> headerValue.equals("chunked"))
141+
.orElse(false);
142+
}
143+
126144
private Compressor resolveCompressorType(ExecutionAttributes executionAttributes) {
127145
List<String> encodings =
128146
executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).getEncodings();

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java

Lines changed: 11 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
2323
import software.amazon.awssdk.core.checksums.SdkChecksum;
2424
import software.amazon.awssdk.core.internal.chunked.AwsChunkedEncodingConfig;
25-
import software.amazon.awssdk.core.io.SdkInputStream;
26-
import software.amazon.awssdk.utils.Logger;
2725
import software.amazon.awssdk.utils.Validate;
2826

2927
/**
@@ -37,37 +35,18 @@
3735
* the wrapped stream.
3836
*/
3937
@SdkInternalApi
40-
public abstract class AwsChunkedEncodingInputStream extends SdkInputStream {
38+
public abstract class AwsChunkedEncodingInputStream extends AwsChunkedInputStream {
4139

42-
public static final int DEFAULT_CHUNK_SIZE = 128 * 1024;
43-
protected static final int SKIP_BUFFER_SIZE = 256 * 1024;
4440
protected static final String CRLF = "\r\n";
4541
protected static final byte[] FINAL_CHUNK = new byte[0];
4642
protected static final String HEADER_COLON_SEPARATOR = ":";
47-
private static final Logger log = Logger.loggerFor(AwsChunkedEncodingInputStream.class);
4843
protected byte[] calculatedChecksum = null;
4944
protected final String checksumHeaderForTrailer;
5045
protected boolean isTrailingTerminated = true;
51-
private InputStream is = null;
5246
private final int chunkSize;
5347
private final int maxBufferSize;
5448
private final SdkChecksum sdkChecksum;
5549
private boolean isLastTrailingCrlf;
56-
/**
57-
* Iterator on the current chunk.
58-
*/
59-
private ChunkContentIterator currentChunkIterator;
60-
61-
/**
62-
* Iterator on the buffer of the decoded stream,
63-
* Null if the wrapped stream is marksupported,
64-
* otherwise it will be initialized when this wrapper is marked.
65-
*/
66-
private DecodedStreamBuffer decodedStreamBuffer;
67-
68-
private boolean isAtStart = true;
69-
private boolean isTerminating = false;
70-
7150

7251
/**
7352
* Creates a chunked encoding input stream initialized with the originating stream. The configuration allows
@@ -89,10 +68,10 @@ protected AwsChunkedEncodingInputStream(InputStream in,
8968
AwsChunkedEncodingInputStream originalChunkedStream = (AwsChunkedEncodingInputStream) in;
9069
providedMaxBufferSize = Math.max(originalChunkedStream.maxBufferSize, providedMaxBufferSize);
9170
is = originalChunkedStream.is;
92-
decodedStreamBuffer = originalChunkedStream.decodedStreamBuffer;
71+
underlyingStreamBuffer = originalChunkedStream.underlyingStreamBuffer;
9372
} else {
9473
is = in;
95-
decodedStreamBuffer = null;
74+
underlyingStreamBuffer = null;
9675
}
9776
this.chunkSize = awsChunkedEncodingConfig.chunkSize();
9877
this.maxBufferSize = providedMaxBufferSize;
@@ -153,19 +132,6 @@ public T checksumHeaderForTrailer(String checksumHeaderForTrailer) {
153132

154133
}
155134

156-
@Override
157-
public int read() throws IOException {
158-
byte[] tmp = new byte[1];
159-
int count = read(tmp, 0, 1);
160-
if (count > 0) {
161-
log.debug(() -> "One byte read from the stream.");
162-
int unsignedByte = (int) tmp[0] & 0xFF;
163-
return unsignedByte;
164-
} else {
165-
return count;
166-
}
167-
}
168-
169135
@Override
170136
public int read(byte[] b, int off, int len) throws IOException {
171137
abortIfNeeded();
@@ -211,32 +177,6 @@ private boolean setUpTrailingChunks() {
211177
return true;
212178
}
213179

214-
@Override
215-
public long skip(long n) throws IOException {
216-
if (n <= 0) {
217-
return 0;
218-
}
219-
long remaining = n;
220-
int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n);
221-
byte[] temp = new byte[toskip];
222-
while (remaining > 0) {
223-
int count = read(temp, 0, toskip);
224-
if (count < 0) {
225-
break;
226-
}
227-
remaining -= count;
228-
}
229-
return n - remaining;
230-
}
231-
232-
/**
233-
* @see java.io.InputStream#markSupported()
234-
*/
235-
@Override
236-
public boolean markSupported() {
237-
return true;
238-
}
239-
240180
/**
241181
* The readlimit parameter is ignored.
242182
*/
@@ -256,7 +196,7 @@ public void mark(int readlimit) {
256196
} else {
257197
log.debug(() -> "AwsChunkedEncodingInputStream marked at the start of the stream "
258198
+ "(initializing the buffer since the wrapped stream is not mark-supported).");
259-
decodedStreamBuffer = new DecodedStreamBuffer(maxBufferSize);
199+
underlyingStreamBuffer = new UnderlyingStreamBuffer(maxBufferSize);
260200
}
261201
}
262202

@@ -280,8 +220,8 @@ public void reset() throws IOException {
280220
is.reset();
281221
} else {
282222
log.debug(() -> "AwsChunkedEncodingInputStream reset (will use the buffer of the decoded stream).");
283-
Validate.notNull(decodedStreamBuffer, "Cannot reset the stream because the mark is not set.");
284-
decodedStreamBuffer.startReadBuffer();
223+
Validate.notNull(underlyingStreamBuffer, "Cannot reset the stream because the mark is not set.");
224+
underlyingStreamBuffer.startReadBuffer();
285225
}
286226
isAtStart = true;
287227
isTerminating = false;
@@ -298,14 +238,14 @@ private boolean setUpNextChunk() throws IOException {
298238
int chunkSizeInBytes = 0;
299239
while (chunkSizeInBytes < chunkSize) {
300240
/** Read from the buffer of the decoded stream */
301-
if (null != decodedStreamBuffer && decodedStreamBuffer.hasNext()) {
302-
chunkData[chunkSizeInBytes++] = decodedStreamBuffer.next();
241+
if (null != underlyingStreamBuffer && underlyingStreamBuffer.hasNext()) {
242+
chunkData[chunkSizeInBytes++] = underlyingStreamBuffer.next();
303243
} else { /** Read from the wrapped stream */
304244
int bytesToRead = chunkSize - chunkSizeInBytes;
305245
int count = is.read(chunkData, chunkSizeInBytes, bytesToRead);
306246
if (count != -1) {
307-
if (null != decodedStreamBuffer) {
308-
decodedStreamBuffer.buffer(chunkData, chunkSizeInBytes, count);
247+
if (null != underlyingStreamBuffer) {
248+
underlyingStreamBuffer.buffer(chunkData, chunkSizeInBytes, count);
309249
}
310250
chunkSizeInBytes += count;
311251
} else {
@@ -333,13 +273,6 @@ private boolean setUpNextChunk() throws IOException {
333273
}
334274
}
335275

336-
337-
@Override
338-
protected InputStream getWrappedInputStream() {
339-
return is;
340-
}
341-
342-
343276
/**
344277
* The final chunk.
345278
*
@@ -361,5 +294,4 @@ protected InputStream getWrappedInputStream() {
361294
* @return ChecksumChunkHeader in bytes based on the Header name field.
362295
*/
363296
protected abstract byte[] createChecksumChunkHeader();
364-
365-
}
297+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.io;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.core.io.SdkInputStream;
22+
import software.amazon.awssdk.utils.Logger;
23+
24+
/**
25+
* A wrapper of InputStream that implements streaming in chunks.
26+
*/
27+
@SdkInternalApi
28+
public abstract class AwsChunkedInputStream extends SdkInputStream {
29+
public static final int DEFAULT_CHUNK_SIZE = 128 * 1024;
30+
protected static final int SKIP_BUFFER_SIZE = 256 * 1024;
31+
protected static final Logger log = Logger.loggerFor(AwsChunkedInputStream.class);
32+
protected InputStream is;
33+
/**
34+
* Iterator on the current chunk.
35+
*/
36+
protected ChunkContentIterator currentChunkIterator;
37+
38+
/**
39+
* Iterator on the buffer of the underlying stream,
40+
* Null if the wrapped stream is marksupported,
41+
* otherwise it will be initialized when this wrapper is marked.
42+
*/
43+
protected UnderlyingStreamBuffer underlyingStreamBuffer;
44+
protected boolean isAtStart = true;
45+
protected boolean isTerminating = false;
46+
47+
@Override
48+
public int read() throws IOException {
49+
byte[] tmp = new byte[1];
50+
int count = read(tmp, 0, 1);
51+
if (count > 0) {
52+
log.debug(() -> "One byte read from the stream.");
53+
int unsignedByte = (int) tmp[0] & 0xFF;
54+
return unsignedByte;
55+
} else {
56+
return count;
57+
}
58+
}
59+
60+
@Override
61+
public long skip(long n) throws IOException {
62+
if (n <= 0) {
63+
return 0;
64+
}
65+
long remaining = n;
66+
int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n);
67+
byte[] temp = new byte[toskip];
68+
while (remaining > 0) {
69+
int count = read(temp, 0, toskip);
70+
if (count < 0) {
71+
break;
72+
}
73+
remaining -= count;
74+
}
75+
return n - remaining;
76+
}
77+
78+
/**
79+
* @see InputStream#markSupported()
80+
*/
81+
@Override
82+
public boolean markSupported() {
83+
return true;
84+
}
85+
86+
@Override
87+
protected InputStream getWrappedInputStream() {
88+
return is;
89+
}
90+
}

0 commit comments

Comments
 (0)