Skip to content

Commit 4924d40

Browse files
committed
Polishing
Eagerly remove Conversation on complete to avoid cancellation if the conversation is already completed (e.g. concatMap(close())). Remove error logging in FluxDiscardOnCancel to avoid unnecessary noise. [#231]
1 parent 0d41973 commit 4924d40

File tree

2 files changed

+44
-14
lines changed

2 files changed

+44
-14
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+43-11
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,10 @@ public Mono<Void> close() {
182182

183183
drainError(EXPECTED);
184184

185+
boolean connected = isConnected();
185186
if (this.isClosed.compareAndSet(false, true)) {
186187

187-
if (!isConnected() || this.processId == null) {
188+
if (!connected || this.processId == null) {
188189
this.connection.dispose();
189190
return this.connection.onDispose();
190191
}
@@ -452,7 +453,7 @@ private void drainError(Supplier<? extends Throwable> supplier) {
452453
Conversation receiver;
453454

454455
while ((receiver = this.messageSubscriber.conversations.poll()) != null) {
455-
receiver.sink.error(supplier.get());
456+
receiver.onError(supplier.get());
456457
}
457458

458459
if (!this.notificationProcessor.isTerminated()) {
@@ -642,27 +643,55 @@ private long decrementDemand() {
642643
}
643644

644645
/**
645-
* Emit a {@link BackendMessage}. Returns whether the emission should be continued by returning {@literal true} or whether the conversation is complete by returning {@literal false}.
646+
* Check whether the {@link BackendMessage} can complete the conversation.
646647
*
647648
* @param item
648649
* @return
649650
*/
650-
public boolean emit(BackendMessage item) {
651+
public boolean canComplete(BackendMessage item) {
652+
return this.takeUntil.test(item);
653+
}
651654

652-
if (this.sink.isCancelled()) {
653-
ReferenceCountUtil.release(item);
655+
/**
656+
* Complete the conversation.
657+
*
658+
* @param item
659+
* @return
660+
*/
661+
public void complete(BackendMessage item) {
662+
663+
ReferenceCountUtil.release(item);
664+
if (!this.sink.isCancelled()) {
665+
this.sink.complete();
654666
}
667+
}
655668

656-
if (this.takeUntil.test(item)) {
669+
/**
670+
* Emit a {@link BackendMessage}.
671+
*
672+
* @param item
673+
* @return
674+
*/
675+
public void emit(BackendMessage item) {
676+
677+
if (this.sink.isCancelled()) {
657678
ReferenceCountUtil.release(item);
658-
this.sink.complete();
659-
return false;
660679
}
661680

662681
decrementDemand();
663682
this.sink.next(item);
683+
}
664684

665-
return true;
685+
/**
686+
* Notify the conversation about an error. Drops errors silently if the conversation is finished.
687+
*
688+
* @param throwable
689+
*/
690+
public void onError(Throwable throwable) {
691+
692+
if (!this.sink.isCancelled()) {
693+
this.sink.error(throwable);
694+
}
666695
}
667696

668697
public boolean hasDemand() {
@@ -789,8 +818,11 @@ private void drainLoop() {
789818
break;
790819
}
791820

792-
if (!conversation.emit(item)) {
821+
if (conversation.canComplete(item)) {
793822
this.conversations.poll();
823+
conversation.complete(item);
824+
} else {
825+
conversation.emit(item);
794826
}
795827
}
796828
}

src/main/java/io/r2dbc/postgresql/util/FluxDiscardOnCancel.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ public void onNext(T t) {
8888

8989
@Override
9090
public void onError(Throwable t) {
91-
if (this.get()) {
92-
Operators.onErrorDropped(t, this.ctx);
93-
} else {
91+
if (!this.get()) {
9492
this.actual.onError(t);
9593
}
9694
}

0 commit comments

Comments
 (0)