diff --git a/src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/core/internal/async/ByteArrayAsyncResponseTransformerAlternative.java b/src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/core/internal/async/ByteArrayAsyncResponseTransformerAlternative.java index 3f2a655..58645e5 100644 --- a/src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/core/internal/async/ByteArrayAsyncResponseTransformerAlternative.java +++ b/src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/core/internal/async/ByteArrayAsyncResponseTransformerAlternative.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import static com.madgag.aws.sdk.async.responsebytes.awssdk.utils.BinaryUtilsAlternative.copyBytes; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; /** @@ -65,9 +66,9 @@ public void onResponse(ResponseT response) { @Override public void onStream(SdkPublisher publisher) { - ByteArrayOutputStream baos = - knownSize.map(f -> new ByteArrayOutputStream(f.apply(response))).orElse(new ByteArrayOutputStream()); - publisher.subscribe(new BaosSubscriber(cf, baos)); + ByteStore byteStore = + knownSize.map(f -> new KnownLengthStore(f.apply(response))).orElseGet(BaosStore::new); + publisher.subscribe(new ByteSubscriber(cf, byteStore)); } @Override @@ -75,16 +76,17 @@ public void exceptionOccurred(Throwable throwable) { cf.completeExceptionally(throwable); } - static class BaosSubscriber implements Subscriber { + + static class ByteSubscriber implements Subscriber { private final CompletableFuture resultFuture; - private ByteArrayOutputStream baos; + private ByteStore byteStore; private Subscription subscription; - BaosSubscriber(CompletableFuture resultFuture, ByteArrayOutputStream baos) { + ByteSubscriber(CompletableFuture resultFuture, ByteStore byteStore) { this.resultFuture = resultFuture; - this.baos = baos; + this.byteStore = byteStore; } @Override @@ -99,19 +101,54 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuffer byteBuffer) { - invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer))); + byteStore.append(byteBuffer); subscription.request(1); } @Override public void onError(Throwable throwable) { - baos = null; + byteStore = null; resultFuture.completeExceptionally(throwable); } @Override public void onComplete() { - resultFuture.complete(baos.toByteArray()); + resultFuture.complete(byteStore.toByteArray()); + } + } + + interface ByteStore { + void append(ByteBuffer byteBuffer); + byte[] toByteArray(); + } + + static class BaosStore implements ByteStore { + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + public void append(ByteBuffer byteBuffer) { + invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer))); + } + + public byte[] toByteArray() { + return baos.toByteArray(); + } + } + + static class KnownLengthStore implements ByteStore { + private final byte[] byteArray; + private int offset = 0; + + KnownLengthStore(int contentSize) { + System.out.println("We know the size is "+contentSize); + this.byteArray = new byte[contentSize]; + } + + public void append(ByteBuffer byteBuffer) { + offset += copyBytes(byteBuffer, byteArray, offset); + } + + public byte[] toByteArray() { + return byteArray; } } } diff --git a/src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/utils/BinaryUtilsAlternative.java b/src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/utils/BinaryUtilsAlternative.java new file mode 100644 index 0000000..78cdbd0 --- /dev/null +++ b/src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/utils/BinaryUtilsAlternative.java @@ -0,0 +1,23 @@ +package com.madgag.aws.sdk.async.responsebytes.awssdk.utils; + +import java.nio.ByteBuffer; + +public class BinaryUtilsAlternative { + /** + * This behaves identically to {@link software.amazon.awssdk.utils.BinaryUtils#copyBytesFrom(ByteBuffer)}, except + * that the bytes are copied to the supplied destination array, at the supplied destination offset. + */ + public static int copyBytes(ByteBuffer bb, byte[] dest, int destOffset) { + if (bb == null) { + return 0; + } + + int remaining = bb.remaining(); + if (bb.hasArray()) { + System.arraycopy(bb.array(), bb.arrayOffset() + bb.position(), dest, destOffset, remaining); + } else { + bb.asReadOnlyBuffer().get(dest, destOffset, remaining); + } + return remaining; + } +}