Skip to content

Commit 920b8ae

Browse files
committed
ZeroMQ test: Add sleep between SUB & PUB
1 parent a577652 commit 920b8ae

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void testMessageHandlerForPair() {
8686
}
8787

8888
@Test
89-
void testMessageHandlerForPubSub() {
89+
void testMessageHandlerForPubSub() throws InterruptedException {
9090
ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB);
9191
subSocket.setReceiveTimeOut(20_000);
9292
int port = subSocket.bindToRandomPort("tcp://*");
@@ -98,12 +98,15 @@ void testMessageHandlerForPubSub() {
9898
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
9999
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
100100
messageHandler.afterPropertiesSet();
101+
subSocket.subscribe("test");
102+
103+
// Give it some time to connect and subscribe
104+
Thread.sleep(2000);
101105

102106
Message<?> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();
103107

104108
await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100))
105109
.untilAsserted(() -> {
106-
subSocket.subscribe("test");
107110
messageHandler.handleMessage(testMessage).subscribe();
108111
ZMsg msg = ZMsg.recvMsg(subSocket);
109112
assertThat(msg).isNotNull();

0 commit comments

Comments
 (0)