Skip to content

Commit e3ca913

Browse files
committed
Leave drainLoop if loop cannot be entered
We now exit the drainLoop depending on whether the loop could be actually progress. Previously, the loop attempted to enter a guarded block that can be processed by a single thread only. If the drain loop block was already invoked by a different thread, then the parent caller of drainLoop would re-attempt to run the drainLoop until the subscriber had no demand or the buffer became empty. In case of recursion, a reentrant call to drainLoop could never succeed as the drain loop guard was engaged with the parent call. [resolves #285]
1 parent 6e642f5 commit e3ca913

File tree

1 file changed

+33
-23
lines changed

1 file changed

+33
-23
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+33-23
Original file line numberDiff line numberDiff line change
@@ -799,9 +799,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
799799
public void onRequest(Conversation conversation, long n) {
800800
conversation.incrementDemand(n);
801801

802-
while (hasBufferedItems() && hasDownstreamDemand()) {
803-
drainLoop();
804-
}
802+
tryDrainLoop();
805803
}
806804

807805
private void demandMore() {
@@ -852,45 +850,57 @@ public void onNext(BackendMessage message) {
852850
return;
853851
}
854852

853+
tryDrainLoop();
854+
}
855+
856+
private void tryDrainLoop() {
855857
while (hasBufferedItems() && hasDownstreamDemand()) {
856-
this.drainLoop();
858+
if (!this.drainLoop()) {
859+
return;
860+
}
857861
}
858862
}
859863

860-
private void drainLoop() {
864+
private boolean drainLoop() {
861865

862-
Conversation lastConversation = null;
866+
if (!this.drain.compareAndSet(false, true)) {
867+
return false;
868+
}
863869

864-
if (this.drain.compareAndSet(false, true)) {
870+
Conversation lastConversation = null;
865871

866-
try {
867-
while (hasBufferedItems()) {
872+
try {
868873

869-
Conversation conversation = this.conversations.peek();
870-
lastConversation = conversation;
871-
if (conversation == null) {
872-
break;
873-
}
874+
while (hasBufferedItems()) {
874875

875-
if (conversation.hasDemand()) {
876+
Conversation conversation = this.conversations.peek();
877+
lastConversation = conversation;
878+
if (conversation == null) {
879+
break;
880+
}
876881

877-
BackendMessage item = this.buffer.poll();
882+
if (conversation.hasDemand()) {
878883

879-
if (item == null) {
880-
break;
881-
}
884+
BackendMessage item = this.buffer.poll();
882885

883-
emit(conversation, item);
884-
} else {
886+
if (item == null) {
885887
break;
886888
}
889+
890+
emit(conversation, item);
891+
} else {
892+
break;
887893
}
888-
} finally {
889-
this.drain.compareAndSet(true, false);
890894
}
895+
896+
} finally {
897+
this.drain.compareAndSet(true, false);
891898
}
892899

893900
potentiallyDemandMore(lastConversation);
901+
902+
return true;
903+
894904
}
895905

896906
private void potentiallyDemandMore(@Nullable Conversation lastConversation) {

0 commit comments

Comments
 (0)