Skip to content

Commit 3cee287

Browse files
L-Applinzoewangg
andauthored
Java based S3 Multipart Client (#4254)
* Implement multipart upload in Java-based S3 async client (#4052) * Implement multipart upload in Java-based S3 async client Co-authored-by: Matthew Miller <[email protected]> * Iterate SdkFields to convert requests (#4177) * Iterate SdkFields to convert requests * Fix flaky test * Rename convertion utils class * Fix null content length in SplittingPublisher (#4173) * Implement multipart copy in Java-based S3 async client (#4189) * Create split method in AsyncRequestBody to return SplittingPublisher (#4188) * Create split method in AsyncRequestBody to return SplittingPublisher * Fix Javadoc and build * Add more tests with ByteArrayAsyncRequestBody (#4214) * Handle null response metadata (#4215) * Handle null response metadata * Fix build * Support streaming with unknown content length (#4226) * Support uploading with unknown content length * Refactoring * Create a configuration class for SdkPublisher#split (#4236) * S3 Multipart API implementation (#4235) * Multipart API fix merge conflicts * getObject(...) throw UnsupportedOperationException * Use user agent for all requests in MultipartS3Client * MultipartS3AsyncClient javadoc + API_NAME private * use `maximumMemoryUsageInBytes` * fix problem with UserAgent, cleanup * move contextParam keys to S3AsyncClientDecorator * javadoc * more javadoc * Use 4x part size as default apiCallBufferSize * Fix test * Guard against re-subscription in SplittingPublisher (#4253) * guard against re-subscription in SplittingPublisher * fix checkstyle * Error msg * Fix a race condition where the third upload part request was sent before the second one (#4260) --------- Co-authored-by: Zoe Wang <[email protected]>
1 parent 21bdc95 commit 3cee287

File tree

37 files changed

+3422
-344
lines changed

37 files changed

+3422
-344
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/model/config/customization/CustomizationConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ public class CustomizationConfig {
227227
*/
228228
private String asyncClientDecorator;
229229

230+
/**
231+
* Only for s3. A set of customization to related to multipart operations.
232+
*/
233+
private MultipartCustomization multipartCustomization;
234+
230235
/**
231236
* Whether to skip generating endpoint tests from endpoint-tests.json
232237
*/
@@ -665,4 +670,12 @@ public Map<String, ClientContextParam> getCustomClientContextParams() {
665670
public void setCustomClientContextParams(Map<String, ClientContextParam> customClientContextParams) {
666671
this.customClientContextParams = customClientContextParams;
667672
}
673+
674+
public MultipartCustomization getMultipartCustomization() {
675+
return this.multipartCustomization;
676+
}
677+
678+
public void setMultipartCustomization(MultipartCustomization multipartCustomization) {
679+
this.multipartCustomization = multipartCustomization;
680+
}
668681
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.codegen.model.config.customization;
17+
18+
public class MultipartCustomization {
19+
private String multipartConfigurationClass;
20+
private String multipartConfigMethodDoc;
21+
private String multipartEnableMethodDoc;
22+
private String contextParamEnabledKey;
23+
private String contextParamConfigKey;
24+
25+
public String getMultipartConfigurationClass() {
26+
return multipartConfigurationClass;
27+
}
28+
29+
public void setMultipartConfigurationClass(String multipartConfigurationClass) {
30+
this.multipartConfigurationClass = multipartConfigurationClass;
31+
}
32+
33+
public String getMultipartConfigMethodDoc() {
34+
return multipartConfigMethodDoc;
35+
}
36+
37+
public void setMultipartConfigMethodDoc(String multipartMethodDoc) {
38+
this.multipartConfigMethodDoc = multipartMethodDoc;
39+
}
40+
41+
public String getMultipartEnableMethodDoc() {
42+
return multipartEnableMethodDoc;
43+
}
44+
45+
public void setMultipartEnableMethodDoc(String multipartEnableMethodDoc) {
46+
this.multipartEnableMethodDoc = multipartEnableMethodDoc;
47+
}
48+
49+
public String getContextParamEnabledKey() {
50+
return contextParamEnabledKey;
51+
}
52+
53+
public void setContextParamEnabledKey(String contextParamEnabledKey) {
54+
this.contextParamEnabledKey = contextParamEnabledKey;
55+
}
56+
57+
public String getContextParamConfigKey() {
58+
return contextParamConfigKey;
59+
}
60+
61+
public void setContextParamConfigKey(String contextParamConfigKey) {
62+
this.contextParamConfigKey = contextParamConfigKey;
63+
}
64+
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/ClassSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Collections;
2121

2222
/**
23-
* Represents the a Poet generated class
23+
* Represents a Poet generated class
2424
*/
2525
public interface ClassSpec {
2626

codegen/src/main/java/software/amazon/awssdk/codegen/poet/builder/AsyncClientBuilderClass.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
import com.squareup.javapoet.ClassName;
1919
import com.squareup.javapoet.MethodSpec;
20+
import com.squareup.javapoet.ParameterSpec;
2021
import com.squareup.javapoet.ParameterizedTypeName;
2122
import com.squareup.javapoet.TypeSpec;
2223
import java.net.URI;
2324
import javax.lang.model.element.Modifier;
2425
import software.amazon.awssdk.annotations.SdkInternalApi;
2526
import software.amazon.awssdk.auth.token.credentials.SdkTokenProvider;
2627
import software.amazon.awssdk.awscore.client.config.AwsClientOption;
28+
import software.amazon.awssdk.codegen.model.config.customization.MultipartCustomization;
2729
import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel;
2830
import software.amazon.awssdk.codegen.poet.ClassSpec;
2931
import software.amazon.awssdk.codegen.poet.PoetExtension;
@@ -59,12 +61,12 @@ public AsyncClientBuilderClass(IntermediateModel model) {
5961
@Override
6062
public TypeSpec poetSpec() {
6163
TypeSpec.Builder builder =
62-
PoetUtils.createClassBuilder(builderClassName)
63-
.addAnnotation(SdkInternalApi.class)
64-
.addModifiers(Modifier.FINAL)
65-
.superclass(ParameterizedTypeName.get(builderBaseClassName, builderInterfaceName, clientInterfaceName))
66-
.addSuperinterface(builderInterfaceName)
67-
.addJavadoc("Internal implementation of {@link $T}.", builderInterfaceName);
64+
PoetUtils.createClassBuilder(builderClassName)
65+
.addAnnotation(SdkInternalApi.class)
66+
.addModifiers(Modifier.FINAL)
67+
.superclass(ParameterizedTypeName.get(builderBaseClassName, builderInterfaceName, clientInterfaceName))
68+
.addSuperinterface(builderInterfaceName)
69+
.addJavadoc("Internal implementation of {@link $T}.", builderInterfaceName);
6870

6971
if (model.getEndpointOperation().isPresent()) {
7072
builder.addMethod(endpointDiscoveryEnabled());
@@ -80,6 +82,12 @@ public TypeSpec poetSpec() {
8082
builder.addMethod(bearerTokenProviderMethod());
8183
}
8284

85+
MultipartCustomization multipartCustomization = model.getCustomizationConfig().getMultipartCustomization();
86+
if (multipartCustomization != null) {
87+
builder.addMethod(multipartEnabledMethod(multipartCustomization));
88+
builder.addMethod(multipartConfigMethods(multipartCustomization));
89+
}
90+
8391
builder.addMethod(buildClientMethod());
8492
builder.addMethod(initializeServiceClientConfigMethod());
8593

@@ -124,15 +132,15 @@ private MethodSpec endpointProviderMethod() {
124132

125133
private MethodSpec buildClientMethod() {
126134
MethodSpec.Builder builder = MethodSpec.methodBuilder("buildClient")
127-
.addAnnotation(Override.class)
128-
.addModifiers(Modifier.PROTECTED, Modifier.FINAL)
129-
.returns(clientInterfaceName)
130-
.addStatement("$T clientConfiguration = super.asyncClientConfiguration()",
131-
SdkClientConfiguration.class).addStatement("this.validateClientOptions"
132-
+ "(clientConfiguration)")
133-
.addStatement("$T serviceClientConfiguration = initializeServiceClientConfig"
134-
+ "(clientConfiguration)",
135-
serviceConfigClassName);
135+
.addAnnotation(Override.class)
136+
.addModifiers(Modifier.PROTECTED, Modifier.FINAL)
137+
.returns(clientInterfaceName)
138+
.addStatement("$T clientConfiguration = super.asyncClientConfiguration()",
139+
SdkClientConfiguration.class)
140+
.addStatement("this.validateClientOptions(clientConfiguration)")
141+
.addStatement("$T serviceClientConfiguration = initializeServiceClientConfig"
142+
+ "(clientConfiguration)",
143+
serviceConfigClassName);
136144

137145
builder.addStatement("$1T client = new $2T(serviceClientConfiguration, clientConfiguration)",
138146
clientInterfaceName, clientClassName);
@@ -156,6 +164,32 @@ private MethodSpec bearerTokenProviderMethod() {
156164
.build();
157165
}
158166

167+
private MethodSpec multipartEnabledMethod(MultipartCustomization multipartCustomization) {
168+
return MethodSpec.methodBuilder("multipartEnabled")
169+
.addAnnotation(Override.class)
170+
.addModifiers(Modifier.PUBLIC)
171+
.returns(builderInterfaceName)
172+
.addParameter(Boolean.class, "enabled")
173+
.addStatement("clientContextParams.put($N, enabled)",
174+
multipartCustomization.getContextParamEnabledKey())
175+
.addStatement("return this")
176+
.build();
177+
}
178+
179+
private MethodSpec multipartConfigMethods(MultipartCustomization multipartCustomization) {
180+
ClassName mulitpartConfigClassName =
181+
PoetUtils.classNameFromFqcn(multipartCustomization.getMultipartConfigurationClass());
182+
return MethodSpec.methodBuilder("multipartConfiguration")
183+
.addAnnotation(Override.class)
184+
.addModifiers(Modifier.PUBLIC)
185+
.addParameter(ParameterSpec.builder(mulitpartConfigClassName, "multipartConfig").build())
186+
.returns(builderInterfaceName)
187+
.addStatement("clientContextParams.put($N, multipartConfig)",
188+
multipartCustomization.getContextParamConfigKey())
189+
.addStatement("return this")
190+
.build();
191+
}
192+
159193
private MethodSpec initializeServiceClientConfigMethod() {
160194
return MethodSpec.methodBuilder("initializeServiceClientConfig").addModifiers(Modifier.PRIVATE)
161195
.addParameter(SdkClientConfiguration.class, "clientConfig")

codegen/src/main/java/software/amazon/awssdk/codegen/poet/builder/AsyncClientBuilderInterface.java

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,97 @@
1717

1818
import com.squareup.javapoet.ClassName;
1919
import com.squareup.javapoet.CodeBlock;
20+
import com.squareup.javapoet.MethodSpec;
21+
import com.squareup.javapoet.ParameterSpec;
2022
import com.squareup.javapoet.ParameterizedTypeName;
2123
import com.squareup.javapoet.TypeSpec;
24+
import java.util.function.Consumer;
25+
import javax.lang.model.element.Modifier;
2226
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
27+
import software.amazon.awssdk.codegen.model.config.customization.MultipartCustomization;
2328
import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel;
2429
import software.amazon.awssdk.codegen.poet.ClassSpec;
2530
import software.amazon.awssdk.codegen.poet.PoetUtils;
31+
import software.amazon.awssdk.utils.Logger;
32+
import software.amazon.awssdk.utils.Validate;
2633

2734
public class AsyncClientBuilderInterface implements ClassSpec {
35+
private static final Logger log = Logger.loggerFor(AsyncClientBuilderInterface.class);
36+
2837
private final ClassName builderInterfaceName;
2938
private final ClassName clientInterfaceName;
3039
private final ClassName baseBuilderInterfaceName;
40+
private final IntermediateModel model;
3141

3242
public AsyncClientBuilderInterface(IntermediateModel model) {
3343
String basePackage = model.getMetadata().getFullClientPackageName();
3444
this.clientInterfaceName = ClassName.get(basePackage, model.getMetadata().getAsyncInterface());
3545
this.builderInterfaceName = ClassName.get(basePackage, model.getMetadata().getAsyncBuilderInterface());
3646
this.baseBuilderInterfaceName = ClassName.get(basePackage, model.getMetadata().getBaseBuilderInterface());
47+
this.model = model;
3748
}
3849

3950
@Override
4051
public TypeSpec poetSpec() {
41-
return PoetUtils.createInterfaceBuilder(builderInterfaceName)
42-
.addSuperinterface(ParameterizedTypeName.get(ClassName.get(AwsAsyncClientBuilder.class),
43-
builderInterfaceName, clientInterfaceName))
44-
.addSuperinterface(ParameterizedTypeName.get(baseBuilderInterfaceName,
45-
builderInterfaceName, clientInterfaceName))
46-
.addJavadoc(getJavadoc())
47-
.build();
52+
TypeSpec.Builder builder = PoetUtils
53+
.createInterfaceBuilder(builderInterfaceName)
54+
.addSuperinterface(ParameterizedTypeName.get(ClassName.get(AwsAsyncClientBuilder.class),
55+
builderInterfaceName, clientInterfaceName))
56+
.addSuperinterface(ParameterizedTypeName.get(baseBuilderInterfaceName,
57+
builderInterfaceName, clientInterfaceName))
58+
.addJavadoc(getJavadoc());
59+
60+
MultipartCustomization multipartCustomization = model.getCustomizationConfig().getMultipartCustomization();
61+
if (multipartCustomization != null) {
62+
includeMultipartMethod(builder, multipartCustomization);
63+
}
64+
return builder.build();
65+
}
66+
67+
private void includeMultipartMethod(TypeSpec.Builder builder, MultipartCustomization multipartCustomization) {
68+
log.debug(() -> String.format("Adding multipart config methods to builder interface for service '%s'",
69+
model.getMetadata().getServiceId()));
70+
71+
// .multipartEnabled(Boolean)
72+
builder.addMethod(
73+
MethodSpec.methodBuilder("multipartEnabled")
74+
.addModifiers(Modifier.DEFAULT, Modifier.PUBLIC)
75+
.returns(builderInterfaceName)
76+
.addParameter(Boolean.class, "enabled")
77+
.addCode("throw new $T();", UnsupportedOperationException.class)
78+
.addJavadoc(CodeBlock.of(multipartCustomization.getMultipartEnableMethodDoc()))
79+
.build());
80+
81+
// .multipartConfiguration(MultipartConfiguration)
82+
String multiPartConfigMethodName = "multipartConfiguration";
83+
String multipartConfigClass = Validate.notNull(multipartCustomization.getMultipartConfigurationClass(),
84+
"'multipartConfigurationClass' must be defined");
85+
ClassName mulitpartConfigClassName = PoetUtils.classNameFromFqcn(multipartConfigClass);
86+
builder.addMethod(
87+
MethodSpec.methodBuilder(multiPartConfigMethodName)
88+
.addModifiers(Modifier.DEFAULT, Modifier.PUBLIC)
89+
.returns(builderInterfaceName)
90+
.addParameter(ParameterSpec.builder(mulitpartConfigClassName, "multipartConfiguration").build())
91+
.addCode("throw new $T();", UnsupportedOperationException.class)
92+
.addJavadoc(CodeBlock.of(multipartCustomization.getMultipartConfigMethodDoc()))
93+
.build());
94+
95+
// .multipartConfiguration(Consumer<MultipartConfiguration>)
96+
ClassName mulitpartConfigBuilderClassName = PoetUtils.classNameFromFqcn(multipartConfigClass + ".Builder");
97+
ParameterizedTypeName consumerBuilderType = ParameterizedTypeName.get(ClassName.get(Consumer.class),
98+
mulitpartConfigBuilderClassName);
99+
builder.addMethod(
100+
MethodSpec.methodBuilder(multiPartConfigMethodName)
101+
.addModifiers(Modifier.DEFAULT, Modifier.PUBLIC)
102+
.returns(builderInterfaceName)
103+
.addParameter(ParameterSpec.builder(consumerBuilderType, "multipartConfiguration").build())
104+
.addStatement("$T builder = $T.builder()",
105+
mulitpartConfigBuilderClassName,
106+
mulitpartConfigClassName)
107+
.addStatement("multipartConfiguration.accept(builder)")
108+
.addStatement("return multipartConfiguration(builder.build())")
109+
.addJavadoc(CodeBlock.of(multipartCustomization.getMultipartConfigMethodDoc()))
110+
.build());
48111
}
49112

50113
@Override

core/aws-core/src/main/java/software/amazon/awssdk/awscore/AwsResponseMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;
1919

2020
import java.util.Collections;
21+
import java.util.HashMap;
2122
import java.util.Map;
2223
import java.util.Objects;
2324
import java.util.Optional;
@@ -48,7 +49,7 @@ protected AwsResponseMetadata(Map<String, String> metadata) {
4849
}
4950

5051
protected AwsResponseMetadata(AwsResponseMetadata responseMetadata) {
51-
this(responseMetadata.metadata);
52+
this(responseMetadata == null ? new HashMap<>() : responseMetadata.metadata);
5253
}
5354

5455
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@
2525
import java.util.Arrays;
2626
import java.util.Optional;
2727
import java.util.concurrent.ExecutorService;
28+
import java.util.function.Consumer;
2829
import org.reactivestreams.Publisher;
2930
import org.reactivestreams.Subscriber;
3031
import software.amazon.awssdk.annotations.SdkPublicApi;
3132
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
3233
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3334
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
35+
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
3436
import software.amazon.awssdk.core.internal.util.Mimetype;
3537
import software.amazon.awssdk.utils.BinaryUtils;
38+
import software.amazon.awssdk.utils.Validate;
3639

3740
/**
3841
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
@@ -399,4 +402,40 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
399402
static AsyncRequestBody empty() {
400403
return fromBytes(new byte[0]);
401404
}
405+
406+
407+
/**
408+
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
409+
* portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
410+
* is 2MB and the default buffer size is 8MB.
411+
*
412+
* <p>
413+
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
414+
* subscriber right after it's initialized.
415+
* <p>
416+
* If content length is null, it is sent after the entire content for that chunk is buffered.
417+
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
418+
*
419+
* @see AsyncRequestBodySplitConfiguration
420+
*/
421+
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
422+
Validate.notNull(splitConfiguration, "splitConfiguration");
423+
424+
return SplittingPublisher.builder()
425+
.asyncRequestBody(this)
426+
.chunkSizeInBytes(splitConfiguration.chunkSizeInBytes())
427+
.bufferSizeInBytes(splitConfiguration.bufferSizeInBytes())
428+
.build();
429+
}
430+
431+
/**
432+
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
433+
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
434+
*
435+
* @see #split(AsyncRequestBodySplitConfiguration)
436+
*/
437+
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
438+
Validate.notNull(splitConfiguration, "splitConfiguration");
439+
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
440+
}
402441
}

0 commit comments

Comments
 (0)