Skip to content

Commit bd087f9

Browse files
committed
Bugfix: s3 asynchronous client GetBucketPolicy now works and does not always through an XML parsing error
1 parent 308c95a commit bd087f9

File tree

18 files changed

+878
-16
lines changed

18 files changed

+878
-16
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon S3",
4+
"description": "Fixed bug prevent GetBucketBolicy from ever being successful using the asynchronous S3 client."
5+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/interceptor/ExecutionInterceptorChain.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
package software.amazon.awssdk.core.interceptor;
1717

1818
import java.io.InputStream;
19+
import java.nio.ByteBuffer;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.Objects;
2223
import java.util.function.Consumer;
24+
25+
import org.reactivestreams.Publisher;
26+
2327
import software.amazon.awssdk.annotations.SdkProtectedApi;
2428
import software.amazon.awssdk.core.SdkRequest;
2529
import software.amazon.awssdk.core.SdkResponse;
@@ -134,10 +138,15 @@ public InterceptorContext modifyHttpResponse(InterceptorContext context,
134138
public InterceptorContext modifyAsyncHttpResponse(InterceptorContext context,
135139
ExecutionAttributes executionAttributes) {
136140
InterceptorContext result = context;
141+
137142
for (int i = interceptors.size() - 1; i >= 0; i--) {
143+
ExecutionInterceptor interceptor = interceptors.get(i);
144+
145+
Publisher<ByteBuffer> newResponsePublisher =
146+
interceptor.modifyAsyncHttpResponseContent(result, executionAttributes).orElse(null);
147+
138148
result = result.toBuilder()
139-
.responsePublisher(interceptors.get(i).modifyAsyncHttpResponseContent(result, executionAttributes)
140-
.orElse(null))
149+
.responsePublisher(newResponsePublisher)
141150
.build();
142151
}
143152

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.util.function.BiFunction;
19+
20+
import org.reactivestreams.Publisher;
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.core.async.SdkPublisher;
26+
27+
/**
28+
* Publisher implementation that wraps the content of another publisher in an envelope with an optional prefix (or
29+
* header) and suffix (or footer). The prefix content will be prepended to the first published object from the
30+
* wrapped publisher, and the suffix content will be published when the wrapped publisher signals completion.
31+
* <p>
32+
* The envelope prefix will not be published until the wrapped publisher publishes something or is completed.
33+
* The envelope suffix will not be published until the wrapped publisher is completed.
34+
* <p>
35+
* This class can be used in an asynchronous interceptor in the AWS SDK to wrap content around the incoming
36+
* bytestream from a response.
37+
* <p>
38+
* A function must be supplied that can be used to concatenate the envelope content with the content being published by
39+
* the wrapped publisher. Example usage:
40+
* {@code
41+
* Publisher<String> wrappedPublisher = ContentEnvelopeWrappingPublisher.of(publisher, "S", "E", (s1, s2) -> s1 + s2);
42+
* }
43+
* If publisher publishes a single string "1", wrappedPublisher will publish "S1" (prepending the envelop prefix). If
44+
* publisher then publishes a second string "2", wrappedPublisher will then publish "2" (no added content). If
45+
* publisher then completes, wrappedPublisher will then publish "E" and then complete.
46+
* <p>
47+
* WARNING: This publisher implementation does not comply with the complete flow spec (as it inserts data into the
48+
* middle of a flow between a third-party publisher and subscriber rather than acting as a fully featured
49+
* independent publisher) and therefore should only be used in a limited fashion when we have complete control over
50+
* how data is being published to the publisher it wraps.
51+
*
52+
* @param <T> The type of objects being published
53+
*/
54+
55+
@SdkInternalApi
56+
public class EnvelopeWrappedSdkPublisher<T> implements SdkPublisher<T> {
57+
private final Publisher<T> wrappedPublisher;
58+
private final T contentPrefix;
59+
private final T contentSuffix;
60+
private final BiFunction<T, T, T> mergeContentFunction;
61+
62+
private EnvelopeWrappedSdkPublisher(Publisher<T> wrappedPublisher,
63+
T contentPrefix,
64+
T contentSuffix,
65+
BiFunction<T, T, T> mergeContentFunction) {
66+
this.wrappedPublisher = wrappedPublisher;
67+
this.contentPrefix = contentPrefix;
68+
this.contentSuffix = contentSuffix;
69+
this.mergeContentFunction = mergeContentFunction;
70+
}
71+
72+
/**
73+
* Create a new publisher that wraps the content of an existing publisher.
74+
* @param wrappedPublisher The publisher who's content will be wrapped.
75+
* @param contentPrefix The content to be inserted in front of the wrapped content.
76+
* @param contentSuffix The content to be inserted at the back of the wrapped content.
77+
* @param mergeContentFunction A function that will be used to merge the inserted content into the wrapped content.
78+
* @param <T> The content type.
79+
* @return A newly initialized instance of this class.
80+
*/
81+
public static <T> EnvelopeWrappedSdkPublisher<T> of(Publisher<T> wrappedPublisher,
82+
T contentPrefix,
83+
T contentSuffix,
84+
BiFunction<T, T, T> mergeContentFunction) {
85+
return new EnvelopeWrappedSdkPublisher<>(wrappedPublisher, contentPrefix, contentSuffix, mergeContentFunction);
86+
}
87+
88+
/**
89+
* See {@link Publisher#subscribe(Subscriber)}
90+
*/
91+
@Override
92+
public void subscribe(Subscriber<? super T> subscriber) {
93+
if (subscriber == null) {
94+
throw new NullPointerException("subscriber must be non-null on call to subscribe()");
95+
}
96+
97+
wrappedPublisher.subscribe(new ContentWrappedSubscriber(subscriber));
98+
}
99+
100+
private class ContentWrappedSubscriber implements Subscriber<T> {
101+
private final Subscriber<? super T> wrappedSubscriber;
102+
private boolean prefixApplied = false;
103+
private boolean suffixApplied = false;
104+
105+
private ContentWrappedSubscriber(Subscriber<? super T> wrappedSubscriber) {
106+
this.wrappedSubscriber = wrappedSubscriber;
107+
}
108+
109+
@Override
110+
public void onSubscribe(Subscription subscription) {
111+
wrappedSubscriber.onSubscribe(subscription);
112+
}
113+
114+
@Override
115+
public void onNext(T t) {
116+
T contentToSend = t;
117+
118+
if (!prefixApplied) {
119+
prefixApplied = true;
120+
121+
if (contentPrefix != null) {
122+
contentToSend = mergeContentFunction.apply(contentPrefix, t);
123+
}
124+
}
125+
126+
wrappedSubscriber.onNext(contentToSend);
127+
}
128+
129+
@Override
130+
public void onError(Throwable throwable) {
131+
wrappedSubscriber.onError(throwable);
132+
}
133+
134+
@Override
135+
public void onComplete() {
136+
try {
137+
// Only transmit the close of the envelope once and only if the prefix has been previously sent.
138+
if (!suffixApplied && prefixApplied) {
139+
suffixApplied = true;
140+
141+
if (contentSuffix != null) {
142+
// TODO: This should respect the demand from the subscriber as technically an onComplete
143+
// signal could be received even if there is no demand. We have minimized the impact of this
144+
// by only making it applicable in situations where there data has already been transmitted
145+
// over the stream.
146+
wrappedSubscriber.onNext(contentSuffix);
147+
}
148+
}
149+
} finally {
150+
wrappedSubscriber.onComplete();
151+
}
152+
}
153+
}
154+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.nio.charset.StandardCharsets;
20+
21+
import org.reactivestreams.Publisher;
22+
23+
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.core.async.SdkPublisher;
25+
26+
/**
27+
* Common implementations of {@link SdkPublisher} that are provided for convenience when building asynchronous
28+
* interceptors to be used with specific clients.
29+
*/
30+
@SdkInternalApi
31+
public final class SdkPublishers {
32+
private SdkPublishers() {
33+
}
34+
35+
/**
36+
* Constructs an {@link SdkPublisher} that wraps a {@link ByteBuffer} publisher and inserts additional content
37+
* that wraps the published content like an envelope. This can be used when you want to transform the content of
38+
* an asynchronous SDK response by putting it in an envelope. This publisher implementation does not comply with
39+
* the complete flow spec (as it inserts data into the middle of a flow between a third-party publisher and
40+
* subscriber rather than acting as a fully featured independent publisher) and therefore should only be used in a
41+
* limited fashion when we have complete control over how data is being published to the publisher it wraps.
42+
* @param publisher The underlying publisher to wrap the content of.
43+
* @param envelopePrefix A string representing the content to be inserted as the head of the containing envelope.
44+
* @param envelopeSuffix A string representing the content to be inserted as the tail of containing envelope.
45+
* @return An {@link SdkPublisher} that wraps the provided publisher.
46+
*/
47+
public static SdkPublisher<ByteBuffer> envelopeWrappedPublisher(Publisher<ByteBuffer> publisher,
48+
String envelopePrefix,
49+
String envelopeSuffix) {
50+
return EnvelopeWrappedSdkPublisher.of(publisher,
51+
wrapUtf8(envelopePrefix),
52+
wrapUtf8(envelopeSuffix),
53+
SdkPublishers::concat);
54+
}
55+
56+
private static ByteBuffer wrapUtf8(String s) {
57+
return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
58+
}
59+
60+
private static ByteBuffer concat(ByteBuffer b1, ByteBuffer b2) {
61+
ByteBuffer result = ByteBuffer.allocate(b1.remaining() + b2.remaining());
62+
result.put(b1);
63+
result.put(b2);
64+
result.rewind();
65+
return result;
66+
}
67+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.nio.ByteBuffer;
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
import org.junit.Test;
26+
import org.reactivestreams.Publisher;
27+
import org.reactivestreams.Subscriber;
28+
import org.reactivestreams.Subscription;
29+
30+
import software.amazon.awssdk.core.internal.async.SdkPublishers;
31+
import utils.FakePublisher;
32+
33+
public class SdkPublishersTest {
34+
@Test
35+
public void envelopeWrappedPublisher() {
36+
FakePublisher<ByteBuffer> fakePublisher = new FakePublisher<>();
37+
Publisher<ByteBuffer> wrappedPublisher =
38+
SdkPublishers.envelopeWrappedPublisher(fakePublisher, "prefix:", ":suffix");
39+
40+
FakeByteBufferSubscriber fakeSubscriber = new FakeByteBufferSubscriber();
41+
wrappedPublisher.subscribe(fakeSubscriber);
42+
fakePublisher.publish(ByteBuffer.wrap("content".getBytes(StandardCharsets.UTF_8)));
43+
fakePublisher.complete();
44+
45+
assertThat(fakeSubscriber.recordedEvents()).containsExactly("prefix:content", ":suffix");
46+
}
47+
48+
private final static class FakeByteBufferSubscriber implements Subscriber<ByteBuffer> {
49+
private final List<String> recordedEvents = new ArrayList<>();
50+
51+
@Override
52+
public void onSubscribe(Subscription s) {
53+
54+
}
55+
56+
@Override
57+
public void onNext(ByteBuffer byteBuffer) {
58+
String s = StandardCharsets.UTF_8.decode(byteBuffer).toString();
59+
recordedEvents.add(s);
60+
}
61+
62+
@Override
63+
public void onError(Throwable t) {
64+
65+
}
66+
67+
@Override
68+
public void onComplete() {
69+
70+
}
71+
72+
public List<String> recordedEvents() {
73+
return this.recordedEvents;
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)