|
49 | 49 | import java.util.concurrent.atomic.AtomicLong;
|
50 | 50 | import java.util.concurrent.atomic.AtomicReference;
|
51 | 51 | import java.util.function.Function;
|
| 52 | +import java.util.function.LongConsumer; |
52 | 53 | import java.util.stream.Collectors;
|
53 | 54 | import java.util.stream.IntStream;
|
54 | 55 | import org.junit.jupiter.api.Test;
|
@@ -512,30 +513,51 @@ void publishAndConsume(boolean directBuffer) throws Exception {
|
512 | 513 | client.subscribe(b(1), stream, OffsetSpecification.first(), credit);
|
513 | 514 |
|
514 | 515 | CountDownLatch confirmedLatch = new CountDownLatch(publishCount);
|
| 516 | + Set<Long> sent = ConcurrentHashMap.newKeySet(publishCount); |
| 517 | + Client publisher = |
| 518 | + cf.get( |
| 519 | + new Client.ClientParameters() |
| 520 | + .byteBufAllocator(allocator) |
| 521 | + .publishConfirmListener( |
| 522 | + (publisherId, correlationId) -> { |
| 523 | + sent.remove(correlationId); |
| 524 | + confirmedLatch.countDown(); |
| 525 | + })); |
| 526 | + publisher.declarePublisher(b(1), null, stream); |
| 527 | + LongConsumer publish = |
| 528 | + messageId -> { |
| 529 | + sent.add(messageId); |
| 530 | + publisher.publish( |
| 531 | + b(1), |
| 532 | + Collections.singletonList( |
| 533 | + publisher |
| 534 | + .messageBuilder() |
| 535 | + .addData(("message" + messageId).getBytes(StandardCharsets.UTF_8)) |
| 536 | + .build()), |
| 537 | + msg -> messageId); |
| 538 | + }; |
515 | 539 | new Thread(
|
516 | 540 | () -> {
|
517 |
| - Client publisher = |
518 |
| - cf.get( |
519 |
| - new Client.ClientParameters() |
520 |
| - .byteBufAllocator(allocator) |
521 |
| - .publishConfirmListener( |
522 |
| - (publisherId, correlationId) -> confirmedLatch.countDown())); |
523 | 541 | int messageId = 0;
|
524 |
| - publisher.declarePublisher(b(1), null, stream); |
525 | 542 | while (messageId < publishCount) {
|
526 | 543 | messageId++;
|
527 |
| - publisher.publish( |
528 |
| - b(1), |
529 |
| - Collections.singletonList( |
530 |
| - publisher |
531 |
| - .messageBuilder() |
532 |
| - .addData(("message" + messageId).getBytes(StandardCharsets.UTF_8)) |
533 |
| - .build())); |
| 544 | + publish.accept(messageId); |
534 | 545 | }
|
535 | 546 | })
|
536 | 547 | .start();
|
537 | 548 |
|
538 |
| - assertThat(confirmedLatch.await(15, SECONDS)).isTrue(); |
| 549 | + int attempt = 0; |
| 550 | + while (attempt < 3) { |
| 551 | + boolean allConfirmed = confirmedLatch.await(15, SECONDS); |
| 552 | + if (allConfirmed) { |
| 553 | + break; |
| 554 | + } else { |
| 555 | + attempt++; |
| 556 | + for (Long messageIdNotConfirmed : sent) { |
| 557 | + publish.accept(messageIdNotConfirmed); |
| 558 | + } |
| 559 | + } |
| 560 | + } |
539 | 561 | assertThat(consumedLatch.await(15, SECONDS)).isTrue();
|
540 | 562 | client.unsubscribe(b(1));
|
541 | 563 | }
|
|
0 commit comments