Skip to content

Commit 37f2e5e

Browse files
committed
Leave Client drainloop if current conversation has no demand
The drainloop is now left if the current conversation has no demand. [#242]
1 parent 4924d40 commit 37f2e5e

File tree

1 file changed

+36
-9
lines changed

1 file changed

+36
-9
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+36-9
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,10 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
761761

762762
public void onRequest(Conversation conversation, long n) {
763763
conversation.incrementDemand(n);
764-
drainLoop();
764+
765+
while (hasBufferedItems() && hasDownstreamDemand()) {
766+
drainLoop();
767+
}
765768
}
766769

767770
private void demandMore() {
@@ -776,6 +779,13 @@ public void onSubscribe(Subscription s) {
776779
this.demandMore();
777780
}
778781

782+
private boolean hasDownstreamDemand() {
783+
784+
Conversation conversation = this.conversations.peek();
785+
786+
return conversation != null && conversation.hasDemand();
787+
}
788+
779789
@Override
780790
public void onNext(BackendMessage message) {
781791

@@ -788,11 +798,15 @@ public void onNext(BackendMessage message) {
788798
this.demand.decrementAndGet();
789799

790800
if (!this.buffer.offer(message)) {
801+
ReferenceCountUtil.release(message);
802+
Operators.onNextDropped(message, currentContext());
791803
onError(new ResponseQueueException("Response queue is full"));
792804
return;
793805
}
794806

795-
drainLoop();
807+
while (hasBufferedItems() && hasDownstreamDemand()) {
808+
this.drainLoop();
809+
}
796810
}
797811

798812
private void drainLoop() {
@@ -802,7 +816,7 @@ private void drainLoop() {
802816
if (this.drain.compareAndSet(false, true)) {
803817

804818
try {
805-
while (!this.buffer.isEmpty()) {
819+
while (hasBufferedItems()) {
806820

807821
Conversation conversation = this.conversations.peek();
808822
lastConversation = conversation;
@@ -818,25 +832,38 @@ private void drainLoop() {
818832
break;
819833
}
820834

821-
if (conversation.canComplete(item)) {
822-
this.conversations.poll();
823-
conversation.complete(item);
824-
} else {
825-
conversation.emit(item);
826-
}
835+
emit(conversation, item);
836+
} else {
837+
break;
827838
}
828839
}
829840
} finally {
830841
this.drain.compareAndSet(true, false);
831842
}
832843
}
833844

845+
potentiallyDemandMore(lastConversation);
846+
}
834847

848+
private void potentiallyDemandMore(@Nullable Conversation lastConversation) {
835849
if (lastConversation == null || lastConversation.hasDemand() || lastConversation.isCancelled()) {
836850
this.demandMore();
837851
}
838852
}
839853

854+
private void emit(Conversation conversation, BackendMessage item) {
855+
if (conversation.canComplete(item)) {
856+
this.conversations.poll();
857+
conversation.complete(item);
858+
} else {
859+
conversation.emit(item);
860+
}
861+
}
862+
863+
private boolean hasBufferedItems() {
864+
return !this.buffer.isEmpty();
865+
}
866+
840867
@Override
841868
public void onError(Throwable throwable) {
842869

0 commit comments

Comments
 (0)