Skip to content

Commit 39c3acb

Browse files
committed
Fix ZMQMessageHandlerTests race condition
Use `receiveTimeOut = 0` on SUB socket and rely on the retries from the `await().atMost()`. Also resubscribe in the retry callback before publishing the next attempt
1 parent 0297f6b commit 39c3acb

File tree

1 file changed

+4
-6
lines changed

1 file changed

+4
-6
lines changed

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,11 @@ void testMessageHandlerForPair() {
8686
}
8787

8888
@Test
89-
void testMessageHandlerForPubSub() throws InterruptedException {
89+
void testMessageHandlerForPubSub() {
9090
ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB);
91-
subSocket.setReceiveTimeOut(20_000);
91+
subSocket.setReceiveTimeOut(0);
9292
int port = subSocket.bindToRandomPort("tcp://*");
93+
subSocket.subscribe("test");
9394

9495
ZeroMqMessageHandler messageHandler =
9596
new ZeroMqMessageHandler(CONTEXT, "tcp://localhost:" + port, SocketType.PUB);
@@ -98,15 +99,12 @@ void testMessageHandlerForPubSub() throws InterruptedException {
9899
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
99100
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
100101
messageHandler.afterPropertiesSet();
101-
subSocket.subscribe("test");
102-
103-
// Give it some time to connect and subscribe
104-
Thread.sleep(2000);
105102

106103
Message<?> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();
107104

108105
await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100))
109106
.untilAsserted(() -> {
107+
subSocket.subscribe("test");
110108
messageHandler.handleMessage(testMessage).subscribe();
111109
ZMsg msg = ZMsg.recvMsg(subSocket);
112110
assertThat(msg).isNotNull();

0 commit comments

Comments
 (0)