Skip to content

Commit ac29b26

Browse files
alessiomatricardispring-builds
authored andcommitted
GH-9197: Optional ZeroMQ topic wrapping
Fixes: #9197 * Update `ZeroMqMessageHandler` for `wrapTopic` option * Add author, fix code style, add same logic also for `ZeroMqMessageProducer` * Add `wrapTopic` function also in DSL specs * Fix wrap topic test: duplicate socket address caused binding exception * Rewrite the `MessageProducer.wrapTopic()` test * Call `stop()` instead of `destroy()` method (cherry picked from commit 80c8a61)
1 parent 460a4b6 commit ac29b26

File tree

7 files changed

+156
-7
lines changed

7 files changed

+156
-7
lines changed

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* The {@link ReactiveMessageHandlerSpec} extension for {@link ZeroMqMessageHandler}.
3737
*
3838
* @author Artem Bilan
39+
* @author Alessio Matricardi
3940
*
4041
* @since 5.4
4142
*/
@@ -127,6 +128,20 @@ public ZeroMqMessageHandlerSpec topic(String topic) {
127128
return this;
128129
}
129130

131+
/**
132+
* Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the
133+
* subscriptions must be wrapped with an additional empty frame.
134+
* It is ignored for all other {@link SocketType}s supported.
135+
* This attribute is set to {@code true} by default.
136+
* @param wrapTopic true if the topic must be wrapped with an additional empty frame.
137+
* @return the spec
138+
* @since 6.2.6
139+
*/
140+
public ZeroMqMessageHandlerSpec wrapTopic(boolean wrapTopic) {
141+
this.reactiveMessageHandler.wrapTopic(wrapTopic);
142+
return this;
143+
}
144+
130145
/**
131146
* Specify a {@link Function} to evaluate a topic a {@link SocketType#PUB}
132147
* is going to use for distributing messages into the

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030

3131
/**
3232
* @author Artem Bilan
33+
* @author Alessio Matricardi
3334
*
3435
* @since 5.4
3536
*/
@@ -108,6 +109,20 @@ public ZeroMqMessageProducerSpec topics(String... topics) {
108109
return this;
109110
}
110111

112+
/**
113+
* Specify if the topic
114+
* that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame.
115+
* It is ignored for all other {@link SocketType}s supported.
116+
* This attribute is set to {@code true} by default.
117+
* @param unwrapTopic true if the received topic is wrapped with an additional empty frame.
118+
* @return the spec
119+
* @since 6.2.6
120+
*/
121+
public ZeroMqMessageProducerSpec unwrapTopic(boolean unwrapTopic) {
122+
this.target.unwrapTopic(unwrapTopic);
123+
return this;
124+
}
125+
111126
/**
112127
* Configure an URL for {@link org.zeromq.ZMQ.Socket#connect(String)}.
113128
* @param connectUrl the URL to connect ZeroMq socket to.

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626

2727
import org.zeromq.SocketType;
2828
import org.zeromq.ZContext;
29+
import org.zeromq.ZFrame;
2930
import org.zeromq.ZMQ;
3031
import org.zeromq.ZMsg;
3132
import reactor.core.publisher.Flux;
@@ -54,6 +55,7 @@
5455
* When the {@link SocketType#SUB} is used, the received topic is stored in the {@link ZeroMqHeaders#TOPIC}.
5556
*
5657
* @author Artem Bilan
58+
* @author Alessio Matricardi
5759
*
5860
* @since 5.4
5961
*/
@@ -90,6 +92,8 @@ public class ZeroMqMessageProducer extends MessageProducerSupport {
9092

9193
private volatile Mono<ZMQ.Socket> socketMono;
9294

95+
private volatile boolean unwrapTopic = true;
96+
9397
public ZeroMqMessageProducer(ZContext context) {
9498
this(context, SocketType.PAIR);
9599
}
@@ -189,6 +193,18 @@ public int getBoundPort() {
189193
return this.bindPort.get();
190194
}
191195

196+
/**
197+
* Specify if the topic
198+
* that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame.
199+
* It is ignored for all other {@link SocketType}s supported.
200+
* This attribute is set to {@code true} by default.
201+
* @param unwrapTopic true if the received topic is wrapped with an additional empty frame.
202+
* @since 6.2.6
203+
*/
204+
public void unwrapTopic(boolean unwrapTopic) {
205+
this.unwrapTopic = unwrapTopic;
206+
}
207+
192208
@Override
193209
public String getComponentType() {
194210
return "zeromq:inbound-channel-adapter";
@@ -284,7 +300,8 @@ private Mono<Message<?>> convertMessage(Mono<ZMsg> msgMono) {
284300
return msgMono.map((msg) -> {
285301
Map<String, Object> headers = null;
286302
if (msg.size() > 1) {
287-
headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, msg.unwrap().getString(ZMQ.CHARSET));
303+
ZFrame topicFrame = this.unwrapTopic ? msg.unwrap() : msg.pop();
304+
headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, topicFrame.getString(ZMQ.CHARSET));
288305
}
289306
return this.messageMapper.toMessage(msg.getLast().getData(), headers); // NOSONAR
290307
});

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* the {@link ZMsg} is sent into a socket as is and it is not destroyed for possible further reusing.
6161
*
6262
* @author Artem Bilan
63+
* @author Alessio Matricardi
6364
*
6465
* @since 5.4
6566
*/
@@ -88,6 +89,8 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler
8889

8990
private volatile Disposable socketMonoSubscriber;
9091

92+
private volatile boolean wrapTopic = true;
93+
9194
/**
9295
* Create an instance based on the provided {@link ZContext} and connection string.
9396
* @param context the {@link ZContext} to use for creating sockets.
@@ -192,6 +195,18 @@ public void setTopicExpression(Expression topicExpression) {
192195
this.topicExpression = topicExpression;
193196
}
194197

198+
/**
199+
* Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the
200+
* subscriptions must be wrapped with an additional empty frame.
201+
* It is ignored for all other {@link SocketType}s supported.
202+
* This attribute is set to {@code true} by default.
203+
* @param wrapTopic true if the topic must be wrapped with an additional empty frame.
204+
* @since 6.2.6
205+
*/
206+
public void wrapTopic(boolean wrapTopic) {
207+
this.wrapTopic = wrapTopic;
208+
}
209+
195210
@Override
196211
public String getComponentType() {
197212
return "zeromq:outbound-channel-adapter";
@@ -245,7 +260,13 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
245260
if (socket.base() instanceof Pub) {
246261
String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class);
247262
if (topic != null) {
248-
msg.wrap(new ZFrame(topic));
263+
ZFrame topicFrame = new ZFrame(topic);
264+
if (this.wrapTopic) {
265+
msg.wrap(topicFrame);
266+
}
267+
else {
268+
msg.push(topicFrame);
269+
}
249270
}
250271
}
251272
}

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
import org.springframework.beans.factory.BeanFactory;
3333
import org.springframework.integration.channel.FluxMessageChannel;
3434
import org.springframework.integration.test.util.TestUtils;
35+
import org.springframework.integration.zeromq.ZeroMqHeaders;
3536
import org.springframework.messaging.support.GenericMessage;
3637

3738
import static org.assertj.core.api.Assertions.assertThat;
@@ -40,6 +41,7 @@
4041

4142
/**
4243
* @author Artem Bilan
44+
* @author Alessio Matricardi
4345
*
4446
* @since 5.4
4547
*/
@@ -88,7 +90,7 @@ void testMessageProducerForPair() {
8890

8991
stepVerifier.verify();
9092

91-
messageProducer.destroy();
93+
messageProducer.stop();
9294
socket.close();
9395
}
9496

@@ -142,7 +144,42 @@ void testMessageProducerForPubSubReceiveRaw() {
142144

143145
stepVerifier.verify(Duration.ofSeconds(10));
144146

145-
messageProducer.destroy();
147+
messageProducer.stop();
148+
socket.close();
149+
}
150+
151+
@Test
152+
void testMessageProducerForPubSubDisabledWrapTopic() {
153+
String socketAddress = "inproc://messageProducerWrapTopic.test";
154+
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB);
155+
socket.bind(socketAddress);
156+
157+
FluxMessageChannel outputChannel = new FluxMessageChannel();
158+
159+
StepVerifier stepVerifier =
160+
StepVerifier.create(outputChannel)
161+
.assertNext((message) -> assertThat(message.getHeaders()).containsEntry(ZeroMqHeaders.TOPIC, "testTopicWithNonWrappedTopic"))
162+
.thenCancel()
163+
.verifyLater();
164+
165+
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(CONTEXT, SocketType.SUB);
166+
messageProducer.setOutputChannel(outputChannel);
167+
messageProducer.setTopics("test");
168+
messageProducer.setConnectUrl(socketAddress);
169+
messageProducer.setBeanFactory(mock(BeanFactory.class));
170+
messageProducer.unwrapTopic(false);
171+
messageProducer.afterPropertiesSet();
172+
messageProducer.start();
173+
174+
assertThat(socket.recv()).isNotNull();
175+
176+
ZMsg msg = ZMsg.newStringMsg("test");
177+
msg.push("testTopicWithNonWrappedTopic");
178+
msg.send(socket);
179+
180+
stepVerifier.verify();
181+
182+
messageProducer.stop();
146183
socket.close();
147184
}
148185

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
/**
4545
* @author Artem Bilan
46+
* @author Alessio Matricardi
4647
*
4748
* @since 5.4
4849
*/
@@ -151,4 +152,40 @@ void testMessageHandlerForPushPullOverProxy() {
151152
proxy.destroy();
152153
}
153154

155+
@Test
156+
void testMessageHandlerForPubSubDisabledWrapTopic() {
157+
ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB);
158+
subSocket.setReceiveTimeOut(0);
159+
int port = subSocket.bindToRandomPort("tcp://*");
160+
subSocket.subscribe("test");
161+
162+
ZeroMqMessageHandler messageHandler =
163+
new ZeroMqMessageHandler(CONTEXT, "tcp://localhost:" + port, SocketType.PUB);
164+
messageHandler.setBeanFactory(mock(BeanFactory.class));
165+
messageHandler.setTopicExpression(
166+
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
167+
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
168+
messageHandler.wrapTopic(false);
169+
messageHandler.afterPropertiesSet();
170+
messageHandler.start();
171+
172+
Message<?> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();
173+
174+
await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100))
175+
.untilAsserted(() -> {
176+
subSocket.subscribe("test");
177+
messageHandler.handleMessage(testMessage).subscribe();
178+
ZMsg msg = ZMsg.recvMsg(subSocket);
179+
assertThat(msg).isNotNull();
180+
assertThat(msg.pop().getString(ZMQ.CHARSET)).isEqualTo("testTopic");
181+
Message<?> capturedMessage =
182+
new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData());
183+
assertThat(capturedMessage).isEqualTo(testMessage);
184+
msg.destroy();
185+
});
186+
187+
messageHandler.destroy();
188+
subSocket.close();
189+
}
190+
154191
}

src/reference/antora/modules/ROOT/pages/zeromq.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ If the `receiveRaw` option is set to `true`, a `ZMsg`, consumed from the socket,
119119
Otherwise, an `InboundMessageMapper` is used to convert the consumed data into a `Message`.
120120
If the received `ZMsg` is multi-frame, the first frame is treated as the `ZeroMqHeaders.TOPIC` header this ZeroMQ message was published to.
121121

122+
If the `unwrapTopic` option is set to `false`, the incoming message is considered to consist of two frames: the topic and the ZeroMQ message.
123+
Otherwise, by default, the `ZMsg` is considered to consist of three frames: the first one containing the topic, the last frame containing the message, with an empty frame in the middle.
124+
122125
With `SocketType.SUB`, the `ZeroMqMessageProducer` uses the provided `topics` option for subscriptions; defaults to subscribe to all.
123126
Subscriptions can be adjusted at runtime using `subscribeToTopics()` and `unsubscribeFromTopics()` `@ManagedOperation` s.
124127

@@ -146,6 +149,10 @@ Only `SocketType.PAIR`, `SocketType.PUSH` and `SocketType.PUB` are supported.
146149
The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding is not supported.
147150
When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null.
148151
The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data.
152+
153+
If the `wrapTopic` option is set to `false`, the ZeroMQ message frame is sent after the injected topic, if present.
154+
By default, an additional empty frame is sent between the topic and the message.
155+
149156
When the payload of the request message is a `ZMsg`, no conversion or topic extraction is performed: the `ZMsg` is sent into a socket as is and it is not destroyed for possible further reuse.
150157
Otherwise, an `OutboundMessageMapper<byte[]>` is used to convert a request message (or just its payload) into a ZeroMQ frame to publish.
151158
By default, a `ConvertingBytesMessageMapper` is used supplied with a `ConfigurableCompositeMessageConverter`.

0 commit comments

Comments
 (0)