Skip to content

The Mqttv5PahoMessageHandler.publish() fails with a Connect already in progress (32110) under high concurrency #9597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
artembilan opened this issue Oct 25, 2024 · 0 comments

Comments

@artembilan
Copy link
Member

When input channel for the endpoint with Mqttv5PahoMessageHandler is an ExecutorChannel and a lot of concurrent messages are sent, the following exception can be thrown:

2024-10-25 16:41:31.164 [][] ERROR o.s.i.handler.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutboundHandler' for component 'mqttOutboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/rms/config/MqttConfig.class]'; from source: 'com.rms.config.MqttConfig.mqttOutboundHandler(org.eclipse.paho.mqttv5.client.MqttConnectionOptions)'], failedMessage=GenericMessage [payload= MQTT MSG 510 from Spring Boot!, headers={replyChannel=nullChannel, errorChannel=, id=c38c5f3b-d4fb-56ac-6abb-f24e3c4d11d6, mqtt_topic=test/, timestamp=1729888891103}]
	at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:283)
	at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.handleMessageInternal(Mqttv5PahoMessageHandler.java:222)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
	at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
	at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
	at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)
Caused by: Connect already in progress (32110)
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:734)
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:715)
	at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:269)
	... 9 more

The code:

if (!this.mqttClient.isConnected()) {
	this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
}

has to be guarded with a Lock to prevent concurrent connect() calls.

The workaround is to add a RequestHandlerRetryAdvice into this endpoint.

@artembilan artembilan added this to the 6.4.0 milestone Oct 25, 2024
spring-builds pushed a commit that referenced this issue Oct 25, 2024
Fixes: #9597
Issue link: #9597

When input channel for the endpoint with a `Mqttv5PahoMessageHandler` is an `ExecutorChannel` and a lot of concurrent messages are sent,
the `Connect already in progress (32110)` exception can be thrown

* Wrap `this.mqttClient.connect()` in the `Mqttv5PahoMessageHandler.publish()` into a `Lock`

(cherry picked from commit 334620f)
spring-builds pushed a commit that referenced this issue Oct 25, 2024
Fixes: #9597
Issue link: #9597

When input channel for the endpoint with a `Mqttv5PahoMessageHandler` is an `ExecutorChannel` and a lot of concurrent messages are sent,
the `Connect already in progress (32110)` exception can be thrown

* Wrap `this.mqttClient.connect()` in the `Mqttv5PahoMessageHandler.publish()` into a `Lock`

(cherry picked from commit 334620f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants