diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index c77f7a0e65..dd85211cca 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -50,7 +50,6 @@ public class NettyConnection implements Connection private final Clock clock; private final AtomicBoolean open = new AtomicBoolean( true ); - private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true ); public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) { @@ -72,8 +71,7 @@ public boolean isOpen() @Override public void enableAutoRead() { - assertOpen(); - if ( autoReadEnabled.compareAndSet( false, true ) ) + if ( isOpen() ) { setAutoRead( true ); } @@ -82,8 +80,7 @@ public void enableAutoRead() @Override public void disableAutoRead() { - assertOpen(); - if ( autoReadEnabled.compareAndSet( true, false ) ) + if ( isOpen() ) { setAutoRead( false ); } @@ -110,6 +107,9 @@ public CompletionStage release() { if ( open.compareAndSet( true, false ) ) { + // auto-read could've been disabled, re-enable it to automatically receive response for RESET + setAutoRead( true ); + reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) ); } return releaseFuture; @@ -180,7 +180,7 @@ private void setAutoRead( boolean value ) private void assertOpen() { - if ( !open.get() ) + if ( !isOpen() ) { throw new IllegalStateException( "Connection has been released to the pool and can't be reused" ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index e957fcd3a6..c76025f329 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -18,9 +18,7 @@ */ package org.neo4j.driver.internal.handlers; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; +import java.util.ArrayDeque; import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; @@ -29,36 +27,27 @@ import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.internal.summary.InternalNotification; -import org.neo4j.driver.internal.summary.InternalPlan; -import org.neo4j.driver.internal.summary.InternalProfiledPlan; -import org.neo4j.driver.internal.summary.InternalResultSummary; -import org.neo4j.driver.internal.summary.InternalServerInfo; -import org.neo4j.driver.internal.summary.InternalSummaryCounters; +import org.neo4j.driver.internal.util.MetadataUtil; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.Value; -import org.neo4j.driver.v1.summary.Notification; -import org.neo4j.driver.v1.summary.Plan; -import org.neo4j.driver.v1.summary.ProfiledPlan; import org.neo4j.driver.v1.summary.ResultSummary; -import org.neo4j.driver.v1.summary.StatementType; import static java.util.Collections.emptyMap; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.neo4j.driver.internal.util.Futures.failedFuture; -// todo: unit tests public abstract class PullAllResponseHandler implements ResponseHandler { - private static final boolean TOUCH_AUTO_READ = false; + static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 ); + static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 ); private final Statement statement; private final RunResponseHandler runResponseHandler; protected final Connection connection; - private final Queue records = new LinkedList<>(); + private final Queue records = new ArrayDeque<>(); private boolean finished; private Throwable failure; @@ -210,25 +199,31 @@ else if ( finished ) private void queueRecord( Record record ) { records.add( record ); - if ( TOUCH_AUTO_READ ) + + boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null; + // when summary or failure is requested we have to buffer all remaining records and then return summary/failure + // do not disable auto-read in this case, otherwise records will not be consumed and trailing + // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure + if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK ) { - if ( records.size() > 10_000 ) - { - connection.disableAutoRead(); - } + // more than high watermark records are already queued, tell connection to stop auto-reading from network + // this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are + // fetched from network faster than consumed + connection.disableAutoRead(); } } private Record dequeueRecord() { Record record = records.poll(); - if ( TOUCH_AUTO_READ ) + + if ( records.size() < RECORD_BUFFER_LOW_WATERMARK ) { - if ( record != null && records.size() < 100 ) - { - connection.enableAutoRead(); - } + // less than low watermark records are now available in the buffer, tell connection to pre-fetch more + // and populate queue with new records from network + connection.enableAutoRead(); } + return record; } @@ -302,89 +297,7 @@ private boolean completeFailureFuture( Throwable error ) private ResultSummary extractResultSummary( Map metadata ) { - InternalServerInfo serverInfo = new InternalServerInfo( connection.serverAddress(), - connection.serverVersion() ); - return new InternalResultSummary( statement, serverInfo, extractStatementType( metadata ), - extractCounters( metadata ), extractPlan( metadata ), extractProfiledPlan( metadata ), - extractNotifications( metadata ), runResponseHandler.resultAvailableAfter(), - extractResultConsumedAfter( metadata ) ); - } - - private static StatementType extractStatementType( Map metadata ) - { - Value typeValue = metadata.get( "type" ); - if ( typeValue != null ) - { - return StatementType.fromCode( typeValue.asString() ); - } - return null; - } - - private static InternalSummaryCounters extractCounters( Map metadata ) - { - Value countersValue = metadata.get( "stats" ); - if ( countersValue != null ) - { - return new InternalSummaryCounters( - counterValue( countersValue, "nodes-created" ), - counterValue( countersValue, "nodes-deleted" ), - counterValue( countersValue, "relationships-created" ), - counterValue( countersValue, "relationships-deleted" ), - counterValue( countersValue, "properties-set" ), - counterValue( countersValue, "labels-added" ), - counterValue( countersValue, "labels-removed" ), - counterValue( countersValue, "indexes-added" ), - counterValue( countersValue, "indexes-removed" ), - counterValue( countersValue, "constraints-added" ), - counterValue( countersValue, "constraints-removed" ) - ); - } - return null; - } - - private static int counterValue( Value countersValue, String name ) - { - Value value = countersValue.get( name ); - return value.isNull() ? 0 : value.asInt(); - } - - private static Plan extractPlan( Map metadata ) - { - Value planValue = metadata.get( "plan" ); - if ( planValue != null ) - { - return InternalPlan.EXPLAIN_PLAN_FROM_VALUE.apply( planValue ); - } - return null; - } - - private static ProfiledPlan extractProfiledPlan( Map metadata ) - { - Value profiledPlanValue = metadata.get( "profile" ); - if ( profiledPlanValue != null ) - { - return InternalProfiledPlan.PROFILED_PLAN_FROM_VALUE.apply( profiledPlanValue ); - } - return null; - } - - private static List extractNotifications( Map metadata ) - { - Value notificationsValue = metadata.get( "notifications" ); - if ( notificationsValue != null ) - { - return notificationsValue.asList( InternalNotification.VALUE_TO_NOTIFICATION ); - } - return Collections.emptyList(); - } - - private static long extractResultConsumedAfter( Map metadata ) - { - Value resultConsumedAfterValue = metadata.get( "result_consumed_after" ); - if ( resultConsumedAfterValue != null ) - { - return resultConsumedAfterValue.asLong(); - } - return -1; + long resultAvailableAfter = runResponseHandler.resultAvailableAfter(); + return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java index 3b2c1dc130..bb9bc2dccf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.internal.handlers; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -27,13 +26,15 @@ import org.neo4j.driver.v1.Value; import static java.util.Collections.emptyList; +import static org.neo4j.driver.internal.util.MetadataUtil.extractResultAvailableAfter; +import static org.neo4j.driver.internal.util.MetadataUtil.extractStatementKeys; public class RunResponseHandler implements ResponseHandler { private final CompletableFuture runCompletedFuture; private List statementKeys = emptyList(); - private long resultAvailableAfter; + private long resultAvailableAfter = -1; public RunResponseHandler( CompletableFuture runCompletedFuture ) { @@ -43,7 +44,7 @@ public RunResponseHandler( CompletableFuture runCompletedFuture ) @Override public void onSuccess( Map metadata ) { - statementKeys = extractKeys( metadata ); + statementKeys = extractStatementKeys( metadata ); resultAvailableAfter = extractResultAvailableAfter( metadata ); completeRunFuture(); @@ -80,33 +81,4 @@ private void completeRunFuture() { runCompletedFuture.complete( null ); } - - private static List extractKeys( Map metadata ) - { - Value keysValue = metadata.get( "fields" ); - if ( keysValue != null ) - { - if ( !keysValue.isEmpty() ) - { - List keys = new ArrayList<>( keysValue.size() ); - for ( Value value : keysValue.values() ) - { - keys.add( value.asString() ); - } - - return keys; - } - } - return emptyList(); - } - - private static long extractResultAvailableAfter( Map metadata ) - { - Value resultAvailableAfterValue = metadata.get( "result_available_after" ); - if ( resultAvailableAfterValue != null ) - { - return resultAvailableAfterValue.asLong(); - } - return -1; - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalInputPosition.java b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalInputPosition.java index d9cd9ad48c..443b9838f5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalInputPosition.java +++ b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalInputPosition.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.summary; +import java.util.Objects; + import org.neo4j.driver.v1.summary.InputPosition; /** @@ -61,6 +63,29 @@ public int column() return column; } + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + InternalInputPosition that = (InternalInputPosition) o; + return offset == that.offset && + line == that.line && + column == that.column; + } + + @Override + public int hashCode() + { + return Objects.hash( offset, line, column ); + } + @Override public String toString() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalResultSummary.java b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalResultSummary.java index ed299e6bd4..1fb5986818 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalResultSummary.java +++ b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalResultSummary.java @@ -110,13 +110,15 @@ public List notifications() @Override public long resultAvailableAfter( TimeUnit unit ) { - return unit.convert( resultAvailableAfter, TimeUnit.MILLISECONDS ); + return resultAvailableAfter == -1 ? resultAvailableAfter + : unit.convert( resultAvailableAfter, TimeUnit.MILLISECONDS ); } @Override public long resultConsumedAfter( TimeUnit unit ) { - return unit.convert( resultConsumedAfter, TimeUnit.MILLISECONDS ); + return resultConsumedAfter == -1 ? resultConsumedAfter + : unit.convert( resultConsumedAfter, TimeUnit.MILLISECONDS ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalServerInfo.java b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalServerInfo.java index 482a97595c..2f7d59d8b8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalServerInfo.java +++ b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalServerInfo.java @@ -25,29 +25,19 @@ public class InternalServerInfo implements ServerInfo { - private final BoltServerAddress address; + private final String address; private final String version; public InternalServerInfo( BoltServerAddress address, ServerVersion version ) { - this( address, version.toString() ); - } - - public InternalServerInfo( BoltServerAddress address, String version ) - { - this.address = address; - this.version = version; - } - - public BoltServerAddress boltServerAddress() - { - return this.address; + this.address = address.toString(); + this.version = version.toString(); } @Override public String address() { - return this.address.toString(); + return address; } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/MetadataUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/MetadataUtil.java new file mode 100644 index 0000000000..9700233483 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/util/MetadataUtil.java @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.summary.InternalNotification; +import org.neo4j.driver.internal.summary.InternalPlan; +import org.neo4j.driver.internal.summary.InternalProfiledPlan; +import org.neo4j.driver.internal.summary.InternalResultSummary; +import org.neo4j.driver.internal.summary.InternalServerInfo; +import org.neo4j.driver.internal.summary.InternalSummaryCounters; +import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.summary.Notification; +import org.neo4j.driver.v1.summary.Plan; +import org.neo4j.driver.v1.summary.ProfiledPlan; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.summary.ServerInfo; +import org.neo4j.driver.v1.summary.StatementType; + +import static java.util.Collections.emptyList; + +public final class MetadataUtil +{ + private MetadataUtil() + { + } + + public static List extractStatementKeys( Map metadata ) + { + Value keysValue = metadata.get( "fields" ); + if ( keysValue != null ) + { + if ( !keysValue.isEmpty() ) + { + List keys = new ArrayList<>( keysValue.size() ); + for ( Value value : keysValue.values() ) + { + keys.add( value.asString() ); + } + + return keys; + } + } + return emptyList(); + } + + public static long extractResultAvailableAfter( Map metadata ) + { + Value resultAvailableAfterValue = metadata.get( "result_available_after" ); + if ( resultAvailableAfterValue != null ) + { + return resultAvailableAfterValue.asLong(); + } + return -1; + } + + public static ResultSummary extractSummary( Statement statement, Connection connection, long resultAvailableAfter, + Map metadata ) + { + ServerInfo serverInfo = new InternalServerInfo( connection.serverAddress(), connection.serverVersion() ); + return new InternalResultSummary( statement, serverInfo, extractStatementType( metadata ), + extractCounters( metadata ), extractPlan( metadata ), extractProfiledPlan( metadata ), + extractNotifications( metadata ), resultAvailableAfter, extractResultConsumedAfter( metadata ) ); + } + + private static StatementType extractStatementType( Map metadata ) + { + Value typeValue = metadata.get( "type" ); + if ( typeValue != null ) + { + return StatementType.fromCode( typeValue.asString() ); + } + return null; + } + + private static InternalSummaryCounters extractCounters( Map metadata ) + { + Value countersValue = metadata.get( "stats" ); + if ( countersValue != null ) + { + return new InternalSummaryCounters( + counterValue( countersValue, "nodes-created" ), + counterValue( countersValue, "nodes-deleted" ), + counterValue( countersValue, "relationships-created" ), + counterValue( countersValue, "relationships-deleted" ), + counterValue( countersValue, "properties-set" ), + counterValue( countersValue, "labels-added" ), + counterValue( countersValue, "labels-removed" ), + counterValue( countersValue, "indexes-added" ), + counterValue( countersValue, "indexes-removed" ), + counterValue( countersValue, "constraints-added" ), + counterValue( countersValue, "constraints-removed" ) + ); + } + return null; + } + + private static int counterValue( Value countersValue, String name ) + { + Value value = countersValue.get( name ); + return value.isNull() ? 0 : value.asInt(); + } + + private static Plan extractPlan( Map metadata ) + { + Value planValue = metadata.get( "plan" ); + if ( planValue != null ) + { + return InternalPlan.EXPLAIN_PLAN_FROM_VALUE.apply( planValue ); + } + return null; + } + + private static ProfiledPlan extractProfiledPlan( Map metadata ) + { + Value profiledPlanValue = metadata.get( "profile" ); + if ( profiledPlanValue != null ) + { + return InternalProfiledPlan.PROFILED_PLAN_FROM_VALUE.apply( profiledPlanValue ); + } + return null; + } + + private static List extractNotifications( Map metadata ) + { + Value notificationsValue = metadata.get( "notifications" ); + if ( notificationsValue != null ) + { + return notificationsValue.asList( InternalNotification.VALUE_TO_NOTIFICATION ); + } + return Collections.emptyList(); + } + + private static long extractResultConsumedAfter( Map metadata ) + { + Value resultConsumedAfterValue = metadata.get( "result_consumed_after" ); + if ( resultConsumedAfterValue != null ) + { + return resultConsumedAfterValue.asLong(); + } + return -1; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 7938b9bee5..1791520704 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -90,6 +90,7 @@ public void setUp() { connection = connectionMock(); when( connection.release() ).thenReturn( completedFuture( null ) ); + when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index d2ab115426..44c076a783 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -122,7 +122,7 @@ public void shouldWriteForceReleaseInEventLoopThread() throws Exception } @Test - public void shouldNotEnableAutoReadWhenReleased() + public void shouldEnableAutoReadWhenReleased() { EmbeddedChannel channel = new EmbeddedChannel(); channel.config().setAutoRead( false ); @@ -130,17 +130,7 @@ public void shouldNotEnableAutoReadWhenReleased() NettyConnection connection = newConnection( channel ); connection.release(); - - try - { - connection.enableAutoRead(); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertConnectionReleasedError( e ); - } - assertFalse( channel.config().isAutoRead() ); + assertTrue( channel.config().isAutoRead() ); } @Test @@ -152,16 +142,7 @@ public void shouldNotDisableAutoReadWhenReleased() NettyConnection connection = newConnection( channel ); connection.release(); - - try - { - connection.disableAutoRead(); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertConnectionReleasedError( e ); - } + connection.disableAutoRead(); // does nothing on released connection assertTrue( channel.config().isAutoRead() ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java new file mode 100644 index 0000000000..0f15368b52 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java @@ -0,0 +1,822 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.summary.StatementType; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.v1.Values.value; +import static org.neo4j.driver.v1.Values.values; +import static org.neo4j.driver.v1.util.TestUtil.await; + +public class PullAllResponseHandlerTest +{ + @Test + public void shouldReturnNoFailureWhenAlreadySucceeded() + { + PullAllResponseHandler handler = newHandler(); + handler.onSuccess( emptyMap() ); + + Throwable failure = await( handler.failureAsync() ); + + assertNull( failure ); + } + + @Test + public void shouldReturnNoFailureWhenSucceededAfterFailureRequested() + { + PullAllResponseHandler handler = newHandler(); + + CompletableFuture failureFuture = handler.failureAsync().toCompletableFuture(); + assertFalse( failureFuture.isDone() ); + + handler.onSuccess( emptyMap() ); + + assertTrue( failureFuture.isDone() ); + assertNull( await( failureFuture ) ); + } + + @Test + public void shouldReturnFailureWhenAlreadyFailed() + { + PullAllResponseHandler handler = newHandler(); + + RuntimeException failure = new RuntimeException( "Ops" ); + handler.onFailure( failure ); + + Throwable receivedFailure = await( handler.failureAsync() ); + assertEquals( failure, receivedFailure ); + } + + @Test + public void shouldReturnFailureWhenFailedAfterFailureRequested() + { + PullAllResponseHandler handler = newHandler(); + + CompletableFuture failureFuture = handler.failureAsync().toCompletableFuture(); + assertFalse( failureFuture.isDone() ); + + IOException failure = new IOException( "Broken pipe" ); + handler.onFailure( failure ); + + assertTrue( failureFuture.isDone() ); + assertEquals( failure, await( failureFuture ) ); + } + + @Test + public void shouldReturnFailureWhenRequestedMultipleTimes() + { + PullAllResponseHandler handler = newHandler(); + + CompletableFuture failureFuture1 = handler.failureAsync().toCompletableFuture(); + CompletableFuture failureFuture2 = handler.failureAsync().toCompletableFuture(); + + assertFalse( failureFuture1.isDone() ); + assertFalse( failureFuture2.isDone() ); + + RuntimeException failure = new RuntimeException( "Unable to contact database" ); + handler.onFailure( failure ); + + assertTrue( failureFuture1.isDone() ); + assertTrue( failureFuture2.isDone() ); + + assertEquals( failure, await( failureFuture1 ) ); + assertEquals( failure, await( failureFuture2 ) ); + } + + @Test + public void shouldReturnFailureOnlyOnceWhenFailedBeforeFailureRequested() + { + PullAllResponseHandler handler = newHandler(); + + ServiceUnavailableException failure = new ServiceUnavailableException( "Connection terminated" ); + handler.onFailure( failure ); + + assertEquals( failure, await( handler.failureAsync() ) ); + assertNull( await( handler.failureAsync() ) ); + } + + @Test + public void shouldReturnFailureOnlyOnceWhenFailedAfterFailureRequested() + { + PullAllResponseHandler handler = newHandler(); + + CompletionStage failureFuture = handler.failureAsync(); + + SessionExpiredException failure = new SessionExpiredException( "Network unreachable" ); + handler.onFailure( failure ); + assertEquals( failure, await( failureFuture ) ); + + assertNull( await( handler.failureAsync() ) ); + } + + @Test + public void shouldReturnSummaryWhenAlreadySucceeded() + { + Statement statement = new Statement( "CREATE () RETURN 42" ); + PullAllResponseHandler handler = newHandler( statement ); + handler.onSuccess( singletonMap( "type", value( "rw" ) ) ); + + ResultSummary summary = await( handler.summaryAsync() ); + + assertEquals( statement, summary.statement() ); + assertEquals( StatementType.READ_WRITE, summary.statementType() ); + } + + @Test + public void shouldReturnSummaryWhenSucceededAfterSummaryRequested() + { + Statement statement = new Statement( "RETURN 'Hi!" ); + PullAllResponseHandler handler = newHandler( statement ); + + CompletableFuture summaryFuture = handler.summaryAsync().toCompletableFuture(); + assertFalse( summaryFuture.isDone() ); + + handler.onSuccess( singletonMap( "type", value( "r" ) ) ); + + assertTrue( summaryFuture.isDone() ); + ResultSummary summary = await( summaryFuture ); + + assertEquals( statement, summary.statement() ); + assertEquals( StatementType.READ_ONLY, summary.statementType() ); + } + + @Test + public void shouldReturnFailureWhenSummaryRequestedWhenAlreadyFailed() + { + PullAllResponseHandler handler = newHandler(); + + RuntimeException failure = new RuntimeException( "Computer is burning" ); + handler.onFailure( failure ); + + try + { + await( handler.summaryAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( failure, e ); + } + } + + @Test + public void shouldReturnFailureWhenFailedAfterSummaryRequested() + { + PullAllResponseHandler handler = newHandler(); + + CompletableFuture summaryFuture = handler.summaryAsync().toCompletableFuture(); + assertFalse( summaryFuture.isDone() ); + + IOException failure = new IOException( "Failed to write" ); + handler.onFailure( failure ); + + assertTrue( summaryFuture.isDone() ); + + try + { + await( summaryFuture ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( failure, e ); + } + } + + @Test + public void shouldFailSummaryWhenRequestedMultipleTimes() + { + PullAllResponseHandler handler = newHandler(); + + CompletableFuture summaryFuture1 = handler.summaryAsync().toCompletableFuture(); + CompletableFuture summaryFuture2 = handler.summaryAsync().toCompletableFuture(); + assertFalse( summaryFuture1.isDone() ); + assertFalse( summaryFuture2.isDone() ); + + ClosedChannelException failure = new ClosedChannelException(); + handler.onFailure( failure ); + + assertTrue( summaryFuture1.isDone() ); + assertTrue( summaryFuture2.isDone() ); + + try + { + await( summaryFuture1 ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( failure, e ); + } + + try + { + await( summaryFuture2 ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( failure, e ); + } + } + + @Test + public void shouldPropagateFailureOnlyOnceFromSummary() + { + Statement statement = new Statement( "CREATE INDEX ON :Person(name)" ); + PullAllResponseHandler handler = newHandler( statement ); + + IllegalStateException failure = new IllegalStateException( "Some state is illegal :(" ); + handler.onFailure( failure ); + + try + { + await( handler.summaryAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( failure, e ); + } + + ResultSummary summary = await( handler.summaryAsync() ); + assertNotNull( summary ); + assertEquals( statement, summary.statement() ); + } + + @Test + public void shouldReturnSummaryWhenAlreadyFailedAndFailureConsumed() + { + Statement statement = new Statement( "CREATE ()" ); + PullAllResponseHandler handler = newHandler( statement ); + + ServiceUnavailableException failure = new ServiceUnavailableException( "Neo4j unreachable" ); + handler.onFailure( failure ); + + assertEquals( failure, await( handler.failureAsync() ) ); + + ResultSummary summary = await( handler.summaryAsync() ); + assertNotNull( summary ); + assertEquals( statement, summary.statement() ); + } + + @Test + public void shouldPeekSingleAvailableRecord() + { + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys ); + handler.onRecord( values( "a", "b" ) ); + + Record record = await( handler.peekAsync() ); + + assertEquals( keys, record.keys() ); + assertEquals( "a", record.get( "key1" ).asString() ); + assertEquals( "b", record.get( "key2" ).asString() ); + } + + @Test + public void shouldPeekFirstRecordWhenMultipleAvailable() + { + List keys = asList( "key1", "key2", "key3" ); + PullAllResponseHandler handler = newHandler( keys ); + + handler.onRecord( values( "a1", "b1", "c1" ) ); + handler.onRecord( values( "a2", "b2", "c2" ) ); + handler.onRecord( values( "a3", "b3", "c3" ) ); + + Record record = await( handler.peekAsync() ); + + assertEquals( keys, record.keys() ); + assertEquals( "a1", record.get( "key1" ).asString() ); + assertEquals( "b1", record.get( "key2" ).asString() ); + assertEquals( "c1", record.get( "key3" ).asString() ); + } + + @Test + public void shouldPeekRecordThatBecomesAvailableLater() + { + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys ); + + CompletableFuture recordFuture = handler.peekAsync().toCompletableFuture(); + assertFalse( recordFuture.isDone() ); + + handler.onRecord( values( 24, 42 ) ); + assertTrue( recordFuture.isDone() ); + + Record record = await( recordFuture ); + assertEquals( keys, record.keys() ); + assertEquals( 24, record.get( "key1" ).asInt() ); + assertEquals( 42, record.get( "key2" ).asInt() ); + } + + @Test + public void shouldPeekAvailableNothingAfterSuccess() + { + List keys = asList( "key1", "key2", "key3" ); + PullAllResponseHandler handler = newHandler( keys ); + + handler.onRecord( values( 1, 2, 3 ) ); + handler.onSuccess( emptyMap() ); + + Record record = await( handler.peekAsync() ); + assertEquals( keys, record.keys() ); + assertEquals( 1, record.get( "key1" ).asInt() ); + assertEquals( 2, record.get( "key2" ).asInt() ); + assertEquals( 3, record.get( "key3" ).asInt() ); + } + + @Test + public void shouldPeekNothingAfterSuccess() + { + PullAllResponseHandler handler = newHandler(); + handler.onSuccess( emptyMap() ); + + assertNull( await( handler.peekAsync() ) ); + } + + @Test + public void shouldPeekWhenRequestedMultipleTimes() + { + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys ); + + CompletableFuture recordFuture1 = handler.peekAsync().toCompletableFuture(); + CompletableFuture recordFuture2 = handler.peekAsync().toCompletableFuture(); + CompletableFuture recordFuture3 = handler.peekAsync().toCompletableFuture(); + + assertFalse( recordFuture1.isDone() ); + assertFalse( recordFuture2.isDone() ); + assertFalse( recordFuture3.isDone() ); + + handler.onRecord( values( 2, 1 ) ); + + assertTrue( recordFuture1.isDone() ); + assertTrue( recordFuture2.isDone() ); + assertTrue( recordFuture3.isDone() ); + + Record record1 = await( recordFuture1 ); + Record record2 = await( recordFuture2 ); + Record record3 = await( recordFuture3 ); + + assertEquals( keys, record1.keys() ); + assertEquals( keys, record2.keys() ); + assertEquals( keys, record3.keys() ); + + assertEquals( 2, record1.get( "key1" ).asInt() ); + assertEquals( 1, record1.get( "key2" ).asInt() ); + + assertEquals( 2, record2.get( "key1" ).asInt() ); + assertEquals( 1, record2.get( "key2" ).asInt() ); + + assertEquals( 2, record3.get( "key1" ).asInt() ); + assertEquals( 1, record3.get( "key2" ).asInt() ); + } + + @Test + public void shouldPropagateNotConsumedFailureInPeek() + { + PullAllResponseHandler handler = newHandler(); + + RuntimeException failure = new RuntimeException( "Something is wrong" ); + handler.onFailure( failure ); + + try + { + await( handler.peekAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( failure, e ); + } + } + + @Test + public void shouldPropagateFailureInPeekWhenItBecomesAvailable() + { + PullAllResponseHandler handler = newHandler(); + + CompletableFuture recordFuture = handler.peekAsync().toCompletableFuture(); + assertFalse( recordFuture.isDone() ); + + RuntimeException failure = new RuntimeException( "Error" ); + handler.onFailure( failure ); + + try + { + await( recordFuture ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( failure, e ); + } + } + + @Test + public void shouldPropagateFailureInPeekOnlyOnce() + { + PullAllResponseHandler handler = newHandler(); + + RuntimeException failure = new RuntimeException( "Something is wrong" ); + handler.onFailure( failure ); + + try + { + await( handler.peekAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( failure, e ); + } + + assertNull( await( handler.peekAsync() ) ); + } + + @Test + public void shouldReturnSingleAvailableRecordInNextAsync() + { + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys ); + handler.onRecord( values( "1", "2" ) ); + + Record record = await( handler.nextAsync() ); + + assertNotNull( record ); + assertEquals( keys, record.keys() ); + assertEquals( "1", record.get( "key1" ).asString() ); + assertEquals( "2", record.get( "key2" ).asString() ); + } + + @Test + public void shouldReturnNoRecordsWhenNoneAvailableInNextAsync() + { + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ) ); + handler.onSuccess( emptyMap() ); + + assertNull( await( handler.nextAsync() ) ); + } + + @Test + public void shouldReturnNoRecordsWhenSuccessComesAfterNextAsync() + { + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ) ); + + CompletableFuture recordFuture = handler.nextAsync().toCompletableFuture(); + assertFalse( recordFuture.isDone() ); + + handler.onSuccess( emptyMap() ); + assertTrue( recordFuture.isDone() ); + + assertNull( await( recordFuture ) ); + } + + @Test + public void shouldPullAllAvailableRecordsWithNextAsync() + { + List keys = asList( "key1", "key2", "key3" ); + PullAllResponseHandler handler = newHandler( keys ); + + handler.onRecord( values( 1, 2, 3 ) ); + handler.onRecord( values( 11, 22, 33 ) ); + handler.onRecord( values( 111, 222, 333 ) ); + handler.onRecord( values( 1111, 2222, 3333 ) ); + handler.onSuccess( emptyMap() ); + + Record record1 = await( handler.nextAsync() ); + assertNotNull( record1 ); + assertEquals( keys, record1.keys() ); + assertEquals( 1, record1.get( "key1" ).asInt() ); + assertEquals( 2, record1.get( "key2" ).asInt() ); + assertEquals( 3, record1.get( "key3" ).asInt() ); + + Record record2 = await( handler.nextAsync() ); + assertNotNull( record2 ); + assertEquals( keys, record2.keys() ); + assertEquals( 11, record2.get( "key1" ).asInt() ); + assertEquals( 22, record2.get( "key2" ).asInt() ); + assertEquals( 33, record2.get( "key3" ).asInt() ); + + Record record3 = await( handler.nextAsync() ); + assertNotNull( record3 ); + assertEquals( keys, record3.keys() ); + assertEquals( 111, record3.get( "key1" ).asInt() ); + assertEquals( 222, record3.get( "key2" ).asInt() ); + assertEquals( 333, record3.get( "key3" ).asInt() ); + + Record record4 = await( handler.nextAsync() ); + assertNotNull( record4 ); + assertEquals( keys, record4.keys() ); + assertEquals( 1111, record4.get( "key1" ).asInt() ); + assertEquals( 2222, record4.get( "key2" ).asInt() ); + assertEquals( 3333, record4.get( "key3" ).asInt() ); + + assertNull( await( handler.nextAsync() ) ); + assertNull( await( handler.nextAsync() ) ); + } + + @Test + public void shouldReturnRecordInNextAsyncWhenItBecomesAvailableLater() + { + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys ); + + CompletableFuture recordFuture = handler.nextAsync().toCompletableFuture(); + assertFalse( recordFuture.isDone() ); + + handler.onRecord( values( 24, 42 ) ); + assertTrue( recordFuture.isDone() ); + + Record record = await( recordFuture ); + assertNotNull( record ); + assertEquals( keys, record.keys() ); + assertEquals( 24, record.get( "key1" ).asInt() ); + assertEquals( 42, record.get( "key2" ).asInt() ); + } + + @Test + public void shouldReturnSameRecordOnceWhenRequestedMultipleTimesInNextAsync() + { + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys ); + + CompletableFuture recordFuture1 = handler.nextAsync().toCompletableFuture(); + CompletableFuture recordFuture2 = handler.nextAsync().toCompletableFuture(); + assertFalse( recordFuture1.isDone() ); + assertFalse( recordFuture2.isDone() ); + + handler.onRecord( values( "A", "B" ) ); + assertTrue( recordFuture1.isDone() ); + assertTrue( recordFuture2.isDone() ); + + Record record1 = await( recordFuture1 ); + Record record2 = await( recordFuture2 ); + + // record should be returned only once because #nextAsync() polls it + assertTrue( record1 != null || record2 != null ); + Record record = record1 != null ? record1 : record2; + + assertNotNull( record ); + assertEquals( keys, record.keys() ); + assertEquals( "A", record.get( "key1" ).asString() ); + assertEquals( "B", record.get( "key2" ).asString() ); + } + + @Test + public void shouldPropagateExistingFailureInNextAsync() + { + PullAllResponseHandler handler = newHandler(); + RuntimeException error = new RuntimeException( "Failed to read" ); + handler.onFailure( error ); + + try + { + await( handler.nextAsync() ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + } + + @Test + public void shouldPropagateFailureInNextAsyncWhenFailureMessagesArrivesLater() + { + PullAllResponseHandler handler = newHandler(); + + CompletableFuture recordFuture = handler.nextAsync().toCompletableFuture(); + assertFalse( recordFuture.isDone() ); + + RuntimeException error = new RuntimeException( "Network failed" ); + handler.onFailure( error ); + + assertTrue( recordFuture.isDone() ); + try + { + await( recordFuture ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( error, e ); + } + } + + @Test + public void shouldDisableAutoReadWhenTooManyRecordsArrive() + { + Connection connection = connectionMock(); + PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ), connection ); + + for ( int i = 0; i < PullAllResponseHandler.RECORD_BUFFER_HIGH_WATERMARK + 1; i++ ) + { + handler.onRecord( values( 100, 200 ) ); + } + + verify( connection ).disableAutoRead(); + } + + @Test + public void shouldEnableAutoReadWhenRecordsRetrievedFromBuffer() + { + Connection connection = connectionMock(); + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys, connection ); + + int i; + for ( i = 0; i < PullAllResponseHandler.RECORD_BUFFER_HIGH_WATERMARK + 1; i++ ) + { + handler.onRecord( values( 100, 200 ) ); + } + + verify( connection, never() ).enableAutoRead(); + verify( connection ).disableAutoRead(); + + while ( i-- > PullAllResponseHandler.RECORD_BUFFER_LOW_WATERMARK - 1 ) + { + Record record = await( handler.nextAsync() ); + assertNotNull( record ); + assertEquals( keys, record.keys() ); + assertEquals( 100, record.get( "key1" ).asInt() ); + assertEquals( 200, record.get( "key2" ).asInt() ); + } + verify( connection ).enableAutoRead(); + } + + @Test + public void shouldNotDisableAutoReadWhenSummaryRequested() + { + Connection connection = connectionMock(); + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys, connection ); + + CompletableFuture summaryFuture = handler.summaryAsync().toCompletableFuture(); + assertFalse( summaryFuture.isDone() ); + + int recordCount = PullAllResponseHandler.RECORD_BUFFER_HIGH_WATERMARK + 10; + for ( int i = 0; i < recordCount; i++ ) + { + handler.onRecord( values( "a", "b" ) ); + } + + verify( connection, never() ).enableAutoRead(); + verify( connection, never() ).disableAutoRead(); + + handler.onSuccess( emptyMap() ); + assertTrue( summaryFuture.isDone() ); + + ResultSummary summary = await( summaryFuture ); + assertNotNull( summary ); + + for ( int i = 0; i < recordCount; i++ ) + { + Record record = await( handler.nextAsync() ); + assertNotNull( record ); + assertEquals( keys, record.keys() ); + assertEquals( "a", record.get( "key1" ).asString() ); + assertEquals( "b", record.get( "key2" ).asString() ); + } + + assertNull( await( handler.nextAsync() ) ); + } + + @Test + public void shouldNotDisableAutoReadWhenFailureRequested() + { + Connection connection = connectionMock(); + List keys = asList( "key1", "key2" ); + PullAllResponseHandler handler = newHandler( keys, connection ); + + CompletableFuture failureFuture = handler.failureAsync().toCompletableFuture(); + assertFalse( failureFuture.isDone() ); + + int recordCount = PullAllResponseHandler.RECORD_BUFFER_HIGH_WATERMARK + 5; + for ( int i = 0; i < recordCount; i++ ) + { + handler.onRecord( values( 123, 456 ) ); + } + + verify( connection, never() ).enableAutoRead(); + verify( connection, never() ).disableAutoRead(); + + IllegalStateException error = new IllegalStateException( "Wrong config" ); + handler.onFailure( error ); + + assertTrue( failureFuture.isDone() ); + assertEquals( error, await( failureFuture ) ); + + for ( int i = 0; i < recordCount; i++ ) + { + Record record = await( handler.nextAsync() ); + assertNotNull( record ); + assertEquals( keys, record.keys() ); + assertEquals( 123, record.get( "key1" ).asInt() ); + assertEquals( 456, record.get( "key2" ).asInt() ); + } + + assertNull( await( handler.nextAsync() ) ); + } + + private static PullAllResponseHandler newHandler() + { + return newHandler( new Statement( "RETURN 1" ) ); + } + + private static PullAllResponseHandler newHandler( Statement statement ) + { + return newHandler( statement, emptyList(), connectionMock() ); + } + + private static PullAllResponseHandler newHandler( List statementKeys ) + { + return newHandler( new Statement( "RETURN 1" ), statementKeys, connectionMock() ); + } + + private static PullAllResponseHandler newHandler( List statementKeys, Connection connection ) + { + return newHandler( new Statement( "RETURN 1" ), statementKeys, connection ); + } + + private static PullAllResponseHandler newHandler( Statement statement, List statementKeys, + Connection connection ) + { + RunResponseHandler runResponseHandler = new RunResponseHandler( new CompletableFuture<>() ); + runResponseHandler.onSuccess( singletonMap( "fields", value( statementKeys ) ) ); + return new TestPullAllResponseHandler( statement, runResponseHandler, connection ); + } + + private static Connection connectionMock() + { + Connection connection = mock( Connection.class ); + when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); + when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); + return connection; + } + + private static class TestPullAllResponseHandler extends PullAllResponseHandler + { + TestPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection ) + { + super( statement, runResponseHandler, connection ); + } + + @Override + protected void afterSuccess() + { + } + + @Override + protected void afterFailure( Throwable error ) + { + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java new file mode 100644 index 0000000000..5c3d77d1b0 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.neo4j.driver.v1.Values.value; +import static org.neo4j.driver.v1.Values.values; + +public class RunResponseHandlerTest +{ + @Test + public void shouldNotifyCompletionFutureOnSuccess() throws Exception + { + CompletableFuture runCompletedFuture = new CompletableFuture<>(); + RunResponseHandler handler = newHandler( runCompletedFuture ); + + assertFalse( runCompletedFuture.isDone() ); + handler.onSuccess( emptyMap() ); + + assertTrue( runCompletedFuture.isDone() ); + assertNull( runCompletedFuture.get() ); + } + + @Test + public void shouldNotifyCompletionFutureOnFailure() throws Exception + { + CompletableFuture runCompletedFuture = new CompletableFuture<>(); + RunResponseHandler handler = newHandler( runCompletedFuture ); + + assertFalse( runCompletedFuture.isDone() ); + handler.onFailure( new RuntimeException() ); + + assertTrue( runCompletedFuture.isDone() ); + assertNull( runCompletedFuture.get() ); + } + + @Test + public void shouldThrowOnRecord() + { + RunResponseHandler handler = newHandler(); + + try + { + handler.onRecord( values( "a", "b", "c" ) ); + fail( "Exception expected" ); + } + catch ( UnsupportedOperationException ignore ) + { + } + } + + @Test + public void shouldReturnNoKeysWhenFailed() + { + RunResponseHandler handler = newHandler(); + + handler.onFailure( new RuntimeException() ); + + assertEquals( emptyList(), handler.statementKeys() ); + } + + @Test + public void shouldReturnDefaultResultAvailableAfterWhenFailed() + { + RunResponseHandler handler = newHandler(); + + handler.onFailure( new RuntimeException() ); + + assertEquals( -1, handler.resultAvailableAfter() ); + } + + @Test + public void shouldReturnKeysWhenSucceeded() + { + RunResponseHandler handler = newHandler(); + + List keys = asList( "key1", "key2", "key3" ); + handler.onSuccess( singletonMap( "fields", value( keys ) ) ); + + assertEquals( keys, handler.statementKeys() ); + } + + @Test + public void shouldReturnResultAvailableAfterWhenSucceeded() + { + RunResponseHandler handler = newHandler(); + + handler.onSuccess( singletonMap( "result_available_after", value( 42 ) ) ); + + assertEquals( 42L, handler.resultAvailableAfter() ); + } + + private static RunResponseHandler newHandler() + { + return new RunResponseHandler( new CompletableFuture<>() ); + } + + private static RunResponseHandler newHandler( CompletableFuture runCompletedFuture ) + { + return new RunResponseHandler( runCompletedFuture ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java index 793003bca4..720676bbeb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandlerTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; +import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Statement; @@ -64,6 +65,7 @@ private SessionPullAllResponseHandler newHandler( Connection connection ) private static Connection newConnectionMock() { Connection connection = mock( Connection.class ); + when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); return connection; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java index 057801d85c..28079924c8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; +import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.ServerVersion; @@ -37,6 +38,7 @@ public class TransactionPullAllResponseHandlerTest public void shouldMarkTransactionAsFailedOnFailure() { Connection connection = mock( Connection.class ); + when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); ExplicitTransaction tx = mock( ExplicitTransaction.class ); TransactionPullAllResponseHandler handler = new TransactionPullAllResponseHandler( new Statement( "RETURN 1" ), diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/MetadataUtilTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/MetadataUtilTest.java new file mode 100644 index 0000000000..901f9c6518 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/MetadataUtilTest.java @@ -0,0 +1,365 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.summary.InternalInputPosition; +import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.summary.Notification; +import org.neo4j.driver.v1.summary.Plan; +import org.neo4j.driver.v1.summary.ProfiledPlan; +import org.neo4j.driver.v1.summary.ResultSummary; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.summary.InternalSummaryCounters.EMPTY_STATS; +import static org.neo4j.driver.internal.util.MetadataUtil.extractResultAvailableAfter; +import static org.neo4j.driver.internal.util.MetadataUtil.extractStatementKeys; +import static org.neo4j.driver.internal.util.MetadataUtil.extractSummary; +import static org.neo4j.driver.v1.Values.parameters; +import static org.neo4j.driver.v1.Values.value; +import static org.neo4j.driver.v1.Values.values; +import static org.neo4j.driver.v1.summary.StatementType.READ_ONLY; +import static org.neo4j.driver.v1.summary.StatementType.READ_WRITE; +import static org.neo4j.driver.v1.summary.StatementType.SCHEMA_WRITE; +import static org.neo4j.driver.v1.summary.StatementType.WRITE_ONLY; + +public class MetadataUtilTest +{ + @Test + public void shouldExtractStatementKeys() + { + List keys = asList( "hello", " ", "world", "!" ); + List extractedKeys = extractStatementKeys( singletonMap( "fields", value( keys ) ) ); + assertEquals( keys, extractedKeys ); + } + + @Test + public void shouldExtractEmptyStatementKeysWhenNoneInMetadata() + { + List extractedKeys = extractStatementKeys( emptyMap() ); + assertEquals( emptyList(), extractedKeys ); + } + + @Test + public void shouldExtractResultAvailableAfter() + { + long extractedResultAvailableAfter = extractResultAvailableAfter( + singletonMap( "result_available_after", value( 424242 ) ) ); + assertEquals( 424242L, extractedResultAvailableAfter ); + } + + @Test + public void shouldExtractNoResultAvailableAfterWhenNoneInMetadata() + { + long extractedResultAvailableAfter = extractResultAvailableAfter( emptyMap() ); + assertEquals( -1, extractedResultAvailableAfter ); + } + + @Test + public void shouldBuildResultSummaryWithStatement() + { + Statement statement = new Statement( "UNWIND range(10, 100) AS x CREATE (:Node {name: $name, x: x})", + singletonMap( "name", "Apa" ) ); + + ResultSummary summary = extractSummary( statement, connectionMock(), 42, emptyMap() ); + + assertEquals( statement, summary.statement() ); + } + + @Test + public void shouldBuildResultSummaryWithServerInfo() + { + Connection connection = connectionMock( new BoltServerAddress( "server:42" ), ServerVersion.v3_2_0 ); + + ResultSummary summary = extractSummary( statement(), connection, 42, emptyMap() ); + + assertEquals( "server:42", summary.server().address() ); + assertEquals( "Neo4j/3.2.0", summary.server().version() ); + } + + @Test + public void shouldBuildResultSummaryWithStatementType() + { + assertEquals( READ_ONLY, createWithStatementType( value( "r" ) ).statementType() ); + assertEquals( READ_WRITE, createWithStatementType( value( "rw" ) ).statementType() ); + assertEquals( WRITE_ONLY, createWithStatementType( value( "w" ) ).statementType() ); + assertEquals( SCHEMA_WRITE, createWithStatementType( value( "s" ) ).statementType() ); + + assertNull( createWithStatementType( null ).statementType() ); + } + + @Test + public void shouldBuildResultSummaryWithCounters() + { + Value stats = parameters( + "nodes-created", value( 42 ), + "nodes-deleted", value( 4242 ), + "relationships-created", value( 24 ), + "relationships-deleted", value( 24 ), + "properties-set", null, + "labels-added", value( 5 ), + "labels-removed", value( 10 ), + "indexes-added", null, + "indexes-removed", value( 0 ), + "constraints-added", null, + "constraints-removed", value( 2 ) + ); + + Map metadata = singletonMap( "stats", stats ); + + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, metadata ); + + assertEquals( 42, summary.counters().nodesCreated() ); + assertEquals( 4242, summary.counters().nodesDeleted() ); + assertEquals( 24, summary.counters().relationshipsCreated() ); + assertEquals( 24, summary.counters().relationshipsDeleted() ); + assertEquals( 0, summary.counters().propertiesSet() ); + assertEquals( 5, summary.counters().labelsAdded() ); + assertEquals( 10, summary.counters().labelsRemoved() ); + assertEquals( 0, summary.counters().indexesAdded() ); + assertEquals( 0, summary.counters().indexesRemoved() ); + assertEquals( 0, summary.counters().constraintsAdded() ); + assertEquals( 2, summary.counters().constraintsRemoved() ); + } + + @Test + public void shouldBuildResultSummaryWithoutCounters() + { + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, emptyMap() ); + assertEquals( EMPTY_STATS, summary.counters() ); + } + + @Test + public void shouldBuildResultSummaryWithPlan() + { + Value plan = value( parameters( + "operatorType", "Projection", + "args", parameters( "n", 42 ), + "identifiers", values( "a", "b" ), + "children", values( + parameters( + "operatorType", "AllNodeScan", + "args", parameters( "x", 4242 ), + "identifiers", values( "n", "t", "f" ) + ) + ) + ) ); + Map metadata = singletonMap( "plan", plan ); + + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, metadata ); + + assertTrue( summary.hasPlan() ); + assertEquals( "Projection", summary.plan().operatorType() ); + assertEquals( singletonMap( "n", value( 42 ) ), summary.plan().arguments() ); + assertEquals( asList( "a", "b" ), summary.plan().identifiers() ); + + List children = summary.plan().children(); + assertEquals( 1, children.size() ); + Plan child = children.get( 0 ); + + assertEquals( "AllNodeScan", child.operatorType() ); + assertEquals( singletonMap( "x", value( 4242 ) ), child.arguments() ); + assertEquals( asList( "n", "t", "f" ), child.identifiers() ); + assertEquals( 0, child.children().size() ); + } + + @Test + public void shouldBuildResultSummaryWithoutPlan() + { + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, emptyMap() ); + assertFalse( summary.hasPlan() ); + assertNull( summary.plan() ); + } + + @Test + public void shouldBuildResultSummaryWithProfiledPlan() + { + Value profile = value( parameters( + "operatorType", "ProduceResult", + "args", parameters( "a", 42 ), + "identifiers", values( "a", "b" ), + "rows", value( 424242 ), + "dbHits", value( 242424 ), + "children", values( + parameters( + "operatorType", "LabelScan", + "args", parameters( "x", 1 ), + "identifiers", values( "y", "z" ), + "rows", value( 2 ), + "dbHits", value( 4 ) + ) + ) + ) ); + Map metadata = singletonMap( "profile", profile ); + + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, metadata ); + + assertTrue( summary.hasPlan() ); + assertTrue( summary.hasProfile() ); + assertEquals( "ProduceResult", summary.profile().operatorType() ); + assertEquals( singletonMap( "a", value( 42 ) ), summary.profile().arguments() ); + assertEquals( asList( "a", "b" ), summary.profile().identifiers() ); + assertEquals( 424242, summary.profile().records() ); + assertEquals( 242424, summary.profile().dbHits() ); + + List children = summary.profile().children(); + assertEquals( 1, children.size() ); + ProfiledPlan child = children.get( 0 ); + + assertEquals( "LabelScan", child.operatorType() ); + assertEquals( singletonMap( "x", value( 1 ) ), child.arguments() ); + assertEquals( asList( "y", "z" ), child.identifiers() ); + assertEquals( 2, child.records() ); + assertEquals( 4, child.dbHits() ); + } + + @Test + public void shouldBuildResultSummaryWithoutProfiledPlan() + { + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, emptyMap() ); + assertFalse( summary.hasProfile() ); + assertNull( summary.profile() ); + } + + @Test + public void shouldBuildResultSummaryWithNotifications() + { + Value notification1 = parameters( + "description", "Almost bad thing", + "code", "Neo.DummyNotification", + "title", "A title", + "severity", "WARNING", + "position", parameters( + "offset", 42, + "line", 4242, + "column", 424242 + ) + ); + Value notification2 = parameters( + "description", "Almost good thing", + "code", "Neo.GoodNotification", + "title", "Good", + "severity", "INFO", + "position", parameters( + "offset", 1, + "line", 2, + "column", 3 + ) + ); + Value notifications = value( notification1, notification2 ); + Map metadata = singletonMap( "notifications", notifications ); + + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, metadata ); + + assertEquals( 2, summary.notifications().size() ); + Notification firstNotification = summary.notifications().get( 0 ); + Notification secondNotification = summary.notifications().get( 1 ); + + assertEquals( "Almost bad thing", firstNotification.description() ); + assertEquals( "Neo.DummyNotification", firstNotification.code() ); + assertEquals( "A title", firstNotification.title() ); + assertEquals( "WARNING", firstNotification.severity() ); + assertEquals( new InternalInputPosition( 42, 4242, 424242 ), firstNotification.position() ); + + assertEquals( "Almost good thing", secondNotification.description() ); + assertEquals( "Neo.GoodNotification", secondNotification.code() ); + assertEquals( "Good", secondNotification.title() ); + assertEquals( "INFO", secondNotification.severity() ); + assertEquals( new InternalInputPosition( 1, 2, 3 ), secondNotification.position() ); + } + + @Test + public void shouldBuildResultSummaryWithoutNotifications() + { + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, emptyMap() ); + assertEquals( 0, summary.notifications().size() ); + } + + @Test + public void shouldBuildResultSummaryWithResultAvailableAfter() + { + int value = 42_000; + + ResultSummary summary = extractSummary( statement(), connectionMock(), value, emptyMap() ); + + assertEquals( 42, summary.resultAvailableAfter( TimeUnit.SECONDS ) ); + assertEquals( value, summary.resultAvailableAfter( TimeUnit.MILLISECONDS ) ); + } + + @Test + public void shouldBuildResultSummaryWithResultConsumedAfter() + { + int value = 42_000; + Map metadata = singletonMap( "result_consumed_after", value( value ) ); + + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, metadata ); + + assertEquals( 42, summary.resultConsumedAfter( TimeUnit.SECONDS ) ); + assertEquals( value, summary.resultConsumedAfter( TimeUnit.MILLISECONDS ) ); + } + + @Test + public void shouldBuildResultSummaryWithoutResultConsumedAfter() + { + ResultSummary summary = extractSummary( statement(), connectionMock(), 42, emptyMap() ); + assertEquals( -1, summary.resultConsumedAfter( TimeUnit.SECONDS ) ); + assertEquals( -1, summary.resultConsumedAfter( TimeUnit.MILLISECONDS ) ); + } + + private static ResultSummary createWithStatementType( Value typeValue ) + { + Map metadata = singletonMap( "type", typeValue ); + return extractSummary( statement(), connectionMock(), 42, metadata ); + } + + private static Statement statement() + { + return new Statement( "RETURN 1" ); + } + + private static Connection connectionMock() + { + return connectionMock( BoltServerAddress.LOCAL_DEFAULT, ServerVersion.v3_1_0 ); + } + + private static Connection connectionMock( BoltServerAddress address, ServerVersion version ) + { + Connection connection = mock( Connection.class ); + when( connection.serverAddress() ).thenReturn( address ); + when( connection.serverVersion() ).thenReturn( version ); + return connection; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 11613a3453..c280e97c92 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -1104,6 +1104,55 @@ public void shouldAllowBlockingOperationInCommonPoolWhenChaining() assertEquals( 1, countNodesByLabel( "Node" ) ); } + @Test + public void shouldAllowAccessingRecordsAfterSummary() + { + int recordCount = 10_000; + String query = "UNWIND range(1, " + recordCount + ") AS x RETURN 'Hello-' + x"; + + CompletionStage summaryAndRecordsStage = session.runAsync( query ) + .thenCompose( cursor -> cursor.summaryAsync().thenCompose( summary -> cursor.listAsync() + .thenApply( records -> new SummaryAndRecords( summary, records ) ) ) ); + + SummaryAndRecords summaryAndRecords = await( summaryAndRecordsStage ); + ResultSummary summary = summaryAndRecords.summary; + List records = summaryAndRecords.records; + + assertNotNull( summary ); + assertNotNull( records ); + + assertEquals( neo4j.address().toString(), summary.server().address() ); + assertEquals( query, summary.statement().text() ); + assertEquals( StatementType.READ_ONLY, summary.statementType() ); + + assertEquals( recordCount, records.size() ); + for ( int i = 1; i <= recordCount; i++ ) + { + Record record = records.get( i - 1 ); + assertEquals( "Hello-" + i, record.get( 0 ).asString() ); + } + } + + @Test + public void shouldAllowAccessingRecordsAfterSessionClosed() + { + int recordCount = 7_500; + String query = "UNWIND range(1, " + recordCount + ") AS x RETURN x"; + + CompletionStage> recordsStage = session.runAsync( query ) + .thenCompose( cursor -> session.closeAsync().thenApply( ignore -> cursor ) ) + .thenCompose( StatementResultCursor::listAsync ); + + List records = await( recordsStage ); + + assertEquals( recordCount, records.size() ); + for ( int i = 1; i <= recordCount; i++ ) + { + Record record = records.get( i - 1 ); + assertEquals( i, record.get( 0 ).asInt() ); + } + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); @@ -1293,4 +1342,16 @@ private void processFetchResult( Record record, Throwable error, CompletableFutu } } } + + private static class SummaryAndRecords + { + final ResultSummary summary; + final List records; + + SummaryAndRecords( ResultSummary summary, List records ) + { + this.summary = summary; + this.records = records; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 1c460a5be1..be532aaa37 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -63,6 +63,8 @@ import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.TransientException; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.summary.StatementType; import org.neo4j.driver.v1.util.TestNeo4j; import static java.lang.String.format; @@ -89,6 +91,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.neo4j.driver.internal.util.Matchers.arithmeticError; import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError; import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0; import static org.neo4j.driver.v1.Values.parameters; @@ -1331,6 +1334,77 @@ public void shouldNotRetryOnConnectionAcquisitionTimeout() assertEquals( 0, invocations.get() ); } + @Test + public void shouldAllowConsumingRecordsAfterFailureInSessionClose() + { + Session session = neo4j.driver().session(); + + StatementResult result = session.run( "UNWIND [2, 4, 8, 0] AS x RETURN 32 / x" ); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e, is( arithmeticError() ) ); + } + + assertTrue( result.hasNext() ); + assertEquals( 16, result.next().get( 0 ).asInt() ); + assertTrue( result.hasNext() ); + assertEquals( 8, result.next().get( 0 ).asInt() ); + assertTrue( result.hasNext() ); + assertEquals( 4, result.next().get( 0 ).asInt() ); + assertFalse( result.hasNext() ); + } + + @Test + public void shouldAllowAccessingRecordsAfterSummary() + { + int recordCount = 10_000; + String query = "UNWIND range(1, " + recordCount + ") AS x RETURN x"; + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( query ); + + ResultSummary summary = result.summary(); + assertEquals( query, summary.statement().text() ); + assertEquals( StatementType.READ_ONLY, summary.statementType() ); + + List records = result.list(); + assertEquals( recordCount, records.size() ); + for ( int i = 1; i <= recordCount; i++ ) + { + Record record = records.get( i - 1 ); + assertEquals( i, record.get( 0 ).asInt() ); + } + } + } + + @Test + public void shouldAllowAccessingRecordsAfterSessionClosed() + { + int recordCount = 11_333; + String query = "UNWIND range(1, " + recordCount + ") AS x RETURN 'Result-' + x"; + + StatementResult result; + try ( Session session = neo4j.driver().session() ) + { + result = session.run( query ); + } + + List records = result.list(); + assertEquals( recordCount, records.size() ); + for ( int i = 1; i <= recordCount; i++ ) + { + Record record = records.get( i - 1 ); + assertEquals( "Result-" + i, record.get( 0 ).asString() ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index be641e508a..ddbf8ad9b2 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -26,6 +26,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -74,6 +75,11 @@ public static T await( CompletionStage stage ) return await( future ); } + public static T await( CompletableFuture future ) + { + return await( (Future) future ); + } + public static > T await( U future ) { try