Skip to content

Commit 18145a5

Browse files
committed
Ensure BoltConnection does not send signals after summaries are finished
1 parent f6cd08c commit 18145a5

File tree

1 file changed

+56
-30
lines changed

1 file changed

+56
-30
lines changed

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/BoltConnectionImpl.java

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -563,40 +563,50 @@ private ResponseHandleImpl(ResponseHandler delegate, int expectedSummaries) {
563563
@Override
564564
public void onError(Throwable throwable) {
565565
if (!(throwable instanceof MessageIgnoredException)) {
566-
runIgnoringError(() -> delegate.onError(throwable));
567-
if (!(throwable instanceof Neo4jException)
568-
|| throwable instanceof ServiceUnavailableException
569-
|| throwable instanceof ProtocolException) {
570-
// assume unrecoverable error, ensure onComplete
571-
expectedSummaries = 1;
566+
if (!summariesFuture.isDone()) {
567+
runIgnoringError(() -> delegate.onError(throwable));
568+
if (!(throwable instanceof Neo4jException)
569+
|| throwable instanceof ServiceUnavailableException
570+
|| throwable instanceof ProtocolException) {
571+
// assume unrecoverable error, ensure onComplete
572+
expectedSummaries = 1;
573+
}
574+
handleSummary();
572575
}
573-
handleSummary();
574576
} else {
575577
onIgnored();
576578
}
577579
}
578580

579581
@Override
580582
public void onBeginSummary(BeginSummary summary) {
581-
runIgnoringError(() -> delegate.onBeginSummary(summary));
582-
handleSummary();
583+
if (!summariesFuture.isDone()) {
584+
runIgnoringError(() -> delegate.onBeginSummary(summary));
585+
handleSummary();
586+
}
583587
}
584588

585589
@Override
586590
public void onRunSummary(RunSummary summary) {
587-
runIgnoringError(() -> delegate.onRunSummary(summary));
588-
handleSummary();
591+
if (!summariesFuture.isDone()) {
592+
runIgnoringError(() -> delegate.onRunSummary(summary));
593+
handleSummary();
594+
}
589595
}
590596

591597
@Override
592598
public void onRecord(Value[] fields) {
593-
runIgnoringError(() -> delegate.onRecord(fields));
599+
if (!summariesFuture.isDone()) {
600+
runIgnoringError(() -> delegate.onRecord(fields));
601+
}
594602
}
595603

596604
@Override
597605
public void onPullSummary(PullSummary summary) {
598-
runIgnoringError(() -> delegate.onPullSummary(summary));
599-
handleSummary();
606+
if (!summariesFuture.isDone()) {
607+
runIgnoringError(() -> delegate.onPullSummary(summary));
608+
handleSummary();
609+
}
600610
}
601611

602612
@Override
@@ -607,50 +617,66 @@ public void onDiscardSummary(DiscardSummary summary) {
607617

608618
@Override
609619
public void onCommitSummary(CommitSummary summary) {
610-
runIgnoringError(() -> delegate.onCommitSummary(summary));
611-
handleSummary();
620+
if (!summariesFuture.isDone()) {
621+
runIgnoringError(() -> delegate.onCommitSummary(summary));
622+
handleSummary();
623+
}
612624
}
613625

614626
@Override
615627
public void onRollbackSummary(RollbackSummary summary) {
616-
runIgnoringError(() -> delegate.onRollbackSummary(summary));
617-
handleSummary();
628+
if (!summariesFuture.isDone()) {
629+
runIgnoringError(() -> delegate.onRollbackSummary(summary));
630+
handleSummary();
631+
}
618632
}
619633

620634
@Override
621635
public void onResetSummary(ResetSummary summary) {
622-
runIgnoringError(() -> delegate.onResetSummary(summary));
623-
handleSummary();
636+
if (!summariesFuture.isDone()) {
637+
runIgnoringError(() -> delegate.onResetSummary(summary));
638+
handleSummary();
639+
}
624640
}
625641

626642
@Override
627643
public void onRouteSummary(RouteSummary summary) {
628-
runIgnoringError(() -> delegate.onRouteSummary(summary));
629-
handleSummary();
644+
if (!summariesFuture.isDone()) {
645+
runIgnoringError(() -> delegate.onRouteSummary(summary));
646+
handleSummary();
647+
}
630648
}
631649

632650
@Override
633651
public void onLogoffSummary(LogoffSummary summary) {
634-
runIgnoringError(() -> delegate.onLogoffSummary(summary));
635-
handleSummary();
652+
if (!summariesFuture.isDone()) {
653+
runIgnoringError(() -> delegate.onLogoffSummary(summary));
654+
handleSummary();
655+
}
636656
}
637657

638658
@Override
639659
public void onLogonSummary(LogonSummary summary) {
640-
runIgnoringError(() -> delegate.onLogonSummary(summary));
641-
handleSummary();
660+
if (!summariesFuture.isDone()) {
661+
runIgnoringError(() -> delegate.onLogonSummary(summary));
662+
handleSummary();
663+
}
642664
}
643665

644666
@Override
645667
public void onTelemetrySummary(TelemetrySummary summary) {
646-
runIgnoringError(() -> delegate.onTelemetrySummary(summary));
647-
handleSummary();
668+
if (!summariesFuture.isDone()) {
669+
runIgnoringError(() -> delegate.onTelemetrySummary(summary));
670+
handleSummary();
671+
}
648672
}
649673

650674
@Override
651675
public void onIgnored() {
652-
runIgnoringError(delegate::onIgnored);
653-
handleSummary();
676+
if (!summariesFuture.isDone()) {
677+
runIgnoringError(delegate::onIgnored);
678+
handleSummary();
679+
}
654680
}
655681

656682
@Override

0 commit comments

Comments
 (0)