Skip to content

Commit d6b29d9

Browse files
committed
Fixed an issue where "Error thrown from TransformingAsyncResponseHandler#onError, ignoring" could be logged during an unrelated failure.
Fixes #1812
1 parent 3304935 commit d6b29d9

File tree

4 files changed

+59
-4
lines changed

4 files changed

+59
-4
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"description": "Fixed an issue where \"Error thrown from TransformingAsyncResponseHandler#onError, ignoring\" could be logged during an unrelated failure."
5+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/TransformingAsyncResponseHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package software.amazon.awssdk.core.internal.http;
1717

1818
import java.util.concurrent.CompletableFuture;
19+
import org.reactivestreams.Publisher;
1920
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.http.SdkHttpResponse;
2022
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
2123

2224
/**
@@ -29,8 +31,11 @@ public interface TransformingAsyncResponseHandler<ResultT> extends SdkAsyncHttpR
2931
/**
3032
* Return the future holding the transformed response.
3133
* <p>
32-
* This method is guaranteed to be called before the request is executed, and before {@link
33-
* SdkAsyncHttpResponseHandler#onHeaders(software.amazon.awssdk.http.SdkHttpResponse)} is signaled.
34+
* This method is guaranteed to be called before the request is executed, before {@link
35+
* #onHeaders(SdkHttpResponse)} and before {@link #onStream(Publisher)} is signaled.
36+
* <p>
37+
* Note: {@link #onError(Throwable)} *may* be invoked before this method is invoked, but it will
38+
* never be invoked in parallel with this method.
3439
*
3540
* @return The future holding the transformed response.
3641
*/

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/AsyncResponseHandler.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,26 @@ public void onStream(Publisher<ByteBuffer> publisher) {
7373

7474
@Override
7575
public void onError(Throwable err) {
76-
streamFuture.completeExceptionally(err);
76+
// Method assumes onError will be never be executed concurrently with prepare.
77+
if (streamFuture != null) {
78+
streamFuture.completeExceptionally(err);
79+
} else {
80+
CompletableFuture<ByteArrayOutputStream> streamFuture = new CompletableFuture<>();
81+
this.streamFuture = streamFuture;
82+
streamFuture.completeExceptionally(err);
83+
}
7784
}
7885

7986
@Override
8087
public CompletableFuture<T> prepare() {
81-
streamFuture = new CompletableFuture<>();
88+
// Method assumes onError will be never be executed concurrently with prepare.
89+
CompletableFuture<ByteArrayOutputStream> streamFuture = this.streamFuture;
90+
91+
if (streamFuture == null) {
92+
streamFuture = new CompletableFuture<>();
93+
this.streamFuture = streamFuture;
94+
}
95+
8296
return streamFuture.thenCompose(baos -> {
8397
ByteArrayInputStream content = new ByteArrayInputStream(baos.toByteArray());
8498
// Ignore aborts - we already have all of the content.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package software.amazon.awssdk.core.internal.http.async;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
import org.junit.Test;
7+
8+
public class AsyncResponseHandlerTest {
9+
@Test
10+
public void onErrorAfterPrepareForwardsErrorToFuture() {
11+
AsyncResponseHandler<Object> responseHandler = new AsyncResponseHandler<>(null, null, null);
12+
CompletableFuture<Object> future = responseHandler.prepare();
13+
14+
assertThat(future).isNotCompleted();
15+
16+
Throwable t = new Throwable();
17+
responseHandler.onError(t);
18+
19+
assertThat(future).hasFailedWithThrowableThat().isEqualTo(t);
20+
}
21+
22+
@Test
23+
public void onErrorBeforePrepareStoresExceptionForPrepare() {
24+
AsyncResponseHandler<Object> responseHandler = new AsyncResponseHandler<>(null, null, null);
25+
26+
Throwable t = new Throwable();
27+
responseHandler.onError(t);
28+
29+
assertThat(responseHandler.prepare()).hasFailedWithThrowableThat().isEqualTo(t);
30+
}
31+
}

0 commit comments

Comments
 (0)