Skip to content

Commit a6a74d8

Browse files
committed
Make reactive subscription request with Long.MAX_VALUE unbounded (neo4j#1003)
* Make reactive subscription request with Long.MAX_VALUE unbounded This update ensures that unbounded reactive request is translated to unbounded Bolt request. * Update ListBasedPullHandler test class to support unbounded request
1 parent 499e6c9 commit a6a74d8

File tree

5 files changed

+26
-12
lines changed

5 files changed

+26
-12
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
8888
@Override
8989
public void request( long n )
9090
{
91+
if ( n == Long.MAX_VALUE )
92+
{
93+
n = -1;
94+
}
9195
pullHandler.request( n );
9296
}
9397

driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,21 @@ void shouldPull()
122122
verify( pullHandler ).request( 100 );
123123
}
124124

125+
@Test
126+
void shouldPullUnboundedOnLongMax()
127+
{
128+
// Given
129+
RunResponseHandler runHandler = newRunResponseHandler();
130+
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
131+
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
132+
133+
// When
134+
cursor.request( Long.MAX_VALUE );
135+
136+
// Then
137+
verify( pullHandler ).request( -1 );
138+
}
139+
125140
@Test
126141
void shouldCancel()
127142
{

driver/src/test/java/org/neo4j/driver/internal/reactive/util/ListBasedPullHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@
2222
import java.util.Map;
2323

2424
import org.neo4j.driver.Query;
25+
import org.neo4j.driver.Record;
26+
import org.neo4j.driver.Value;
2527
import org.neo4j.driver.internal.handlers.PullResponseCompletionListener;
2628
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2729
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
2830
import org.neo4j.driver.internal.spi.Connection;
2931
import org.neo4j.driver.internal.util.MetadataExtractor;
3032
import org.neo4j.driver.internal.util.QueryKeys;
3133
import org.neo4j.driver.internal.value.BooleanValue;
32-
import org.neo4j.driver.Record;
33-
import org.neo4j.driver.Value;
3434
import org.neo4j.driver.summary.ResultSummary;
3535

3636
import static java.util.Collections.emptyList;
@@ -81,7 +81,7 @@ private ListBasedPullHandler( List<Record> list, Throwable error )
8181
public void request( long n )
8282
{
8383
super.request( n );
84-
while ( index < list.size() && n-- > 0 )
84+
while ( index < list.size() && (n == -1 || n-- > 0) )
8585
{
8686
onRecord( list.get( index++ ).values().toArray( new Value[0] ) );
8787
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ private CompletionStage<Record> consumeNextRecordOrCompletionSignal( RxBlockingS
132132

133133
private long getFetchSize( RxResultHolder resultHolder )
134134
{
135-
return resultHolder.getSessionHolder().getConfig()
136-
.fetchSize()
137-
.orElse( resultHolder.getSessionHolder().getDriverHolder().getConfig().fetchSize() );
135+
long fetchSize = resultHolder.getSessionHolder().getConfig()
136+
.fetchSize()
137+
.orElse( resultHolder.getSessionHolder().getDriverHolder().getConfig().fetchSize() );
138+
return fetchSize == -1 ? Long.MAX_VALUE : fetchSize;
138139
}
139140

140141
private neo4j.org.testkit.backend.messages.responses.Record createResponse( Record record )

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ public class StartTest implements TestkitRequest
8080
skipMessage );
8181
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_until_success_with_leader_change_using_tx_function$",
8282
skipMessage );
83-
skipMessage = "Fetch size -1 not supported";
84-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_pull_all_when_fetch_is_minus_one_using_driver_configuration$", skipMessage );
85-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationSessionRun\\.test_all$", skipMessage );
86-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationSessionRun\\.test_all_slow_connection$", skipMessage );
87-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationTxRun\\.test_all$", skipMessage );
88-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationTxRun\\.test_all_slow_connection$", skipMessage );
8983
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$",
9084
"Does not support partially consumed state" );
9185
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );

0 commit comments

Comments
 (0)