@@ -251,8 +251,8 @@ private static boolean isSslException(Throwable throwable) {
251
251
* Consume a {@link BackendMessage}. This method can either fully consume the message or it can signal by returning {@literal false} that the method wasn't able to fully consume the message and
252
252
* that the message needs to be passed to an active {@link Conversation}.
253
253
*
254
- * @param message
255
- * @return {@literal false} if the message could not be fully consumed and should be propagated to the active {@link Conversation}.
254
+ * @param message the {@link BackendMessage} to handle
255
+ * @return {@literal false} if the message could not be fully consumed and should be propagated to the active {@link Conversation}
256
256
*/
257
257
private boolean consumeMessage (BackendMessage message ) {
258
258
@@ -658,15 +658,15 @@ private Conversation(Predicate<BackendMessage> takeUntil, FluxSink<BackendMessag
658
658
this .takeUntil = takeUntil ;
659
659
}
660
660
661
- private long decrementDemand () {
662
- return Operators .addCap (DEMAND_UPDATER , this , -1 );
661
+ private void decrementDemand () {
662
+ Operators .addCap (DEMAND_UPDATER , this , -1 );
663
663
}
664
664
665
665
/**
666
666
* Check whether the {@link BackendMessage} can complete the conversation.
667
667
*
668
- * @param item
669
- * @return
668
+ * @param item the message to test whether it can complete the current conversation
669
+ * @return whether the {@link BackendMessage} can complete the current conversation
670
670
*/
671
671
public boolean canComplete (BackendMessage item ) {
672
672
return this .takeUntil .test (item );
@@ -675,8 +675,7 @@ public boolean canComplete(BackendMessage item) {
675
675
/**
676
676
* Complete the conversation.
677
677
*
678
- * @param item
679
- * @return
678
+ * @param item the message completing the conversation
680
679
*/
681
680
public void complete (BackendMessage item ) {
682
681
@@ -689,8 +688,7 @@ public void complete(BackendMessage item) {
689
688
/**
690
689
* Emit a {@link BackendMessage}.
691
690
*
692
- * @param item
693
- * @return
691
+ * @param item the item to emit
694
692
*/
695
693
public void emit (BackendMessage item ) {
696
694
@@ -705,7 +703,7 @@ public void emit(BackendMessage item) {
705
703
/**
706
704
* Notify the conversation about an error. Drops errors silently if the conversation is finished.
707
705
*
708
- * @param throwable
706
+ * @param throwable the error signal
709
707
*/
710
708
public void onError (Throwable throwable ) {
711
709
@@ -779,31 +777,35 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
779
777
});
780
778
}
781
779
780
+ /**
781
+ * {@link Subscription#request(long)} callback. Request more for a {@link Conversation}. Potentially, demands also more upstream elements.
782
+ *
783
+ * @param conversation the conversation subject
784
+ * @param n number of requested elements
785
+ */
782
786
public void onRequest (Conversation conversation , long n ) {
783
787
conversation .incrementDemand (n );
784
788
demandMore ();
785
789
tryDrainLoop ();
786
790
}
787
791
788
- private void demandMore () {
789
- if (!hasBufferedItems () && this .demand .compareAndSet (0 , DEMAND )) {
790
- this .upstream .request (DEMAND );
791
- }
792
- }
793
-
792
+ /**
793
+ * {@link Subscriber#onSubscribe(Subscription)} callback. Registers the {@link Subscription} and potentially requests more upstream elements.
794
+ *
795
+ * @param s the subscription
796
+ */
794
797
@ Override
795
798
public void onSubscribe (Subscription s ) {
796
799
this .upstream = s ;
797
- this .demandMore ();
798
- }
799
-
800
- private boolean hasDownstreamDemand () {
801
-
802
- Conversation conversation = this .conversations .peek ();
803
-
804
- return conversation != null && conversation .hasDemand ();
800
+ demandMore ();
805
801
}
806
802
803
+ /**
804
+ * {@link Subscriber#onNext(Object)} callback. Decrements upstream demand and attempts to emit {@link BackendMessage} to an active {@link Conversation}. If a conversation has no demand, it
805
+ * will be buffered.
806
+ *
807
+ * @param message the message to emit
808
+ */
807
809
@ Override
808
810
public void onNext (BackendMessage message ) {
809
811
@@ -836,14 +838,63 @@ public void onNext(BackendMessage message) {
836
838
tryDrainLoop ();
837
839
}
838
840
841
+ /**
842
+ * {@link Subscriber#onError(Throwable)} callback.
843
+ *
844
+ * @param throwable the error to emit
845
+ */
846
+ @ Override
847
+ public void onError (Throwable throwable ) {
848
+
849
+ if (this .terminated ) {
850
+ Operators .onErrorDropped (throwable , currentContext ());
851
+ return ;
852
+ }
853
+
854
+ handleConnectionError (throwable );
855
+ ReactorNettyClient .this .requestProcessor .onComplete ();
856
+ this .terminated = true ;
857
+
858
+ if (isSslException (throwable )) {
859
+ logger .debug ("Connection Error" , throwable );
860
+ } else {
861
+ logger .error ("Connection Error" , throwable );
862
+ }
863
+
864
+ ReactorNettyClient .this .close ().subscribe ();
865
+ }
866
+
867
+ /**
868
+ * {@link Subscriber#onComplete()} callback.
869
+ */
870
+ @ Override
871
+ public void onComplete () {
872
+ this .terminated = true ;
873
+ ReactorNettyClient .this .handleClose ();
874
+ }
875
+
876
+ /**
877
+ * Context propagation from an active {@link Conversation}.
878
+ */
879
+ @ Override
880
+ public Context currentContext () {
881
+ Conversation receiver = this .conversations .peek ();
882
+ return receiver != null ? receiver .sink .currentContext () : Context .empty ();
883
+ }
884
+
839
885
private void tryDrainLoop () {
840
886
while (hasBufferedItems () && hasDownstreamDemand ()) {
841
- if (!this . drainLoop ()) {
887
+ if (!drainLoop ()) {
842
888
return ;
843
889
}
844
890
}
845
891
}
846
892
893
+ /**
894
+ * Drains the buffer. Guarded for single-thread access.
895
+ *
896
+ * @return {@code true} if the drain loop was entered successfully. {@code false} otherwise (i.e. a different thread already works on the drain loop).
897
+ */
847
898
private boolean drainLoop () {
848
899
849
900
if (!this .drain .compareAndSet (false , true )) {
@@ -883,12 +934,11 @@ private boolean drainLoop() {
883
934
potentiallyDemandMore (lastConversation );
884
935
885
936
return true ;
886
-
887
937
}
888
938
889
939
private void potentiallyDemandMore (@ Nullable Conversation lastConversation ) {
890
940
if (lastConversation == null || lastConversation .hasDemand () || lastConversation .isCancelled ()) {
891
- this . demandMore ();
941
+ demandMore ();
892
942
}
893
943
}
894
944
@@ -901,51 +951,27 @@ private void emit(Conversation conversation, BackendMessage item) {
901
951
}
902
952
}
903
953
904
- private boolean hasBufferedItems () {
905
- return !this .buffer .isEmpty ();
906
- }
907
-
908
- @ Override
909
- public void onError (Throwable throwable ) {
910
-
911
- if (this .terminated ) {
912
- Operators .onErrorDropped (throwable , currentContext ());
913
- return ;
954
+ private void demandMore () {
955
+ if (!hasBufferedItems () && this .demand .compareAndSet (0 , DEMAND )) {
956
+ this .upstream .request (DEMAND );
914
957
}
958
+ }
915
959
916
- handleConnectionError (throwable );
917
- ReactorNettyClient .this .requestProcessor .onComplete ();
918
- this .terminated = true ;
919
-
920
- if (isSslException (throwable )) {
921
- logger .debug ("Connection Error" , throwable );
922
- } else {
923
- logger .error ("Connection Error" , throwable );
924
- }
960
+ private boolean hasDownstreamDemand () {
925
961
926
- ReactorNettyClient .this .close ().subscribe ();
927
- }
962
+ Conversation conversation = this .conversations .peek ();
928
963
929
- @ Override
930
- public void onComplete () {
931
- this .terminated = true ;
932
- ReactorNettyClient .this .handleClose ();
964
+ return conversation != null && conversation .hasDemand ();
933
965
}
934
966
935
- @ Override
936
- public Context currentContext () {
937
- Conversation receiver = this .conversations .peek ();
938
- if (receiver != null ) {
939
- return receiver .sink .currentContext ();
940
- } else {
941
- return Context .empty ();
942
- }
967
+ private boolean hasBufferedItems () {
968
+ return !this .buffer .isEmpty ();
943
969
}
944
970
945
971
/**
946
- * Cleanup the subscriber by terminating all {@link Conversation}s and purging the data buffer.
972
+ * Cleanup the subscriber by terminating all {@link Conversation}s and purging the data buffer. All conversations are completed with an error signal provided by {@code supplier}.
947
973
*
948
- * @param supplier
974
+ * @param supplier the error supplier
949
975
*/
950
976
public void close (Supplier <? extends Throwable > supplier ) {
951
977
0 commit comments