diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java index 64565a62a204..7bf28fb30e39 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java @@ -19,8 +19,10 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import java.util.function.Function; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.core.FileTransformerConfiguration; import software.amazon.awssdk.core.ResponseBytes; @@ -202,7 +204,11 @@ static AsyncResponseTransformer toFile( * @return AsyncResponseTransformer instance. */ static AsyncResponseTransformer> toBytes() { - return new ByteArrayAsyncResponseTransformer<>(); + return new ByteArrayAsyncResponseTransformer<>(Optional.empty()); + } + + static AsyncResponseTransformer> toBytes(Function f) { + return new ByteArrayAsyncResponseTransformer<>(Optional.of(f)); } /** diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java index 90d587cd5a36..bb06ecf8edc8 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java @@ -15,11 +15,14 @@ package software.amazon.awssdk.core.internal.async; +import static software.amazon.awssdk.utils.BinaryUtils.copyBytes; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -41,13 +44,18 @@ public final class ByteArrayAsyncResponseTransformer implements AsyncResponseTransformer> { + private final Optional> knownSize; private volatile CompletableFuture cf; private volatile ResponseT response; + public ByteArrayAsyncResponseTransformer(Optional> knownSize) { + this.knownSize = knownSize; + } + @Override public CompletableFuture> prepare() { cf = new CompletableFuture<>(); - return cf.thenApply(arr -> ResponseBytes.fromByteArray(response, arr)); + return cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr)); } @Override @@ -57,7 +65,9 @@ public void onResponse(ResponseT response) { @Override public void onStream(SdkPublisher publisher) { - publisher.subscribe(new BaosSubscriber(cf)); + ByteStore byteStore = + knownSize.map(f -> new KnownLengthStore(f.apply(response))).orElseGet(BaosStore::new); + publisher.subscribe(new ByteSubscriber(cf, byteStore)); } @Override @@ -65,15 +75,17 @@ public void exceptionOccurred(Throwable throwable) { cf.completeExceptionally(throwable); } - static class BaosSubscriber implements Subscriber { + + static class ByteSubscriber implements Subscriber { private final CompletableFuture resultFuture; - private ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private ByteStore byteStore; private Subscription subscription; - BaosSubscriber(CompletableFuture resultFuture) { + ByteSubscriber(CompletableFuture resultFuture, ByteStore byteStore) { this.resultFuture = resultFuture; + this.byteStore = byteStore; } @Override @@ -88,19 +100,54 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuffer byteBuffer) { - invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer))); + byteStore.append(byteBuffer); subscription.request(1); } @Override public void onError(Throwable throwable) { - baos = null; + byteStore = null; resultFuture.completeExceptionally(throwable); } @Override public void onComplete() { - resultFuture.complete(baos.toByteArray()); + resultFuture.complete(byteStore.toByteArray()); + } + } + + interface ByteStore { + void append(ByteBuffer byteBuffer); + + byte[] toByteArray(); + } + + static class BaosStore implements ByteStore { + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + public void append(ByteBuffer byteBuffer) { + invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer))); + } + + public byte[] toByteArray() { + return baos.toByteArray(); + } + } + + static class KnownLengthStore implements ByteStore { + private final byte[] byteArray; + private int offset = 0; + + KnownLengthStore(int contentSize) { + this.byteArray = new byte[contentSize]; + } + + public void append(ByteBuffer byteBuffer) { + offset += copyBytes(byteBuffer, byteArray, offset); + } + + public byte[] toByteArray() { + return byteArray; } } } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/AsyncS3ResponseTransformer.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/AsyncS3ResponseTransformer.java new file mode 100644 index 000000000000..ced9917305bd --- /dev/null +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/AsyncS3ResponseTransformer.java @@ -0,0 +1,11 @@ +package software.amazon.awssdk.services.s3; + +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +public class AsyncS3ResponseTransformer { + public static AsyncResponseTransformer> toBytes() { + return AsyncResponseTransformer.toBytes(r -> r.contentLength().intValue()); + } +} diff --git a/test/sdk-native-image-test/src/main/java/software/amazon/awssdk/nativeimagetest/S3TestRunner.java b/test/sdk-native-image-test/src/main/java/software/amazon/awssdk/nativeimagetest/S3TestRunner.java index 4290edd7a87d..d34211fd0f08 100644 --- a/test/sdk-native-image-test/src/main/java/software/amazon/awssdk/nativeimagetest/S3TestRunner.java +++ b/test/sdk-native-image-test/src/main/java/software/amazon/awssdk/nativeimagetest/S3TestRunner.java @@ -19,8 +19,8 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.AsyncS3ResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; @@ -54,7 +54,7 @@ public void runTests() { requestBody); s3NettyClient.getObject(b -> b.bucket(BUCKET_NAME).key(KEY), - AsyncResponseTransformer.toBytes()).join(); + AsyncS3ResponseTransformer.toBytes()).join(); } finally { if (bucketResponse != null) { diff --git a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java index e8d9271f3b65..ebfdc1c344ec 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java @@ -304,4 +304,22 @@ public static byte[] copyBytesFrom(ByteBuffer bb, int readLimit) { return dst; } + + /** + * This behaves identically to {@link software.amazon.awssdk.utils.BinaryUtils#copyBytesFrom(ByteBuffer)}, except + * that the bytes are copied to the supplied destination array, at the supplied destination offset. + */ + public static int copyBytes(ByteBuffer bb, byte[] dest, int destOffset) { + if (bb == null) { + return 0; + } + + int remaining = bb.remaining(); + if (bb.hasArray()) { + System.arraycopy(bb.array(), bb.arrayOffset() + bb.position(), dest, destOffset, remaining); + } else { + bb.asReadOnlyBuffer().get(dest, destOffset, remaining); + } + return remaining; + } }