diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProvider.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProvider.java index c881301cb693..2789ba19e921 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProvider.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProvider.java @@ -67,8 +67,9 @@ final class ClasspathSdkHttpServiceProvider implements SdkHttpServiceProvider @Override public Optional loadService() { - Queue impls = new PriorityQueue<>(Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass(), - Integer.MAX_VALUE))); + Queue impls = new PriorityQueue<>( + Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass().getName(), + Integer.MAX_VALUE))); Iterable iterable = () -> serviceLoader.loadServices(serviceClass); iterable.forEach(impl -> impls.add(impl)); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProviderTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProviderTest.java index 26264a79aac1..03e25c388625 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProviderTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProviderTest.java @@ -29,8 +29,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.core.SdkSystemSetting; -import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.http.SdkHttpService; import software.amazon.awssdk.http.apache.ApacheSdkHttpService; import software.amazon.awssdk.http.async.SdkAsyncHttpService; @@ -77,7 +75,7 @@ public void multipleSyncImplementationsFound_ReturnHighestPriority() { SdkHttpService mock = mock(SdkHttpService.class); when(serviceLoader.loadServices(SdkHttpService.class)) - .thenReturn(iteratorOf(apacheSdkHttpService, mock)); + .thenReturn(iteratorOf(mock, apacheSdkHttpService)); assertThat(provider.loadService()).contains(apacheSdkHttpService); SdkHttpService mock1 = mock(SdkHttpService.class); @@ -93,7 +91,7 @@ public void multipleAsyncImplementationsFound_ReturnHighestPriority() { SdkAsyncHttpService mock = mock(SdkAsyncHttpService.class); when(serviceLoader.loadServices(SdkAsyncHttpService.class)) - .thenReturn(iteratorOf(netty, mock)); + .thenReturn(iteratorOf(mock, netty)); assertThat(asyncProvider.loadService()).contains(netty); SdkAsyncHttpService mock1 = mock(SdkAsyncHttpService.class); diff --git a/http-client-spi/src/main/java/software/amazon/awssdk/http/HttpExecuteResponse.java b/http-client-spi/src/main/java/software/amazon/awssdk/http/HttpExecuteResponse.java index f8db3f2cd250..8f183be3e408 100644 --- a/http-client-spi/src/main/java/software/amazon/awssdk/http/HttpExecuteResponse.java +++ b/http-client-spi/src/main/java/software/amazon/awssdk/http/HttpExecuteResponse.java @@ -38,7 +38,7 @@ public SdkHttpResponse httpResponse() { } /** - * /** Get the {@link AbortableInputStream} associated with this response. + * Get the {@link AbortableInputStream} associated with this response. * *

Always close the "responseBody" input stream to release the underlying HTTP connection. * Even for error responses, the SDK creates an input stream for reading error data. It is essential to close the input stream diff --git a/http-clients/aws-crt-client/pom.xml b/http-clients/aws-crt-client/pom.xml index d5217348d0b3..d315406ecfe8 100644 --- a/http-clients/aws-crt-client/pom.xml +++ b/http-clients/aws-crt-client/pom.xml @@ -151,12 +151,6 @@ ${awsjavasdk.version} test - - software.amazon.awssdk - s3 - ${awsjavasdk.version} - test - software.amazon.awssdk auth diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index d48f0da5acb1..146233beb189 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -24,7 +24,6 @@ import static software.amazon.awssdk.utils.NumericUtils.saturatedCast; import java.io.IOException; -import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.CompletableFuture; import javax.net.ssl.SSLHandshakeException; @@ -33,25 +32,19 @@ import software.amazon.awssdk.crt.http.HttpClientConnection; import software.amazon.awssdk.crt.http.HttpClientConnectionManager; import software.amazon.awssdk.crt.http.HttpException; -import software.amazon.awssdk.crt.http.HttpHeader; -import software.amazon.awssdk.crt.http.HttpHeaderBlock; import software.amazon.awssdk.crt.http.HttpManagerMetrics; import software.amazon.awssdk.crt.http.HttpRequest; -import software.amazon.awssdk.crt.http.HttpStream; import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; -import software.amazon.awssdk.http.AbortableInputStream; -import software.amazon.awssdk.http.HttpStatusFamily; import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; +import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; import software.amazon.awssdk.metrics.MetricCollector; import software.amazon.awssdk.metrics.NoOpMetricCollector; import software.amazon.awssdk.utils.Logger; -import software.amazon.awssdk.utils.async.InputStreamSubscriber; -import software.amazon.awssdk.utils.async.SimplePublisher; @SdkInternalApi public final class CrtRequestExecutor { @@ -201,87 +194,6 @@ private void executeRequest(CrtAsyncRequestContext executionContext, } } - private static final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler { - private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder(); - private volatile InputStreamSubscriber inputStreamSubscriber; - private volatile SimplePublisher simplePublisher; - - private final CompletableFuture requestCompletionFuture; - private final HttpClientConnection crtConn; - - InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn, - CompletableFuture requestCompletionFuture) { - this.crtConn = crtConn; - this.requestCompletionFuture = requestCompletionFuture; - } - - @Override - public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, - HttpHeader[] nextHeaders) { - if (blockType == HttpHeaderBlock.MAIN.getValue()) { - for (HttpHeader h : nextHeaders) { - responseBuilder.appendHeader(h.getName(), h.getValue()); - } - } - - if (responseBuilder.statusCode() == 0) { - responseBuilder.statusCode(responseStatusCode); - } - - } - - @Override - public void onResponseComplete(HttpStream stream, int errorCode) { - // always close the connection on a 5XX response code. - if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) { - crtConn.shutdown(); - } - - if (errorCode == 0) { - requestCompletionFuture.complete(responseBuilder.build()); - } else { - Throwable toThrow = - wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); - requestCompletionFuture.completeExceptionally(toThrow); - } - stream.close(); - crtConn.close(); - - if (simplePublisher != null) { - simplePublisher.complete(); - } - } - - @Override - public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { - - // Per the client conformance tests, unless there's an actual body the response stream must be null. - // This is function is only called in one of the CRT's IO threads, or is otherwise sequentially ordered. - // It is volatile in the declaration for thread visibility reasons, NOT concurrency control. - if (inputStreamSubscriber == null && simplePublisher == null) { - inputStreamSubscriber = new InputStreamSubscriber(); - simplePublisher = new SimplePublisher<>(); - simplePublisher.subscribe(inputStreamSubscriber); - responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber)); - } - - CompletableFuture writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn)); - - writeFuture.whenComplete((result, failure) -> { - if (failure != null) { - requestCompletionFuture.completeExceptionally(failure); - return; - } - - // increment the window upon buffer consumption. - stream.incrementWindow(bodyBytesIn.length); - }); - - // the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing. - return 0; - } - } - private void executeRequest(CrtRequestContext executionContext, CompletableFuture requestFuture, HttpClientConnection crtConn) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index 816cb6a2c902..b2865ac49531 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -30,11 +30,14 @@ import software.amazon.awssdk.http.HttpStatusFamily; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.async.SimplePublisher; /** + * Response handler adaptor for {@link AwsCrtAsyncHttpClient}. + *

* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods */ @SdkInternalApi diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java new file mode 100644 index 000000000000..b1bf4462be89 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -0,0 +1,136 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.response; + +import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.http.HttpClientConnection; +import software.amazon.awssdk.crt.http.HttpException; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpHeaderBlock; +import software.amazon.awssdk.crt.http.HttpStream; +import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.HttpStatusFamily; +import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.http.crt.AwsCrtHttpClient; +import software.amazon.awssdk.utils.async.InputStreamSubscriber; +import software.amazon.awssdk.utils.async.SimplePublisher; + +/** + * Response handler adaptor for {@link AwsCrtHttpClient}. + */ +@SdkInternalApi +public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler { + private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder(); + private volatile InputStreamSubscriber inputStreamSubscriber; + private final SimplePublisher simplePublisher = new SimplePublisher<>(); + + private final CompletableFuture requestCompletionFuture; + private final HttpClientConnection crtConn; + + public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn, + CompletableFuture requestCompletionFuture) { + this.crtConn = crtConn; + this.requestCompletionFuture = requestCompletionFuture; + } + + @Override + public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + if (blockType == HttpHeaderBlock.MAIN.getValue()) { + for (HttpHeader h : nextHeaders) { + responseBuilder.appendHeader(h.getName(), h.getValue()); + } + } + + responseBuilder.statusCode(responseStatusCode); + } + + @Override + public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { + if (inputStreamSubscriber == null) { + inputStreamSubscriber = new InputStreamSubscriber(); + simplePublisher.subscribe(inputStreamSubscriber); + // For response with a payload, we need to complete the future here to allow downstream to retrieve the data from + // the stream directly. + responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber)); + requestCompletionFuture.complete(responseBuilder.build()); + } + + CompletableFuture writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn)); + + if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) { + // Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT. + return bodyBytesIn.length; + } + + writeFuture.whenComplete((result, failure) -> { + if (failure != null) { + failFutureAndCloseConnection(stream, failure); + return; + } + + // increment the window upon buffer consumption. + stream.incrementWindow(bodyBytesIn.length); + }); + + // the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing. + return 0; + } + + @Override + public void onResponseComplete(HttpStream stream, int errorCode) { + if (errorCode == CRT.AWS_CRT_SUCCESS) { + onSuccessfulResponseComplete(stream); + } else { + onFailedResponseComplete(stream, errorCode); + } + } + + private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) { + requestCompletionFuture.completeExceptionally(failure); + crtConn.shutdown(); + crtConn.close(); + stream.close(); + } + + private void onFailedResponseComplete(HttpStream stream, int errorCode) { + Throwable toThrow = + wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); + + simplePublisher.error(toThrow); + failFutureAndCloseConnection(stream, toThrow); + } + + private void onSuccessfulResponseComplete(HttpStream stream) { + // always close the connection on a 5XX response code. + if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) { + crtConn.shutdown(); + } + + // For response without a payload, for example, S3 PutObjectResponse, we need to complete the future + // in onResponseComplete callback since onResponseBody will never be invoked. + requestCompletionFuture.complete(responseBuilder.build()); + simplePublisher.complete(); + crtConn.close(); + stream.close(); + } +} diff --git a/services/s3/pom.xml b/services/s3/pom.xml index 01382997897c..c02dd247a92b 100644 --- a/services/s3/pom.xml +++ b/services/s3/pom.xml @@ -174,6 +174,12 @@ ${awsjavasdk.version} test + + software.amazon.awssdk + aws-crt-client + ${awsjavasdk.version} + test + io.netty netty-transport diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/S3IntegrationTestBase.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/S3IntegrationTestBase.java index 03cf42afe5df..fcb037a578ce 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/S3IntegrationTestBase.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/S3IntegrationTestBase.java @@ -17,12 +17,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import org.junit.AfterClass; import org.junit.BeforeClass; import software.amazon.awssdk.core.ClientType; import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; import software.amazon.awssdk.crt.Log; +import software.amazon.awssdk.http.crt.AwsCrtHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.model.BucketLocationConstraint; import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; @@ -43,6 +45,8 @@ public class S3IntegrationTestBase extends AwsTestBase { */ protected static S3Client s3; + protected static S3Client s3WithCrtHttpClient; + protected static S3AsyncClient s3Async; /** @@ -51,9 +55,18 @@ public class S3IntegrationTestBase extends AwsTestBase { */ @BeforeClass public static void setUp() throws Exception { + System.setProperty("aws.crt.debugnative", "true"); Log.initLoggingToStdout(Log.LogLevel.Warn); s3 = s3ClientBuilder().build(); s3Async = s3AsyncClientBuilder().build(); + s3WithCrtHttpClient = s3ClientBuilderWithCrtHttpClient().build(); + } + + @AfterClass + public static void cleanUpResources() { + s3.close(); + s3Async.close(); + s3WithCrtHttpClient.close(); } protected static S3ClientBuilder s3ClientBuilder() { @@ -64,6 +77,15 @@ protected static S3ClientBuilder s3ClientBuilder() { new UserAgentVerifyingExecutionInterceptor("Apache", ClientType.SYNC))); } + protected static S3ClientBuilder s3ClientBuilderWithCrtHttpClient() { + return S3Client.builder() + .region(DEFAULT_REGION) + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .httpClientBuilder(AwsCrtHttpClient.builder()) + .overrideConfiguration(o -> o.addExecutionInterceptor( + new UserAgentVerifyingExecutionInterceptor("AwsCommonRuntime", ClientType.SYNC))); + } + protected static S3AsyncClientBuilder s3AsyncClientBuilder() { return S3AsyncClient.builder() .region(DEFAULT_REGION) diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java new file mode 100644 index 000000000000..621ad2a33fab --- /dev/null +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java @@ -0,0 +1,81 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.crthttpclient; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.utils.ChecksumUtils; +import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.utils.Md5Utils; + +public class S3WithCrtHttpClientsIntegrationTests extends S3IntegrationTestBase { + private static final String TEST_BUCKET = temporaryBucketName(S3WithCrtHttpClientsIntegrationTests.class); + private static final String TEST_KEY = "2mib_file.dat"; + private static final int OBJ_SIZE = 2 * 1024 * 1024; + + private static RandomTempFile testFile; + + @BeforeAll + public static void setup() throws Exception { + S3IntegrationTestBase.setUp(); + S3IntegrationTestBase.createBucket(TEST_BUCKET); + testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE); + s3WithCrtHttpClient.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), RequestBody.fromFile(testFile.toPath())); + } + + @AfterAll + public static void teardown() throws IOException { + S3IntegrationTestBase.deleteBucketAndAllContents(TEST_BUCKET); + Files.delete(testFile.toPath()); + s3WithCrtHttpClient.close(); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + void getObject_toResponseStream_objectSentCorrectly() throws Exception { + ResponseInputStream objContent = + s3WithCrtHttpClient.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + + byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath())); + + assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); + } + + @Test + void getObject_toFile_objectSentCorrectly() throws Exception { + Path destination = RandomTempFile.randomUncreatedFile().toPath(); + GetObjectResponse response = s3WithCrtHttpClient.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toFile(destination)); + + assertThat(Md5Utils.md5AsBase64(destination.toFile())).isEqualTo(Md5Utils.md5AsBase64(testFile)); + } +} diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/InputStreamSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/InputStreamSubscriber.java index a33cc9b0b994..95f697b6d689 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/InputStreamSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/InputStreamSubscriber.java @@ -82,7 +82,10 @@ public void onError(Throwable t) { @Override public void onComplete() { - callQueue.add(new QueueEntry(true, delegate::onComplete)); + callQueue.add(new QueueEntry(true, () -> { + delegate.onComplete(); + inputStreamState.set(State.STREAMING_DONE); + })); drainQueue(); } @@ -125,6 +128,11 @@ public int read(byte[] bytes, int off, int len) { @Override public void close() { synchronized (subscribeLock) { + // If it is done, no-op + if (inputStreamState.get().equals(State.STREAMING_DONE)) { + return; + } + if (inputStreamState.compareAndSet(State.UNINITIALIZED, State.CLOSED)) { delegate.onSubscribe(new NoOpSubscription()); delegate.onError(new CancellationException()); @@ -153,6 +161,9 @@ private void doDrainQueue() { while (true) { QueueEntry entry = callQueue.poll(); if (done || entry == null) { + if (done) { + inputStreamState.set(State.STREAMING_DONE); + } return; } done = entry.terminal; @@ -173,7 +184,8 @@ private QueueEntry(boolean terminal, Runnable call) { private enum State { UNINITIALIZED, READABLE, - CLOSED + CLOSED, + STREAMING_DONE } private final class CancelWatcher implements Subscription {