Skip to content

Commit ef6f910

Browse files
committed
[PERF] S3BlobStoreDAO: readBytes copies too much data
CF aws/aws-sdk-java-v2#3193 `AsyncResponseTransformer.toBytes()` relies internally on an unsized ByteArrayOutputStream (that thus will expend many times) and the result will be copied when calling `toByteArray()` and another defensive copy is carried other when transforming the future. We thus implement a response transformer variation: - Remove needless defensive copies inside `ByteArrayAsyncResponseTransformer`. The byte array is passed to the caller, who then becomes responsible of it, and nobody else references the old byte array once the publisher completes. This can be an instant win coming at a very low price. - Rely on `GetResponse::contentLength` to size a byte array and copy incoming buffers to it in place. This requires knowledge about response type... Thus this might be hardly doable in a generic fashion. On a typical IMAP benchmark S3 getObject toBytes transformation takes 1.66% of overall memory allocation.
1 parent 07de730 commit ef6f910

File tree

2 files changed

+106
-1
lines changed

2 files changed

+106
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/****************************************************************
2+
* Licensed to the Apache Software Foundation (ASF) under one *
3+
* or more contributor license agreements. See the NOTICE file *
4+
* distributed with this work for additional information *
5+
* regarding copyright ownership. The ASF licenses this file *
6+
* to you under the Apache License, Version 2.0 (the *
7+
* "License"); you may not use this file except in compliance *
8+
* with the License. You may obtain a copy of the License at *
9+
* *
10+
* http://www.apache.org/licenses/LICENSE-2.0 *
11+
* *
12+
* Unless required by applicable law or agreed to in writing, *
13+
* software distributed under the License is distributed on an *
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
15+
* KIND, either express or implied. See the License for the *
16+
* specific language governing permissions and limitations *
17+
* under the License. *
18+
****************************************************************/
19+
20+
package org.apache.james.blob.objectstorage.aws;
21+
22+
import java.nio.ByteBuffer;
23+
import java.util.concurrent.CompletableFuture;
24+
25+
import org.reactivestreams.Subscriber;
26+
import org.reactivestreams.Subscription;
27+
28+
import software.amazon.awssdk.core.ResponseBytes;
29+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
30+
import software.amazon.awssdk.core.async.SdkPublisher;
31+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
32+
33+
/**
34+
* Class copied for {@link software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer}
35+
*
36+
* Modified to take advantage of the content length of the get response in order to use a sized array
37+
* upon content copy. This avoids the usage of a ByteArrayOutputStream that yields additional copies
38+
* (resizing upon copy, copy of the resulting byte array).
39+
*
40+
* A defensive copy upon returning the result is also removed (responsibility transfered to the caller, no other usages)
41+
*/
42+
public class MinimalCopyBytesResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, ResponseBytes<GetObjectResponse>> {
43+
private volatile CompletableFuture<byte[]> cf;
44+
private volatile GetObjectResponse response;
45+
46+
public MinimalCopyBytesResponseTransformer() {
47+
48+
}
49+
50+
public CompletableFuture<ResponseBytes<GetObjectResponse>> prepare() {
51+
this.cf = new CompletableFuture();
52+
// Modifcation: Remove a defensive copy of the buffer upon completion: the caller is now the sole user of the array
53+
return this.cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr));
54+
}
55+
56+
public void onResponse(GetObjectResponse response) {
57+
this.response = response;
58+
}
59+
60+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
61+
publisher.subscribe(new BaosSubscriber(this.cf, response.contentLength().intValue()));
62+
}
63+
64+
public void exceptionOccurred(Throwable throwable) {
65+
this.cf.completeExceptionally(throwable);
66+
}
67+
68+
static class BaosSubscriber implements Subscriber<ByteBuffer> {
69+
private final CompletableFuture<byte[]> resultFuture;
70+
// Modification: use a byte array instead of the ByteArrayInputStream and track position
71+
private final byte[] buffer;
72+
private int pos = 0;
73+
private Subscription subscription;
74+
75+
BaosSubscriber(CompletableFuture<byte[]> resultFuture, int size) {
76+
this.resultFuture = resultFuture;
77+
this.buffer = new byte[size];
78+
}
79+
80+
public void onSubscribe(Subscription s) {
81+
if (this.subscription != null) {
82+
s.cancel();
83+
} else {
84+
this.subscription = s;
85+
this.subscription.request(9223372036854775807L);
86+
}
87+
}
88+
89+
public void onNext(ByteBuffer byteBuffer) {
90+
// Modification: copy the response part in place into the result buffer and track position
91+
int written = byteBuffer.remaining();
92+
byteBuffer.get(buffer, pos, written);
93+
pos += written;
94+
this.subscription.request(1L);
95+
}
96+
97+
public void onError(Throwable throwable) {
98+
this.resultFuture.completeExceptionally(throwable);
99+
}
100+
101+
public void onComplete() {
102+
this.resultFuture.complete(this.buffer);
103+
}
104+
}
105+
}

server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
213213
return Mono.fromFuture(() ->
214214
client.getObject(
215215
builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()),
216-
AsyncResponseTransformer.toBytes()))
216+
new MinimalCopyBytesResponseTransformer()))
217217
.onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
218218
.onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e))
219219
.publishOn(Schedulers.parallel())

0 commit comments

Comments
 (0)