Skip to content

Commit fbf06e8

Browse files
authored
GH-3181: MQTT: Support MANUAL Acks
Resolves #3181 * Doc polishing * Rework acknowledgment into the existing `AcknowledgmentCallback`. * Fix javadocs and doc linFix javadocs and doc linkk * Doc polishing; explain uses of `ACKNOWLEDGMENT_CALLBACK` header.
1 parent 3eedda6 commit fbf06e8

File tree

18 files changed

+282
-18
lines changed

18 files changed

+282
-18
lines changed

spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public class IntegrationMessageHeaderAccessor extends MessageHeaderAccessor {
6565

6666
public static final String DELIVERY_ATTEMPT = "deliveryAttempt";
6767

68+
/**
69+
* A callback to acknowledge message delivery. The type of the header value depends on
70+
* the context in which the header is used. See the reference manual for more
71+
* information.
72+
*/
6873
public static final String ACKNOWLEDGMENT_CALLBACK = "acknowledgmentCallback";
6974

7075
/**

spring-integration-core/src/main/java/org/springframework/integration/StaticMessageHeaderAccessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.atomic.AtomicInteger;
2222

2323
import org.springframework.integration.acks.AcknowledgmentCallback;
24+
import org.springframework.integration.acks.SimpleAcknowledgment;
2425
import org.springframework.lang.Nullable;
2526
import org.springframework.messaging.Message;
2627
import org.springframework.messaging.MessageHeaders;
@@ -107,6 +108,12 @@ public static AcknowledgmentCallback getAcknowledgmentCallback(Message<?> messag
107108
AcknowledgmentCallback.class);
108109
}
109110

111+
@Nullable
112+
public static SimpleAcknowledgment getAcknowledgment(Message<?> message) {
113+
return message.getHeaders().get(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
114+
SimpleAcknowledgment.class);
115+
}
116+
110117
@SuppressWarnings("unchecked")
111118
@Nullable
112119
public static <T> T getSourceData(Message<?> message) {

spring-integration-core/src/main/java/org/springframework/integration/acks/AcknowledgmentCallback.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,14 +25,19 @@
2525
*
2626
*/
2727
@FunctionalInterface
28-
public interface AcknowledgmentCallback {
28+
public interface AcknowledgmentCallback extends SimpleAcknowledgment {
2929

3030
/**
3131
* Acknowledge the message.
3232
* @param status the status.
3333
*/
3434
void acknowledge(Status status);
3535

36+
@Override
37+
default void acknowledge() {
38+
acknowledge(Status.ACCEPT);
39+
}
40+
3641
/**
3742
* Implementations must implement this to indicate when the ack has been
3843
* processed by the user so that the framework can auto-ack if needed.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.acks;
18+
19+
/**
20+
* Opaque object for manually acknowledging.
21+
*
22+
* @author Gary Russell
23+
* @since 5.3
24+
*
25+
*/
26+
@FunctionalInterface
27+
public interface SimpleAcknowledgment {
28+
29+
/**
30+
* Acknowledge the message delivery.
31+
*/
32+
void acknowledge();
33+
34+
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/config/xml/MqttMessageDrivenChannelAdapterParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
4848
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
4949
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos");
5050
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval");
51+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "manual-acks");
5152

5253
return builder.getBeanDefinition();
5354
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030

3131
import org.springframework.context.ApplicationEventPublisher;
3232
import org.springframework.context.ApplicationEventPublisherAware;
33+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
34+
import org.springframework.integration.acks.SimpleAcknowledgment;
3335
import org.springframework.integration.mqtt.core.ConsumerStopAction;
3436
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
3537
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
3638
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
3739
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
40+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3841
import org.springframework.messaging.Message;
3942
import org.springframework.messaging.MessagingException;
4043
import org.springframework.util.Assert;
@@ -71,6 +74,8 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv
7174

7275
private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
7376

77+
private boolean manualAcks;
78+
7479
private volatile IMqttClient client;
7580

7681
private volatile ScheduledFuture<?> reconnectFuture;
@@ -152,6 +157,15 @@ public synchronized void setRecoveryInterval(int recoveryInterval) {
152157
this.recoveryInterval = recoveryInterval;
153158
}
154159

160+
/**
161+
* Set the acknowledgment mode to manual.
162+
* @param manualAcks true for manual acks.
163+
* @since 5.3
164+
*/
165+
public void setManualAcks(boolean manualAcks) {
166+
this.manualAcks = manualAcks;
167+
}
168+
155169
/**
156170
* @since 4.2.2
157171
*/
@@ -263,6 +277,7 @@ private synchronized void connectAndSubscribe() throws MqttException {
263277
String[] topics = getTopic();
264278
try {
265279
this.client.connect(connectionOptions);
280+
this.client.setManualAcks(this.manualAcks);
266281
int[] requestedQos = getQos();
267282
int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
268283
this.client.subscribe(topics, grantedQos);
@@ -365,7 +380,12 @@ public synchronized void connectionLost(Throwable cause) {
365380

366381
@Override
367382
public void messageArrived(String topic, MqttMessage mqttMessage) {
368-
Message<?> message = this.getConverter().toMessage(topic, mqttMessage);
383+
AbstractIntegrationMessageBuilder<?> builder = getConverter().toMessageBuilder(topic, mqttMessage);
384+
if (this.manualAcks) {
385+
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
386+
new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
387+
}
388+
Message<?> message = builder.build();
369389
try {
370390
sendMessage(message);
371391
}
@@ -379,4 +399,46 @@ public void messageArrived(String topic, MqttMessage mqttMessage) {
379399
public void deliveryComplete(IMqttDeliveryToken token) {
380400
}
381401

402+
/**
403+
* Used to complete message arrival when {@link AckMode#MANUAL}.
404+
*
405+
* @since 5.3
406+
*/
407+
private static class AcknowledgmentImpl implements SimpleAcknowledgment {
408+
409+
private final int id;
410+
411+
private final int qos;
412+
413+
private final IMqttClient ackClient;
414+
415+
/**
416+
* Construct an instance with the provided properties.
417+
* @param id the message id.
418+
* @param qos the message QOS.
419+
* @param client the client.
420+
*/
421+
AcknowledgmentImpl(int id, int qos, IMqttClient client) {
422+
this.id = id;
423+
this.qos = qos;
424+
this.ackClient = client;
425+
}
426+
427+
@Override
428+
public void acknowledge() {
429+
if (this.ackClient != null) {
430+
try {
431+
this.ackClient.messageArrivedComplete(this.id, this.qos);
432+
}
433+
catch (MqttException e) {
434+
throw new IllegalStateException(e);
435+
}
436+
}
437+
else {
438+
throw new IllegalStateException("Client has changed");
439+
}
440+
}
441+
442+
}
443+
382444
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/DefaultPahoMessageConverter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -206,6 +206,11 @@ public Message<?> toMessage(Object mqttMessage, MessageHeaders headers) {
206206

207207
@Override
208208
public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
209+
return toMessageBuilder(topic, mqttMessage).build();
210+
}
211+
212+
@Override
213+
public AbstractIntegrationMessageBuilder<?> toMessageBuilder(String topic, MqttMessage mqttMessage) {
209214
try {
210215
AbstractIntegrationMessageBuilder<?> messageBuilder;
211216
if (this.bytesMessageMapper != null) {
@@ -219,13 +224,14 @@ public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
219224
.withPayload(mqttBytesToPayload(mqttMessage));
220225
}
221226
messageBuilder
227+
.setHeader(MqttHeaders.ID, mqttMessage.getId())
222228
.setHeader(MqttHeaders.RECEIVED_QOS, mqttMessage.getQos())
223229
.setHeader(MqttHeaders.DUPLICATE, mqttMessage.isDuplicate())
224230
.setHeader(MqttHeaders.RECEIVED_RETAINED, mqttMessage.isRetained());
225231
if (topic != null) {
226232
messageBuilder.setHeader(MqttHeaders.RECEIVED_TOPIC, topic);
227233
}
228-
return messageBuilder.build();
234+
return messageBuilder;
229235
}
230236
catch (Exception e) {
231237
throw new MessageConversionException("failed to convert object to Message", e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.support;
18+
19+
import org.springframework.lang.Nullable;
20+
import org.springframework.messaging.Message;
21+
22+
/**
23+
* Helper for typed access to incoming MQTT message headers.
24+
*
25+
* @author Gary Russell
26+
* @since 5.3
27+
*
28+
*/
29+
public final class MqttHeaderAccessor {
30+
31+
private MqttHeaderAccessor() {
32+
}
33+
34+
/**
35+
* Return the received topic header.
36+
* @param message the message.
37+
* @return the header.
38+
*/
39+
@Nullable
40+
public static String receivedTopic(Message<?> message) {
41+
return message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC, String.class);
42+
}
43+
44+
/**
45+
* Return the MQTT message id.
46+
* @param message the message.
47+
* @return the header.
48+
*/
49+
@Nullable
50+
public static Integer id(Message<?> message) {
51+
return message.getHeaders().get(MqttHeaders.ID, Integer.class);
52+
}
53+
54+
/**
55+
* Return the received QOS header.
56+
* @param message the message.
57+
* @return the header.
58+
*/
59+
@Nullable
60+
public static Integer receivedQos(Message<?> message) {
61+
return message.getHeaders().get(MqttHeaders.RECEIVED_QOS, Integer.class);
62+
}
63+
64+
/**
65+
* Return the received retained header.
66+
* @param message the message.
67+
* @return the header.
68+
*/
69+
@Nullable
70+
public static Boolean receivedRetained(Message<?> message) {
71+
return message.getHeaders().get(MqttHeaders.RECEIVED_RETAINED, Boolean.class);
72+
}
73+
74+
/**
75+
* Return the duplicate header.
76+
* @param message the message.
77+
* @return the header.
78+
*/
79+
@Nullable
80+
public static Boolean duplicate(Message<?> message) {
81+
return message.getHeaders().get(MqttHeaders.DUPLICATE, Boolean.class);
82+
}
83+
84+
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttHeaders.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,8 @@ public final class MqttHeaders {
3030

3131
public static final String QOS = PREFIX + "qos";
3232

33+
public static final String ID = PREFIX + "id";
34+
3335
public static final String RECEIVED_QOS = PREFIX + "receivedQos";
3436

3537
public static final String DUPLICATE = PREFIX + "duplicate";

0 commit comments

Comments
 (0)