-
Notifications
You must be signed in to change notification settings - Fork 909
S3 Async GetObject toBytes
: 3x memory reduction, less array-copying
#4355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3 Async GetObject toBytes
: 3x memory reduction, less array-copying
#4355
Conversation
2ed164c
to
dc315da
Compare
A quick update on this PR: we want to improve the memory usage, but we're not sure that a new AsyncS3ResponseTransformer is the right solution. At this moment, we don't know what the right solution looks like, we have a task in our backlog to design a solution. I'll move this PR to a draft state to keep the feature request in our radar. We might do the change ourselves later on, depending on the solution complexity. |
@rtyley I'm closing this PR as stale, we didn't have the chance to go back to this issue. We still have the task in our backlog to research how to improve memory usage. |
Oh, that's disappointing - I did try hard to make this PR clear and show that it would fix the issue. If you'd like individual explanations of the 3 changes (A1, A2 & B), they're available in these 3 PRs:
Could you share the team's concerns around solving the problem by adding an |
Same. That's not understandable from my standpoint that a driver used in performance critical applications do not even take the basic performance enhancements proposals that outsiders with simple tools like flame graphs reports and fix. |
Fully agree - this needs to be fixed. I need a way to stream large S3 objects with predictable and minimal CPU and in particular memory overhead and thought this method would be the right one for this - finding that it in fact is not was very disappointing... |
I personally ended up implementing my own concrete subclass of It would be great if the SDK itself fixed these issues and provided an easier way of accessing the headers. It feels to me that the interfaces/abstractions can be more fluent/user friendly. |
@naderghanbari Piece of code / jist to share? THis looks interesting... |
I would also be interested.... |
package my.storage.s3.internal;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.utils.BinaryUtils;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import static java.lang.System.arraycopy;
import static software.amazon.awssdk.core.ResponseBytes.fromByteArrayUnsafe;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
/**
* Implementation of {@link AsyncResponseTransformer} that dumps content into a byte array and supports further
* conversions into types, like strings. Unlike the official ByteArrayAsyncResponseTransformer from AWS SDK,
* this implementation has a lower memory footprint, and avoids copying the array multiple times. It also returns the
* content type stored in the "Content-Type" standard tag/metadata, which is returned as an optional response header by the GetObject calls.
*/
public final class ContentTypeAwareByteArrayAsyncResponseTransformer implements
AsyncResponseTransformer<GetObjectResponse, ContentTypedResponse> {
private volatile GetObjectResponse response;
private volatile CompletableFuture<byte[]> cf;
@Override
public CompletableFuture<ContentTypedResponse> prepare() {
cf = new CompletableFuture<>();
Function<byte[], ContentTypedResponse> finalizer = arr -> new ContentTypedResponse(fromByteArrayUnsafe(response, arr), response.contentType());
return cf.thenApply(finalizer);
}
@Override
public void onResponse(GetObjectResponse response) {
this.response = response;
}
@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
ByteStore byteStore =
(response != null && response.contentLength() != null) ?
new KnownLengthStore(response.contentLength().intValue()) :
new BaosStore();
publisher.subscribe(new ByteSubscriber(cf, byteStore));
}
@Override
public void exceptionOccurred(Throwable throwable) {
cf.completeExceptionally(throwable);
}
private static class ByteSubscriber implements Subscriber<ByteBuffer> {
private final CompletableFuture<byte[]> resultFuture;
private ByteStore byteStore;
private Subscription subscription;
ByteSubscriber(CompletableFuture<byte[]> resultFuture, ByteStore byteStore) {
this.resultFuture = resultFuture;
this.byteStore = byteStore;
}
@Override
public void onSubscribe(Subscription s) {
if (this.subscription != null) {
s.cancel();
return;
}
this.subscription = s;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(ByteBuffer byteBuffer) {
byteStore.append(byteBuffer);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
byteStore = null;
resultFuture.completeExceptionally(throwable);
}
@Override
public void onComplete() {
resultFuture.complete(byteStore.toByteArray());
}
}
private interface ByteStore {
void append(ByteBuffer byteBuffer);
byte[] toByteArray();
}
private static class BaosStore implements ByteStore {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
public void append(ByteBuffer byteBuffer) {
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
}
public byte[] toByteArray() {
return baos.toByteArray();
}
}
private static class KnownLengthStore implements ByteStore {
private final byte[] byteArray;
private int offset = 0;
KnownLengthStore(int contentSize) {
this.byteArray = new byte[contentSize];
}
public void append(ByteBuffer byteBuffer) {
offset += copyBytes(byteBuffer, byteArray, offset);
}
public byte[] toByteArray() {
return byteArray;
}
/**
* Behaves identically to {@link software.amazon.awssdk.utils.BinaryUtils#copyBytesFrom(ByteBuffer)}, except
* that bytes are copied to the supplied destination array at the supplied destination offset.
*/
private static int copyBytes(ByteBuffer bb, byte[] dest, int destOffset) {
if (bb == null) return 0;
int remaining = bb.remaining();
if (bb.hasArray()) arraycopy(bb.array(), bb.arrayOffset() + bb.position(), dest, destOffset, remaining);
else bb.asReadOnlyBuffer().get(dest, destOffset, remaining);
return remaining;
}
}
} and usage site is s3Client.getObject(
GetObjectRequest.builder().bucket(config.bucketName).key(objectKey).build(),
new ContentTypeAwareByteArrayAsyncResponseTransformer() // instantiate one instance per getObject call
) Courtesy of @rtyley with some minor changes. We have some integration and performance tests to make sure of correctness using LocalStack, but please do the same on your side if you end up using it. |
Motivation and Context
This fixes #4392 (duplicate of #3193): loading an S3 object into memory using
AsyncResponseTransformer.toBytes()
has these problems:byte[]
allocations, copies & GCModifications
Change A : Improve
byte[]
storage by using the known Content LengthWhen reading an S3
GetObject
response, we have the S3 Content Length available to us, meaning we can create aAsyncS3ResponseTransformer
which has these enhancements:ByteArrayOutputStream
with a byte array of the right size.ByteArrayOutputStream
Change B : Use
ResponseBytes.fromByteArrayUnsafe()
ResponseBytes
instanceTesting
In https://github.com/rtyley/aws-sdk-async-response-bytes I've added automated memory testing of the various enhancements above. These are the resulting memory requirements, in MB, for all possible permutations
of the fixes:
From the results, it's clear that we need all 3 changes (A1, A2, & B) in order to get the lowest memory usage.
Screenshots (if appropriate)
Types of changes
toBytes
uses memory 4x the object size #4392)AsyncS3ResponseTransformer
, optimised for handlingGetObjectResponse
)Checklist
mvn install
succeedsscripts/new-change
script and following the instructions. Commit the new file created by the script in.changes/next-release
with your changes.License