Skip to content

Commit 3af53df

Browse files
artembilanspring-builds
authored andcommitted
GH-9214: Fix subscription identifier logic in Mqttv5PahoMessageDrivenChannelAdapter
Fixes: #9214 The `MqttAsyncClient` can set `subscriptionIdentifier` from session if it is enabled and available from connection * Remove manual `subscriptionIdentifierCounter` from the `Mqttv5PahoMessageDrivenChannelAdapter`. Instead, use `subscriptionProperties.setSubscriptionIdentifiers(List.of(0));` to make the `this.mqttSession.getNextSubscriptionIdentifier();` to work when `if (connOpts.useSubscriptionIdentifiers() && this.mqttConnection.isSubscriptionIdentifiersAvailable()) {` is `true` (cherry picked from commit 3dcbdef)
1 parent f3ac134 commit 3af53df

File tree

2 files changed

+14
-19
lines changed

2 files changed

+14
-19
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.Map;
24-
import java.util.concurrent.atomic.AtomicInteger;
2524
import java.util.concurrent.locks.Lock;
2625
import java.util.concurrent.locks.ReentrantLock;
2726
import java.util.stream.IntStream;
2827

2928
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
30-
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
3129
import org.eclipse.paho.mqttv5.client.IMqttToken;
3230
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
3331
import org.eclipse.paho.mqttv5.client.MqttCallback;
@@ -109,8 +107,6 @@ public class Mqttv5PahoMessageDrivenChannelAdapter
109107

110108
private volatile boolean readyToSubscribeOnStart;
111109

112-
private final AtomicInteger subscriptionIdentifierCounter = new AtomicInteger(0);
113-
114110
/**
115111
* Create an instance based on the MQTT url, client id and subscriptions.
116112
* @param url the MQTT url to connect.
@@ -344,9 +340,10 @@ public void addTopic(String topic, int qos) {
344340
}
345341
if (this.mqttClient != null && this.mqttClient.isConnected()) {
346342
MqttProperties subscriptionProperties = new MqttProperties();
347-
subscriptionProperties.setSubscriptionIdentifier(this.subscriptionIdentifierCounter.incrementAndGet());
343+
// Make use of mqttSession.getNextSubscriptionIdentifier() if available in connection
344+
subscriptionProperties.setSubscriptionIdentifiers(List.of(0));
348345
this.mqttClient.subscribe(new MqttSubscription[] {subscription},
349-
null, null, new IMqttMessageListener[] {this::messageArrived}, subscriptionProperties)
346+
null, null, this::messageArrived, subscriptionProperties)
350347
.waitForCompletion(getCompletionTimeout());
351348
}
352349
}
@@ -472,15 +469,10 @@ private void subscribe() {
472469
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
473470
this.topicLock.lock();
474471
try {
475-
IMqttMessageListener listener = this::messageArrived;
476-
IMqttMessageListener[] listeners = IntStream.range(0, mqttSubscriptions.length)
477-
.mapToObj(t -> listener)
478-
.toArray(IMqttMessageListener[]::new);
479472
MqttProperties subscriptionProperties = new MqttProperties();
480-
subscriptionProperties.setSubscriptionIdentifiers(IntStream.range(0, mqttSubscriptions.length)
481-
.mapToObj(i -> this.subscriptionIdentifierCounter.incrementAndGet())
482-
.toList());
483-
this.mqttClient.subscribe(mqttSubscriptions, null, null, listeners, subscriptionProperties)
473+
// Make use of mqttSession.getNextSubscriptionIdentifier() if available in connection
474+
subscriptionProperties.setSubscriptionIdentifiers(List.of(0));
475+
this.mqttClient.subscribe(mqttSubscriptions, null, null, this::messageArrived, subscriptionProperties)
484476
.waitForCompletion(getCompletionTimeout());
485477
String message = "Connected and subscribed to " + Arrays.toString(mqttSubscriptions);
486478
logger.debug(message);

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5AdapterTests.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void testStop() throws Exception {
5757
adapter.stop();
5858

5959
verify(client).connect(any(MqttConnectionOptions.class));
60-
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
60+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener.class), any());
6161
verify(client).unsubscribe(any(String[].class));
6262
}
6363

@@ -71,11 +71,12 @@ public void testStopNotClean() throws Exception {
7171
adapter.stop();
7272

7373
verify(client).connect(any(MqttConnectionOptions.class));
74-
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
74+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener.class), any());
7575
verify(client, never()).unsubscribe(any(String[].class));
7676
}
7777

78-
private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, boolean cleanStart) throws MqttException {
78+
private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client,
79+
boolean cleanStart) throws MqttException {
7980

8081
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
8182
connectionOptions.setServerURIs(new String[] {"tcp://localhost:1883"});
@@ -85,9 +86,11 @@ private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttA
8586
IMqttToken token = mock(IMqttToken.class);
8687
given(client.disconnect()).willReturn(token);
8788
given(client.connect(any(MqttConnectionOptions.class))).willReturn(token);
88-
given(client.subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any())).willReturn(token);
89+
given(client.subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener.class), any()))
90+
.willReturn(token);
8991
given(client.unsubscribe(any(String[].class))).willReturn(token);
90-
Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(connectionOptions, "client", "foo");
92+
Mqttv5PahoMessageDrivenChannelAdapter adapter =
93+
new Mqttv5PahoMessageDrivenChannelAdapter(connectionOptions, "client", "foo");
9194
ReflectionTestUtils.setField(adapter, "mqttClient", client);
9295
adapter.setBeanFactory(mock(BeanFactory.class));
9396
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));

0 commit comments

Comments
 (0)