Skip to content

Commit 0eb508f

Browse files
committed
Ensure BoltConnection does not send signals after summaries are finished
1 parent f6cd08c commit 0eb508f

File tree

1 file changed

+60
-32
lines changed

1 file changed

+60
-32
lines changed

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/BoltConnectionImpl.java

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -563,94 +563,122 @@ private ResponseHandleImpl(ResponseHandler delegate, int expectedSummaries) {
563563
@Override
564564
public void onError(Throwable throwable) {
565565
if (!(throwable instanceof MessageIgnoredException)) {
566-
runIgnoringError(() -> delegate.onError(throwable));
567-
if (!(throwable instanceof Neo4jException)
568-
|| throwable instanceof ServiceUnavailableException
569-
|| throwable instanceof ProtocolException) {
570-
// assume unrecoverable error, ensure onComplete
571-
expectedSummaries = 1;
566+
if (!summariesFuture.isDone()) {
567+
runIgnoringError(() -> delegate.onError(throwable));
568+
if (!(throwable instanceof Neo4jException)
569+
|| throwable instanceof ServiceUnavailableException
570+
|| throwable instanceof ProtocolException) {
571+
// assume unrecoverable error, ensure onComplete
572+
expectedSummaries = 1;
573+
}
574+
handleSummary();
572575
}
573-
handleSummary();
574576
} else {
575577
onIgnored();
576578
}
577579
}
578580

579581
@Override
580582
public void onBeginSummary(BeginSummary summary) {
581-
runIgnoringError(() -> delegate.onBeginSummary(summary));
582-
handleSummary();
583+
if (!summariesFuture.isDone()) {
584+
runIgnoringError(() -> delegate.onBeginSummary(summary));
585+
handleSummary();
586+
}
583587
}
584588

585589
@Override
586590
public void onRunSummary(RunSummary summary) {
587-
runIgnoringError(() -> delegate.onRunSummary(summary));
588-
handleSummary();
591+
if (!summariesFuture.isDone()) {
592+
runIgnoringError(() -> delegate.onRunSummary(summary));
593+
handleSummary();
594+
}
589595
}
590596

591597
@Override
592598
public void onRecord(Value[] fields) {
593-
runIgnoringError(() -> delegate.onRecord(fields));
599+
if (!summariesFuture.isDone()) {
600+
runIgnoringError(() -> delegate.onRecord(fields));
601+
}
594602
}
595603

596604
@Override
597605
public void onPullSummary(PullSummary summary) {
598-
runIgnoringError(() -> delegate.onPullSummary(summary));
599-
handleSummary();
606+
if (!summariesFuture.isDone()) {
607+
runIgnoringError(() -> delegate.onPullSummary(summary));
608+
handleSummary();
609+
}
600610
}
601611

602612
@Override
603613
public void onDiscardSummary(DiscardSummary summary) {
604-
runIgnoringError(() -> delegate.onDiscardSummary(summary));
605-
handleSummary();
614+
if (!summariesFuture.isDone()) {
615+
runIgnoringError(() -> delegate.onDiscardSummary(summary));
616+
handleSummary();
617+
}
606618
}
607619

608620
@Override
609621
public void onCommitSummary(CommitSummary summary) {
610-
runIgnoringError(() -> delegate.onCommitSummary(summary));
611-
handleSummary();
622+
if (!summariesFuture.isDone()) {
623+
runIgnoringError(() -> delegate.onCommitSummary(summary));
624+
handleSummary();
625+
}
612626
}
613627

614628
@Override
615629
public void onRollbackSummary(RollbackSummary summary) {
616-
runIgnoringError(() -> delegate.onRollbackSummary(summary));
617-
handleSummary();
630+
if (!summariesFuture.isDone()) {
631+
runIgnoringError(() -> delegate.onRollbackSummary(summary));
632+
handleSummary();
633+
}
618634
}
619635

620636
@Override
621637
public void onResetSummary(ResetSummary summary) {
622-
runIgnoringError(() -> delegate.onResetSummary(summary));
623-
handleSummary();
638+
if (!summariesFuture.isDone()) {
639+
runIgnoringError(() -> delegate.onResetSummary(summary));
640+
handleSummary();
641+
}
624642
}
625643

626644
@Override
627645
public void onRouteSummary(RouteSummary summary) {
628-
runIgnoringError(() -> delegate.onRouteSummary(summary));
629-
handleSummary();
646+
if (!summariesFuture.isDone()) {
647+
runIgnoringError(() -> delegate.onRouteSummary(summary));
648+
handleSummary();
649+
}
630650
}
631651

632652
@Override
633653
public void onLogoffSummary(LogoffSummary summary) {
634-
runIgnoringError(() -> delegate.onLogoffSummary(summary));
635-
handleSummary();
654+
if (!summariesFuture.isDone()) {
655+
runIgnoringError(() -> delegate.onLogoffSummary(summary));
656+
handleSummary();
657+
}
636658
}
637659

638660
@Override
639661
public void onLogonSummary(LogonSummary summary) {
640-
runIgnoringError(() -> delegate.onLogonSummary(summary));
641-
handleSummary();
662+
if (!summariesFuture.isDone()) {
663+
runIgnoringError(() -> delegate.onLogonSummary(summary));
664+
handleSummary();
665+
}
642666
}
643667

644668
@Override
645669
public void onTelemetrySummary(TelemetrySummary summary) {
646-
runIgnoringError(() -> delegate.onTelemetrySummary(summary));
647-
handleSummary();
670+
if (!summariesFuture.isDone()) {
671+
runIgnoringError(() -> delegate.onTelemetrySummary(summary));
672+
handleSummary();
673+
}
648674
}
649675

650676
@Override
651677
public void onIgnored() {
652-
runIgnoringError(delegate::onIgnored);
653-
handleSummary();
678+
if (!summariesFuture.isDone()) {
679+
runIgnoringError(delegate::onIgnored);
680+
handleSummary();
681+
}
654682
}
655683

656684
@Override

0 commit comments

Comments
 (0)