@@ -476,7 +476,7 @@ private ClientSubscriptionsManager(
476
476
stream );
477
477
478
478
Set <SubscriptionTracker > affectedSubscriptions ;
479
- synchronized (ClientSubscriptionsManager . this ) {
479
+ synchronized (this . owner ) {
480
480
Set <SubscriptionTracker > subscriptions = streamToStreamSubscriptions .remove (stream );
481
481
if (subscriptions != null && !subscriptions .isEmpty ()) {
482
482
List <SubscriptionTracker > newSubscriptions =
@@ -616,129 +616,137 @@ private void assignConsumersToStream(
616
616
});
617
617
}
618
618
619
- synchronized void add (
619
+ void add (
620
620
SubscriptionTracker subscriptionTracker ,
621
621
OffsetSpecification offsetSpecification ,
622
622
boolean isInitialSubscription ) {
623
- // FIXME check manager is still open (not closed because of connection failure)
624
- byte subscriptionId = 0 ;
625
- for (int i = 0 ; i < MAX_SUBSCRIPTIONS_PER_CLIENT ; i ++) {
626
- if (subscriptionTrackers .get (i ) == null ) {
627
- subscriptionId = (byte ) i ;
628
- break ;
623
+ synchronized (this .owner ) {
624
+
625
+ // FIXME check manager is still open (not closed because of connection failure)
626
+ byte subscriptionId = 0 ;
627
+ for (int i = 0 ; i < MAX_SUBSCRIPTIONS_PER_CLIENT ; i ++) {
628
+ if (subscriptionTrackers .get (i ) == null ) {
629
+ subscriptionId = (byte ) i ;
630
+ break ;
631
+ }
629
632
}
630
- }
631
633
632
- List <SubscriptionTracker > previousSubscriptions = this .subscriptionTrackers ;
633
-
634
- LOGGER .debug (
635
- "Subscribing to {}, requested offset specification is {}, offset tracking reference is {}" ,
636
- subscriptionTracker .stream ,
637
- offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification ,
638
- subscriptionTracker .offsetTrackingReference );
639
- try {
640
- // updating data structures before subscribing
641
- // (to make sure they are up-to-date in case message would arrive super fast)
642
- subscriptionTracker .assign (subscriptionId , this );
643
- streamToStreamSubscriptions
644
- .computeIfAbsent (subscriptionTracker .stream , s -> ConcurrentHashMap .newKeySet ())
645
- .add (subscriptionTracker );
646
- this .subscriptionTrackers =
647
- update (previousSubscriptions , subscriptionId , subscriptionTracker );
648
-
649
- String offsetTrackingReference = subscriptionTracker .offsetTrackingReference ;
650
- if (offsetTrackingReference != null ) {
651
- QueryOffsetResponse queryOffsetResponse =
652
- client .queryOffset (offsetTrackingReference , subscriptionTracker .stream );
653
- if (queryOffsetResponse .isOk () && queryOffsetResponse .getOffset () != 0 ) {
654
- if (offsetSpecification != null && isInitialSubscription ) {
655
- // subscription call (not recovery), so telling the user their offset specification is
656
- // ignored
657
- LOGGER .info (
658
- "Requested offset specification {} not used in favor of stored offset found for reference {}" ,
659
- offsetSpecification ,
660
- offsetTrackingReference );
634
+ List <SubscriptionTracker > previousSubscriptions = this .subscriptionTrackers ;
635
+
636
+ LOGGER .debug (
637
+ "Subscribing to {}, requested offset specification is {}, offset tracking reference is {}" ,
638
+ subscriptionTracker .stream ,
639
+ offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification ,
640
+ subscriptionTracker .offsetTrackingReference );
641
+ try {
642
+ // updating data structures before subscribing
643
+ // (to make sure they are up-to-date in case message would arrive super fast)
644
+ subscriptionTracker .assign (subscriptionId , this );
645
+ streamToStreamSubscriptions
646
+ .computeIfAbsent (subscriptionTracker .stream , s -> ConcurrentHashMap .newKeySet ())
647
+ .add (subscriptionTracker );
648
+ this .subscriptionTrackers =
649
+ update (previousSubscriptions , subscriptionId , subscriptionTracker );
650
+
651
+ String offsetTrackingReference = subscriptionTracker .offsetTrackingReference ;
652
+ if (offsetTrackingReference != null ) {
653
+ QueryOffsetResponse queryOffsetResponse =
654
+ client .queryOffset (offsetTrackingReference , subscriptionTracker .stream );
655
+ if (queryOffsetResponse .isOk () && queryOffsetResponse .getOffset () != 0 ) {
656
+ if (offsetSpecification != null && isInitialSubscription ) {
657
+ // subscription call (not recovery), so telling the user their offset specification
658
+ // is
659
+ // ignored
660
+ LOGGER .info (
661
+ "Requested offset specification {} not used in favor of stored offset found for reference {}" ,
662
+ offsetSpecification ,
663
+ offsetTrackingReference );
664
+ }
665
+ LOGGER .debug (
666
+ "Using offset {} to start consuming from {} with consumer {} "
667
+ + "(instead of {})" ,
668
+ queryOffsetResponse .getOffset (),
669
+ subscriptionTracker .stream ,
670
+ offsetTrackingReference ,
671
+ offsetSpecification );
672
+ offsetSpecification = OffsetSpecification .offset (queryOffsetResponse .getOffset () + 1 );
661
673
}
662
- LOGGER .debug (
663
- "Using offset {} to start consuming from {} with consumer {} " + "(instead of {})" ,
664
- queryOffsetResponse .getOffset (),
665
- subscriptionTracker .stream ,
666
- offsetTrackingReference ,
667
- offsetSpecification );
668
- offsetSpecification = OffsetSpecification .offset (queryOffsetResponse .getOffset () + 1 );
669
674
}
670
- }
671
675
672
- offsetSpecification =
673
- offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification ;
676
+ offsetSpecification =
677
+ offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification ;
674
678
675
- Map <String , String > subscriptionProperties = Collections .emptyMap ();
676
- if (subscriptionTracker .offsetTrackingReference != null ) {
677
- subscriptionProperties = new HashMap <>(1 );
678
- subscriptionProperties .put ("name" , subscriptionTracker .offsetTrackingReference );
679
- }
679
+ Map <String , String > subscriptionProperties = Collections .emptyMap ();
680
+ if (subscriptionTracker .offsetTrackingReference != null ) {
681
+ subscriptionProperties = new HashMap <>(1 );
682
+ subscriptionProperties .put ("name" , subscriptionTracker .offsetTrackingReference );
683
+ }
680
684
681
- SubscriptionContext subscriptionContext =
682
- new DefaultSubscriptionContext (offsetSpecification );
683
- subscriptionTracker .subscriptionListener .preSubscribe (subscriptionContext );
684
- LOGGER .info (
685
- "Computed offset specification {}, offset specification used after subscription listener {}" ,
686
- offsetSpecification ,
687
- subscriptionContext .offsetSpecification ());
688
-
689
- // FIXME consider using fewer initial credits
690
- Client .Response subscribeResponse =
691
- client .subscribe (
692
- subscriptionId ,
693
- subscriptionTracker .stream ,
694
- subscriptionContext .offsetSpecification (),
695
- 10 ,
696
- subscriptionProperties );
697
- if (!subscribeResponse .isOk ()) {
698
- String message =
699
- "Subscription to stream "
700
- + subscriptionTracker .stream
701
- + " failed with code "
702
- + formatConstant (subscribeResponse .getResponseCode ());
703
- LOGGER .debug (message );
704
- throw new StreamException (message );
685
+ SubscriptionContext subscriptionContext =
686
+ new DefaultSubscriptionContext (offsetSpecification );
687
+ subscriptionTracker .subscriptionListener .preSubscribe (subscriptionContext );
688
+ LOGGER .info (
689
+ "Computed offset specification {}, offset specification used after subscription listener {}" ,
690
+ offsetSpecification ,
691
+ subscriptionContext .offsetSpecification ());
692
+
693
+ // FIXME consider using fewer initial credits
694
+ Client .Response subscribeResponse =
695
+ client .subscribe (
696
+ subscriptionId ,
697
+ subscriptionTracker .stream ,
698
+ subscriptionContext .offsetSpecification (),
699
+ 10 ,
700
+ subscriptionProperties );
701
+ if (!subscribeResponse .isOk ()) {
702
+ String message =
703
+ "Subscription to stream "
704
+ + subscriptionTracker .stream
705
+ + " failed with code "
706
+ + formatConstant (subscribeResponse .getResponseCode ());
707
+ LOGGER .debug (message );
708
+ throw new StreamException (message );
709
+ }
710
+ } catch (RuntimeException e ) {
711
+ subscriptionTracker .assign ((byte ) -1 , null );
712
+ this .subscriptionTrackers = previousSubscriptions ;
713
+ streamToStreamSubscriptions
714
+ .computeIfAbsent (subscriptionTracker .stream , s -> ConcurrentHashMap .newKeySet ())
715
+ .remove (subscriptionTracker );
716
+ throw e ;
705
717
}
706
- } catch (RuntimeException e ) {
707
- subscriptionTracker .assign ((byte ) -1 , null );
708
- this .subscriptionTrackers = previousSubscriptions ;
709
- streamToStreamSubscriptions
710
- .computeIfAbsent (subscriptionTracker .stream , s -> ConcurrentHashMap .newKeySet ())
711
- .remove (subscriptionTracker );
712
- throw e ;
713
- }
714
718
715
- LOGGER .debug ("Subscribed to {}" , subscriptionTracker .stream );
719
+ LOGGER .debug ("Subscribed to {}" , subscriptionTracker .stream );
720
+ }
716
721
}
717
722
718
723
synchronized void remove (SubscriptionTracker subscriptionTracker ) {
719
- // FIXME check manager is still open (not closed because of connection failure)
720
- byte subscriptionIdInClient = subscriptionTracker .subscriptionIdInClient ;
721
- Client .Response unsubscribeResponse = client .unsubscribe (subscriptionIdInClient );
722
- if (!unsubscribeResponse .isOk ()) {
723
- LOGGER .warn (
724
- "Unexpected response code when unsubscribing from {}: {} (subscription ID {})" ,
724
+ synchronized (this .owner ) {
725
+
726
+ // FIXME check manager is still open (not closed because of connection failure)
727
+ byte subscriptionIdInClient = subscriptionTracker .subscriptionIdInClient ;
728
+ Client .Response unsubscribeResponse = client .unsubscribe (subscriptionIdInClient );
729
+ if (!unsubscribeResponse .isOk ()) {
730
+ LOGGER .warn (
731
+ "Unexpected response code when unsubscribing from {}: {} (subscription ID {})" ,
732
+ subscriptionTracker .stream ,
733
+ formatConstant (unsubscribeResponse .getResponseCode ()),
734
+ subscriptionIdInClient );
735
+ }
736
+ this .subscriptionTrackers = update (this .subscriptionTrackers , subscriptionIdInClient , null );
737
+ streamToStreamSubscriptions .compute (
725
738
subscriptionTracker .stream ,
726
- formatConstant (unsubscribeResponse .getResponseCode ()),
727
- subscriptionIdInClient );
739
+ (stream , subscriptionsForThisStream ) -> {
740
+ if (subscriptionsForThisStream == null || subscriptionsForThisStream .isEmpty ()) {
741
+ // should not happen
742
+ return null ;
743
+ } else {
744
+ subscriptionsForThisStream .remove (subscriptionTracker );
745
+ return subscriptionsForThisStream .isEmpty () ? null : subscriptionsForThisStream ;
746
+ }
747
+ });
748
+ this .owner .maybeDisposeManager (this );
728
749
}
729
- this .subscriptionTrackers = update (this .subscriptionTrackers , subscriptionIdInClient , null );
730
- streamToStreamSubscriptions .compute (
731
- subscriptionTracker .stream ,
732
- (stream , subscriptionsForThisStream ) -> {
733
- if (subscriptionsForThisStream == null || subscriptionsForThisStream .isEmpty ()) {
734
- // should not happen
735
- return null ;
736
- } else {
737
- subscriptionsForThisStream .remove (subscriptionTracker );
738
- return subscriptionsForThisStream .isEmpty () ? null : subscriptionsForThisStream ;
739
- }
740
- });
741
- this .owner .maybeDisposeManager (this );
742
750
}
743
751
744
752
private List <SubscriptionTracker > update (
0 commit comments