Description
I'm reading from SQS via Alpakka using the async client. It appears to randomly stop receiving messages from the queue, anywhere from a few minutes to a few hours apart. It appears to happen more frequently when volume is higher, though not necessarily during peak times.
Expected Behavior
My service continuously polls messages from SQS, processes them, and deletes them from the queue. Invalid messages are sent to a dead letter queue. In the attached screenshot of the queue monitoring dashboard, I would expect the NumberOfMessagesReceived graph to match the NumberOfMessagesSent graph.
Current Behavior
The SQS client randomly throws java.io.IOException: Server failed to send complete response
and stops polling messages.
Steps to Reproduce (for bugs)
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.javadsl.SqsSource
import akka.stream.alpakka.sqs.SqsSourceSettings
import akka.stream.javadsl.Sink
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.SqsAsyncClient
//...
fun <T> subscribe() {
/* Initialize Akka */
val actorSystem = ActorSystem.create()
/* Provision async SQS client */
val sqsAsyncClient = SqsAsyncClient
.builder()
.httpClientBuilder(NettyNioAsyncHttpClient.builder())
.region(Region.US_EAST_1)
.build()
/* Provision Alpakka SQS module (Akka Streams) */
val source = SqsSource.create(
"https://sqs.us-east-1.amazonaws.com/000000000000/queue-name",
SqsSourceSettings.Defaults(),
sqsAsyncClient
)
/* Akka receives between 1k - 1.5k messages per minute from SQS */
source
.map { message: Message -> message.messageId() }
.runWith(Sink.ignore(), ActorMaterializer.create(actorSystem))
.handle { _, error ->
/* Unpredictably returns java.io.IOException: Server failed to send complete response */
error?.printStackTrace()
}
}
Error stack trace:
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException
at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:61)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:75)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryErrorIfNeeded(AsyncRetryableStage.java:175)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:126)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.onError(MakeAsyncHttpRequestStage.java:249)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.lambda$notifyIfResponseNotCompleted$2(ResponseHandler.java:397)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:180)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.notifyIfResponseNotCompleted(ResponseHandler.java:397)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.channelInactive(ResponseHandler.java:148)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:223)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1078)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1403)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:827)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: software.amazon.awssdk.core.exception.SdkClientException
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125)
... 52 more
Caused by: java.io.IOException: Server failed to send complete response
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.notifyIfResponseNotCompleted(ResponseHandler.java:396)
... 43 more
Context
This service is designed to stream business-critical accounting events in order to respond to charges against customer accounts in real time. So, if the service stops polling SQS messages, this prevents us from responding as events occur and could incur significant costs to the business.
Your Environment
Kotlin: 1.3.41
Netty & SQS Java SDK: 2.9.15
CC: @debora-ito re: #1122 (comment)