Skip to content

Commit b54488a

Browse files
authored
Use StoringSubscriber in AsyncBufferingSubscriber (#3636)
1 parent 7968411 commit b54488a

File tree

4 files changed

+78
-73
lines changed

4 files changed

+78
-73
lines changed

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java

Lines changed: 37 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515

1616
package software.amazon.awssdk.transfer.s3.internal;
1717

18-
import java.util.Queue;
18+
import java.util.Optional;
1919
import java.util.concurrent.CompletableFuture;
20-
import java.util.concurrent.ConcurrentLinkedQueue;
2120
import java.util.concurrent.atomic.AtomicBoolean;
2221
import java.util.concurrent.atomic.AtomicInteger;
2322
import java.util.function.Function;
@@ -26,6 +25,8 @@
2625
import software.amazon.awssdk.annotations.SdkInternalApi;
2726
import software.amazon.awssdk.utils.Logger;
2827
import software.amazon.awssdk.utils.Validate;
28+
import software.amazon.awssdk.utils.async.DemandIgnoringSubscription;
29+
import software.amazon.awssdk.utils.async.StoringSubscriber;
2930

3031
/**
3132
* An implementation of {@link Subscriber} that execute the provided function for every event and limits the number of concurrent
@@ -36,24 +37,24 @@
3637
@SdkInternalApi
3738
public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
3839
private static final Logger log = Logger.loggerFor(AsyncBufferingSubscriber.class);
39-
private static final Object COMPLETE_EVENT = new Object();
40-
private final Queue<Object> buffer;
4140
private final CompletableFuture<?> returnFuture;
4241
private final Function<T, CompletableFuture<?>> consumer;
4342
private final int maxConcurrentExecutions;
4443
private final AtomicInteger numRequestsInFlight;
4544
private final AtomicBoolean isDelivering = new AtomicBoolean(false);
4645
private volatile boolean isStreamingDone;
47-
private volatile Subscription subscription;
46+
private Subscription subscription;
47+
48+
private final StoringSubscriber<T> storingSubscriber;
4849

4950
public AsyncBufferingSubscriber(Function<T, CompletableFuture<?>> consumer,
5051
CompletableFuture<Void> returnFuture,
5152
int maxConcurrentExecutions) {
52-
this.buffer = new ConcurrentLinkedQueue<>();
5353
this.returnFuture = returnFuture;
5454
this.consumer = consumer;
5555
this.maxConcurrentExecutions = maxConcurrentExecutions;
5656
this.numRequestsInFlight = new AtomicInteger(0);
57+
this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
5758
}
5859

5960
@Override
@@ -64,68 +65,53 @@ public void onSubscribe(Subscription subscription) {
6465
subscription.cancel();
6566
return;
6667
}
68+
storingSubscriber.onSubscribe(new DemandIgnoringSubscription(subscription));
6769
this.subscription = subscription;
6870
subscription.request(maxConcurrentExecutions);
6971
}
7072

7173
@Override
7274
public void onNext(T item) {
73-
if (item == null) {
74-
subscription.cancel();
75-
NullPointerException exception = new NullPointerException("Item must not be null");
76-
returnFuture.completeExceptionally(exception);
77-
throw exception;
78-
}
79-
80-
try {
81-
buffer.add(item);
82-
flushBufferIfNeeded();
83-
} catch (Exception e) {
84-
isStreamingDone = true;
85-
subscription.cancel();
86-
returnFuture.completeExceptionally(e);
87-
}
75+
storingSubscriber.onNext(item);
76+
flushBufferIfNeeded();
8877
}
8978

9079
private void flushBufferIfNeeded() {
91-
if (buffer.isEmpty()) {
92-
if (isStreamingDone && numRequestsInFlight.get() == 0) {
93-
returnFuture.complete(null);
94-
} else {
95-
subscription.request(1);
96-
}
97-
return;
98-
}
99-
10080
if (isDelivering.compareAndSet(false, true)) {
10181
try {
102-
Object firstEvent = buffer.peek();
103-
if (isCompleteEvent(firstEvent)) {
104-
Object event = buffer.poll();
105-
handleCompleteEvent(event);
106-
return;
107-
}
108-
109-
while (!buffer.isEmpty() && numRequestsInFlight.get() < maxConcurrentExecutions) {
110-
Object item = buffer.poll();
111-
if (item == null) {
82+
Optional<StoringSubscriber.Event<T>> next = storingSubscriber.peek();
83+
while (numRequestsInFlight.get() < maxConcurrentExecutions) {
84+
if (!next.isPresent()) {
85+
subscription.request(1);
11286
break;
11387
}
11488

115-
if (isCompleteEvent(item)) {
116-
handleCompleteEvent(item);
117-
return;
89+
switch (next.get().type()) {
90+
case ON_COMPLETE:
91+
handleCompleteEvent();
92+
break;
93+
case ON_ERROR:
94+
handleError(next.get().runtimeError());
95+
break;
96+
case ON_NEXT:
97+
handleOnNext(next.get().value());
98+
break;
99+
default:
100+
handleError(new IllegalStateException("Unknown stored type: " + next.get().type()));
101+
break;
118102
}
119103

120-
deliverItem((T) item);
104+
next = storingSubscriber.peek();
121105
}
122106
} finally {
123107
isDelivering.set(false);
124108
}
125109
}
126110
}
127111

128-
private void deliverItem(T item) {
112+
private void handleOnNext(T item) {
113+
storingSubscriber.poll();
114+
129115
int numberOfRequestInFlight = numRequestsInFlight.incrementAndGet();
130116
log.debug(() -> "Delivering next item, numRequestInFlight=" + numberOfRequestInFlight);
131117

@@ -139,26 +125,28 @@ private void deliverItem(T item) {
139125
});
140126
}
141127

142-
private void handleCompleteEvent(Object event) {
143-
isStreamingDone = true;
128+
private void handleCompleteEvent() {
144129
if (numRequestsInFlight.get() == 0) {
145130
returnFuture.complete(null);
131+
storingSubscriber.poll();
146132
}
147133
}
148134

149135
@Override
150136
public void onError(Throwable t) {
151137
handleError(t);
138+
storingSubscriber.onError(t);
152139
}
153140

154141
private void handleError(Throwable t) {
155142
returnFuture.completeExceptionally(t);
156-
buffer.clear();
143+
storingSubscriber.poll();
157144
}
158145

159146
@Override
160147
public void onComplete() {
161-
buffer.add(COMPLETE_EVENT);
148+
isStreamingDone = true;
149+
storingSubscriber.onComplete();
162150
flushBufferIfNeeded();
163151
}
164152

@@ -168,8 +156,4 @@ public void onComplete() {
168156
public int numRequestsInFlight() {
169157
return numRequestsInFlight.get();
170158
}
171-
172-
private static boolean isCompleteEvent(Object event) {
173-
return COMPLETE_EVENT.equals(event);
174-
}
175159
}

utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -222,23 +222,4 @@ public enum TransferResult {
222222
*/
223223
SUCCESS
224224
}
225-
226-
private static final class DemandIgnoringSubscription implements Subscription {
227-
private final Subscription delegate;
228-
229-
private DemandIgnoringSubscription(Subscription delegate) {
230-
this.delegate = delegate;
231-
}
232-
233-
@Override
234-
public void request(long n) {
235-
// Ignore demand requests from downstream, they want too much.
236-
// We feed them the amount that we want.
237-
}
238-
239-
@Override
240-
public void cancel() {
241-
delegate.cancel();
242-
}
243-
}
244225
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.async;
17+
18+
import org.reactivestreams.Subscription;
19+
import software.amazon.awssdk.annotations.SdkProtectedApi;
20+
21+
@SdkProtectedApi
22+
public final class DemandIgnoringSubscription implements Subscription {
23+
24+
private final Subscription delegate;
25+
26+
public DemandIgnoringSubscription(Subscription delegate) {
27+
this.delegate = delegate;
28+
}
29+
30+
@Override
31+
public void request(long n) {
32+
// Ignore demand requests from downstream, they want too much.
33+
// We feed them the amount that we want.
34+
}
35+
36+
@Override
37+
public void cancel() {
38+
delegate.cancel();
39+
}
40+
}

utils/src/main/java/software/amazon/awssdk/utils/async/StoringSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
/**
2929
* An implementation of {@link Subscriber} that stores the events it receives for retrieval.
3030
*
31-
* <p>Events can be observed via {@link #peek()} and {@link #drop()}. The number of events stored is limited by the
31+
* <p>Events can be observed via {@link #peek()} and {@link #poll()}. The number of events stored is limited by the
3232
* {@code maxElements} configured at construction.
3333
*/
3434
@SdkProtectedApi

0 commit comments

Comments
 (0)