Skip to content

Commit 7ea7047

Browse files
authored
Merge pull request #1655 from aws/millem/fix-my-messup
Revert "Fixed an issue where event streams might fail with ClassCastE…
2 parents 2150aa8 + fcb4ef3 commit 7ea7047

File tree

12 files changed

+402
-762
lines changed

12 files changed

+402
-762
lines changed

core/aws-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@
7373
<artifactId>utils</artifactId>
7474
<version>${awsjavasdk.version}</version>
7575
</dependency>
76+
<dependency>
77+
<groupId>org.slf4j</groupId>
78+
<artifactId>slf4j-api</artifactId>
79+
</dependency>
7680
<dependency>
7781
<groupId>software.amazon.eventstream</groupId>
7882
<artifactId>eventstream</artifactId>

core/aws-core/src/main/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer.java

Lines changed: 344 additions & 129 deletions
Large diffs are not rendered by default.

core/aws-core/src/test/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformerTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ public void onComplete() {
8787
.executor(Executors.newSingleThreadExecutor())
8888
.future(new CompletableFuture<>())
8989
.build();
90-
transformer.prepare();
9190
transformer.onStream(SdkPublisher.adapt(bytePublisher));
9291
latch.await();
9392
assertThat(numEvents)
@@ -328,10 +327,9 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
328327

329328
Flowable<ByteBuffer> bytePublisher = Flowable.just(exceptionMessage.toByteBuffer());
330329

331-
SubscribingResponseHandler handler = new SubscribingResponseHandler();
332330
AsyncResponseTransformer<SdkResponse, Void> transformer =
333331
EventStreamAsyncResponseTransformer.builder()
334-
.eventStreamResponseHandler(handler)
332+
.eventStreamResponseHandler(new SubscribingResponseHandler())
335333
.exceptionResponseHandler((response, executionAttributes) -> exception)
336334
.executor(Executors.newSingleThreadExecutor())
337335
.future(new CompletableFuture<>())
@@ -345,16 +343,13 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
345343
cf.join();
346344
} catch (CompletionException e) {
347345
if (e.getCause() instanceof SdkServiceException) {
348-
throw e.getCause();
346+
throw ((SdkServiceException) e.getCause());
349347
}
350348
}
351349
}).isSameAs(exception);
352-
353-
assertThat(handler.exceptionOccurredCalled).isTrue();
354350
}
355351

356352
private static class SubscribingResponseHandler implements EventStreamResponseHandler<Object, Object> {
357-
private volatile boolean exceptionOccurredCalled = false;
358353

359354
@Override
360355
public void responseReceived(Object response) {
@@ -368,7 +363,6 @@ public void onEventStream(SdkPublisher<Object> publisher) {
368363

369364
@Override
370365
public void exceptionOccurred(Throwable throwable) {
371-
exceptionOccurredCalled = true;
372366
}
373367

374368
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SdkPublisher.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
import java.util.function.Predicate;
2323
import org.reactivestreams.Publisher;
2424
import org.reactivestreams.Subscriber;
25-
import org.reactivestreams.Subscription;
2625
import software.amazon.awssdk.annotations.SdkPublicApi;
2726
import software.amazon.awssdk.utils.async.BufferingSubscriber;
28-
import software.amazon.awssdk.utils.async.EventListeningSubscriber;
2927
import software.amazon.awssdk.utils.async.FilteringSubscriber;
3028
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
3129
import software.amazon.awssdk.utils.async.LimitingSubscriber;
@@ -118,36 +116,6 @@ default SdkPublisher<T> limit(int limit) {
118116
return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit));
119117
}
120118

121-
/**
122-
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}.
123-
*
124-
* @param afterOnComplete The logic that should be run immediately after onComplete.
125-
* @return New publisher that invokes the requested callback.
126-
*/
127-
default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) {
128-
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, afterOnComplete, null, null));
129-
}
130-
131-
/**
132-
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onError(Throwable)}.
133-
*
134-
* @param afterOnError The logic that should be run immediately after onError.
135-
* @return New publisher that invokes the requested callback.
136-
*/
137-
default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) {
138-
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, afterOnError, null));
139-
}
140-
141-
/**
142-
* Add a callback that will be invoked after this publisher invokes {@link Subscription#cancel()}.
143-
*
144-
* @param afterOnCancel The logic that should be run immediately after cancellation of the subscription.
145-
* @return New publisher that invokes the requested callback.
146-
*/
147-
default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) {
148-
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, null, afterOnCancel));
149-
}
150-
151119
/**
152120
* Subscribes to the publisher with the given {@link Consumer}. This consumer will be called for each event
153121
* published. There is no backpressure using this method if the Consumer dispatches processing asynchronously. If more

services/kinesis/src/test/java/software/amazon/awssdk/services/kinesis/SubscribeToShardUnmarshallingTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import java.util.List;
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.CompletionException;
32-
import java.util.concurrent.ExecutionException;
33-
import java.util.concurrent.TimeUnit;
3432
import java.util.concurrent.atomic.AtomicInteger;
3533
import org.junit.Before;
3634
import org.junit.Test;
@@ -203,9 +201,9 @@ private List<SubscribeToShardEventStream> subscribeToShard() throws Throwable {
203201
SubscribeToShardResponseHandler.builder()
204202
.subscriber(events::add)
205203
.build())
206-
.get(10, TimeUnit.SECONDS);
204+
.join();
207205
return events;
208-
} catch (ExecutionException e) {
206+
} catch (CompletionException e) {
209207
throw e.getCause();
210208
}
211209
}
@@ -236,6 +234,9 @@ public void request(long l) {
236234

237235
@Override
238236
public void cancel() {
237+
RuntimeException e = new RuntimeException();
238+
subscriber.onError(e);
239+
value.onError(e);
239240
}
240241
}));
241242
return cf;

utils/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,6 @@
8484
<artifactId>commons-io</artifactId>
8585
<scope>test</scope>
8686
</dependency>
87-
<dependency>
88-
<groupId>org.reactivestreams</groupId>
89-
<artifactId>reactive-streams-tck</artifactId>
90-
<scope>test</scope>
91-
</dependency>
9287
</dependencies>
9388

9489
<build>

utils/src/main/java/software/amazon/awssdk/utils/async/DelegatingSubscriber.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515

1616
package software.amazon.awssdk.utils.async;
1717

18-
import java.util.concurrent.atomic.AtomicBoolean;
1918
import org.reactivestreams.Subscriber;
2019
import org.reactivestreams.Subscription;
2120
import software.amazon.awssdk.annotations.SdkProtectedApi;
2221

2322
@SdkProtectedApi
2423
public abstract class DelegatingSubscriber<T, U> implements Subscriber<T> {
24+
2525
protected final Subscriber<? super U> subscriber;
26-
private final AtomicBoolean complete = new AtomicBoolean(false);
2726

2827
protected DelegatingSubscriber(Subscriber<? super U> subscriber) {
2928
this.subscriber = subscriber;
@@ -36,15 +35,12 @@ public void onSubscribe(Subscription subscription) {
3635

3736
@Override
3837
public void onError(Throwable throwable) {
39-
if (complete.compareAndSet(false, true)) {
40-
subscriber.onError(throwable);
41-
}
38+
subscriber.onError(throwable);
4239
}
4340

4441
@Override
4542
public void onComplete() {
46-
if (complete.compareAndSet(false, true)) {
47-
subscriber.onComplete();
48-
}
43+
subscriber.onComplete();
4944
}
45+
5046
}

utils/src/main/java/software/amazon/awssdk/utils/async/EventListeningSubscriber.java

Lines changed: 0 additions & 91 deletions
This file was deleted.

0 commit comments

Comments
 (0)