Skip to content

Commit 5a2d122

Browse files
committed
Create a configuration class for SdkPublisher#split
1 parent 3e6e70f commit 5a2d122

File tree

8 files changed

+256
-74
lines changed

8 files changed

+256
-74
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Arrays;
2626
import java.util.Optional;
2727
import java.util.concurrent.ExecutorService;
28+
import java.util.function.Consumer;
2829
import org.reactivestreams.Publisher;
2930
import org.reactivestreams.Subscriber;
3031
import software.amazon.awssdk.annotations.SdkPublicApi;
@@ -405,34 +406,36 @@ static AsyncRequestBody empty() {
405406

406407
/**
407408
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
408-
* portion of the original data, based on the configured {code chunkSizeInBytes}.
409+
* portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
410+
* is 2MB and the default buffer size is 8MB.
409411
*
410412
* <p>
411413
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
412414
* subscriber right after it's initialized.
413415
* <p>
414-
* // TODO: API Surface Area review: should we make this behavior configurable?
415416
* If content length is null, it is sent after the entire content for that chunk is buffered.
416417
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
417418
*
418-
* @param chunkSizeInBytes the size for each divided chunk. The last chunk may be smaller than the configured size.
419-
* @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
420-
* @return SplitAsyncRequestBodyResult
419+
* @see AsyncRequestBodySplitConfiguration
421420
*/
422-
default SdkPublisher<AsyncRequestBody> split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
423-
Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
424-
Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
425-
426-
if (!contentLength().isPresent()) {
427-
Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
428-
"maxMemoryUsageInBytes must be larger than or equal to " +
429-
"chunkSizeInBytes if the content length is unknown");
430-
}
421+
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
422+
Validate.notNull(splitConfiguration, "splitConfiguration");
431423

432424
return SplittingPublisher.builder()
433425
.asyncRequestBody(this)
434-
.chunkSizeInBytes(chunkSizeInBytes)
435-
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
426+
.chunkSizeInBytes(splitConfiguration.chunkSizeInBytes())
427+
.bufferSizeInBytes(splitConfiguration.bufferSizeInBytes())
436428
.build();
437429
}
430+
431+
/**
432+
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
433+
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
434+
*
435+
* @see #split(AsyncRequestBodySplitConfiguration)
436+
*/
437+
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
438+
Validate.notNull(splitConfiguration, "splitConfiguration");
439+
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
440+
}
438441
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.util.Objects;
19+
import software.amazon.awssdk.annotations.SdkPublicApi;
20+
import software.amazon.awssdk.utils.Validate;
21+
import software.amazon.awssdk.utils.builder.CopyableBuilder;
22+
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
23+
24+
/**
25+
* Configuration options for {@link AsyncRequestBody#split} to configure how the SDK
26+
* should split an {@link SdkPublisher}.
27+
*/
28+
@SdkPublicApi
29+
public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder<AsyncRequestBodySplitConfiguration.Builder,
30+
AsyncRequestBodySplitConfiguration> {
31+
private final Long chunkSizeInBytes;
32+
private final Long bufferSizeInBytes;
33+
34+
private AsyncRequestBodySplitConfiguration(DefaultBuilder builder) {
35+
this.chunkSizeInBytes = Validate.isPositiveOrNull(builder.chunkSizeInBytes, "chunkSizeInBytes");
36+
this.bufferSizeInBytes = Validate.isPositiveOrNull(builder.bufferSizeInBytes, "bufferSizeInBytes");
37+
}
38+
39+
/**
40+
* The configured chunk size for each divided {@link AsyncRequestBody}.
41+
*/
42+
public Long chunkSizeInBytes() {
43+
return chunkSizeInBytes;
44+
}
45+
46+
/**
47+
* The configured maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}.
48+
*/
49+
public Long bufferSizeInBytes() {
50+
return bufferSizeInBytes;
51+
}
52+
53+
/**
54+
* Create a {@link Builder}, used to create a {@link AsyncRequestBodySplitConfiguration}.
55+
*/
56+
public static Builder builder() {
57+
return new DefaultBuilder();
58+
}
59+
60+
@Override
61+
public boolean equals(Object o) {
62+
if (this == o) {
63+
return true;
64+
}
65+
if (o == null || getClass() != o.getClass()) {
66+
return false;
67+
}
68+
69+
AsyncRequestBodySplitConfiguration that = (AsyncRequestBodySplitConfiguration) o;
70+
71+
if (!Objects.equals(chunkSizeInBytes, that.chunkSizeInBytes)) {
72+
return false;
73+
}
74+
return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes);
75+
}
76+
77+
@Override
78+
public int hashCode() {
79+
int result = chunkSizeInBytes != null ? chunkSizeInBytes.hashCode() : 0;
80+
result = 31 * result + (bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0);
81+
return result;
82+
}
83+
84+
@Override
85+
public AsyncRequestBodySplitConfiguration.Builder toBuilder() {
86+
return new DefaultBuilder(this);
87+
}
88+
89+
public interface Builder extends CopyableBuilder<AsyncRequestBodySplitConfiguration.Builder,
90+
AsyncRequestBodySplitConfiguration> {
91+
92+
/**
93+
* Configures the size for each divided chunk. The last chunk may be smaller than the configured size. The default value
94+
* is 2MB.
95+
*
96+
* @param chunkSizeInBytes the chunk size in bytes
97+
* @return This object for method chaining.
98+
*/
99+
Builder chunkSizeInBytes(Long chunkSizeInBytes);
100+
101+
/**
102+
* The maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}. The default value
103+
* is 8MB.
104+
*
105+
* @param bufferSizeInBytes the buffer size in bytes
106+
* @return This object for method chaining.
107+
*/
108+
Builder bufferSizeInBytes(Long bufferSizeInBytes);
109+
}
110+
111+
private static final class DefaultBuilder implements Builder {
112+
private Long chunkSizeInBytes;
113+
private Long bufferSizeInBytes;
114+
115+
private DefaultBuilder(AsyncRequestBodySplitConfiguration asyncRequestBodySplitConfiguration) {
116+
this.chunkSizeInBytes = asyncRequestBodySplitConfiguration.chunkSizeInBytes;
117+
this.bufferSizeInBytes = asyncRequestBodySplitConfiguration.bufferSizeInBytes;
118+
}
119+
120+
private DefaultBuilder() {
121+
122+
}
123+
124+
@Override
125+
public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
126+
this.chunkSizeInBytes = chunkSizeInBytes;
127+
return this;
128+
}
129+
130+
@Override
131+
public Builder bufferSizeInBytes(Long bufferSizeInBytes) {
132+
this.bufferSizeInBytes = bufferSizeInBytes;
133+
return this;
134+
}
135+
136+
@Override
137+
public AsyncRequestBodySplitConfiguration build() {
138+
return new AsyncRequestBodySplitConfiguration(this);
139+
}
140+
}
141+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,25 @@
3939
@SdkInternalApi
4040
public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
4141
private static final Logger log = Logger.loggerFor(SplittingPublisher.class);
42+
private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
43+
private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4;
4244
private final AsyncRequestBody upstreamPublisher;
4345
private final SplittingSubscriber splittingSubscriber;
4446
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
4547
private final long chunkSizeInBytes;
46-
private final long maxMemoryUsageInBytes;
48+
private final long bufferSizeInBytes;
4749

4850
private SplittingPublisher(Builder builder) {
4951
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
50-
this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
52+
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
53+
this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes;
5154
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
52-
this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
55+
56+
if (!upstreamPublisher.contentLength().isPresent()) {
57+
Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes,
58+
"bufferSizeInBytes must be larger than or equal to " +
59+
"chunkSizeInBytes if the content length is unknown");
60+
}
5361
}
5462

5563
public static Builder builder() {
@@ -213,7 +221,7 @@ private void maybeRequestMoreUpstreamData() {
213221
}
214222

215223
private boolean shouldRequestMoreData(long buffered) {
216-
return buffered == 0 || buffered + byteBufferSizeHint <= maxMemoryUsageInBytes;
224+
return buffered == 0 || buffered + byteBufferSizeHint <= bufferSizeInBytes;
217225
}
218226

219227
private Long totalDataRemaining() {
@@ -289,42 +297,20 @@ private void addDataBuffered(int length) {
289297
public static final class Builder {
290298
private AsyncRequestBody asyncRequestBody;
291299
private Long chunkSizeInBytes;
292-
private Long maxMemoryUsageInBytes;
300+
private Long bufferSizeInBytes;
293301

294-
/**
295-
* Configures the asyncRequestBody to split
296-
*
297-
* @param asyncRequestBody The new asyncRequestBody value.
298-
* @return This object for method chaining.
299-
*/
300302
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
301303
this.asyncRequestBody = asyncRequestBody;
302304
return this;
303305
}
304306

305-
/**
306-
* Configures the size of the chunk for each {@link AsyncRequestBody} to publish
307-
*
308-
* @param chunkSizeInBytes The new chunkSizeInBytes value.
309-
* @return This object for method chaining.
310-
*/
311-
public Builder chunkSizeInBytes(long chunkSizeInBytes) {
307+
public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
312308
this.chunkSizeInBytes = chunkSizeInBytes;
313309
return this;
314310
}
315311

316-
/**
317-
* Sets the maximum memory usage in bytes.
318-
*
319-
* @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value.
320-
* @return This object for method chaining.
321-
*/
322-
// TODO: max memory usage might not be the best name, since we may technically go a little above this limit when we add
323-
// on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size
324-
// for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum
325-
// buffer size instead?
326-
public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
327-
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
312+
public Builder bufferSizeInBytes(Long bufferSizeInBytes) {
313+
this.bufferSizeInBytes = bufferSizeInBytes;
328314
return this;
329315
}
330316

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import nl.jqno.equalsverifier.EqualsVerifier;
22+
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.params.ParameterizedTest;
24+
import org.junit.jupiter.params.provider.ValueSource;
25+
26+
public class AsyncRequestBodyConfigurationTest {
27+
28+
@Test
29+
void equalsHashCode() {
30+
EqualsVerifier.forClass(AsyncRequestBodySplitConfiguration.class)
31+
.verify();
32+
}
33+
34+
@ParameterizedTest
35+
@ValueSource(longs = {0, -1})
36+
void nonPositiveValue_shouldThrowException(long size) {
37+
assertThatThrownBy(() ->
38+
AsyncRequestBodySplitConfiguration.builder()
39+
.chunkSizeInBytes(size)
40+
.build())
41+
.hasMessageContaining("must be positive");
42+
assertThatThrownBy(() ->
43+
AsyncRequestBodySplitConfiguration.builder()
44+
.bufferSizeInBytes(size)
45+
.build())
46+
.hasMessageContaining("must be positive");
47+
}
48+
49+
@Test
50+
void toBuilder_shouldCopyAllFields() {
51+
AsyncRequestBodySplitConfiguration config = AsyncRequestBodySplitConfiguration.builder()
52+
.bufferSizeInBytes(1L)
53+
.chunkSizeInBytes(2L)
54+
.build();
55+
56+
assertThat(config.toBuilder().build()).isEqualTo(config);
57+
}
58+
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -356,25 +356,4 @@ void publisherConstructorHasCorrectContentType() {
356356
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
357357
assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
358358
}
359-
360-
@Test
361-
public void split_nonPositiveInput_shouldThrowException() {
362-
AsyncRequestBody body = AsyncRequestBody.fromString("test");
363-
assertThatThrownBy(() -> body.split(0, 4)).hasMessageContaining("must be positive");
364-
assertThatThrownBy(() -> body.split(-1, 4)).hasMessageContaining("must be positive");
365-
assertThatThrownBy(() -> body.split(5, 0)).hasMessageContaining("must be positive");
366-
assertThatThrownBy(() -> body.split(5, -1)).hasMessageContaining("must be positive");
367-
}
368-
369-
@Test
370-
public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() {
371-
AsyncRequestBody body = AsyncRequestBody.fromPublisher(new Publisher<ByteBuffer>() {
372-
@Override
373-
public void subscribe(Subscriber<? super ByteBuffer> s) {
374-
375-
}
376-
});
377-
assertThatThrownBy(() -> body.split(10, 4))
378-
.hasMessageContaining("must be larger than or equal");
379-
}
380359
}

0 commit comments

Comments
 (0)