Skip to content

Commit d813c08

Browse files
committed
Compose CompletableFuture in async test
References #215
1 parent 05dcfc4 commit d813c08

File tree

1 file changed

+52
-23
lines changed

1 file changed

+52
-23
lines changed

src/test/java/com/rabbitmq/client/test/ChannelAsyncCompletableFutureTest.java

+52-23
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.rabbitmq.client.test;
1717

1818
import com.rabbitmq.client.AMQP;
19-
import com.rabbitmq.client.Command;
2019
import com.rabbitmq.client.DefaultConsumer;
2120
import com.rabbitmq.client.Envelope;
2221
import com.rabbitmq.client.impl.AMQImpl;
@@ -26,7 +25,10 @@
2625

2726
import java.io.IOException;
2827
import java.util.UUID;
29-
import java.util.concurrent.*;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
3032

3133
import static org.junit.Assert.assertTrue;
3234

@@ -35,46 +37,73 @@ public class ChannelAsyncCompletableFutureTest extends BrokerTestCase {
3537
ExecutorService executor;
3638

3739
String queue;
40+
String exchange;
3841

3942
@Before public void init() {
40-
executor = Executors.newSingleThreadExecutor();
43+
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
4144
queue = UUID.randomUUID().toString();
45+
exchange = UUID.randomUUID().toString();
4246
}
4347

4448
@After public void tearDown() throws IOException {
4549
executor.shutdownNow();
4650
channel.queueDelete(queue);
51+
channel.exchangeDelete(exchange);
4752
}
4853

4954
@Test
5055
public void async() throws Exception {
5156
CountDownLatch latch = new CountDownLatch(1);
52-
AMQP.Queue.Declare method = new AMQImpl.Queue.Declare.Builder()
57+
AMQP.Queue.Declare queueDeclare = new AMQImpl.Queue.Declare.Builder()
5358
.queue(queue)
5459
.durable(true)
5560
.exclusive(false)
5661
.autoDelete(false)
5762
.arguments(null)
5863
.build();
59-
CompletableFuture<Command> future = channel.asyncCompletableRpc(method);
60-
future.thenAcceptAsync(action -> {
61-
try {
62-
channel.basicPublish("", queue, null, "dummy".getBytes());
63-
} catch (IOException e) {
64-
throw new RuntimeException(e);
65-
}
66-
}, executor).thenAccept((whatever) -> {
67-
try {
68-
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
69-
@Override
70-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
71-
latch.countDown();
72-
}
73-
});
74-
} catch (IOException e) {
75-
throw new RuntimeException(e);
76-
}
77-
});
64+
65+
channel.asyncCompletableRpc(queueDeclare)
66+
.thenComposeAsync(action -> {
67+
try {
68+
return channel.asyncCompletableRpc(new AMQImpl.Exchange.Declare.Builder()
69+
.exchange(exchange)
70+
.type("fanout")
71+
.durable(false)
72+
.autoDelete(false)
73+
.arguments(null)
74+
.build());
75+
} catch (IOException e) {
76+
throw new RuntimeException(e);
77+
}
78+
}, executor).thenComposeAsync(action -> {
79+
try {
80+
return channel.asyncCompletableRpc(new AMQImpl.Queue.Bind.Builder()
81+
.queue(queue)
82+
.exchange(exchange)
83+
.routingKey("")
84+
.arguments(null)
85+
.build());
86+
} catch (IOException e) {
87+
throw new RuntimeException(e);
88+
}
89+
}, executor).thenAcceptAsync(action -> {
90+
try {
91+
channel.basicPublish("", queue, null, "dummy".getBytes());
92+
} catch (IOException e) {
93+
throw new RuntimeException(e);
94+
}
95+
}, executor).thenAcceptAsync((whatever) -> {
96+
try {
97+
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
98+
@Override
99+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
100+
latch.countDown();
101+
}
102+
});
103+
} catch (IOException e) {
104+
throw new RuntimeException(e);
105+
}
106+
}, executor);
78107
assertTrue(latch.await(1, TimeUnit.SECONDS));
79108
}
80109

0 commit comments

Comments
 (0)