@@ -410,6 +410,7 @@ private static class ProducerTracker implements AgentTracker {
410
410
private volatile byte publisherId ;
411
411
private volatile ClientProducersManager clientProducersManager ;
412
412
private final AtomicBoolean recovering = new AtomicBoolean (false );
413
+ private final Lock trackerLock = new ReentrantLock ();
413
414
414
415
private ProducerTracker (
415
416
long uniqueId , String reference , String stream , StreamProducer producer ) {
@@ -421,10 +422,12 @@ private ProducerTracker(
421
422
422
423
@ Override
423
424
public void assign (byte producerId , Client client , ClientProducersManager manager ) {
424
- synchronized (ProducerTracker .this ) {
425
- this .publisherId = producerId ;
426
- this .clientProducersManager = manager ;
427
- }
425
+ lock (
426
+ this .trackerLock ,
427
+ () -> {
428
+ this .publisherId = producerId ;
429
+ this .clientProducersManager = manager ;
430
+ });
428
431
this .producer .setPublisherId (producerId );
429
432
this .producer .setClient (client );
430
433
}
@@ -451,9 +454,7 @@ public String stream() {
451
454
452
455
@ Override
453
456
public void unavailable () {
454
- synchronized (ProducerTracker .this ) {
455
- this .clientProducersManager = null ;
456
- }
457
+ lock (this .trackerLock , () -> this .clientProducersManager = null );
457
458
this .producer .unavailable ();
458
459
}
459
460
@@ -464,10 +465,14 @@ public void running() {
464
465
}
465
466
466
467
@ Override
467
- public synchronized void cancel () {
468
- if (this .clientProducersManager != null ) {
469
- this .clientProducersManager .unregister (this );
470
- }
468
+ public void cancel () {
469
+ lock (
470
+ this .trackerLock ,
471
+ () -> {
472
+ if (this .clientProducersManager != null ) {
473
+ this .clientProducersManager .unregister (this );
474
+ }
475
+ });
471
476
}
472
477
473
478
@ Override
@@ -503,6 +508,7 @@ private static class TrackingConsumerTracker implements AgentTracker {
503
508
private final StreamConsumer consumer ;
504
509
private volatile ClientProducersManager clientProducersManager ;
505
510
private final AtomicBoolean recovering = new AtomicBoolean (false );
511
+ private final Lock trackerLock = new ReentrantLock ();
506
512
507
513
private TrackingConsumerTracker (long uniqueId , String stream , StreamConsumer consumer ) {
508
514
this .uniqueId = uniqueId ;
@@ -512,9 +518,7 @@ private TrackingConsumerTracker(long uniqueId, String stream, StreamConsumer con
512
518
513
519
@ Override
514
520
public void assign (byte producerId , Client client , ClientProducersManager manager ) {
515
- synchronized (TrackingConsumerTracker .this ) {
516
- this .clientProducersManager = manager ;
517
- }
521
+ lock (this .trackerLock , () -> this .clientProducersManager = manager );
518
522
this .consumer .setTrackingClient (client );
519
523
}
520
524
@@ -540,9 +544,7 @@ public String stream() {
540
544
541
545
@ Override
542
546
public void unavailable () {
543
- synchronized (TrackingConsumerTracker .this ) {
544
- this .clientProducersManager = null ;
545
- }
547
+ lock (this .trackerLock , () -> this .clientProducersManager = null );
546
548
this .consumer .unavailable ();
547
549
}
548
550
@@ -553,10 +555,8 @@ public void running() {
553
555
}
554
556
555
557
@ Override
556
- public synchronized void cancel () {
557
- if (this .clientProducersManager != null ) {
558
- this .clientProducersManager .unregister (this );
559
- }
558
+ public void cancel () {
559
+ lock (this .trackerLock , () -> this .clientProducersManager .unregister (this ));
560
560
}
561
561
562
562
@ Override
@@ -599,6 +599,7 @@ private class ClientProducersManager implements Comparable<ClientProducersManage
599
599
private final Map <String , Set <AgentTracker >> streamToTrackers = new ConcurrentHashMap <>();
600
600
private final Client client ;
601
601
private final AtomicBoolean closed = new AtomicBoolean (false );
602
+ private final Lock managerLock = new ReentrantLock ();
602
603
603
604
private ClientProducersManager (
604
605
Broker targetNode ,
@@ -670,7 +671,8 @@ private ClientProducersManager(
670
671
"Received metadata notification for '{}', stream is likely to have become unavailable" ,
671
672
stream );
672
673
Set <AgentTracker > affectedTrackers ;
673
- synchronized (ClientProducersManager .this ) {
674
+ this .managerLock .lock ();
675
+ try {
674
676
affectedTrackers = streamToTrackers .remove (stream );
675
677
LOGGER .debug (
676
678
"Affected publishers and consumer trackers after metadata update: {}" ,
@@ -686,6 +688,8 @@ private ClientProducersManager(
686
688
}
687
689
});
688
690
}
691
+ } finally {
692
+ this .managerLock .unlock ();
689
693
}
690
694
if (affectedTrackers != null && !affectedTrackers .isEmpty ()) {
691
695
environment
@@ -840,79 +844,97 @@ private void recoverAgent(Broker node, List<BrokerWrapper> candidates, AgentTrac
840
844
}
841
845
}
842
846
843
- private synchronized void register (AgentTracker tracker ) {
844
- if (this .isFullFor (tracker )) {
845
- throw new IllegalStateException ("Cannot add subscription tracker, the manager is full" );
846
- }
847
- if (this .isClosed ()) {
848
- throw new IllegalStateException ("Cannot add subscription tracker, the manager is closed" );
849
- }
850
- checkNotClosed ();
851
- if (tracker .identifiable ()) {
852
- ProducerTracker producerTracker = (ProducerTracker ) tracker ;
853
- int index = pickSlot (this .producers , producerTracker , this .producerIndexSequence );
854
- this .checkNotClosed ();
855
- Response response =
856
- callAndMaybeRetry (
857
- () ->
858
- this .client .declarePublisher (
859
- (byte ) index , tracker .reference (), tracker .stream ()),
860
- RETRY_ON_TIMEOUT ,
861
- "Declare publisher request for publisher %d on stream '%s'" ,
862
- producerTracker .uniqueId (),
863
- producerTracker .stream ());
864
- if (response .isOk ()) {
865
- tracker .assign ((byte ) index , this .client , this );
866
- } else {
867
- String message =
868
- "Error while declaring publisher: "
869
- + formatConstant (response .getResponseCode ())
870
- + ". Could not assign producer to client." ;
871
- LOGGER .info (message );
872
- throw new StreamException (message , response .getResponseCode ());
873
- }
874
- producers .put (tracker .id (), producerTracker );
875
- } else {
876
- tracker .assign ((byte ) 0 , this .client , this );
877
- trackingConsumerTrackers .add (tracker );
878
- }
879
- streamToTrackers
880
- .computeIfAbsent (tracker .stream (), s -> ConcurrentHashMap .newKeySet ())
881
- .add (tracker );
847
+ private void register (AgentTracker tracker ) {
848
+ lock (
849
+ this .managerLock ,
850
+ () -> {
851
+ if (this .isFullFor (tracker )) {
852
+ throw new IllegalStateException (
853
+ "Cannot add subscription tracker, the manager is full" );
854
+ }
855
+ if (this .isClosed ()) {
856
+ throw new IllegalStateException (
857
+ "Cannot add subscription tracker, the manager is closed" );
858
+ }
859
+ checkNotClosed ();
860
+ if (tracker .identifiable ()) {
861
+ ProducerTracker producerTracker = (ProducerTracker ) tracker ;
862
+ int index = pickSlot (this .producers , producerTracker , this .producerIndexSequence );
863
+ this .checkNotClosed ();
864
+ Response response =
865
+ callAndMaybeRetry (
866
+ () ->
867
+ this .client .declarePublisher (
868
+ (byte ) index , tracker .reference (), tracker .stream ()),
869
+ RETRY_ON_TIMEOUT ,
870
+ "Declare publisher request for publisher %d on stream '%s'" ,
871
+ producerTracker .uniqueId (),
872
+ producerTracker .stream ());
873
+ if (response .isOk ()) {
874
+ tracker .assign ((byte ) index , this .client , this );
875
+ } else {
876
+ String message =
877
+ "Error while declaring publisher: "
878
+ + formatConstant (response .getResponseCode ())
879
+ + ". Could not assign producer to client." ;
880
+ LOGGER .info (message );
881
+ throw new StreamException (message , response .getResponseCode ());
882
+ }
883
+ producers .put (tracker .id (), producerTracker );
884
+ } else {
885
+ tracker .assign ((byte ) 0 , this .client , this );
886
+ trackingConsumerTrackers .add (tracker );
887
+ }
888
+ streamToTrackers
889
+ .computeIfAbsent (tracker .stream (), s -> ConcurrentHashMap .newKeySet ())
890
+ .add (tracker );
891
+ });
882
892
}
883
893
884
- private synchronized void unregister (AgentTracker tracker ) {
885
- LOGGER .debug (
886
- "Unregistering {} {} from manager on {}" , tracker .type (), tracker .uniqueId (), this .name );
887
- if (tracker .identifiable ()) {
888
- producers .remove (tracker .id ());
889
- } else {
890
- trackingConsumerTrackers .remove (tracker );
891
- }
892
- streamToTrackers .compute (
893
- tracker .stream (),
894
- (s , trackersForThisStream ) -> {
895
- if (s == null || trackersForThisStream == null ) {
896
- // should not happen
897
- return null ;
894
+ private void unregister (AgentTracker tracker ) {
895
+ lock (
896
+ this .managerLock ,
897
+ () -> {
898
+ LOGGER .debug (
899
+ "Unregistering {} {} from manager on {}" ,
900
+ tracker .type (),
901
+ tracker .uniqueId (),
902
+ this .name );
903
+ if (tracker .identifiable ()) {
904
+ producers .remove (tracker .id ());
898
905
} else {
899
- trackersForThisStream .remove (tracker );
900
- return trackersForThisStream .isEmpty () ? null : trackersForThisStream ;
906
+ trackingConsumerTrackers .remove (tracker );
901
907
}
908
+ streamToTrackers .compute (
909
+ tracker .stream (),
910
+ (s , trackersForThisStream ) -> {
911
+ if (s == null || trackersForThisStream == null ) {
912
+ // should not happen
913
+ return null ;
914
+ } else {
915
+ trackersForThisStream .remove (tracker );
916
+ return trackersForThisStream .isEmpty () ? null : trackersForThisStream ;
917
+ }
918
+ });
919
+ closeIfEmpty ();
902
920
});
903
- closeIfEmpty ();
904
921
}
905
922
906
- synchronized boolean isFullFor (AgentTracker tracker ) {
907
- if (tracker .identifiable ()) {
908
- return producers .size () == maxProducersByClient ;
909
- } else {
910
- return trackingConsumerTrackers .size () == maxTrackingConsumersByClient ;
911
- }
923
+ boolean isFullFor (AgentTracker tracker ) {
924
+ return lock (
925
+ this .managerLock ,
926
+ () -> {
927
+ if (tracker .identifiable ()) {
928
+ return producers .size () == maxProducersByClient ;
929
+ } else {
930
+ return trackingConsumerTrackers .size () == maxTrackingConsumersByClient ;
931
+ }
932
+ });
912
933
}
913
934
914
- synchronized boolean isEmpty () {
915
- return producers .isEmpty () && trackingConsumerTrackers .isEmpty ();
935
+ boolean isEmpty () {
936
+ return lock (
937
+ this .managerLock , () -> producers .isEmpty () && trackingConsumerTrackers .isEmpty ());
916
938
}
917
939
918
940
private void checkNotClosed () {
@@ -930,13 +952,15 @@ boolean isClosed() {
930
952
931
953
private void closeIfEmpty () {
932
954
if (!closed .get ()) {
933
- synchronized (this ) {
934
- if (this .isEmpty ()) {
935
- this .close ();
936
- } else {
937
- LOGGER .debug ("Not closing producer manager {} because it is not empty" , this .id );
938
- }
939
- }
955
+ lock (
956
+ this .managerLock ,
957
+ () -> {
958
+ if (this .isEmpty ()) {
959
+ this .close ();
960
+ } else {
961
+ LOGGER .debug ("Not closing producer manager {} because it is not empty" , this .id );
962
+ }
963
+ });
940
964
}
941
965
}
942
966
0 commit comments