Skip to content

Commit 7882609

Browse files
committed
GH-9214: Fix subscription identifier logic in Mqttv5PahoMessageDrivenChannelAdapter
Fixes: #9214 The `MqttAsyncClient` can set `subscriptionIdentifier` from session if it is enabled and available from connection * Remove manual `subscriptionIdentifierCounter` from the `Mqttv5PahoMessageDrivenChannelAdapter`. Instead, use `subscriptionProperties.setSubscriptionIdentifiers(List.of(0));` to make the `this.mqttSession.getNextSubscriptionIdentifier();` to work when `if (connOpts.useSubscriptionIdentifiers() && this.mqttConnection.isSubscriptionIdentifiersAvailable()) {` is `true` (cherry picked from commit 3dcbdef) # Conflicts: # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
1 parent bf6c4f6 commit 7882609

File tree

2 files changed

+19
-22
lines changed

2 files changed

+19
-22
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
package org.springframework.integration.mqtt.inbound;
1818

1919
import java.util.Arrays;
20+
import java.util.List;
2021
import java.util.Map;
21-
import java.util.concurrent.atomic.AtomicInteger;
2222
import java.util.concurrent.locks.Lock;
2323
import java.util.concurrent.locks.ReentrantLock;
2424
import java.util.stream.IntStream;
2525

2626
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
27-
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
2827
import org.eclipse.paho.mqttv5.client.IMqttToken;
2928
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
3029
import org.eclipse.paho.mqttv5.client.MqttCallback;
@@ -85,7 +84,7 @@ public class Mqttv5PahoMessageDrivenChannelAdapter
8584
extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectionOptions>
8685
implements MqttCallback, MqttComponent<MqttConnectionOptions> {
8786

88-
private final Lock lock = new ReentrantLock();
87+
private final Lock lock = new ReentrantLock();
8988

9089
private final MqttConnectionOptions connectionOptions;
9190

@@ -102,7 +101,7 @@ public class Mqttv5PahoMessageDrivenChannelAdapter
102101

103102
private volatile boolean readyToSubscribeOnStart;
104103

105-
private final AtomicInteger subscriptionIdentifierCounter = new AtomicInteger(0); public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
104+
public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
106105
super(url, clientId, topic);
107106
Assert.hasText(url, "'url' cannot be null or empty");
108107
this.connectionOptions = new MqttConnectionOptions();
@@ -282,9 +281,10 @@ public void addTopic(String topic, int qos) {
282281
super.addTopic(topic, qos);
283282
if (this.mqttClient != null && this.mqttClient.isConnected()) {
284283
MqttProperties subscriptionProperties = new MqttProperties();
285-
subscriptionProperties.setSubscriptionIdentifier(this.subscriptionIdentifierCounter.incrementAndGet());
286-
this.mqttClient.subscribe(new MqttSubscription[] { new MqttSubscription(topic, qos) },
287-
null, null, new IMqttMessageListener[] { this::messageArrived }, subscriptionProperties)
284+
// Make use of mqttSession.getNextSubscriptionIdentifier() if available in connection
285+
subscriptionProperties.setSubscriptionIdentifiers(List.of(0));
286+
this.mqttClient.subscribe(new MqttSubscription[] {new MqttSubscription(topic, qos)},
287+
null, null, this::messageArrived, subscriptionProperties)
288288
.waitForCompletion(getCompletionTimeout());
289289
}
290290
}
@@ -409,18 +409,13 @@ private void subscribe() {
409409
}
410410

411411
int[] requestedQos = getQos();
412-
MqttSubscription[] subscriptions = IntStream.range(0, topics.length)
412+
MqttSubscription[] mqttSubscriptions = IntStream.range(0, topics.length)
413413
.mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i]))
414414
.toArray(MqttSubscription[]::new);
415-
IMqttMessageListener listener = this::messageArrived;
416-
IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
417-
.mapToObj(t -> listener)
418-
.toArray(IMqttMessageListener[]::new);
419415
MqttProperties subscriptionProperties = new MqttProperties();
420-
subscriptionProperties.setSubscriptionIdentifiers(IntStream.range(0, topics.length)
421-
.mapToObj(i -> this.subscriptionIdentifierCounter.incrementAndGet())
422-
.toList());
423-
this.mqttClient.subscribe(subscriptions, null, null, listeners, new MqttProperties())
416+
// Make use of mqttSession.getNextSubscriptionIdentifier() if available in connection
417+
subscriptionProperties.setSubscriptionIdentifiers(List.of(0));
418+
this.mqttClient.subscribe(mqttSubscriptions, null, null, this::messageArrived, subscriptionProperties)
424419
.waitForCompletion(getCompletionTimeout());
425420
String message = "Connected and subscribed to " + Arrays.toString(topics);
426421
logger.debug(message);
@@ -451,7 +446,6 @@ private static String obtainServerUrlFromOptions(MqttConnectionOptions connectio
451446
return serverURIs[0];
452447
}
453448

454-
455449
/**
456450
* Used to complete message arrival when {@link #isManualAcks()} is true.
457451
*/

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5AdapterTests.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void testStop() throws Exception {
5858
adapter.stop();
5959

6060
verify(client).connect(any(MqttConnectionOptions.class));
61-
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
61+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener.class), any());
6262
verify(client).unsubscribe(any(String[].class));
6363
}
6464

@@ -72,11 +72,12 @@ public void testStopNotClean() throws Exception {
7272
adapter.stop();
7373

7474
verify(client).connect(any(MqttConnectionOptions.class));
75-
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
75+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener.class), any());
7676
verify(client, never()).unsubscribe(any(String[].class));
7777
}
7878

79-
private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, boolean cleanStart) throws MqttException {
79+
private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client,
80+
boolean cleanStart) throws MqttException {
8081

8182
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
8283
connectionOptions.setServerURIs(new String[] {"tcp://localhost:1883"});
@@ -86,9 +87,11 @@ private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttA
8687
IMqttToken token = mock(IMqttToken.class);
8788
given(client.disconnect()).willReturn(token);
8889
given(client.connect(any(MqttConnectionOptions.class))).willReturn(token);
89-
given(client.subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any())).willReturn(token);
90+
given(client.subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener.class), any()))
91+
.willReturn(token);
9092
given(client.unsubscribe(any(String[].class))).willReturn(token);
91-
Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(connectionOptions, "client", "foo");
93+
Mqttv5PahoMessageDrivenChannelAdapter adapter =
94+
new Mqttv5PahoMessageDrivenChannelAdapter(connectionOptions, "client", "foo");
9295
ReflectionTestUtils.setField(adapter, "mqttClient", client);
9396
adapter.setBeanFactory(mock(BeanFactory.class));
9497
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));

0 commit comments

Comments
 (0)