Skip to content

Commit 96c6d95

Browse files
author
Zhen Li
committed
Reactive TCK tests for records is broken because we forget to release the reference to the subscriber record consumer field.
Added more doc to ensure that we will not hold a reference to it by mistake next time.
1 parent ef22283 commit 96c6d95

File tree

2 files changed

+66
-25
lines changed

2 files changed

+66
-25
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
3131
import org.neo4j.driver.summary.ResultSummary;
3232

33+
import static org.neo4j.driver.internal.cursor.RxStatementResultCursorImpl.RecordConsumerStatus.DISCARD_INSTALLED;
34+
import static org.neo4j.driver.internal.cursor.RxStatementResultCursorImpl.RecordConsumerStatus.INSTALLED;
35+
import static org.neo4j.driver.internal.cursor.RxStatementResultCursorImpl.RecordConsumerStatus.NOT_INSTALLED;
3336
import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
3437

3538
public class RxStatementResultCursorImpl implements RxStatementResultCursor
@@ -39,8 +42,8 @@ public class RxStatementResultCursorImpl implements RxStatementResultCursor
3942
private final PullResponseHandler pullHandler;
4043
private final Throwable runResponseError;
4144
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
42-
private BiConsumer<Record,Throwable> recordConsumer;
4345
private boolean resultConsumed;
46+
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;
4447

4548
public RxStatementResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler )
4649
{
@@ -72,20 +75,17 @@ public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
7275
{
7376
throw newResultConsumedError();
7477
}
75-
if ( isRecordConsumerInstalled() )
78+
79+
if ( consumerStatus.isInstalled() )
7680
{
7781
return;
7882
}
79-
this.recordConsumer = recordConsumer;
80-
pullHandler.installRecordConsumer( this.recordConsumer );
83+
consumerStatus = recordConsumer == DISCARD_RECORD_CONSUMER ?
84+
DISCARD_INSTALLED : INSTALLED;
85+
pullHandler.installRecordConsumer( recordConsumer );
8186
assertRunCompletedSuccessfully();
8287
}
8388

84-
private boolean isRecordConsumerInstalled()
85-
{
86-
return this.recordConsumer != null;
87-
}
88-
8989
@Override
9090
public void request( long n )
9191
{
@@ -108,7 +108,7 @@ public CompletionStage<Throwable> discardAllFailureAsync()
108108
@Override
109109
public CompletionStage<Throwable> pullAllFailureAsync()
110110
{
111-
if ( isRecordConsumerInstalled() && !isDone() )
111+
if ( consumerStatus.isInstalled() && !isDone() )
112112
{
113113
return CompletableFuture.completedFuture( new TransactionNestingException(
114114
"You cannot run another query or begin a new transaction in the same session before you've fully consumed the previous run result." ) );
@@ -146,7 +146,7 @@ private void assertRunCompletedSuccessfully()
146146
private void installSummaryConsumer()
147147
{
148148
pullHandler.installSummaryConsumer( ( summary, error ) -> {
149-
if ( error != null && recordConsumer == DISCARD_RECORD_CONSUMER )
149+
if ( error != null && consumerStatus.isDiscardConsumer() )
150150
{
151151
// We will only report the error to summary if there is no user record consumer installed
152152
// When a user record consumer is installed, the error will be reported to record consumer instead.
@@ -167,4 +167,30 @@ private void assertRunResponseArrived( RunResponseHandler runHandler )
167167
throw new IllegalStateException( "Should wait for response of RUN before allowing PULL." );
168168
}
169169
}
170+
171+
enum RecordConsumerStatus
172+
{
173+
NOT_INSTALLED( false, false ),
174+
INSTALLED( true, false ),
175+
DISCARD_INSTALLED( true, true );
176+
177+
private final boolean isInstalled;
178+
private final boolean isDiscardConsumer;
179+
180+
RecordConsumerStatus( boolean isInstalled, boolean isDiscardConsumer )
181+
{
182+
this.isInstalled = isInstalled;
183+
this.isDiscardConsumer = isDiscardConsumer;
184+
}
185+
186+
boolean isInstalled()
187+
{
188+
return isInstalled;
189+
}
190+
191+
boolean isDiscardConsumer()
192+
{
193+
return isDiscardConsumer;
194+
}
195+
}
170196
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import org.reactivestreams.Publisher;
2222
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.FluxSink;
2324
import reactor.core.publisher.Mono;
2425

2526
import java.util.List;
2627
import java.util.concurrent.CompletionStage;
28+
import java.util.function.BiConsumer;
2729
import java.util.function.Supplier;
2830

2931
import org.neo4j.driver.Record;
@@ -63,20 +65,7 @@ public Publisher<Record> records()
6365
}
6466
else
6567
{
66-
cursor.installRecordConsumer( ( r, e ) -> {
67-
if ( r != null )
68-
{
69-
sink.next( r );
70-
}
71-
else if ( e != null )
72-
{
73-
sink.error( e );
74-
}
75-
else
76-
{
77-
sink.complete();
78-
}
79-
} );
68+
cursor.installRecordConsumer( createRecordConsumer( sink ) );
8069
sink.onCancel( cursor::cancel );
8170
sink.onRequest( cursor::request );
8271
}
@@ -89,6 +78,32 @@ else if ( e != null )
8978
} ), IGNORE );
9079
}
9180

81+
/**
82+
* Defines how a subscriber shall consume records.
83+
* A record consumer holds a reference to a subscriber.
84+
* A publisher and/or a subscription who holds a reference to this consumer shall release the reference to this object
85+
* after subscription is done or cancelled so that the subscriber can be garbage collected.
86+
* @param sink the subscriber
87+
* @return a record consumer.
88+
*/
89+
private BiConsumer<Record,Throwable> createRecordConsumer( FluxSink<Record> sink )
90+
{
91+
return ( r, e ) -> {
92+
if ( r != null )
93+
{
94+
sink.next( r );
95+
}
96+
else if ( e != null )
97+
{
98+
sink.error( e );
99+
}
100+
else
101+
{
102+
sink.complete();
103+
}
104+
};
105+
}
106+
92107
private CompletionStage<RxStatementResultCursor> getCursorFuture()
93108
{
94109
if ( cursorFuture != null )

0 commit comments

Comments
 (0)