Skip to content

Commit d099622

Browse files
committed
Bugfix: s3 asynchronous client GetBucketPolicy now works and does not always through an XML parsing error
1 parent 308c95a commit d099622

File tree

18 files changed

+925
-16
lines changed

18 files changed

+925
-16
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon S3",
4+
"description": "Fixed bug prevent GetBucketBolicy from ever being successful using the asynchronous S3 client."
5+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.nio.charset.StandardCharsets;
20+
21+
import org.reactivestreams.Publisher;
22+
23+
import software.amazon.awssdk.annotations.SdkProtectedApi;
24+
import software.amazon.awssdk.core.internal.async.EnvelopeWrappedSdkPublisher;
25+
26+
/**
27+
* Common implementations of {@link SdkPublisher} that are provided for convenience when building asynchronous
28+
* interceptors to be used with specific clients.
29+
*/
30+
@SdkProtectedApi
31+
public final class SdkPublishers {
32+
private SdkPublishers() {
33+
}
34+
35+
/**
36+
* Constructs an {@link SdkPublisher} that wraps a {@link ByteBuffer} publisher and inserts additional content
37+
* that wraps the published content like an envelope. This can be used when you want to transform the content of
38+
* an asynchronous SDK response by putting it in an envelope.
39+
* @param publisher The underlying publisher to wrap the content of.
40+
* @param envelopePrefix A string representing the content to be inserted as the head of the containing envelope.
41+
* @param envelopeSuffix A string representing the content to be inserted as the tail of containing envelope.
42+
* @return An {@link SdkPublisher} that wraps the provided publisher.
43+
*/
44+
public static SdkPublisher<ByteBuffer> envelopeWrappedPublisher(Publisher<ByteBuffer> publisher,
45+
String envelopePrefix,
46+
String envelopeSuffix) {
47+
return EnvelopeWrappedSdkPublisher.of(publisher,
48+
wrap(envelopePrefix),
49+
wrap(envelopeSuffix),
50+
SdkPublishers::concat);
51+
}
52+
53+
private static ByteBuffer wrap(String s) {
54+
return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
55+
}
56+
57+
private static ByteBuffer concat(ByteBuffer b1, ByteBuffer b2) {
58+
ByteBuffer result = ByteBuffer.allocate(b1.limit() + b2.limit());
59+
result.put(b1);
60+
result.put(b2);
61+
result.rewind();
62+
return result;
63+
}
64+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/interceptor/ExecutionInterceptorChain.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
package software.amazon.awssdk.core.interceptor;
1717

1818
import java.io.InputStream;
19+
import java.nio.ByteBuffer;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.Objects;
2223
import java.util.function.Consumer;
24+
25+
import org.reactivestreams.Publisher;
26+
2327
import software.amazon.awssdk.annotations.SdkProtectedApi;
2428
import software.amazon.awssdk.core.SdkRequest;
2529
import software.amazon.awssdk.core.SdkResponse;
@@ -134,10 +138,15 @@ public InterceptorContext modifyHttpResponse(InterceptorContext context,
134138
public InterceptorContext modifyAsyncHttpResponse(InterceptorContext context,
135139
ExecutionAttributes executionAttributes) {
136140
InterceptorContext result = context;
141+
137142
for (int i = interceptors.size() - 1; i >= 0; i--) {
143+
ExecutionInterceptor interceptor = interceptors.get(i);
144+
145+
Publisher<ByteBuffer> newResponsePublisher =
146+
interceptor.modifyAsyncHttpResponseContent(result, executionAttributes).orElse(null);
147+
138148
result = result.toBuilder()
139-
.responsePublisher(interceptors.get(i).modifyAsyncHttpResponseContent(result, executionAttributes)
140-
.orElse(null))
149+
.responsePublisher(newResponsePublisher)
141150
.build();
142151
}
143152

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import java.util.function.BiFunction;
20+
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
import software.amazon.awssdk.annotations.ThreadSafe;
27+
import software.amazon.awssdk.core.async.SdkPublisher;
28+
29+
/**
30+
* Publisher implementation that wraps the content of another publisher in an envelope with an optional prefix (or
31+
* header) and suffix (or footer). The prefix content will be prepended to the first published object from the
32+
* wrapped publisher, and the suffix content will be published when the wrapped publisher signals completion.
33+
* <p>
34+
* The envelope prefix will not be published until the wrapped publisher publishes something or is completed.
35+
* The envelope suffix will not be published until the wrapped publisher is completed.
36+
* <p>
37+
* This class can be used in an asynchronous interceptor in the AWS SDK to wrap content around the incoming
38+
* bytestream from a response.
39+
* <p>
40+
* A function must be supplied that can be used to concatinate the envelope content to the content being published by
41+
* the wrapped publisher. Example usage:
42+
* {@code
43+
* Publisher<String> wrappedPublisher = ContentEnvelopeWrappingPublisher.of(publisher, "S", "E", (s1, s2) -> s1 + s2);
44+
* }
45+
* If publisher publishes a single string "1", wrappedPublisher will publish "S1" (prepending the envelop prefix). If
46+
* publisher then publishes a second string "2", wrappedPublisher will then publish "2" (no added content). If
47+
* publisher then completes, wrappedPublisher will then publish "E" and then complete.
48+
*
49+
* @param <T> The type of objects being published
50+
*/
51+
52+
@ThreadSafe
53+
@SdkInternalApi
54+
public class EnvelopeWrappedSdkPublisher<T> implements SdkPublisher<T> {
55+
private final Publisher<T> wrappedPublisher;
56+
private final T contentPrefix;
57+
private final T contentSuffix;
58+
private final BiFunction<T, T, T> mergeContentFunction;
59+
60+
private EnvelopeWrappedSdkPublisher(Publisher<T> wrappedPublisher,
61+
T contentPrefix,
62+
T contentSuffix,
63+
BiFunction<T, T, T> mergeContentFunction) {
64+
this.wrappedPublisher = wrappedPublisher;
65+
this.contentPrefix = contentPrefix;
66+
this.contentSuffix = contentSuffix;
67+
this.mergeContentFunction = mergeContentFunction;
68+
}
69+
70+
/**
71+
* Create a new publisher that wraps the content of an existing publisher.
72+
* @param wrappedPublisher The publisher who's content will be wrapped.
73+
* @param contentPrefix The content to be inserted in front of the wrapped content.
74+
* @param contentSuffix The content to be inserted at the back of the wrapped content.
75+
* @param mergeContentFunction A function that will be used to merge the inserted content into the wrapped content.
76+
* @param <T> The content type.
77+
* @return A newly initialized instance of this class.
78+
*/
79+
public static <T> EnvelopeWrappedSdkPublisher<T> of(Publisher<T> wrappedPublisher,
80+
T contentPrefix,
81+
T contentSuffix,
82+
BiFunction<T, T, T> mergeContentFunction) {
83+
return new EnvelopeWrappedSdkPublisher<>(wrappedPublisher, contentPrefix, contentSuffix, mergeContentFunction);
84+
}
85+
86+
/**
87+
* See {@link Publisher#subscribe(Subscriber)}
88+
*/
89+
@Override
90+
public void subscribe(Subscriber<? super T> subscriber) {
91+
wrappedPublisher.subscribe(new ContentWrappedSubscriber(subscriber));
92+
}
93+
94+
private class ContentWrappedSubscriber implements Subscriber<T> {
95+
private final Subscriber<? super T> wrappedSubscriber;
96+
private final AtomicBoolean prefixApplied = new AtomicBoolean(false);
97+
private final AtomicBoolean suffixApplied = new AtomicBoolean(false);
98+
99+
private ContentWrappedSubscriber(Subscriber<? super T> wrappedSubscriber) {
100+
this.wrappedSubscriber = wrappedSubscriber;
101+
}
102+
103+
@Override
104+
public void onSubscribe(Subscription subscription) {
105+
wrappedSubscriber.onSubscribe(subscription);
106+
}
107+
108+
@Override
109+
public void onNext(T t) {
110+
if (contentPrefix != null && prefixApplied.compareAndSet(false, true)) {
111+
wrappedSubscriber.onNext(mergeContentFunction.apply(contentPrefix, t));
112+
} else {
113+
wrappedSubscriber.onNext(t);
114+
}
115+
}
116+
117+
@Override
118+
public void onError(Throwable throwable) {
119+
wrappedSubscriber.onError(throwable);
120+
}
121+
122+
@Override
123+
public void onComplete() {
124+
try {
125+
// In the event onComplete() is called multiple times, only transmit the envelope once
126+
if (suffixApplied.compareAndSet(false, true)) {
127+
T mergedContent = null;
128+
129+
// Handle the case where the prefix was never applied because no events were ever published
130+
if (prefixApplied.compareAndSet(false, true)) {
131+
mergedContent = contentPrefix;
132+
}
133+
134+
if (contentSuffix != null) {
135+
if (mergedContent == null) {
136+
mergedContent = contentSuffix;
137+
} else {
138+
mergedContent = mergeContentFunction.apply(mergedContent, contentSuffix);
139+
}
140+
}
141+
142+
if (mergedContent != null) {
143+
wrappedSubscriber.onNext(mergedContent);
144+
}
145+
}
146+
} finally {
147+
wrappedSubscriber.onComplete();
148+
}
149+
}
150+
}
151+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.nio.ByteBuffer;
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
import org.junit.Test;
26+
import org.reactivestreams.Publisher;
27+
import org.reactivestreams.Subscriber;
28+
import org.reactivestreams.Subscription;
29+
30+
import utils.FakePublisher;
31+
32+
public class SdkPublishersTest {
33+
@Test
34+
public void envelopeWrappedPublisher() {
35+
FakePublisher<ByteBuffer> fakePublisher = new FakePublisher<>();
36+
Publisher<ByteBuffer> wrappedPublisher =
37+
SdkPublishers.envelopeWrappedPublisher(fakePublisher, "prefix:", ":suffix");
38+
39+
FakeByteBufferSubscriber fakeSubscriber = new FakeByteBufferSubscriber();
40+
wrappedPublisher.subscribe(fakeSubscriber);
41+
fakePublisher.publish(ByteBuffer.wrap("content".getBytes(StandardCharsets.UTF_8)));
42+
fakePublisher.complete();
43+
44+
assertThat(fakeSubscriber.recordedEvents()).containsExactly("prefix:content", ":suffix");
45+
}
46+
47+
private final static class FakeByteBufferSubscriber implements Subscriber<ByteBuffer> {
48+
private final List<String> recordedEvents = new ArrayList<>();
49+
50+
@Override
51+
public void onSubscribe(Subscription s) {
52+
53+
}
54+
55+
@Override
56+
public void onNext(ByteBuffer byteBuffer) {
57+
String s = StandardCharsets.UTF_8.decode(byteBuffer).toString();
58+
recordedEvents.add(s);
59+
}
60+
61+
@Override
62+
public void onError(Throwable t) {
63+
64+
}
65+
66+
@Override
67+
public void onComplete() {
68+
69+
}
70+
71+
public List<String> recordedEvents() {
72+
return this.recordedEvents;
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)