Skip to content

Commit c1a1856

Browse files
authored
Merge pull request #645 from zhenlineo/4.0-fix-failing-rx-tck
Ensure subscriber references are freed.
2 parents ef22283 + 96c6d95 commit c1a1856

File tree

2 files changed

+66
-25
lines changed

2 files changed

+66
-25
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
3131
import org.neo4j.driver.summary.ResultSummary;
3232

33+
import static org.neo4j.driver.internal.cursor.RxStatementResultCursorImpl.RecordConsumerStatus.DISCARD_INSTALLED;
34+
import static org.neo4j.driver.internal.cursor.RxStatementResultCursorImpl.RecordConsumerStatus.INSTALLED;
35+
import static org.neo4j.driver.internal.cursor.RxStatementResultCursorImpl.RecordConsumerStatus.NOT_INSTALLED;
3336
import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
3437

3538
public class RxStatementResultCursorImpl implements RxStatementResultCursor
@@ -39,8 +42,8 @@ public class RxStatementResultCursorImpl implements RxStatementResultCursor
3942
private final PullResponseHandler pullHandler;
4043
private final Throwable runResponseError;
4144
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
42-
private BiConsumer<Record,Throwable> recordConsumer;
4345
private boolean resultConsumed;
46+
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;
4447

4548
public RxStatementResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler )
4649
{
@@ -72,20 +75,17 @@ public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
7275
{
7376
throw newResultConsumedError();
7477
}
75-
if ( isRecordConsumerInstalled() )
78+
79+
if ( consumerStatus.isInstalled() )
7680
{
7781
return;
7882
}
79-
this.recordConsumer = recordConsumer;
80-
pullHandler.installRecordConsumer( this.recordConsumer );
83+
consumerStatus = recordConsumer == DISCARD_RECORD_CONSUMER ?
84+
DISCARD_INSTALLED : INSTALLED;
85+
pullHandler.installRecordConsumer( recordConsumer );
8186
assertRunCompletedSuccessfully();
8287
}
8388

84-
private boolean isRecordConsumerInstalled()
85-
{
86-
return this.recordConsumer != null;
87-
}
88-
8989
@Override
9090
public void request( long n )
9191
{
@@ -108,7 +108,7 @@ public CompletionStage<Throwable> discardAllFailureAsync()
108108
@Override
109109
public CompletionStage<Throwable> pullAllFailureAsync()
110110
{
111-
if ( isRecordConsumerInstalled() && !isDone() )
111+
if ( consumerStatus.isInstalled() && !isDone() )
112112
{
113113
return CompletableFuture.completedFuture( new TransactionNestingException(
114114
"You cannot run another query or begin a new transaction in the same session before you've fully consumed the previous run result." ) );
@@ -146,7 +146,7 @@ private void assertRunCompletedSuccessfully()
146146
private void installSummaryConsumer()
147147
{
148148
pullHandler.installSummaryConsumer( ( summary, error ) -> {
149-
if ( error != null && recordConsumer == DISCARD_RECORD_CONSUMER )
149+
if ( error != null && consumerStatus.isDiscardConsumer() )
150150
{
151151
// We will only report the error to summary if there is no user record consumer installed
152152
// When a user record consumer is installed, the error will be reported to record consumer instead.
@@ -167,4 +167,30 @@ private void assertRunResponseArrived( RunResponseHandler runHandler )
167167
throw new IllegalStateException( "Should wait for response of RUN before allowing PULL." );
168168
}
169169
}
170+
171+
enum RecordConsumerStatus
172+
{
173+
NOT_INSTALLED( false, false ),
174+
INSTALLED( true, false ),
175+
DISCARD_INSTALLED( true, true );
176+
177+
private final boolean isInstalled;
178+
private final boolean isDiscardConsumer;
179+
180+
RecordConsumerStatus( boolean isInstalled, boolean isDiscardConsumer )
181+
{
182+
this.isInstalled = isInstalled;
183+
this.isDiscardConsumer = isDiscardConsumer;
184+
}
185+
186+
boolean isInstalled()
187+
{
188+
return isInstalled;
189+
}
190+
191+
boolean isDiscardConsumer()
192+
{
193+
return isDiscardConsumer;
194+
}
195+
}
170196
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import org.reactivestreams.Publisher;
2222
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.FluxSink;
2324
import reactor.core.publisher.Mono;
2425

2526
import java.util.List;
2627
import java.util.concurrent.CompletionStage;
28+
import java.util.function.BiConsumer;
2729
import java.util.function.Supplier;
2830

2931
import org.neo4j.driver.Record;
@@ -63,20 +65,7 @@ public Publisher<Record> records()
6365
}
6466
else
6567
{
66-
cursor.installRecordConsumer( ( r, e ) -> {
67-
if ( r != null )
68-
{
69-
sink.next( r );
70-
}
71-
else if ( e != null )
72-
{
73-
sink.error( e );
74-
}
75-
else
76-
{
77-
sink.complete();
78-
}
79-
} );
68+
cursor.installRecordConsumer( createRecordConsumer( sink ) );
8069
sink.onCancel( cursor::cancel );
8170
sink.onRequest( cursor::request );
8271
}
@@ -89,6 +78,32 @@ else if ( e != null )
8978
} ), IGNORE );
9079
}
9180

81+
/**
82+
* Defines how a subscriber shall consume records.
83+
* A record consumer holds a reference to a subscriber.
84+
* A publisher and/or a subscription who holds a reference to this consumer shall release the reference to this object
85+
* after subscription is done or cancelled so that the subscriber can be garbage collected.
86+
* @param sink the subscriber
87+
* @return a record consumer.
88+
*/
89+
private BiConsumer<Record,Throwable> createRecordConsumer( FluxSink<Record> sink )
90+
{
91+
return ( r, e ) -> {
92+
if ( r != null )
93+
{
94+
sink.next( r );
95+
}
96+
else if ( e != null )
97+
{
98+
sink.error( e );
99+
}
100+
else
101+
{
102+
sink.complete();
103+
}
104+
};
105+
}
106+
92107
private CompletionStage<RxStatementResultCursor> getCursorFuture()
93108
{
94109
if ( cursorFuture != null )

0 commit comments

Comments
 (0)