33
33
import org .reactivestreams .Publisher ;
34
34
import org .reactivestreams .Subscriber ;
35
35
import org .reactivestreams .Subscription ;
36
- import software .amazon .awssdk .annotations .SdkPublicApi ;
36
+ import software .amazon .awssdk .annotations .SdkProtectedApi ;
37
37
import software .amazon .awssdk .utils .Logger ;
38
38
import software .amazon .awssdk .utils .Validate ;
39
39
40
40
/**
41
- * A {@link Publisher} to which callers can {@link #write (Object)} messages, simplifying the process of implementing a publisher.
41
+ * A {@link Publisher} to which callers can {@link #send (Object)} messages, simplifying the process of implementing a publisher.
42
42
*
43
43
* <p><b>Operations</b>
44
44
*
45
45
* <p>The {@code SimplePublisher} supports three simplified operations:
46
46
* <ol>
47
- * <li>{@link #write (Object)} for sending messages</li>
47
+ * <li>{@link #send (Object)} for sending messages</li>
48
48
* <li>{@link #complete()} for indicating the successful end of messages</li>
49
49
* <li>{@link #error(Throwable)} for indicating the unsuccessful end of messages</li>
50
50
* </ol>
51
51
*
52
52
* Each of these operations returns a {@link CompletableFuture} for indicating when the message has been successfully sent.
53
53
*
54
- * <p>Callers are expected to invoke a series of {@link #write (Object)}s followed by a single {@link #complete()} or
54
+ * <p>Callers are expected to invoke a series of {@link #send (Object)}s followed by a single {@link #complete()} or
55
55
* {@link #error(Throwable)}. See the documentation on each operation for more details.
56
56
*
57
57
* <p>This publisher will store an unbounded number of messages. It is recommended that callers limit the number of in-flight
58
- * {@link #write (Object)} operations in order to bound the amount of memory used by this publisher.
58
+ * {@link #send (Object)} operations in order to bound the amount of memory used by this publisher.
59
59
*/
60
- @ SdkPublicApi
60
+ @ SdkProtectedApi
61
61
public final class SimplePublisher <T > implements Publisher <T > {
62
62
private static final Logger log = Logger .loggerFor (SimplePublisher .class );
63
63
@@ -86,7 +86,7 @@ public final class SimplePublisher<T> implements Publisher<T> {
86
86
private final AtomicBoolean processingQueue = new AtomicBoolean (false );
87
87
88
88
/**
89
- * An exception that should be raised to any failed {@link #write (Object)}, {@link #complete()} or {@link #error(Throwable)}
89
+ * An exception that should be raised to any failed {@link #send (Object)}, {@link #complete()} or {@link #error(Throwable)}
90
90
* operations. This is used to stop accepting messages after the downstream subscription is cancelled or after the
91
91
* caller sends a {@code complete()} or {@code #error()}.
92
92
*
@@ -100,30 +100,30 @@ public final class SimplePublisher<T> implements Publisher<T> {
100
100
private Subscriber <? super T > subscriber ;
101
101
102
102
/**
103
- * Write a message to this publisher.
103
+ * Send a message using this publisher.
104
104
*
105
- * <p>Messages written to this publisher will eventually be sent to a downstream subscriber, in the order they were
105
+ * <p>Messages sent using this publisher will eventually be sent to a downstream subscriber, in the order they were
106
106
* written. When the message is sent to the subscriber, the returned future will be completed successfully.
107
107
*
108
108
* <p>This method may be invoked concurrently when the order of messages is not important.
109
109
*
110
110
* <p>In the time between when this method is invoked and the returned future is not completed, this publisher stores the
111
- * request message in memory. Callers are recommended to limit the number of writes in progress at a time to bound the
111
+ * request message in memory. Callers are recommended to limit the number of sends in progress at a time to bound the
112
112
* amount of memory used by this publisher.
113
113
*
114
114
* <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
115
- * if the write call was performed after a {@link #complete()} or {@link #error(Throwable)} call.
115
+ * if the {@code send} call was performed after a {@link #complete()} or {@link #error(Throwable)} call.
116
116
*
117
117
* @param value The message to send. Must not be null.
118
118
* @return A future that is completed when the message is sent to the subscriber.
119
119
*/
120
- public CompletableFuture <Void > write (T value ) {
121
- log .trace (() -> "Received write () with " + value );
120
+ public CompletableFuture <Void > send (T value ) {
121
+ log .trace (() -> "Received send () with " + value );
122
122
123
123
OnNextQueueEntry <T > entry = new OnNextQueueEntry <>(value );
124
124
try {
125
125
Validate .notNull (value , "Null cannot be written." );
126
- checkRejectException ();
126
+ validateRejectState ();
127
127
eventQueue .add (entry );
128
128
processEventQueue ();
129
129
} catch (RuntimeException t ) {
@@ -133,13 +133,13 @@ public CompletableFuture<Void> write(T value) {
133
133
}
134
134
135
135
/**
136
- * Indicate that no more {@link #write (Object)} calls will be made, and that stream of messages is completed successfully.
136
+ * Indicate that no more {@link #send (Object)} calls will be made, and that stream of messages is completed successfully.
137
137
*
138
- * <p>This can be called before any in-flight {@code write } calls are complete. Such messages will be processed before the
138
+ * <p>This can be called before any in-flight {@code send } calls are complete. Such messages will be processed before the
139
139
* stream is treated as complete. The returned future will be completed successfully when the {@code complete} is sent to
140
140
* the downstream subscriber.
141
141
*
142
- * <p>After this method is invoked, any future {@link #write (Object)}, {@code complete()} or {@link #error(Throwable)}
142
+ * <p>After this method is invoked, any future {@link #send (Object)}, {@code complete()} or {@link #error(Throwable)}
143
143
* calls will be completed exceptionally and not be processed.
144
144
*
145
145
* <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
@@ -153,7 +153,7 @@ public CompletableFuture<Void> complete() {
153
153
OnCompleteQueueEntry <T > entry = new OnCompleteQueueEntry <>();
154
154
155
155
try {
156
- checkRejectException ();
156
+ validateRejectState ();
157
157
setRejectExceptionOrThrow (() -> new IllegalStateException ("complete() has been invoked" ));
158
158
eventQueue .add (entry );
159
159
processEventQueue ();
@@ -164,13 +164,13 @@ public CompletableFuture<Void> complete() {
164
164
}
165
165
166
166
/**
167
- * Indicate that no more {@link #write (Object)} calls will be made, and that streaming of messages has failed.
167
+ * Indicate that no more {@link #send (Object)} calls will be made, and that streaming of messages has failed.
168
168
*
169
- * <p>This can be called before any in-flight {@code write } calls are complete. Such messages will be processed before the
169
+ * <p>This can be called before any in-flight {@code send } calls are complete. Such messages will be processed before the
170
170
* stream is treated as being in-error. The returned future will be completed successfully when the {@code error} is
171
171
* sent to the downstream subscriber.
172
172
*
173
- * <p>After this method is invoked, any future {@link #write (Object)}, {@link #complete()} or {@code #error(Throwable)}
173
+ * <p>After this method is invoked, any future {@link #send (Object)}, {@link #complete()} or {@code #error(Throwable)}
174
174
* calls will be completed exceptionally and not be processed.
175
175
*
176
176
* <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
@@ -185,7 +185,7 @@ public CompletableFuture<Void> error(Throwable error) {
185
185
OnErrorQueueEntry <T > entry = new OnErrorQueueEntry <>(error );
186
186
187
187
try {
188
- checkRejectException ();
188
+ validateRejectState ();
189
189
setRejectExceptionOrThrow (() -> new IllegalStateException ("error() has been invoked" ));
190
190
eventQueue .add (entry );
191
191
processEventQueue ();
@@ -270,17 +270,13 @@ private void doProcessQueue() {
270
270
entryTypesToFail .addAll (asList (ON_NEXT , ON_COMPLETE , ON_ERROR ));
271
271
log .trace (() -> "Calling onComplete()" );
272
272
subscriber .onComplete ();
273
- outstandingDemand .set (0 );
274
- log .trace (() -> "Set demand to 0" );
275
273
break ;
276
274
case ON_ERROR :
277
275
OnErrorQueueEntry <T > onErrorEntry = (OnErrorQueueEntry <T >) entry ;
278
276
279
277
entryTypesToFail .addAll (asList (ON_NEXT , ON_COMPLETE , ON_ERROR ));
280
278
log .trace (() -> "Calling onError() with " + onErrorEntry .failure , onErrorEntry .failure );
281
279
subscriber .onError (onErrorEntry .failure );
282
- outstandingDemand .set (0 );
283
- log .trace (() -> "Set demand to 0" );
284
280
break ;
285
281
case CANCEL :
286
282
subscriber = null ; // Allow subscriber to be garbage collected after cancellation.
@@ -338,7 +334,7 @@ private void panicAndDie(Throwable cause) {
338
334
RuntimeException failure = new IllegalStateException ("Encountered fatal error in publisher" , cause );
339
335
rejectException .compareAndSet (null , () -> failure );
340
336
entryTypesToFail .addAll (asList (QueueEntry .Type .values ()));
341
- subscriber .onError (failure );
337
+ subscriber .onError (cause instanceof Error ? cause : failure );
342
338
343
339
while (true ) {
344
340
QueueEntry <T > entry = eventQueue .poll ();
@@ -356,7 +352,7 @@ private void panicAndDie(Throwable cause) {
356
352
/**
357
353
* Ensure that {@link #rejectException} is null. If it is not, throw the exception.
358
354
*/
359
- private void checkRejectException () {
355
+ private void validateRejectState () {
360
356
if (rejectException .get () != null ) {
361
357
throw rejectException .get ().get ();
362
358
}
@@ -385,10 +381,16 @@ public void request(long n) {
385
381
+ "amount of data: " + n );
386
382
rejectException .compareAndSet (null , () -> failure );
387
383
eventQueue .add (new OnErrorQueueEntry <>(failure ));
388
- entryTypesToFail .add ( ON_NEXT );
384
+ entryTypesToFail .addAll ( asList ( ON_NEXT , ON_COMPLETE ) );
389
385
processEventQueue ();
390
386
} else {
391
- long newDemand = outstandingDemand .addAndGet (n );
387
+ long newDemand = outstandingDemand .updateAndGet (current -> {
388
+ if (Long .MAX_VALUE - current < n ) {
389
+ return Long .MAX_VALUE ;
390
+ }
391
+
392
+ return current + n ;
393
+ });
392
394
log .trace (() -> "Increased demand to " + newDemand );
393
395
processEventQueue ();
394
396
}
@@ -412,7 +414,7 @@ public void cancel() {
412
414
*/
413
415
abstract static class QueueEntry <T > {
414
416
/**
415
- * The future that was returned to a {@link #write (Object)}, {@link #complete()} or {@link #error(Throwable)} message.
417
+ * The future that was returned to a {@link #send (Object)}, {@link #complete()} or {@link #error(Throwable)} message.
416
418
*/
417
419
protected final CompletableFuture <Void > resultFuture = new CompletableFuture <>();
418
420
@@ -430,7 +432,7 @@ protected enum Type {
430
432
}
431
433
432
434
/**
433
- * An entry added when we get a {@link #write (Object)} call.
435
+ * An entry added when we get a {@link #send (Object)} call.
434
436
*/
435
437
private static final class OnNextQueueEntry <T > extends QueueEntry <T > {
436
438
private final T value ;
0 commit comments