Skip to content

SqsAsyncClient is failing unpredictably during streaming #1466

Closed
@jaylynstoesz

Description

@jaylynstoesz

I'm reading from SQS via Alpakka using the async client. It appears to randomly stop receiving messages from the queue, anywhere from a few minutes to a few hours apart. It appears to happen more frequently when volume is higher, though not necessarily during peak times.

Expected Behavior

My service continuously polls messages from SQS, processes them, and deletes them from the queue. Invalid messages are sent to a dead letter queue. In the attached screenshot of the queue monitoring dashboard, I would expect the NumberOfMessagesReceived graph to match the NumberOfMessagesSent graph.

Current Behavior

The SQS client randomly throws java.io.IOException: Server failed to send complete response and stops polling messages.
Screen Shot 2019-10-09 at 1 29 35 PM

Steps to Reproduce (for bugs)

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.javadsl.SqsSource
import akka.stream.alpakka.sqs.SqsSourceSettings
import akka.stream.javadsl.Sink
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.SqsAsyncClient

//...

fun <T> subscribe() {
    /* Initialize Akka */
    val actorSystem = ActorSystem.create()

    /* Provision async SQS client */
    val sqsAsyncClient = SqsAsyncClient
        .builder()
        .httpClientBuilder(NettyNioAsyncHttpClient.builder())
        .region(Region.US_EAST_1)
        .build()

    /* Provision Alpakka SQS module (Akka Streams) */
    val source = SqsSource.create(
        "https://sqs.us-east-1.amazonaws.com/000000000000/queue-name",
        SqsSourceSettings.Defaults(),
        sqsAsyncClient
    )

    /* Akka receives between 1k - 1.5k messages per minute from SQS */
    source
        .map { message: Message -> message.messageId() }
        .runWith(Sink.ignore(), ActorMaterializer.create(actorSystem))
        .handle { _, error ->
            /* Unpredictably returns java.io.IOException: Server failed to send complete response */
            error?.printStackTrace()
        }
}

Error stack trace:

java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException
	at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:61)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:75)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryErrorIfNeeded(AsyncRetryableStage.java:175)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:126)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.onError(MakeAsyncHttpRequestStage.java:249)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.lambda$notifyIfResponseNotCompleted$2(ResponseHandler.java:397)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:180)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.notifyIfResponseNotCompleted(ResponseHandler.java:397)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.channelInactive(ResponseHandler.java:148)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)	
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:223)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
	at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1078)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1403)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:827)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)
Caused by: software.amazon.awssdk.core.exception.SdkClientException
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
	at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125)
	... 52 more
Caused by: java.io.IOException: Server failed to send complete response
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.notifyIfResponseNotCompleted(ResponseHandler.java:396)
	... 43 more

Context

This service is designed to stream business-critical accounting events in order to respond to charges against customer accounts in real time. So, if the service stops polling SQS messages, this prevents us from responding as events occur and could incur significant costs to the business.

Your Environment

Kotlin: 1.3.41
Netty & SQS Java SDK: 2.9.15

CC: @debora-ito re: #1122 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    closing-soonThis issue will close in 4 days unless further comments are made.investigatingThis issue is being investigated and/or work is in progress to resolve the issue.response-requestedWaiting on additional info and feedback. Will move to "closing-soon" in 10 days.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions