Skip to content

Commit 85a1fd7

Browse files
authored
Guard against re-subscription in SplittingPublisher (#4253)
* guard against re-subscription in SplittingPublisher * fix checkstyle * Error msg
1 parent 28c126d commit 85a1fd7

File tree

1 file changed

+16
-6
lines changed

1 file changed

+16
-6
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import software.amazon.awssdk.annotations.SdkInternalApi;
2626
import software.amazon.awssdk.core.async.AsyncRequestBody;
2727
import software.amazon.awssdk.core.async.SdkPublisher;
28+
import software.amazon.awssdk.core.exception.NonRetryableException;
29+
import software.amazon.awssdk.core.internal.util.NoopSubscription;
2830
import software.amazon.awssdk.utils.Logger;
2931
import software.amazon.awssdk.utils.Validate;
3032
import software.amazon.awssdk.utils.async.SimplePublisher;
@@ -48,8 +50,8 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
4850
private final long bufferSizeInBytes;
4951

5052
private SplittingPublisher(Builder builder) {
51-
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
52-
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
53+
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
54+
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
5355
this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes;
5456
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
5557

@@ -234,13 +236,14 @@ private Long totalDataRemaining() {
234236
private final class DownstreamBody implements AsyncRequestBody {
235237

236238
/**
237-
* The maximum length of the content this AsyncRequestBody can hold.
238-
* If the upstream content length is known, this is the same as totalLength
239+
* The maximum length of the content this AsyncRequestBody can hold. If the upstream content length is known, this is
240+
* the same as totalLength
239241
*/
240242
private final long maxLength;
241243
private final Long totalLength;
242244
private final SimplePublisher<ByteBuffer> delegate = new SimplePublisher<>();
243245
private final int chunkNumber;
246+
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
244247
private volatile long transferredLength = 0;
245248

246249
private DownstreamBody(boolean contentLengthKnown, long maxLength, int chunkNumber) {
@@ -282,7 +285,14 @@ public void error(Throwable error) {
282285

283286
@Override
284287
public void subscribe(Subscriber<? super ByteBuffer> s) {
285-
delegate.subscribe(s);
288+
if (subscribeCalled.compareAndSet(false, true)) {
289+
delegate.subscribe(s);
290+
} else {
291+
s.onSubscribe(new NoopSubscription(s));
292+
s.onError(NonRetryableException.create(
293+
"A retry was attempted, but AsyncRequestBody.split does not "
294+
+ "support retries."));
295+
}
286296
}
287297

288298
private void addDataBuffered(int length) {
@@ -293,7 +303,7 @@ private void addDataBuffered(int length) {
293303
}
294304
}
295305
}
296-
306+
297307
public static final class Builder {
298308
private AsyncRequestBody asyncRequestBody;
299309
private Long chunkSizeInBytes;

0 commit comments

Comments
 (0)