Skip to content

Commit ee3b026

Browse files
authored
Cancel sub on finish and exception (#2619)
1 parent 911e9ef commit ee3b026

File tree

2 files changed

+74
-2
lines changed

2 files changed

+74
-2
lines changed

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/RequestDataSupplierAdapter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.reactivestreams.Subscriber;
3131
import org.reactivestreams.Subscription;
3232
import software.amazon.awssdk.annotations.SdkInternalApi;
33+
import software.amazon.awssdk.crt.CrtRuntimeException;
3334
import software.amazon.awssdk.crt.http.HttpHeader;
3435
import software.amazon.awssdk.http.SdkHttpResponse;
3536
import software.amazon.awssdk.utils.Logger;
@@ -49,8 +50,7 @@ public final class RequestDataSupplierAdapter implements RequestDataSupplier {
4950

5051
private final Publisher<ByteBuffer> bodyPublisher;
5152

52-
// Not volatile, we synchronize on the subscriptionQueue
53-
private Subscription subscription;
53+
private volatile Subscription subscription;
5454

5555
// TODO: not volatile since it's read and written only by CRT thread(s). Need to
5656
// ensure that CRT actually ensures consistency across their threads...
@@ -175,6 +175,20 @@ public boolean resetPosition() {
175175
return true;
176176
}
177177

178+
@Override
179+
public void onException(CrtRuntimeException e) {
180+
if (subscription != null) {
181+
subscription.cancel();
182+
}
183+
}
184+
185+
@Override
186+
public void onFinished() {
187+
if (subscription != null) {
188+
subscription.cancel();
189+
}
190+
}
191+
178192
private Event takeFirstEvent() {
179193
try {
180194
return eventBuffer.takeFirst();

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/RequestDataSupplierAdapterTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,24 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1919
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.verify;
2022

2123
import io.reactivex.Flowable;
2224
import java.io.IOException;
2325
import java.nio.ByteBuffer;
2426
import java.util.Arrays;
2527
import java.util.List;
28+
import java.util.Optional;
29+
import java.util.concurrent.CompletableFuture;
2630
import java.util.stream.Collectors;
2731
import java.util.stream.Stream;
2832
import org.junit.Test;
2933
import org.reactivestreams.Publisher;
3034
import org.reactivestreams.Subscriber;
35+
import org.reactivestreams.Subscription;
3136
import software.amazon.awssdk.core.async.AsyncRequestBody;
37+
import software.amazon.awssdk.crt.CrtRuntimeException;
3238

3339
public class RequestDataSupplierAdapterTest {
3440

@@ -156,4 +162,56 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
156162
assertThat(readBuffer).isEqualTo(expectedBufferContent);
157163
}
158164
}
165+
166+
@Test
167+
public void onException_cancelsSubscription() {
168+
Subscription subscription = mock(Subscription.class);
169+
170+
AsyncRequestBody requestBody = new AsyncRequestBody() {
171+
@Override
172+
public Optional<Long> contentLength() {
173+
return Optional.empty();
174+
}
175+
176+
@Override
177+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
178+
subscriber.onSubscribe(subscription);
179+
}
180+
};
181+
182+
RequestDataSupplierAdapter adapter = new RequestDataSupplierAdapter(requestBody);
183+
184+
// getRequestBytes() triggers a subscribe() on the publisher
185+
adapter.getRequestBytes(ByteBuffer.allocate(0));
186+
187+
adapter.onException(new CrtRuntimeException("error"));
188+
189+
verify(subscription).cancel();
190+
}
191+
192+
@Test
193+
public void onFinished_cancelsSubscription() {
194+
Subscription subscription = mock(Subscription.class);
195+
196+
AsyncRequestBody requestBody = new AsyncRequestBody() {
197+
@Override
198+
public Optional<Long> contentLength() {
199+
return Optional.empty();
200+
}
201+
202+
@Override
203+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
204+
subscriber.onSubscribe(subscription);
205+
}
206+
};
207+
208+
RequestDataSupplierAdapter adapter = new RequestDataSupplierAdapter(requestBody);
209+
210+
// getRequestBytes() triggers a subscribe() on the publisher
211+
adapter.getRequestBytes(ByteBuffer.allocate(0));
212+
213+
adapter.onFinished();
214+
215+
verify(subscription).cancel();
216+
}
159217
}

0 commit comments

Comments
 (0)