Skip to content

S3 Async GetObject toBytes: 3x memory reduction, less array-copying #4355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

rtyley
Copy link
Contributor

@rtyley rtyley commented Aug 27, 2023

Motivation and Context

This fixes #4392 (duplicate of #3193): loading an S3 object into memory using AsyncResponseTransformer.toBytes() has these problems:

  • Memory: Excessive peak requirements - peak memory requirement is 4x the actual size of the S3 object - loading a 258MB object into memory takes 1070MB of RAM.
  • CPU: Unnecessary byte[] allocations, copies & GC

Modifications

Change A : Improve byte[] storage by using the known Content Length

When reading an S3 GetObject response, we have the S3 Content Length available to us, meaning we can create a AsyncS3ResponseTransformer which has these enhancements:

  • A1 Use the Content Length to initialise the ByteArrayOutputStream with a byte array of the right size.
    • Array copies removed : $ceil(log_2 (N/32))$ (eg a 40MB object takes 21 array copies - graph)
    • Memory: 2N → N
  • A2 Use a simple fixed-size byte array in preference to a ByteArrayOutputStream
    • Storing a chunk of X bytes (where X <= N - ie may equal N)
      • Array copies removed: 1 per chunk
      • Memory: X → 0
    • Retrieving the final N bytes
      • Array copies removed: 1
      • Memory: 2N → N

Change B : Use ResponseBytes.fromByteArrayUnsafe()

  • Avoid performing a byte array copy to create the ResponseBytes instance
    • Array copies removed: 1
    • Memory: 2N → N

Testing

In https://github.com/rtyley/aws-sdk-async-response-bytes I've added automated memory testing of the various enhancements above. These are the resulting memory requirements, in MB, for all possible permutations
of the fixes:

Exclude A A1 A1+A2
Exclude B 1070 934 644
Include B 787 657 357

From the results, it's clear that we need all 3 changes (A1, A2, & B) in order to get the lowest memory usage.

Screenshots (if appropriate)

Types of changes

Checklist

  • I have read the CONTRIBUTING document
  • Local run of mvn install succeeds
  • My code follows the code style of this project
  • My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • I have added tests to cover my changes
  • All new and existing tests passed
  • I have added a changelog entry. Adding a new entry must be accomplished by running the scripts/new-change script and following the instructions. Commit the new file created by the script in .changes/next-release with your changes.
  • My change is to implement 1.11 parity feature and I have updated LaunchChangelog

License

  • I confirm that this pull request can be released under the Apache 2 license

@rtyley rtyley force-pushed the reduce-memory-requirements-and-array-copying branch from 2ed164c to dc315da Compare September 4, 2023 20:44
@rtyley rtyley marked this pull request as ready for review September 5, 2023 11:17
@rtyley rtyley requested a review from a team as a code owner September 5, 2023 11:17
@debora-ito
Copy link
Member

debora-ito commented Oct 18, 2023

A quick update on this PR: we want to improve the memory usage, but we're not sure that a new AsyncS3ResponseTransformer is the right solution. At this moment, we don't know what the right solution looks like, we have a task in our backlog to design a solution.

I'll move this PR to a draft state to keep the feature request in our radar. We might do the change ourselves later on, depending on the solution complexity.

@debora-ito debora-ito marked this pull request as draft October 18, 2023 22:46
@debora-ito debora-ito removed the needs-review This issue or PR needs review from the team. label Nov 11, 2023
@debora-ito
Copy link
Member

@rtyley I'm closing this PR as stale, we didn't have the chance to go back to this issue. We still have the task in our backlog to research how to improve memory usage.

@debora-ito debora-ito closed this Mar 9, 2024
@rtyley
Copy link
Contributor Author

rtyley commented Mar 9, 2024

@rtyley I'm closing this PR as stale, we didn't have the chance to go back to this issue. We still have the task in our backlog to research how to improve memory usage.

Oh, that's disappointing - I did try hard to make this PR clear and show that it would fix the issue. If you'd like individual explanations of the 3 changes (A1, A2 & B), they're available in these 3 PRs:

we want to improve the memory usage, but we're not sure that a new AsyncS3ResponseTransformer is the right solution. At this moment, we don't know what the right solution looks like [...] We might do the change ourselves later on, depending on the solution complexity.

Could you share the team's concerns around solving the problem by adding an AsyncS3ResponseTransformer? If I knew the constraints you're operating under I might be able to find a better place to make the fix.

@chibenwa
Copy link

chibenwa commented Mar 9, 2024

Same.

That's not understandable from my standpoint that a driver used in performance critical applications do not even take the basic performance enhancements proposals that outsiders with simple tools like flame graphs reports and fix.

@javafanboy
Copy link

javafanboy commented May 2, 2024

Fully agree - this needs to be fixed. I need a way to stream large S3 objects with predictable and minimal CPU and in particular memory overhead and thought this method would be the right one for this - finding that it in fact is not was very disappointing...

@naderghanbari
Copy link

I personally ended up implementing my own concrete subclass of AsyncResponseTransformer which does some of the improvements in this MR and also extracts the content-type header and returns it along with the contents. Motivation behind that is to save on an extra GetMetadata call.

It would be great if the SDK itself fixed these issues and provided an easier way of accessing the headers. It feels to me that the interfaces/abstractions can be more fluent/user friendly.

@chibenwa
Copy link

chibenwa commented Sep 13, 2024

and also extracts the content-type header and returns it along with the contents. Motivation behind that is to save on an extra GetMetadata call.

@naderghanbari Piece of code / jist to share? THis looks interesting...

@javafanboy
Copy link

I would also be interested....

@naderghanbari
Copy link

naderghanbari commented Oct 2, 2024

package my.storage.s3.internal;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.utils.BinaryUtils;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static java.lang.System.arraycopy;
import static software.amazon.awssdk.core.ResponseBytes.fromByteArrayUnsafe;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;

/**
 * Implementation of {@link AsyncResponseTransformer} that dumps content into a byte array and supports further
 * conversions into types, like strings. Unlike the official ByteArrayAsyncResponseTransformer from AWS SDK,
 * this implementation has a lower memory footprint, and avoids copying the array multiple times. It also returns the
 * content type stored in the "Content-Type" standard tag/metadata, which is returned as an optional response header by the GetObject calls.
 */
public final class ContentTypeAwareByteArrayAsyncResponseTransformer implements
        AsyncResponseTransformer<GetObjectResponse, ContentTypedResponse> {

    private volatile GetObjectResponse response;
    private volatile CompletableFuture<byte[]> cf;

    @Override
    public CompletableFuture<ContentTypedResponse> prepare() {
        cf = new CompletableFuture<>();
        Function<byte[], ContentTypedResponse> finalizer = arr -> new ContentTypedResponse(fromByteArrayUnsafe(response, arr), response.contentType());
        return cf.thenApply(finalizer);
    }

    @Override
    public void onResponse(GetObjectResponse response) {
        this.response = response;
    }

    @Override
    public void onStream(SdkPublisher<ByteBuffer> publisher) {
        ByteStore byteStore =
                (response != null && response.contentLength() != null) ?
                        new KnownLengthStore(response.contentLength().intValue()) :
                        new BaosStore();
        publisher.subscribe(new ByteSubscriber(cf, byteStore));
    }

    @Override
    public void exceptionOccurred(Throwable throwable) {
        cf.completeExceptionally(throwable);
    }

    private static class ByteSubscriber implements Subscriber<ByteBuffer> {
        private final CompletableFuture<byte[]> resultFuture;
        private ByteStore byteStore;
        private Subscription subscription;

        ByteSubscriber(CompletableFuture<byte[]> resultFuture, ByteStore byteStore) {
            this.resultFuture = resultFuture;
            this.byteStore = byteStore;
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (this.subscription != null) {
                s.cancel();
                return;
            }
            this.subscription = s;
            subscription.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(ByteBuffer byteBuffer) {
            byteStore.append(byteBuffer);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            byteStore = null;
            resultFuture.completeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            resultFuture.complete(byteStore.toByteArray());
        }
    }

    private interface ByteStore {
        void append(ByteBuffer byteBuffer);

        byte[] toByteArray();
    }

    private static class BaosStore implements ByteStore {
        private final ByteArrayOutputStream baos = new ByteArrayOutputStream();

        public void append(ByteBuffer byteBuffer) {
            invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
        }

        public byte[] toByteArray() {
            return baos.toByteArray();
        }
    }

    private static class KnownLengthStore implements ByteStore {
        private final byte[] byteArray;
        private int offset = 0;

        KnownLengthStore(int contentSize) {
            this.byteArray = new byte[contentSize];
        }

        public void append(ByteBuffer byteBuffer) {
            offset += copyBytes(byteBuffer, byteArray, offset);
        }

        public byte[] toByteArray() {
            return byteArray;
        }

        /**
         * Behaves identically to {@link software.amazon.awssdk.utils.BinaryUtils#copyBytesFrom(ByteBuffer)}, except
         * that bytes are copied to the supplied destination array at the supplied destination offset.
         */
        private static int copyBytes(ByteBuffer bb, byte[] dest, int destOffset) {
            if (bb == null) return 0;
            int remaining = bb.remaining();
            if (bb.hasArray()) arraycopy(bb.array(), bb.arrayOffset() + bb.position(), dest, destOffset, remaining);
            else bb.asReadOnlyBuffer().get(dest, destOffset, remaining);
            return remaining;
        }
    }

}

and usage site is

  s3Client.getObject(
    GetObjectRequest.builder().bucket(config.bucketName).key(objectKey).build(),
    new ContentTypeAwareByteArrayAsyncResponseTransformer()     // instantiate one instance per getObject call
  )

Courtesy of @rtyley with some minor changes. We have some integration and performance tests to make sure of correctness using LocalStack, but please do the same on your side if you end up using it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

S3 Async GetObject toBytes uses memory 4x the object size
6 participants