Skip to content

Commit 6969fbe

Browse files
authored
Fixed an issue in that could cause NPE to be thrown when close and onSubscribed get invoked concurrently. (#4460)
1 parent 9eb921e commit 6969fbe

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fixed an issue in `InputStreamSubscriber` that could cause NPE to be thrown when close and onSubscribed get invoked concurrently. See [#4081](https://github.com/aws/aws-sdk-java-v2/issues/4081)"
6+
}

utils/src/main/java/software/amazon/awssdk/utils/async/InputStreamSubscriber.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public final class InputStreamSubscriber extends InputStream implements Subscrib
4545
private final AtomicBoolean drainingCallQueue = new AtomicBoolean(false);
4646
private final Queue<QueueEntry> callQueue = new ConcurrentLinkedQueue<>();
4747

48+
private final Object subscribeLock = new Object();
49+
4850
private Subscription subscription;
4951

5052
private boolean done = false;
@@ -55,13 +57,15 @@ public InputStreamSubscriber() {
5557

5658
@Override
5759
public void onSubscribe(Subscription s) {
58-
if (!inputStreamState.compareAndSet(State.UNINITIALIZED, State.READABLE)) {
59-
close();
60-
return;
61-
}
60+
synchronized (subscribeLock) {
61+
if (!inputStreamState.compareAndSet(State.UNINITIALIZED, State.READABLE)) {
62+
close();
63+
return;
64+
}
6265

63-
this.subscription = new CancelWatcher(s);
64-
delegate.onSubscribe(subscription);
66+
this.subscription = new CancelWatcher(s);
67+
delegate.onSubscribe(subscription);
68+
}
6569
}
6670

6771
@Override
@@ -120,12 +124,14 @@ public int read(byte[] bytes, int off, int len) {
120124

121125
@Override
122126
public void close() {
123-
if (inputStreamState.compareAndSet(State.UNINITIALIZED, State.CLOSED)) {
124-
delegate.onSubscribe(new NoOpSubscription());
125-
delegate.onError(new CancellationException());
126-
} else if (inputStreamState.compareAndSet(State.READABLE, State.CLOSED)) {
127-
subscription.cancel();
128-
onError(new CancellationException());
127+
synchronized (subscribeLock) {
128+
if (inputStreamState.compareAndSet(State.UNINITIALIZED, State.CLOSED)) {
129+
delegate.onSubscribe(new NoOpSubscription());
130+
delegate.onError(new CancellationException());
131+
} else if (inputStreamState.compareAndSet(State.READABLE, State.CLOSED)) {
132+
subscription.cancel();
133+
onError(new CancellationException());
134+
}
129135
}
130136
}
131137

0 commit comments

Comments
 (0)