Skip to content

Commit d3899b5

Browse files
committed
Use fixed-length byte array rather than ByteArrayOutputStream
Reading out of a `ByteArrayOutputStream` ---------------------------------------- There's no properly supported way to get the ultimate cumulative byte array stored in a `ByteArrayOutputStream` without doing an array copy - that's what `ByteArrayOutputStream.toByteArray()` does, and that means: * While copying, the JVM heap must briefly hold both the old & new byte arrays - roughly speaking, doubling the memory requirements. * Copying the bytes from one array to another takes a little bit of CPU time (obviously this varies: `System.arraycopy()` for 40MB of bytes takes ~2ms on my M1 machine). Writing into a `ByteArrayOutputStream` -------------------------------------- The new `KnownLengthStore` implementation does 1 array copy for a write. The `BaosStore` implementation: * Allocates a new array 'X' with the size of the new incoming data chunk * Copies data into that array 'X' * Copies array 'X' into the instance's cumulative byte array
1 parent 216249f commit d3899b5

File tree

2 files changed

+71
-10
lines changed

2 files changed

+71
-10
lines changed

src/main/java/com/madgag/aws/sdk/async/responsebytes/awssdk/core/internal/async/ByteArrayAsyncResponseTransformerAlternative.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.function.Function;
3131

32+
import static com.madgag.aws.sdk.async.responsebytes.awssdk.utils.BinaryUtilsAlternative.copyBytes;
3233
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
3334

3435
/**
@@ -65,26 +66,27 @@ public void onResponse(ResponseT response) {
6566

6667
@Override
6768
public void onStream(SdkPublisher<ByteBuffer> publisher) {
68-
ByteArrayOutputStream baos =
69-
knownSize.map(f -> new ByteArrayOutputStream(f.apply(response))).orElse(new ByteArrayOutputStream());
70-
publisher.subscribe(new BaosSubscriber(cf, baos));
69+
ByteStore byteStore =
70+
knownSize.<ByteStore>map(f -> new KnownLengthStore(f.apply(response))).orElse(new BaosStore());
71+
publisher.subscribe(new ByteSubscriber(cf, byteStore));
7172
}
7273

7374
@Override
7475
public void exceptionOccurred(Throwable throwable) {
7576
cf.completeExceptionally(throwable);
7677
}
7778

78-
static class BaosSubscriber implements Subscriber<ByteBuffer> {
79+
80+
static class ByteSubscriber implements Subscriber<ByteBuffer> {
7981
private final CompletableFuture<byte[]> resultFuture;
8082

81-
private ByteArrayOutputStream baos;
83+
private ByteStore byteStore;
8284

8385
private Subscription subscription;
8486

85-
BaosSubscriber(CompletableFuture<byte[]> resultFuture, ByteArrayOutputStream baos) {
87+
ByteSubscriber(CompletableFuture<byte[]> resultFuture, ByteStore byteStore) {
8688
this.resultFuture = resultFuture;
87-
this.baos = baos;
89+
this.byteStore = byteStore;
8890
}
8991

9092
@Override
@@ -99,19 +101,55 @@ public void onSubscribe(Subscription s) {
99101

100102
@Override
101103
public void onNext(ByteBuffer byteBuffer) {
102-
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
104+
byteStore.append(byteBuffer);
103105
subscription.request(1);
104106
}
105107

106108
@Override
107109
public void onError(Throwable throwable) {
108-
baos = null;
110+
byteStore = null;
109111
resultFuture.completeExceptionally(throwable);
110112
}
111113

112114
@Override
113115
public void onComplete() {
114-
resultFuture.complete(baos.toByteArray());
116+
resultFuture.complete(byteStore.toByteArray());
117+
}
118+
}
119+
120+
121+
interface ByteStore {
122+
void append(ByteBuffer byteBuffer);
123+
byte[] toByteArray();
124+
}
125+
126+
static class BaosStore implements ByteStore {
127+
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
128+
129+
public void append(ByteBuffer byteBuffer) {
130+
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
131+
}
132+
133+
public byte[] toByteArray() {
134+
return baos.toByteArray();
135+
}
136+
}
137+
138+
static class KnownLengthStore implements ByteStore {
139+
private final byte[] byteArray;
140+
private int offset = 0;
141+
142+
KnownLengthStore(int contentSize) {
143+
System.out.println("We know the size is "+contentSize);
144+
this.byteArray = new byte[contentSize];
145+
}
146+
147+
public void append(ByteBuffer byteBuffer) {
148+
offset += copyBytes(byteBuffer, byteArray, offset);
149+
}
150+
151+
public byte[] toByteArray() {
152+
return byteArray;
115153
}
116154
}
117155
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.madgag.aws.sdk.async.responsebytes.awssdk.utils;
2+
3+
import java.nio.ByteBuffer;
4+
5+
public class BinaryUtilsAlternative {
6+
/**
7+
* This behaves identically to {@link software.amazon.awssdk.utils.BinaryUtils#copyBytesFrom(ByteBuffer)}, except
8+
* that the bytes are copied to the supplied destination array, at the supplied destination offset.
9+
*/
10+
public static int copyBytes(ByteBuffer bb, byte[] dest, int destOffset) {
11+
if (bb == null) {
12+
return 0;
13+
}
14+
15+
int remaining = bb.remaining();
16+
if (bb.hasArray()) {
17+
System.arraycopy(bb.array(), bb.arrayOffset() + bb.position(), dest, destOffset, remaining);
18+
} else {
19+
bb.asReadOnlyBuffer().get(dest, destOffset, remaining);
20+
}
21+
return remaining;
22+
}
23+
}

0 commit comments

Comments
 (0)