|
22 | 22 | import java.util.Collections;
|
23 | 23 | import java.util.List;
|
24 | 24 | import java.util.Random;
|
| 25 | +import java.util.concurrent.atomic.AtomicInteger; |
25 | 26 | import java.util.concurrent.atomic.AtomicReference;
|
26 | 27 |
|
27 | 28 | import static org.testng.Assert.assertEquals;
|
@@ -246,9 +247,9 @@ public Void apply(final Integer runNumber) throws Throwable {
|
246 | 247 | public void run(Publisher<T> pub) throws Throwable {
|
247 | 248 | final Latch completionLatch = new Latch(env);
|
248 | 249 |
|
| 250 | + final AtomicInteger gotElements = new AtomicInteger(0); |
249 | 251 | pub.subscribe(new Subscriber<T>() {
|
250 | 252 | private Subscription subs;
|
251 |
| - private long gotElements = 0; |
252 | 253 |
|
253 | 254 | private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();
|
254 | 255 |
|
@@ -302,8 +303,7 @@ public void onNext(T ignore) {
|
302 | 303 | final String signal = String.format("onNext(%s)", ignore);
|
303 | 304 | concurrentAccessBarrier.enterSignal(signal);
|
304 | 305 |
|
305 |
| - gotElements += 1; |
306 |
| - if (gotElements <= elements) // requesting one more than we know are in the stream (some Publishers need this) |
| 306 | + if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this) |
307 | 307 | subs.request(1);
|
308 | 308 |
|
309 | 309 | concurrentAccessBarrier.leaveSignal(signal);
|
@@ -331,7 +331,10 @@ public void onComplete() {
|
331 | 331 | }
|
332 | 332 | });
|
333 | 333 |
|
334 |
| - completionLatch.expectClose(elements * env.defaultTimeoutMillis(), "Expected 10 elements to be drained"); |
| 334 | + completionLatch.expectClose( |
| 335 | + elements * env.defaultTimeoutMillis(), |
| 336 | + String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it", |
| 337 | + runNumber, iterations, elements, gotElements.get())); |
335 | 338 | }
|
336 | 339 | });
|
337 | 340 | return null;
|
|
0 commit comments