@@ -761,7 +761,10 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
761
761
762
762
public void onRequest (Conversation conversation , long n ) {
763
763
conversation .incrementDemand (n );
764
- drainLoop ();
764
+
765
+ while (hasBufferedItems () && hasDownstreamDemand ()) {
766
+ drainLoop ();
767
+ }
765
768
}
766
769
767
770
private void demandMore () {
@@ -776,6 +779,13 @@ public void onSubscribe(Subscription s) {
776
779
this .demandMore ();
777
780
}
778
781
782
+ private boolean hasDownstreamDemand () {
783
+
784
+ Conversation conversation = this .conversations .peek ();
785
+
786
+ return conversation != null && conversation .hasDemand ();
787
+ }
788
+
779
789
@ Override
780
790
public void onNext (BackendMessage message ) {
781
791
@@ -788,11 +798,15 @@ public void onNext(BackendMessage message) {
788
798
this .demand .decrementAndGet ();
789
799
790
800
if (!this .buffer .offer (message )) {
801
+ ReferenceCountUtil .release (message );
802
+ Operators .onNextDropped (message , currentContext ());
791
803
onError (new ResponseQueueException ("Response queue is full" ));
792
804
return ;
793
805
}
794
806
795
- drainLoop ();
807
+ while (hasBufferedItems () && hasDownstreamDemand ()) {
808
+ this .drainLoop ();
809
+ }
796
810
}
797
811
798
812
private void drainLoop () {
@@ -802,7 +816,7 @@ private void drainLoop() {
802
816
if (this .drain .compareAndSet (false , true )) {
803
817
804
818
try {
805
- while (! this . buffer . isEmpty ()) {
819
+ while (hasBufferedItems ()) {
806
820
807
821
Conversation conversation = this .conversations .peek ();
808
822
lastConversation = conversation ;
@@ -818,25 +832,38 @@ private void drainLoop() {
818
832
break ;
819
833
}
820
834
821
- if (conversation .canComplete (item )) {
822
- this .conversations .poll ();
823
- conversation .complete (item );
824
- } else {
825
- conversation .emit (item );
826
- }
835
+ emit (conversation , item );
836
+ } else {
837
+ break ;
827
838
}
828
839
}
829
840
} finally {
830
841
this .drain .compareAndSet (true , false );
831
842
}
832
843
}
833
844
845
+ potentiallyDemandMore (lastConversation );
846
+ }
834
847
848
+ private void potentiallyDemandMore (@ Nullable Conversation lastConversation ) {
835
849
if (lastConversation == null || lastConversation .hasDemand () || lastConversation .isCancelled ()) {
836
850
this .demandMore ();
837
851
}
838
852
}
839
853
854
+ private void emit (Conversation conversation , BackendMessage item ) {
855
+ if (conversation .canComplete (item )) {
856
+ this .conversations .poll ();
857
+ conversation .complete (item );
858
+ } else {
859
+ conversation .emit (item );
860
+ }
861
+ }
862
+
863
+ private boolean hasBufferedItems () {
864
+ return !this .buffer .isEmpty ();
865
+ }
866
+
840
867
@ Override
841
868
public void onError (Throwable throwable ) {
842
869
0 commit comments