Skip to content

Change A2: Use fixed-length byte array rather than ByteArrayOutputStream #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: change-A1_initialise-ByteArrayOutputStream-with-required-capacity
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static com.madgag.aws.sdk.async.responsebytes.awssdk.utils.BinaryUtilsAlternative.copyBytes;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;

/**
Expand Down Expand Up @@ -65,26 +66,27 @@ public void onResponse(ResponseT response) {

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
ByteArrayOutputStream baos =
knownSize.map(f -> new ByteArrayOutputStream(f.apply(response))).orElse(new ByteArrayOutputStream());
publisher.subscribe(new BaosSubscriber(cf, baos));
ByteStore byteStore =
knownSize.<ByteStore>map(f -> new KnownLengthStore(f.apply(response))).orElseGet(BaosStore::new);
publisher.subscribe(new ByteSubscriber(cf, byteStore));
}

@Override
public void exceptionOccurred(Throwable throwable) {
cf.completeExceptionally(throwable);
}

static class BaosSubscriber implements Subscriber<ByteBuffer> {

static class ByteSubscriber implements Subscriber<ByteBuffer> {
private final CompletableFuture<byte[]> resultFuture;

private ByteArrayOutputStream baos;
private ByteStore byteStore;

private Subscription subscription;

BaosSubscriber(CompletableFuture<byte[]> resultFuture, ByteArrayOutputStream baos) {
ByteSubscriber(CompletableFuture<byte[]> resultFuture, ByteStore byteStore) {
this.resultFuture = resultFuture;
this.baos = baos;
this.byteStore = byteStore;
}

@Override
Expand All @@ -99,19 +101,54 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(ByteBuffer byteBuffer) {
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
byteStore.append(byteBuffer);
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
baos = null;
byteStore = null;
resultFuture.completeExceptionally(throwable);
}

@Override
public void onComplete() {
resultFuture.complete(baos.toByteArray());
resultFuture.complete(byteStore.toByteArray());
}
}

interface ByteStore {
void append(ByteBuffer byteBuffer);
byte[] toByteArray();
}

static class BaosStore implements ByteStore {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();

public void append(ByteBuffer byteBuffer) {
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
}

public byte[] toByteArray() {
return baos.toByteArray();
}
}

static class KnownLengthStore implements ByteStore {
private final byte[] byteArray;
private int offset = 0;

KnownLengthStore(int contentSize) {
System.out.println("We know the size is "+contentSize);
this.byteArray = new byte[contentSize];
}

public void append(ByteBuffer byteBuffer) {
offset += copyBytes(byteBuffer, byteArray, offset);
}

public byte[] toByteArray() {
return byteArray;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.madgag.aws.sdk.async.responsebytes.awssdk.utils;

import java.nio.ByteBuffer;

public class BinaryUtilsAlternative {
/**
* This behaves identically to {@link software.amazon.awssdk.utils.BinaryUtils#copyBytesFrom(ByteBuffer)}, except
* that the bytes are copied to the supplied destination array, at the supplied destination offset.
*/
public static int copyBytes(ByteBuffer bb, byte[] dest, int destOffset) {
if (bb == null) {
return 0;
}

int remaining = bb.remaining();
if (bb.hasArray()) {
System.arraycopy(bb.array(), bb.arrayOffset() + bb.position(), dest, destOffset, remaining);
} else {
bb.asReadOnlyBuffer().get(dest, destOffset, remaining);
}
return remaining;
}
}