-
Notifications
You must be signed in to change notification settings - Fork 910
Bugfix: s3 asynchronous client GetBucketPolicy now works and does not… #1586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"type": "bugfix", | ||
"category": "Amazon S3", | ||
"description": "Fixed bug prevent GetBucketBolicy from ever being successful using the asynchronous S3 client." | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
154 changes: 154 additions & 0 deletions
154
...src/main/java/software/amazon/awssdk/core/internal/async/EnvelopeWrappedSdkPublisher.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
/* | ||
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.core.internal.async; | ||
|
||
import java.util.function.BiFunction; | ||
|
||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.core.async.SdkPublisher; | ||
|
||
/** | ||
* Publisher implementation that wraps the content of another publisher in an envelope with an optional prefix (or | ||
* header) and suffix (or footer). The prefix content will be prepended to the first published object from the | ||
* wrapped publisher, and the suffix content will be published when the wrapped publisher signals completion. | ||
* <p> | ||
* The envelope prefix will not be published until the wrapped publisher publishes something or is completed. | ||
* The envelope suffix will not be published until the wrapped publisher is completed. | ||
* <p> | ||
* This class can be used in an asynchronous interceptor in the AWS SDK to wrap content around the incoming | ||
* bytestream from a response. | ||
* <p> | ||
* A function must be supplied that can be used to concatenate the envelope content with the content being published by | ||
* the wrapped publisher. Example usage: | ||
* {@code | ||
* Publisher<String> wrappedPublisher = ContentEnvelopeWrappingPublisher.of(publisher, "S", "E", (s1, s2) -> s1 + s2); | ||
* } | ||
* If publisher publishes a single string "1", wrappedPublisher will publish "S1" (prepending the envelop prefix). If | ||
* publisher then publishes a second string "2", wrappedPublisher will then publish "2" (no added content). If | ||
* publisher then completes, wrappedPublisher will then publish "E" and then complete. | ||
* <p> | ||
* WARNING: This publisher implementation does not comply with the complete flow spec (as it inserts data into the | ||
* middle of a flow between a third-party publisher and subscriber rather than acting as a fully featured | ||
* independent publisher) and therefore should only be used in a limited fashion when we have complete control over | ||
* how data is being published to the publisher it wraps. | ||
* | ||
* @param <T> The type of objects being published | ||
*/ | ||
|
||
@SdkInternalApi | ||
public class EnvelopeWrappedSdkPublisher<T> implements SdkPublisher<T> { | ||
bmaizels marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private final Publisher<T> wrappedPublisher; | ||
private final T contentPrefix; | ||
private final T contentSuffix; | ||
private final BiFunction<T, T, T> mergeContentFunction; | ||
|
||
private EnvelopeWrappedSdkPublisher(Publisher<T> wrappedPublisher, | ||
T contentPrefix, | ||
T contentSuffix, | ||
BiFunction<T, T, T> mergeContentFunction) { | ||
this.wrappedPublisher = wrappedPublisher; | ||
this.contentPrefix = contentPrefix; | ||
this.contentSuffix = contentSuffix; | ||
this.mergeContentFunction = mergeContentFunction; | ||
} | ||
|
||
/** | ||
* Create a new publisher that wraps the content of an existing publisher. | ||
* @param wrappedPublisher The publisher who's content will be wrapped. | ||
* @param contentPrefix The content to be inserted in front of the wrapped content. | ||
* @param contentSuffix The content to be inserted at the back of the wrapped content. | ||
* @param mergeContentFunction A function that will be used to merge the inserted content into the wrapped content. | ||
* @param <T> The content type. | ||
* @return A newly initialized instance of this class. | ||
*/ | ||
public static <T> EnvelopeWrappedSdkPublisher<T> of(Publisher<T> wrappedPublisher, | ||
T contentPrefix, | ||
T contentSuffix, | ||
BiFunction<T, T, T> mergeContentFunction) { | ||
return new EnvelopeWrappedSdkPublisher<>(wrappedPublisher, contentPrefix, contentSuffix, mergeContentFunction); | ||
} | ||
|
||
/** | ||
* See {@link Publisher#subscribe(Subscriber)} | ||
*/ | ||
@Override | ||
public void subscribe(Subscriber<? super T> subscriber) { | ||
if (subscriber == null) { | ||
throw new NullPointerException("subscriber must be non-null on call to subscribe()"); | ||
} | ||
|
||
wrappedPublisher.subscribe(new ContentWrappedSubscriber(subscriber)); | ||
} | ||
|
||
private class ContentWrappedSubscriber implements Subscriber<T> { | ||
private final Subscriber<? super T> wrappedSubscriber; | ||
private boolean prefixApplied = false; | ||
private boolean suffixApplied = false; | ||
|
||
private ContentWrappedSubscriber(Subscriber<? super T> wrappedSubscriber) { | ||
this.wrappedSubscriber = wrappedSubscriber; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription subscription) { | ||
wrappedSubscriber.onSubscribe(subscription); | ||
} | ||
|
||
@Override | ||
public void onNext(T t) { | ||
T contentToSend = t; | ||
|
||
if (!prefixApplied) { | ||
prefixApplied = true; | ||
|
||
if (contentPrefix != null) { | ||
contentToSend = mergeContentFunction.apply(contentPrefix, t); | ||
} | ||
} | ||
|
||
wrappedSubscriber.onNext(contentToSend); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable throwable) { | ||
wrappedSubscriber.onError(throwable); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
bmaizels marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
// Only transmit the close of the envelope once and only if the prefix has been previously sent. | ||
if (!suffixApplied && prefixApplied) { | ||
suffixApplied = true; | ||
|
||
if (contentSuffix != null) { | ||
// TODO: This should respect the demand from the subscriber as technically an onComplete | ||
// signal could be received even if there is no demand. We have minimized the impact of this | ||
// by only making it applicable in situations where there data has already been transmitted | ||
// over the stream. | ||
wrappedSubscriber.onNext(contentSuffix); | ||
} | ||
} | ||
} finally { | ||
wrappedSubscriber.onComplete(); | ||
} | ||
} | ||
} | ||
} |
67 changes: 67 additions & 0 deletions
67
core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SdkPublishers.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.core.internal.async; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.core.async.SdkPublisher; | ||
|
||
/** | ||
* Common implementations of {@link SdkPublisher} that are provided for convenience when building asynchronous | ||
* interceptors to be used with specific clients. | ||
*/ | ||
@SdkInternalApi | ||
public final class SdkPublishers { | ||
private SdkPublishers() { | ||
} | ||
|
||
/** | ||
* Constructs an {@link SdkPublisher} that wraps a {@link ByteBuffer} publisher and inserts additional content | ||
* that wraps the published content like an envelope. This can be used when you want to transform the content of | ||
* an asynchronous SDK response by putting it in an envelope. This publisher implementation does not comply with | ||
* the complete flow spec (as it inserts data into the middle of a flow between a third-party publisher and | ||
* subscriber rather than acting as a fully featured independent publisher) and therefore should only be used in a | ||
* limited fashion when we have complete control over how data is being published to the publisher it wraps. | ||
* @param publisher The underlying publisher to wrap the content of. | ||
* @param envelopePrefix A string representing the content to be inserted as the head of the containing envelope. | ||
* @param envelopeSuffix A string representing the content to be inserted as the tail of containing envelope. | ||
* @return An {@link SdkPublisher} that wraps the provided publisher. | ||
*/ | ||
public static SdkPublisher<ByteBuffer> envelopeWrappedPublisher(Publisher<ByteBuffer> publisher, | ||
String envelopePrefix, | ||
String envelopeSuffix) { | ||
return EnvelopeWrappedSdkPublisher.of(publisher, | ||
wrapUtf8(envelopePrefix), | ||
wrapUtf8(envelopeSuffix), | ||
SdkPublishers::concat); | ||
} | ||
|
||
private static ByteBuffer wrapUtf8(String s) { | ||
return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); | ||
} | ||
|
||
private static ByteBuffer concat(ByteBuffer b1, ByteBuffer b2) { | ||
ByteBuffer result = ByteBuffer.allocate(b1.remaining() + b2.remaining()); | ||
result.put(b1); | ||
result.put(b2); | ||
result.rewind(); | ||
return result; | ||
} | ||
} |
76 changes: 76 additions & 0 deletions
76
core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SdkPublishersTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.core.async; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import org.junit.Test; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
import software.amazon.awssdk.core.internal.async.SdkPublishers; | ||
import utils.FakePublisher; | ||
|
||
public class SdkPublishersTest { | ||
@Test | ||
public void envelopeWrappedPublisher() { | ||
FakePublisher<ByteBuffer> fakePublisher = new FakePublisher<>(); | ||
Publisher<ByteBuffer> wrappedPublisher = | ||
SdkPublishers.envelopeWrappedPublisher(fakePublisher, "prefix:", ":suffix"); | ||
|
||
FakeByteBufferSubscriber fakeSubscriber = new FakeByteBufferSubscriber(); | ||
wrappedPublisher.subscribe(fakeSubscriber); | ||
fakePublisher.publish(ByteBuffer.wrap("content".getBytes(StandardCharsets.UTF_8))); | ||
fakePublisher.complete(); | ||
|
||
assertThat(fakeSubscriber.recordedEvents()).containsExactly("prefix:content", ":suffix"); | ||
} | ||
|
||
private final static class FakeByteBufferSubscriber implements Subscriber<ByteBuffer> { | ||
private final List<String> recordedEvents = new ArrayList<>(); | ||
|
||
@Override | ||
public void onSubscribe(Subscription s) { | ||
|
||
} | ||
|
||
@Override | ||
public void onNext(ByteBuffer byteBuffer) { | ||
String s = StandardCharsets.UTF_8.decode(byteBuffer).toString(); | ||
recordedEvents.add(s); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
|
||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
|
||
} | ||
|
||
public List<String> recordedEvents() { | ||
return this.recordedEvents; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.