Skip to content

S3 putObject (async) is hanging when using ByteArrayAsyncRequestBody #2576

Closed
@jmassiot

Description

@jmassiot

From times to times, we are facing cases where the S3 async putObject API is hanging.

Describe the bug

We are currently using quite intensively (~ 400 put/s) the S3 async putObject API (with both a FileAsyncRequestBody or a ByteArrayAsyncRequestBody).
And from times to times (~ once a day), we have some calls to putObject that are hanging:

  • when using the FileAsyncRequestBody, most probably we are facing the issue fixed by this PR Fix a race condition in FileAsyncRequestBody #2536
  • but we face a similar issue when using the ByteArrayAsyncRequestBody, and the reason here is not clear and is the purpose of this issue

Expected Behavior

Calls to putObject with a ByteArrayAsyncRequestBody should not hang.

Current Behavior

Calls to putObject with a ByteArrayAsyncRequestBody is hanging.

Steps to Reproduce

As mentioned above, it is very difficult to reproduce the issue.
It happens in our case approximatively once a day with a production traffic and several hundreds of puts per seconds.

Possible Solution

When trying to compare FileAsyncRequestBody and ByteArrayAsyncRequestBody, I found the following on which i have a doubt.

  • on FileAsyncRequestBody around the subscribe method, there is this comment:
            // We need to synchronize here because the subscriber could call
            // request() from within onSubscribe which would potentially
            // trigger onNext before onSubscribe is finished.
            Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes);
            synchronized (subscription) {
                s.onSubscribe(subscription);
            }
  • whereas on ByteArrayAsyncRequestBody, we have the following:
        // As per 2.13, this method must return normally (i.e. not throw).
        try {
            s.onSubscribe(
                    new Subscription() {
                        private boolean done = false;

                        @Override
                        public void request(long n) {
                            if (done) {
                                return;
                            }
                            if (n > 0) {
                                done = true;
                                s.onNext(ByteBuffer.wrap(bytes));
                                s.onComplete();
                            } else {
                                s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
                            }
                        }

So, in case of ByteArrayAsyncRequestBody, there is not the same synchronization as the one we have in FileAsyncRequestBody (which means that onNext may be called on another thread whereas onSubscribe is not finished)

Furthermore, on StreamedRequest within NettyRequestExecutor, we have the following:

    private static class StreamedRequest extends DelegateHttpRequest implements StreamedHttpRequest {

        private final Publisher<ByteBuffer> publisher;
        private final Optional<Long> requestContentLength;
        private long written = 0L;
        private boolean done;
        private Subscription subscription;   <=== the subscription field is not volatile
...
...
        @Override
        public void subscribe(Subscriber<? super HttpContent> subscriber) {
            publisher.subscribe(new Subscriber<ByteBuffer>() {
                @Override
                public void onSubscribe(Subscription subscription) {
                    StreamedRequest.this.subscription = subscription;   <=== the subscription field is set by one thread
                    subscriber.onSubscribe(subscription);
                }

                @Override
                public void onNext(ByteBuffer contentBytes) {
                    if (done) {
                        return;
                    }

                    try {
                        int newLimit = clampedBufferLimit(contentBytes.remaining());
                        contentBytes.limit(newLimit);
                        ByteBuf contentByteBuf = Unpooled.wrappedBuffer(contentBytes);
                        HttpContent content = new DefaultHttpContent(contentByteBuf);

                        subscriber.onNext(content);
                        written += newLimit;

                        if (!shouldContinuePublishing()) {
                            done = true;
                            subscription.cancel();                    <=== and potentially accessed by another thread, no ?
                            subscriber.onComplete();
                        }
                    } catch (Throwable t) {
                        onError(t);
                    }
                }

Don't you think that the subcription field should be volatile, here ?

  • If one thread is setting this field, and another one is trying to access it, the one trying to access it may face a NullPointerException, no ? (that will then go to the onError method that will not display anything as done has been set to true in the thread calling onNext)
  • and if it is the case, then the onComplete method will never be called on the subscriber, no ?

Context

We are facing this issue in production.
There are some workarounds like putting timeouts and then re-launching the put.
But it will be good if we can avoid in the 1st place such behavior.

Your Environment

  • AWS Java SDK version used: 2.10.62
  • JDK version used: jdk16
  • Operating System and version: Linux

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugThis issue is a bug.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions