1
- // Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1
+ // Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
2
2
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3
3
//
4
4
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
43
43
import java .util .concurrent .atomic .AtomicBoolean ;
44
44
import java .util .concurrent .atomic .AtomicLong ;
45
45
import java .util .concurrent .atomic .AtomicReference ;
46
+ import java .util .concurrent .locks .Lock ;
47
+ import java .util .concurrent .locks .ReentrantLock ;
46
48
import java .util .function .Function ;
47
49
import java .util .function .ToLongFunction ;
48
50
import org .slf4j .Logger ;
@@ -56,7 +58,6 @@ class StreamProducer implements Producer {
56
58
private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {};
57
59
private final long id ;
58
60
private final MessageAccumulator accumulator ;
59
- private final ToLongFunction <Message > accumulatorPublishSequenceFunction ;
60
61
// FIXME investigate a more optimized data structure to handle pending messages
61
62
private final ConcurrentMap <Long , AccumulatedEntity > unconfirmedMessages ;
62
63
private final int batchSize ;
@@ -79,6 +80,7 @@ class StreamProducer implements Producer {
79
80
private volatile Status status ;
80
81
private volatile ScheduledFuture <?> confirmTimeoutFuture ;
81
82
private final short publishVersion ;
83
+ private final Lock lock = new ReentrantLock ();
82
84
83
85
@ SuppressFBWarnings ("CT_CONSTRUCTOR_THROW" )
84
86
StreamProducer (
@@ -110,7 +112,7 @@ class StreamProducer implements Producer {
110
112
this .closingCallback = environment .registerProducer (this , name , this .stream );
111
113
final Client .OutboundEntityWriteCallback delegateWriteCallback ;
112
114
AtomicLong publishingSequence = new AtomicLong (computeFirstValueOfPublishingSequence ());
113
- this . accumulatorPublishSequenceFunction =
115
+ ToLongFunction < Message > accumulatorPublishSequenceFunction =
114
116
msg -> {
115
117
if (msg .hasPublishingId ()) {
116
118
return msg .getPublishingId ();
@@ -491,76 +493,80 @@ void unavailable() {
491
493
}
492
494
493
495
void running () {
494
- synchronized (this ) {
495
- LOGGER .debug (
496
- "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)" ,
497
- this .unconfirmedMessages .size (),
498
- this .accumulator .size ());
499
- if (this .retryOnRecovery ) {
500
- LOGGER .debug ("Re-publishing {} unconfirmed message(s)" , this .unconfirmedMessages .size ());
501
- if (!this .unconfirmedMessages .isEmpty ()) {
502
- Map <Long , AccumulatedEntity > messagesToResend = new TreeMap <>(this .unconfirmedMessages );
503
- this .unconfirmedMessages .clear ();
504
- Iterator <Entry <Long , AccumulatedEntity >> resendIterator =
505
- messagesToResend .entrySet ().iterator ();
506
- while (resendIterator .hasNext ()) {
507
- List <Object > messages = new ArrayList <>(this .batchSize );
508
- int batchCount = 0 ;
509
- while (batchCount != this .batchSize ) {
510
- Object accMessage =
511
- resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
512
- if (accMessage == null ) {
513
- break ;
496
+ this .executeInLock (
497
+ () -> {
498
+ LOGGER .debug (
499
+ "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)" ,
500
+ this .unconfirmedMessages .size (),
501
+ this .accumulator .size ());
502
+ if (this .retryOnRecovery ) {
503
+ LOGGER .debug (
504
+ "Re-publishing {} unconfirmed message(s)" , this .unconfirmedMessages .size ());
505
+ if (!this .unconfirmedMessages .isEmpty ()) {
506
+ Map <Long , AccumulatedEntity > messagesToResend =
507
+ new TreeMap <>(this .unconfirmedMessages );
508
+ this .unconfirmedMessages .clear ();
509
+ Iterator <Entry <Long , AccumulatedEntity >> resendIterator =
510
+ messagesToResend .entrySet ().iterator ();
511
+ while (resendIterator .hasNext ()) {
512
+ List <Object > messages = new ArrayList <>(this .batchSize );
513
+ int batchCount = 0 ;
514
+ while (batchCount != this .batchSize ) {
515
+ Object accMessage =
516
+ resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
517
+ if (accMessage == null ) {
518
+ break ;
519
+ }
520
+ messages .add (accMessage );
521
+ batchCount ++;
522
+ }
523
+ client .publishInternal (
524
+ this .publishVersion ,
525
+ this .publisherId ,
526
+ messages ,
527
+ this .writeCallback ,
528
+ this .publishSequenceFunction );
529
+ }
530
+ }
531
+ } else {
532
+ LOGGER .debug (
533
+ "Skipping republishing of {} unconfirmed messages" ,
534
+ this .unconfirmedMessages .size ());
535
+ Map <Long , AccumulatedEntity > messagesToFail = new TreeMap <>(this .unconfirmedMessages );
536
+ this .unconfirmedMessages .clear ();
537
+ for (AccumulatedEntity accumulatedEntity : messagesToFail .values ()) {
538
+ try {
539
+ int permits =
540
+ accumulatedEntity
541
+ .confirmationCallback ()
542
+ .handle (false , CODE_PUBLISH_CONFIRM_TIMEOUT );
543
+ this .unconfirmedMessagesSemaphore .release (permits );
544
+ } catch (Exception e ) {
545
+ LOGGER .debug ("Error while nack-ing outbound message: {}" , e .getMessage ());
546
+ this .unconfirmedMessagesSemaphore .release (1 );
514
547
}
515
- messages .add (accMessage );
516
- batchCount ++;
517
548
}
518
- client .publishInternal (
519
- this .publishVersion ,
520
- this .publisherId ,
521
- messages ,
522
- this .writeCallback ,
523
- this .publishSequenceFunction );
524
549
}
525
- }
526
- } else {
527
- LOGGER .debug (
528
- "Skipping republishing of {} unconfirmed messages" , this .unconfirmedMessages .size ());
529
- Map <Long , AccumulatedEntity > messagesToFail = new TreeMap <>(this .unconfirmedMessages );
530
- this .unconfirmedMessages .clear ();
531
- for (AccumulatedEntity accumulatedEntity : messagesToFail .values ()) {
532
- try {
533
- int permits =
534
- accumulatedEntity
535
- .confirmationCallback ()
536
- .handle (false , CODE_PUBLISH_CONFIRM_TIMEOUT );
537
- this .unconfirmedMessagesSemaphore .release (permits );
538
- } catch (Exception e ) {
539
- LOGGER .debug ("Error while nack-ing outbound message: {}" , e .getMessage ());
540
- this .unconfirmedMessagesSemaphore .release (1 );
550
+ this .accumulator .flush (true );
551
+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
552
+ if (toRelease > 0 ) {
553
+ unconfirmedMessagesSemaphore .release (toRelease );
554
+ if (!unconfirmedMessagesSemaphore .tryAcquire (this .unconfirmedMessages .size ())) {
555
+ LOGGER .debug (
556
+ "Could not acquire {} permit(s) for message republishing" ,
557
+ this .unconfirmedMessages .size ());
558
+ }
541
559
}
542
- }
543
- }
544
- this .accumulator .flush (true );
545
- int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
546
- if (toRelease > 0 ) {
547
- unconfirmedMessagesSemaphore .release (toRelease );
548
- if (!unconfirmedMessagesSemaphore .tryAcquire (this .unconfirmedMessages .size ())) {
549
- LOGGER .debug (
550
- "Could not acquire {} permit(s) for message republishing" ,
551
- this .unconfirmedMessages .size ());
552
- }
553
- }
554
- }
560
+ });
555
561
this .status = Status .RUNNING ;
556
562
}
557
563
558
- synchronized void setClient (Client client ) {
559
- this .client = client ;
564
+ void setClient (Client client ) {
565
+ this .executeInLock (() -> this . client = client ) ;
560
566
}
561
567
562
- synchronized void setPublisherId (byte publisherId ) {
563
- this .publisherId = publisherId ;
568
+ void setPublisherId (byte publisherId ) {
569
+ this .executeInLock (() -> this . publisherId = publisherId ) ;
564
570
}
565
571
566
572
Status status () {
@@ -646,4 +652,21 @@ public int fragmentLength(Object entity) {
646
652
}
647
653
}
648
654
}
655
+
656
+ void lock () {
657
+ this .lock .lock ();
658
+ }
659
+
660
+ void unlock () {
661
+ this .lock .unlock ();
662
+ }
663
+
664
+ private void executeInLock (Runnable action ) {
665
+ this .lock ();
666
+ try {
667
+ action .run ();
668
+ } finally {
669
+ this .unlock ();
670
+ }
671
+ }
649
672
}
0 commit comments