|
41 | 41 | import java.util.concurrent.CountDownLatch;
|
42 | 42 | import java.util.concurrent.ExecutorService;
|
43 | 43 | import java.util.concurrent.Executors;
|
44 |
| -import java.util.concurrent.Semaphore; |
45 | 44 | import java.util.concurrent.TimeUnit;
|
46 | 45 | import java.util.concurrent.atomic.AtomicBoolean;
|
47 | 46 | import java.util.concurrent.atomic.AtomicInteger;
|
|
58 | 57 | import org.junit.jupiter.api.extension.ExtendWith;
|
59 | 58 | import org.junit.jupiter.params.ParameterizedTest;
|
60 | 59 | import org.junit.jupiter.params.provider.ValueSource;
|
61 |
| -import wiremock.org.checkerframework.checker.units.qual.A; |
62 | 60 |
|
63 | 61 | @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
|
64 | 62 | public class StreamProducerTest {
|
@@ -90,67 +88,6 @@ void tearDown() {
|
90 | 88 | environment.close();
|
91 | 89 | }
|
92 | 90 |
|
93 |
| - private static AtomicLong rate() { |
94 |
| - AtomicLong count = new AtomicLong(); |
95 |
| - AtomicLong tick = new AtomicLong(System.nanoTime()); |
96 |
| - |
97 |
| - Executors.newSingleThreadScheduledExecutor() |
98 |
| - .scheduleAtFixedRate( |
99 |
| - () -> { |
100 |
| - long now = System.nanoTime(); |
101 |
| - long before = tick.getAndSet(now); |
102 |
| - long elapsed = now - before; |
103 |
| - long sent = count.getAndSet(0); |
104 |
| - System.out.println("Rate " + (sent * 1_000_000_000L / elapsed) + " msg/s"); |
105 |
| - }, |
106 |
| - 1, |
107 |
| - 1, |
108 |
| - TimeUnit.SECONDS); |
109 |
| - return count; |
110 |
| - } |
111 |
| - |
112 |
| - @Test |
113 |
| - void test() { |
114 |
| - AtomicLong count = rate(); |
115 |
| - Producer producer = environment.producerBuilder().stream(stream) |
116 |
| - .maxUnconfirmedMessages(10) |
117 |
| - .build(); |
118 |
| - |
119 |
| - while(true) { |
120 |
| - producer.send(producer.messageBuilder().build(), s -> { }); |
121 |
| - count.incrementAndGet(); |
122 |
| - } |
123 |
| - |
124 |
| - } |
125 |
| - |
126 |
| - @Test |
127 |
| - void client() throws Exception { |
128 |
| - int permits = 10; |
129 |
| - Semaphore semaphore = new Semaphore(permits); |
130 |
| - Client client = cf.get(new Client.ClientParameters().publishConfirmListener(new Client.PublishConfirmListener() { |
131 |
| - @Override |
132 |
| - public void handle(byte publisherId, long publishingId) { |
133 |
| - semaphore.release(); |
134 |
| - } |
135 |
| - })); |
136 |
| - |
137 |
| - byte pubId = (byte) 0; |
138 |
| - client.declarePublisher(pubId, null, stream); |
139 |
| - |
140 |
| - AtomicLong count = rate(); |
141 |
| - |
142 |
| - List<Message> messages = IntStream.range(0, permits).mapToObj(ignored -> client |
143 |
| - .messageBuilder() |
144 |
| - .addData("hello".getBytes(StandardCharsets.UTF_8)) |
145 |
| - .build()).collect(Collectors.toList()); |
146 |
| - while (true) { |
147 |
| - semaphore.acquire(permits); |
148 |
| - client.publish(pubId, messages); |
149 |
| - count.addAndGet(permits); |
150 |
| - } |
151 |
| - |
152 |
| - } |
153 |
| - |
154 | 91 | @Test
|
155 | 92 | void send() throws Exception {
|
156 | 93 | int batchSize = 10;
|
|
0 commit comments