Skip to content

Sending SQS message using async client sometimes never completes the Future #1207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ochrons opened this issue Apr 12, 2019 · 6 comments · Fixed by #1217
Closed

Sending SQS message using async client sometimes never completes the Future #1207

ochrons opened this issue Apr 12, 2019 · 6 comments · Fixed by #1217
Labels
bug This issue is a bug. investigating This issue is being investigated and/or work is in progress to resolve the issue.

Comments

@ochrons
Copy link

ochrons commented Apr 12, 2019

Expected Behavior

Sending messages with SqsAsyncClient.send(req) should return a Future that eventually completes.

Current Behavior

Sending thousands of SQS messages in quick succession triggers a very rare case of single send(req) never completing the Future. No result, no error, just disappears.

Possible Solution

There have been Netty-related issues in this space before, so I would look there first. With a previous version of the SDK (2.4.14) this was more common than in the 2.5 version.

Steps to Reproduce (for bugs)

Send a few hundred thousand SQS messages using the SqsAsyncClient.send method and connect a Future that times out (say, after 10 seconds) with the returned future and see which one completes first.

A Scala example for retrying the send after a timeout.

  private def sendRetry(request: SendMessageRequest, retryCount: Int = 3): Future[SendMessageResponse] = {
    val res     = sqs.sendMessage(request).toScala
    val timeout = APIErrorJVM.delayFuture[SendMessageResponse](Failure(new TimeoutException()), 10.seconds)
    Future.firstCompletedOf(List(res, timeout)) recoverWith {
      case _: TimeoutException if retryCount > 0 =>
        log.error(s"Timeout while sending message $request, retry count = $retryCount")
        sendRetry(request, retryCount - 1)
    }
  }

Context

SQS is used as ground truth in our application, and if sending SQS messages just invisibly fails, the whole application logic is in jeopardy. Had to add an application level timeout to the SDK call to circumvent this.

Your Environment

  • AWS Java SDK version used: 2.5.25
  • JDK version used: 1.8.0 172
  • Operating System and version: Linux in AWS
@zoewangg
Copy link
Contributor

Thank you for reporting!

I think this commit 066e65d (released in 2.5.0) might reduce the occurrences of the issue, but looks like there are more cases that could cause the uncompletable future. We will investigate it.

As a side note, the SDK supports timeout features out of box, see https://github.com/aws/aws-sdk-java-v2/blob/master/docs/BestPractices.md#utilize-timeout-configurations

@ochrons
Copy link
Author

ochrons commented Apr 14, 2019

Actually my comment about 2.4.14 was incorrect, there was another issue that got fixed in 2.5 (received messages were rarely being left "in flight" but never delivered to application for processing). So I cannot say for sure if this uncompleting call behavior has changed from 2.4 to 2.5

@spfink spfink added the investigating This issue is being investigated and/or work is in progress to resolve the issue. label Apr 15, 2019
@millems millems added the bug This issue is a bug. label Apr 17, 2019
@millems
Copy link
Contributor

millems commented Apr 17, 2019

I've been able to reproduce this issue...

@millems
Copy link
Contributor

millems commented Apr 17, 2019

Reproduction code:

/*
 * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package software.amazon.awssdk.services.sqs;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;

public class Issue1207 {
    @Test
    public void test() throws InterruptedException {
        try (SqsAsyncClient client = SqsAsyncClient.create()) {
            String queueName = UUID.randomUUID().toString();
            CreateQueueResponse queue = client.createQueue(r -> r.queueName(queueName)).join();
            try {
                loadTest(client, queue.queueUrl());
            } finally {
                client.deleteQueue(r -> r.queueUrl(queue.queueUrl())).join();
            }
        }
    }

    private void loadTest(SqsAsyncClient client, String queueUrl) throws InterruptedException {
        int concurrentRequests = 100;
        Semaphore concurrencySemaphore = new Semaphore(concurrentRequests);
        Instant endTime = Instant.now().plusSeconds(60);

        System.out.println("Starting...");

        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                while (true) {
                    long timeLeft = Duration.between(Instant.now(), endTime).getSeconds();
                    System.out.println("Seconds left in test: " + timeLeft + ", Open permits: " + concurrencySemaphore.availablePermits());
                    Thread.sleep(5_000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        while (endTime.isAfter(Instant.now())) {
            concurrencySemaphore.acquire(1);

            client.sendMessage(r -> r.queueUrl(queueUrl).messageBody("{}"))
                  .whenComplete((r, t) -> {
                      if (t != null) {
                          t.printStackTrace();
                      }
                      concurrencySemaphore.release(1);
                  });
        }

        System.out.println("Spinning down...");

        if (!concurrencySemaphore.tryAcquire(concurrentRequests, 30, TimeUnit.SECONDS)) {
            int missingResponses = concurrentRequests - concurrencySemaphore.availablePermits();
            throw new IllegalStateException(missingResponses + " requests didn't complete.");
        }
    }
}

@millems
Copy link
Contributor

millems commented Apr 19, 2019

This was a tricky one.

It looks like in some rare edge cases, when we acquire a connection from the connection pool, it isn't active, and the health checks at the netty level didn't catch it for us. Fixing the netty-level health check (it looks like it's broken?) improves things slightly, but it was still happening occasionally if the connection was closed between acquiring it from the pool and us attaching our handlers that monitor for the close.

I've moved the health check fully up the stack until after we've added our connection-close monitors and that seems to have fixed the problem.

I'll be running some longer-term tests to make sure it's definitely licked before putting out a PR.

millems added a commit that referenced this issue Apr 19, 2019
…pleted.

If a service closes a connection between when a channel is acquired and handlers are attached, channel writes could disappear and the response future would never be completed. This change introduces health checks and retries for channel acquisition to fix the majority of cases without failing requests, as well as one last check after handlers are added to ensure the channel hasn't been closed since the channel pool health check. Fixes #1207.
millems added a commit that referenced this issue Apr 19, 2019
…pleted.

If a service closes a connection between when a channel is acquired and handlers are attached, channel writes could disappear and the response future would never be completed. This change introduces health checks and retries for channel acquisition to fix the majority of cases without failing requests, as well as one last check after handlers are added to ensure the channel hasn't been closed since the channel pool health check. Fixes #1207.
@millems
Copy link
Contributor

millems commented Apr 19, 2019

A fix will go out for this on Monday's release. Please reopen this issue if you're still seeing the problem at that time. Our tests are no longer able to reproduce it after this change.

aws-sdk-java-automation pushed a commit that referenced this issue Feb 26, 2021
…97697f093

Pull request: release <- staging/5efdb921-642c-46b1-bada-f7b97697f093
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. investigating This issue is being investigated and/or work is in progress to resolve the issue.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants