Skip to content

Commit 37c1c83

Browse files
author
Bennett Lynch
committed
Remove default wrapWithListener methods & make SdkProtectedApi
1 parent 02185c2 commit 37c1c83

File tree

7 files changed

+64
-72
lines changed

7 files changed

+64
-72
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import software.amazon.awssdk.annotations.SdkPublicApi;
2323
import software.amazon.awssdk.core.ResponseBytes;
2424
import software.amazon.awssdk.core.SdkResponse;
25-
import software.amazon.awssdk.core.async.listen.AsyncResponseTransformerListener;
2625
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
2726
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
2827
import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer;
@@ -109,14 +108,6 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> {
109108
*/
110109
void exceptionOccurred(Throwable error);
111110

112-
/**
113-
* Wrap this {@link AsyncResponseTransformer} with a new one that will notify a {@link AsyncResponseTransformerListener} of
114-
* important events occurring.
115-
*/
116-
default AsyncResponseTransformer<ResponseT, ResultT> wrapWithListener(AsyncResponseTransformerListener<ResponseT> listener) {
117-
return AsyncResponseTransformerListener.wrap(this, listener);
118-
}
119-
120111
/**
121112
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
122113
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SdkPublisher.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,15 @@ default SdkPublisher<List<T>> buffer(int bufferSize) {
117117
default SdkPublisher<T> limit(int limit) {
118118
return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit));
119119
}
120-
120+
121121
/**
122122
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}.
123123
*
124124
* @param afterOnComplete The logic that should be run immediately after onComplete.
125125
* @return New publisher that invokes the requested callback.
126126
*/
127127
default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) {
128-
return wrapWithListener(new PublisherListener<T>() {
128+
return PublisherListener.wrap(this, new PublisherListener<T>() {
129129
@Override
130130
public void subscriberOnComplete() {
131131
afterOnComplete.run();
@@ -140,7 +140,7 @@ public void subscriberOnComplete() {
140140
* @return New publisher that invokes the requested callback.
141141
*/
142142
default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) {
143-
return wrapWithListener(new PublisherListener<T>() {
143+
return PublisherListener.wrap(this, new PublisherListener<T>() {
144144
@Override
145145
public void subscriberOnError(Throwable t) {
146146
afterOnError.accept(t);
@@ -155,7 +155,7 @@ public void subscriberOnError(Throwable t) {
155155
* @return New publisher that invokes the requested callback.
156156
*/
157157
default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) {
158-
return wrapWithListener(new PublisherListener<T>() {
158+
return PublisherListener.wrap(this, new PublisherListener<T>() {
159159
@Override
160160
public void subscriptionCancel() {
161161
afterOnCancel.run();
@@ -177,10 +177,4 @@ default CompletableFuture<Void> subscribe(Consumer<T> consumer) {
177177
return future;
178178
}
179179

180-
/**
181-
* Wrap this {@link SdkPublisher} with a new one that will notify a {@link PublisherListener} of important events occurring.
182-
*/
183-
default SdkPublisher<T> wrapWithListener(PublisherListener<T> listener) {
184-
return PublisherListener.wrap(this, listener);
185-
}
186180
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listen/AsyncResponseTransformerListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
import software.amazon.awssdk.utils.Validate;
3030

3131
/**
32-
* Listener interface that invokes callbacks associated with this {@link AsyncResponseTransformer} and any resulting {@link
32+
* Listener interface that invokes callbacks associated with a {@link AsyncResponseTransformer} and any resulting {@link
3333
* SdkPublisher} and {@link Subscriber}.
3434
*
3535
* @see PublisherListener
3636
* @see SubscriberListener
3737
*/
38+
@SdkProtectedApi
3839
public interface AsyncResponseTransformerListener<ResponseT> extends PublisherListener<ByteBuffer> {
3940

4041
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listen/PublisherListener.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828
import software.amazon.awssdk.utils.Validate;
2929

3030
/**
31-
* Listener interface that invokes callbacks associated with this {@link Publisher} and any resulting {@link Subscriber}.
31+
* Listener interface that invokes callbacks associated with a {@link Publisher} and any resulting {@link Subscriber}.
3232
*
3333
* @see AsyncResponseTransformerListener
3434
* @see SubscriberListener
3535
*/
36+
@SdkProtectedApi
3637
public interface PublisherListener<T> extends SubscriberListener<T> {
3738
/**
3839
* Invoked after {@link Publisher#subscribe(Subscriber)}
@@ -41,7 +42,7 @@ default void publisherSubscribe(Subscriber<? super T> subscriber) {
4142
}
4243

4344
/**
44-
* Wrap this {@link SdkPublisher} with a new one that will notify a {@link PublisherListener} of important events occurring.
45+
* Wrap a {@link SdkPublisher} with a new one that will notify a {@link PublisherListener} of important events occurring.
4546
*/
4647
static <T> SdkPublisher<T> wrap(SdkPublisher<T> delegate, PublisherListener<T> listener) {
4748
return new NotifyingPublisher<>(delegate, listener);

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listen/SubscriberListener.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@
2626
import software.amazon.awssdk.utils.Validate;
2727

2828
/**
29-
* Listener interface that invokes callbacks associated with this {@link Subscriber}.
29+
* Listener interface that invokes callbacks associated with a {@link Subscriber}.
3030
*
3131
* @see AsyncResponseTransformerListener
3232
* @see PublisherListener
3333
*/
34+
@SdkProtectedApi
3435
public interface SubscriberListener<T> {
3536
/**
3637
* Invoked after {@link Subscriber#onNext(Object)}
@@ -57,7 +58,7 @@ default void subscriptionCancel() {
5758
}
5859

5960
/**
60-
* Wrap this {@link Subscriber} with a new one that will notify a {@link SubscriberListener} of important events occurring.
61+
* Wrap a {@link Subscriber} with a new one that will notify a {@link SubscriberListener} of important events occurring.
6162
*/
6263
static <T> Subscriber<T> wrap(Subscriber<? super T> delegate, SubscriberListener<? super T> listener) {
6364
return new NotifyingSubscriber<>(delegate, listener);
@@ -108,7 +109,7 @@ final class NotifyingSubscription implements Subscription {
108109
private final Subscription delegateSubscription;
109110

110111
NotifyingSubscription(Subscription delegateSubscription) {
111-
this.delegateSubscription = Validate.notNull(delegateSubscription, "delegate");
112+
this.delegateSubscription = Validate.notNull(delegateSubscription, "delegateSubscription");
112113
}
113114

114115
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listen/TmpUtil.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,24 @@ public class TmpUtil {
3131
public static <A, B> Pair<AsyncResponseTransformer<A, B>, CompletableFuture<Void>> wrapWithEndOfStreamFuture(
3232
AsyncResponseTransformer<A, B> responseTransformer) {
3333
CompletableFuture<Void> future = new CompletableFuture<>();
34-
AsyncResponseTransformer<A, B> wrapped = responseTransformer.wrapWithListener(new AsyncResponseTransformerListener<A>() {
35-
@Override
36-
public void transformerExceptionOccurred(Throwable t) {
37-
future.completeExceptionally(t);
38-
}
34+
AsyncResponseTransformer<A, B> wrapped = AsyncResponseTransformerListener.wrap(
35+
responseTransformer,
36+
new AsyncResponseTransformerListener<A>() {
37+
@Override
38+
public void transformerExceptionOccurred(Throwable t) {
39+
future.completeExceptionally(t);
40+
}
3941

40-
@Override
41-
public void subscriberOnError(Throwable t) {
42-
future.completeExceptionally(t);
43-
}
42+
@Override
43+
public void subscriberOnError(Throwable t) {
44+
future.completeExceptionally(t);
45+
}
4446

45-
@Override
46-
public void subscriberOnComplete() {
47-
future.complete(null);
48-
}
49-
});
47+
@Override
48+
public void subscriberOnComplete() {
49+
future.complete(null);
50+
}
51+
});
5052
return Pair.of(wrapped, future);
5153
}
5254
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -87,45 +87,47 @@ public void beforeOnNext(ByteBuffer byteBuffer) {
8787

8888
public <ResultT> AsyncResponseTransformer<GetObjectResponse, ResultT> wrapResponseTransformer(
8989
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer) {
90-
return responseTransformer.wrapWithListener(new AsyncResponseTransformerListener<GetObjectResponse>() {
91-
@Override
92-
public void transformerOnResponse(GetObjectResponse response) {
93-
if (response.contentLength() != null) {
94-
progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength()));
90+
return AsyncResponseTransformerListener.wrap(
91+
responseTransformer,
92+
new AsyncResponseTransformerListener<GetObjectResponse>() {
93+
@Override
94+
public void transformerOnResponse(GetObjectResponse response) {
95+
if (response.contentLength() != null) {
96+
progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength()));
97+
}
9598
}
96-
}
9799

98-
@Override
99-
public void transformerExceptionOccurred(Throwable t) {
100-
transferFailed(t);
101-
}
100+
@Override
101+
public void transformerExceptionOccurred(Throwable t) {
102+
transferFailed(t);
103+
}
102104

103-
@Override
104-
public void publisherSubscribe(Subscriber<? super ByteBuffer> subscriber) {
105-
progress.updateAndGet(b -> b.bytesTransferred(0));
106-
}
105+
@Override
106+
public void publisherSubscribe(Subscriber<? super ByteBuffer> subscriber) {
107+
progress.updateAndGet(b -> b.bytesTransferred(0));
108+
}
107109

108-
@Override
109-
public void subscriberOnNext(ByteBuffer byteBuffer) {
110-
TransferProgressSnapshot snapshot = progress.updateAndGet(b -> {
111-
b.bytesTransferred(b.getBytesTransferred() + byteBuffer.limit());
112-
});
113-
listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot)));
114-
}
110+
@Override
111+
public void subscriberOnNext(ByteBuffer byteBuffer) {
112+
TransferProgressSnapshot snapshot = progress.updateAndGet(b -> {
113+
b.bytesTransferred(b.getBytesTransferred() + byteBuffer.limit());
114+
});
115+
listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot)));
116+
}
115117

116-
@Override
117-
public void subscriberOnError(Throwable t) {
118-
transferFailed(t);
119-
}
118+
@Override
119+
public void subscriberOnError(Throwable t) {
120+
transferFailed(t);
121+
}
120122

121-
@Override
122-
public void subscriberOnComplete() {
123-
listeners.transferComplete(context.copy(b -> {
124-
b.progressSnapshot(progress.snapshot());
125-
b.completedTransfer(completedTransfer);
126-
}));
127-
}
128-
});
123+
@Override
124+
public void subscriberOnComplete() {
125+
listeners.transferComplete(context.copy(b -> {
126+
b.progressSnapshot(progress.snapshot());
127+
b.completedTransfer(completedTransfer);
128+
}));
129+
}
130+
});
129131
}
130132

131133
public void registerCompletion(CompletableFuture<? extends CompletedObjectTransfer> future) {

0 commit comments

Comments
 (0)