Skip to content

Commit 6326556

Browse files
committed
Addressed comments.
1 parent fe57c42 commit 6326556

File tree

4 files changed

+79
-61
lines changed

4 files changed

+79
-61
lines changed

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void onResponseHeadersDone(HttpStream stream, int headerType) {
8080

8181
@Override
8282
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
83-
CompletableFuture<Void> writeFuture = responsePublisher.write(ByteBuffer.wrap(bodyBytesIn));
83+
CompletableFuture<Void> writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn));
8484

8585
if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
8686
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
@@ -89,7 +89,7 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
8989

9090
writeFuture.whenComplete((result, failure) -> {
9191
if (failure != null) {
92-
die(stream, failure);
92+
failResponseHandlerAndFuture(stream, failure);
9393
return;
9494
}
9595

@@ -111,7 +111,7 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
111111
private void onSuccessfulResponseComplete(HttpStream stream) {
112112
responsePublisher.complete().whenComplete((result, failure) -> {
113113
if (failure != null) {
114-
die(stream, failure);
114+
failResponseHandlerAndFuture(stream, failure);
115115
return;
116116
}
117117

@@ -128,10 +128,10 @@ private void onSuccessfulResponseComplete(HttpStream stream) {
128128
private void onFailedResponseComplete(HttpStream stream, HttpException error) {
129129
log.error(() -> "HTTP response encountered an error.", error);
130130
responsePublisher.error(error);
131-
die(stream, error);
131+
failResponseHandlerAndFuture(stream, error);
132132
}
133133

134-
private void die(HttpStream stream, Throwable error) {
134+
private void failResponseHandlerAndFuture(HttpStream stream, Throwable error) {
135135
callResponseHandlerOnError(error);
136136
completionFuture.completeExceptionally(error);
137137
connection.shutdown();

utils/src/main/java/software/amazon/awssdk/utils/async/SimplePublisher.java

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,31 @@
3333
import org.reactivestreams.Publisher;
3434
import org.reactivestreams.Subscriber;
3535
import org.reactivestreams.Subscription;
36-
import software.amazon.awssdk.annotations.SdkPublicApi;
36+
import software.amazon.awssdk.annotations.SdkProtectedApi;
3737
import software.amazon.awssdk.utils.Logger;
3838
import software.amazon.awssdk.utils.Validate;
3939

4040
/**
41-
* A {@link Publisher} to which callers can {@link #write(Object)} messages, simplifying the process of implementing a publisher.
41+
* A {@link Publisher} to which callers can {@link #send(Object)} messages, simplifying the process of implementing a publisher.
4242
*
4343
* <p><b>Operations</b>
4444
*
4545
* <p>The {@code SimplePublisher} supports three simplified operations:
4646
* <ol>
47-
* <li>{@link #write(Object)} for sending messages</li>
47+
* <li>{@link #send(Object)} for sending messages</li>
4848
* <li>{@link #complete()} for indicating the successful end of messages</li>
4949
* <li>{@link #error(Throwable)} for indicating the unsuccessful end of messages</li>
5050
* </ol>
5151
*
5252
* Each of these operations returns a {@link CompletableFuture} for indicating when the message has been successfully sent.
5353
*
54-
* <p>Callers are expected to invoke a series of {@link #write(Object)}s followed by a single {@link #complete()} or
54+
* <p>Callers are expected to invoke a series of {@link #send(Object)}s followed by a single {@link #complete()} or
5555
* {@link #error(Throwable)}. See the documentation on each operation for more details.
5656
*
5757
* <p>This publisher will store an unbounded number of messages. It is recommended that callers limit the number of in-flight
58-
* {@link #write(Object)} operations in order to bound the amount of memory used by this publisher.
58+
* {@link #send(Object)} operations in order to bound the amount of memory used by this publisher.
5959
*/
60-
@SdkPublicApi
60+
@SdkProtectedApi
6161
public final class SimplePublisher<T> implements Publisher<T> {
6262
private static final Logger log = Logger.loggerFor(SimplePublisher.class);
6363

@@ -86,7 +86,7 @@ public final class SimplePublisher<T> implements Publisher<T> {
8686
private final AtomicBoolean processingQueue = new AtomicBoolean(false);
8787

8888
/**
89-
* An exception that should be raised to any failed {@link #write(Object)}, {@link #complete()} or {@link #error(Throwable)}
89+
* An exception that should be raised to any failed {@link #send(Object)}, {@link #complete()} or {@link #error(Throwable)}
9090
* operations. This is used to stop accepting messages after the downstream subscription is cancelled or after the
9191
* caller sends a {@code complete()} or {@code #error()}.
9292
*
@@ -100,25 +100,25 @@ public final class SimplePublisher<T> implements Publisher<T> {
100100
private Subscriber<? super T> subscriber;
101101

102102
/**
103-
* Write a message to this publisher.
103+
* Send a message using this publisher.
104104
*
105-
* <p>Messages written to this publisher will eventually be sent to a downstream subscriber, in the order they were
105+
* <p>Messages sent using this publisher will eventually be sent to a downstream subscriber, in the order they were
106106
* written. When the message is sent to the subscriber, the returned future will be completed successfully.
107107
*
108108
* <p>This method may be invoked concurrently when the order of messages is not important.
109109
*
110110
* <p>In the time between when this method is invoked and the returned future is not completed, this publisher stores the
111-
* request message in memory. Callers are recommended to limit the number of writes in progress at a time to bound the
111+
* request message in memory. Callers are recommended to limit the number of sends in progress at a time to bound the
112112
* amount of memory used by this publisher.
113113
*
114114
* <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
115-
* if the write call was performed after a {@link #complete()} or {@link #error(Throwable)} call.
115+
* if the {@code send} call was performed after a {@link #complete()} or {@link #error(Throwable)} call.
116116
*
117117
* @param value The message to send. Must not be null.
118118
* @return A future that is completed when the message is sent to the subscriber.
119119
*/
120-
public CompletableFuture<Void> write(T value) {
121-
log.trace(() -> "Received write() with " + value);
120+
public CompletableFuture<Void> send(T value) {
121+
log.trace(() -> "Received send() with " + value);
122122

123123
OnNextQueueEntry<T> entry = new OnNextQueueEntry<>(value);
124124
try {
@@ -133,13 +133,13 @@ public CompletableFuture<Void> write(T value) {
133133
}
134134

135135
/**
136-
* Indicate that no more {@link #write(Object)} calls will be made, and that stream of messages is completed successfully.
136+
* Indicate that no more {@link #send(Object)} calls will be made, and that stream of messages is completed successfully.
137137
*
138-
* <p>This can be called before any in-flight {@code write} calls are complete. Such messages will be processed before the
138+
* <p>This can be called before any in-flight {@code send} calls are complete. Such messages will be processed before the
139139
* stream is treated as complete. The returned future will be completed successfully when the {@code complete} is sent to
140140
* the downstream subscriber.
141141
*
142-
* <p>After this method is invoked, any future {@link #write(Object)}, {@code complete()} or {@link #error(Throwable)}
142+
* <p>After this method is invoked, any future {@link #send(Object)}, {@code complete()} or {@link #error(Throwable)}
143143
* calls will be completed exceptionally and not be processed.
144144
*
145145
* <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
@@ -164,13 +164,13 @@ public CompletableFuture<Void> complete() {
164164
}
165165

166166
/**
167-
* Indicate that no more {@link #write(Object)} calls will be made, and that streaming of messages has failed.
167+
* Indicate that no more {@link #send(Object)} calls will be made, and that streaming of messages has failed.
168168
*
169-
* <p>This can be called before any in-flight {@code write} calls are complete. Such messages will be processed before the
169+
* <p>This can be called before any in-flight {@code send} calls are complete. Such messages will be processed before the
170170
* stream is treated as being in-error. The returned future will be completed successfully when the {@code error} is
171171
* sent to the downstream subscriber.
172172
*
173-
* <p>After this method is invoked, any future {@link #write(Object)}, {@link #complete()} or {@code #error(Throwable)}
173+
* <p>After this method is invoked, any future {@link #send(Object)}, {@link #complete()} or {@code #error(Throwable)}
174174
* calls will be completed exceptionally and not be processed.
175175
*
176176
* <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
@@ -270,17 +270,13 @@ private void doProcessQueue() {
270270
entryTypesToFail.addAll(asList(ON_NEXT, ON_COMPLETE, ON_ERROR));
271271
log.trace(() -> "Calling onComplete()");
272272
subscriber.onComplete();
273-
outstandingDemand.set(0);
274-
log.trace(() -> "Set demand to 0");
275273
break;
276274
case ON_ERROR:
277275
OnErrorQueueEntry<T> onErrorEntry = (OnErrorQueueEntry<T>) entry;
278276

279277
entryTypesToFail.addAll(asList(ON_NEXT, ON_COMPLETE, ON_ERROR));
280278
log.trace(() -> "Calling onError() with " + onErrorEntry.failure, onErrorEntry.failure);
281279
subscriber.onError(onErrorEntry.failure);
282-
outstandingDemand.set(0);
283-
log.trace(() -> "Set demand to 0");
284280
break;
285281
case CANCEL:
286282
subscriber = null; // Allow subscriber to be garbage collected after cancellation.
@@ -385,10 +381,16 @@ public void request(long n) {
385381
+ "amount of data: " + n);
386382
rejectException.compareAndSet(null, () -> failure);
387383
eventQueue.add(new OnErrorQueueEntry<>(failure));
388-
entryTypesToFail.add(ON_NEXT);
384+
entryTypesToFail.addAll(asList(ON_NEXT, ON_COMPLETE));
389385
processEventQueue();
390386
} else {
391-
long newDemand = outstandingDemand.addAndGet(n);
387+
long newDemand = outstandingDemand.updateAndGet(current -> {
388+
if (Long.MAX_VALUE - current < n) {
389+
return Long.MAX_VALUE;
390+
}
391+
392+
return current + n;
393+
});
392394
log.trace(() -> "Increased demand to " + newDemand);
393395
processEventQueue();
394396
}
@@ -412,7 +414,7 @@ public void cancel() {
412414
*/
413415
abstract static class QueueEntry<T> {
414416
/**
415-
* The future that was returned to a {@link #write(Object)}, {@link #complete()} or {@link #error(Throwable)} message.
417+
* The future that was returned to a {@link #send(Object)}, {@link #complete()} or {@link #error(Throwable)} message.
416418
*/
417419
protected final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
418420

@@ -430,7 +432,7 @@ protected enum Type {
430432
}
431433

432434
/**
433-
* An entry added when we get a {@link #write(Object)} call.
435+
* An entry added when we get a {@link #send(Object)} call.
434436
*/
435437
private static final class OnNextQueueEntry<T> extends QueueEntry<T> {
436438
private final T value;

utils/src/test/java/software/amazon/awssdk/utils/async/SimplePublisherTckTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
package software.amazon.awssdk.utils.async;
1717

1818
import org.reactivestreams.Publisher;
19-
import org.reactivestreams.Subscriber;
20-
import org.reactivestreams.Subscription;
2119
import org.reactivestreams.tck.PublisherVerification;
22-
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
2320
import org.reactivestreams.tck.TestEnvironment;
2421

2522
public class SimplePublisherTckTest extends PublisherVerification<Integer> {
@@ -31,7 +28,7 @@ public SimplePublisherTckTest() {
3128
public Publisher<Integer> createPublisher(long elements) {
3229
SimplePublisher<Integer> publisher = new SimplePublisher<>();
3330
for (int i = 0; i < elements; i++) {
34-
publisher.write(i);
31+
publisher.send(i);
3532
}
3633
publisher.complete();
3734
return publisher;

utils/src/test/java/software/amazon/awssdk/utils/async/SimplePublisherTest.java

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ public void immediateFailureWorks() {
8181
public void writeAfterCompleteFails() {
8282
SimplePublisher<Integer> publisher = new SimplePublisher<>();
8383
publisher.complete();
84-
assertThat(publisher.write(5)).isCompletedExceptionally();
84+
assertThat(publisher.send(5)).isCompletedExceptionally();
8585
}
8686

8787
@Test
8888
public void writeAfterErrorFails() {
8989
SimplePublisher<Integer> publisher = new SimplePublisher<>();
9090
publisher.error(new Throwable());
91-
assertThat(publisher.write(5)).isCompletedExceptionally();
91+
assertThat(publisher.send(5)).isCompletedExceptionally();
9292
}
9393

9494
@Test
@@ -125,8 +125,8 @@ public void oneDemandWorks() {
125125
StoringSubscriber<Integer> subscriber = new StoringSubscriber<>(1);
126126
publisher.subscribe(subscriber);
127127

128-
publisher.write(1);
129-
publisher.write(2);
128+
publisher.send(1);
129+
publisher.send(2);
130130
publisher.complete();
131131

132132
assertThat(subscriber.peek().get().type()).isEqualTo(EventType.ON_NEXT);
@@ -146,28 +146,32 @@ public void oneDemandWorks() {
146146
@Test
147147
public void highDemandWorks() {
148148
SimplePublisher<Integer> publisher = new SimplePublisher<>();
149-
StoringSubscriber<Integer> subscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
149+
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
150150
publisher.subscribe(subscriber);
151+
subscriber.subscription.request(Long.MAX_VALUE);
151152

152-
publisher.write(1);
153-
publisher.write(2);
153+
publisher.send(1);
154+
subscriber.subscription.request(Long.MAX_VALUE);
155+
publisher.send(2);
156+
subscriber.subscription.request(Long.MAX_VALUE);
154157
publisher.complete();
158+
subscriber.subscription.request(Long.MAX_VALUE);
155159

156-
assertThat(subscriber.peek().get().type()).isEqualTo(EventType.ON_NEXT);
157-
assertThat(subscriber.peek().get().value()).isEqualTo(1);
160+
assertThat(subscriber.eventQueue.peek().get().type()).isEqualTo(EventType.ON_NEXT);
161+
assertThat(subscriber.eventQueue.peek().get().value()).isEqualTo(1);
158162

159-
subscriber.poll();
163+
subscriber.eventQueue.poll();
160164

161-
assertThat(subscriber.peek().get().type()).isEqualTo(EventType.ON_NEXT);
162-
assertThat(subscriber.peek().get().value()).isEqualTo(2);
165+
assertThat(subscriber.eventQueue.peek().get().type()).isEqualTo(EventType.ON_NEXT);
166+
assertThat(subscriber.eventQueue.peek().get().value()).isEqualTo(2);
163167

164-
subscriber.poll();
168+
subscriber.eventQueue.poll();
165169

166-
assertThat(subscriber.peek().get().type()).isEqualTo(EventType.ON_COMPLETE);
170+
assertThat(subscriber.eventQueue.peek().get().type()).isEqualTo(EventType.ON_COMPLETE);
167171

168-
subscriber.poll();
172+
subscriber.eventQueue.poll();
169173

170-
assertThat(subscriber.poll()).isNotPresent();
174+
assertThat(subscriber.eventQueue.poll()).isNotPresent();
171175
}
172176

173177
@Test
@@ -176,7 +180,7 @@ public void writeFuturesDoNotCompleteUntilAfterOnNext() {
176180
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
177181
publisher.subscribe(subscriber);
178182

179-
CompletableFuture<Void> writeFuture = publisher.write(5);
183+
CompletableFuture<Void> writeFuture = publisher.send(5);
180184

181185
assertThat(subscriber.eventQueue.peek()).isNotPresent();
182186
assertThat(writeFuture).isNotCompleted();
@@ -194,7 +198,7 @@ public void completeFuturesDoNotCompleteUntilAfterOnComplete() {
194198
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
195199

196200
publisher.subscribe(subscriber);
197-
publisher.write(5);
201+
publisher.send(5);
198202
CompletableFuture<Void> completeFuture = publisher.complete();
199203

200204
assertThat(subscriber.eventQueue.peek()).isNotPresent();
@@ -215,7 +219,7 @@ public void errorFuturesDoNotCompleteUntilAfterOnError() {
215219
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
216220

217221
publisher.subscribe(subscriber);
218-
publisher.write(5);
222+
publisher.send(5);
219223
CompletableFuture<Void> errorFuture = publisher.error(error);
220224

221225
assertThat(subscriber.eventQueue.peek()).isNotPresent();
@@ -256,7 +260,7 @@ public void writeBeforeSubscribeIsDeliveredOnSubscribe() {
256260
SimplePublisher<Integer> publisher = new SimplePublisher<>();
257261
StoringSubscriber<Integer> subscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
258262

259-
publisher.write(5);
263+
publisher.send(5);
260264
publisher.subscribe(subscriber);
261265
assertThat(subscriber.peek().get().type()).isEqualTo(EventType.ON_NEXT);
262266
assertThat(subscriber.peek().get().value()).isEqualTo(5);
@@ -268,7 +272,7 @@ public void cancelFailsAnyInFlightFutures() {
268272
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
269273

270274
publisher.subscribe(subscriber);
271-
CompletableFuture<Void> writeFuture = publisher.write(5);
275+
CompletableFuture<Void> writeFuture = publisher.send(5);
272276
CompletableFuture<Void> completeFuture = publisher.complete();
273277

274278
subscriber.subscription.cancel();
@@ -285,18 +289,33 @@ public void newCallsAfterCancelFail() {
285289
publisher.subscribe(subscriber);
286290
subscriber.subscription.cancel();
287291

288-
assertThat(publisher.write(5)).isCompletedExceptionally();
292+
assertThat(publisher.send(5)).isCompletedExceptionally();
289293
assertThat(publisher.complete()).isCompletedExceptionally();
290294
assertThat(publisher.error(new Throwable())).isCompletedExceptionally();
291295
}
292296

297+
@Test
298+
public void negativeDemandSkipsOutstandingMessages() {
299+
SimplePublisher<Integer> publisher = new SimplePublisher<>();
300+
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
301+
302+
publisher.subscribe(subscriber);
303+
CompletableFuture<Void> sendFuture = publisher.send(0);
304+
CompletableFuture<Void> completeFuture = publisher.complete();
305+
subscriber.subscription.request(-1);
306+
307+
assertThat(sendFuture).isCompletedExceptionally();
308+
assertThat(completeFuture).isCompletedExceptionally();
309+
assertThat(subscriber.eventQueue.poll().get().type()).isEqualTo(EventType.ON_ERROR);
310+
}
311+
293312
@Test
294313
public void evilDownstreamPublisherThrowingInOnNextStillCancelsInFlightFutures() {
295314
SimplePublisher<Integer> publisher = new SimplePublisher<>();
296315
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
297316
subscriber.failureInOnNext = new RuntimeException();
298317

299-
CompletableFuture<Void> writeFuture = publisher.write(5);
318+
CompletableFuture<Void> writeFuture = publisher.send(5);
300319
CompletableFuture<Void> completeFuture = publisher.complete();
301320

302321
publisher.subscribe(subscriber);
@@ -343,7 +362,7 @@ public void stochastic_completeAndError_seemThreadSafe() throws Exception {
343362

344363
Future<?> writeCall = executor.submit(() -> {
345364
waitForStart.run();
346-
publisher.write(0).join();
365+
publisher.send(0).join();
347366
});
348367

349368
Future<?> completeCall = executor.submit(() -> {
@@ -404,7 +423,7 @@ private void seemsThreadSafeWithProducerCount(int producerCount) throws Interrup
404423
producers.add(executor.submit(() -> {
405424
while (runProducers.get()) {
406425
productionLimiter.acquire();
407-
publisher.write(messageCount.getAndIncrement());
426+
publisher.send(messageCount.getAndIncrement());
408427
}
409428
publisher.complete(); // All but one producer sending this will fail.
410429
return null;

0 commit comments

Comments
 (0)