Skip to content

Commit c93587e

Browse files
authored
Add support for Resolver and DomainNameResolver to async Testkit backend (#992)
1 parent 249fee9 commit c93587e

File tree

10 files changed

+183
-87
lines changed

10 files changed

+183
-87
lines changed

testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
package neo4j.org.testkit.backend;
2020

2121
import lombok.Getter;
22+
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
23+
import neo4j.org.testkit.backend.messages.responses.TestkitCallback;
2224
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2325

24-
import java.net.InetAddress;
2526
import java.util.HashMap;
2627
import java.util.Map;
27-
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CompletionStage;
2830
import java.util.function.Consumer;
2931
import java.util.function.Supplier;
3032

@@ -35,7 +37,6 @@
3537
import org.neo4j.driver.async.ResultCursor;
3638
import org.neo4j.driver.exceptions.Neo4jException;
3739
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
38-
import org.neo4j.driver.net.ServerAddress;
3940

4041
@Getter
4142
public class TestkitState
@@ -52,15 +53,24 @@ public class TestkitState
5253
private int idGenerator = 0;
5354
private final Consumer<TestkitResponse> responseWriter;
5455
private final Supplier<Boolean> processor;
55-
private final Map<String,Set<ServerAddress>> idToServerAddresses = new HashMap<>();
56-
private final Map<String,InetAddress[]> idToResolvedAddresses = new HashMap<>();
56+
private final Map<String,CompletableFuture<TestkitCallbackResult>> callbackIdToFuture = new HashMap<>();
5757

5858
public TestkitState( Consumer<TestkitResponse> responseWriter, Supplier<Boolean> processor )
5959
{
6060
this.responseWriter = responseWriter;
6161
this.processor = processor;
6262
}
6363

64+
public CompletionStage<TestkitCallbackResult> dispatchTestkitCallback( TestkitCallback response )
65+
{
66+
CompletableFuture<TestkitCallbackResult> future = new CompletableFuture<>();
67+
callbackIdToFuture.put( response.getCallbackId(), future );
68+
responseWriter.accept( response );
69+
// This is required for sync backend, but should be removed during migration to Netty implementation.
70+
processor.get();
71+
return future;
72+
}
73+
6474
public String newId()
6575
{
6676
return String.valueOf( idGenerator++ );

testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import neo4j.org.testkit.backend.messages.responses.DriverError;
2828
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2929

30+
import java.util.concurrent.CompletableFuture;
3031
import java.util.concurrent.CompletionException;
3132

3233
import org.neo4j.driver.exceptions.Neo4jException;
@@ -49,20 +50,25 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception
4950
public void channelRead( ChannelHandlerContext ctx, Object msg )
5051
{
5152
TestkitRequest testkitRequest = (TestkitRequest) msg;
52-
try
53-
{
54-
testkitRequest.processAsync( testkitState )
55-
.thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) )
56-
.exceptionally( throwable ->
57-
{
58-
ctx.writeAndFlush( createErrorResponse( throwable ) );
59-
return null;
60-
} );
61-
}
62-
catch ( Throwable throwable )
63-
{
64-
ctx.writeAndFlush( createErrorResponse( throwable ) );
65-
}
53+
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking.
54+
CompletableFuture.runAsync(
55+
() ->
56+
{
57+
try
58+
{
59+
testkitRequest.processAsync( testkitState )
60+
.thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) )
61+
.exceptionally( throwable ->
62+
{
63+
ctx.writeAndFlush( createErrorResponse( throwable ) );
64+
return null;
65+
} );
66+
}
67+
catch ( Throwable throwable )
68+
{
69+
ctx.writeAndFlush( createErrorResponse( throwable ) );
70+
}
71+
} );
6672
}
6773

6874
private TestkitResponse createErrorResponse( Throwable throwable )

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,55 +21,26 @@
2121
import lombok.Getter;
2222
import lombok.NoArgsConstructor;
2323
import lombok.Setter;
24-
import neo4j.org.testkit.backend.TestkitState;
25-
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2624

27-
import java.net.InetAddress;
28-
import java.net.UnknownHostException;
2925
import java.util.List;
30-
import java.util.Optional;
31-
import java.util.concurrent.CompletionStage;
3226

3327
@Setter
3428
@Getter
3529
@NoArgsConstructor
36-
public class DomainNameResolutionCompleted implements TestkitRequest
30+
public class DomainNameResolutionCompleted implements TestkitCallbackResult
3731
{
3832
private DomainNameResolutionCompletedBody data;
3933

4034
@Override
41-
public TestkitResponse process( TestkitState testkitState )
35+
public String getCallbackId()
4236
{
43-
testkitState.getIdToResolvedAddresses().put(
44-
data.getRequestId(),
45-
data.getAddresses()
46-
.stream()
47-
.map(
48-
addr ->
49-
{
50-
try
51-
{
52-
return InetAddress.getByName( addr );
53-
}
54-
catch ( UnknownHostException e )
55-
{
56-
throw new RuntimeException( e );
57-
}
58-
} )
59-
.toArray( InetAddress[]::new ) );
60-
return null;
61-
}
62-
63-
@Override
64-
public CompletionStage<Optional<TestkitResponse>> processAsync( TestkitState testkitState )
65-
{
66-
throw new UnsupportedOperationException();
37+
return data.getRequestId();
6738
}
6839

6940
@Setter
7041
@Getter
7142
@NoArgsConstructor
72-
private static class DomainNameResolutionCompletedBody
43+
public static class DomainNameResolutionCompletedBody
7344
{
7445
private String requestId;
7546
private List<String> addresses;

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,20 @@
3030
import neo4j.org.testkit.backend.messages.responses.ResolverResolutionRequired;
3131
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
3232

33+
import java.net.InetAddress;
3334
import java.net.URI;
35+
import java.net.UnknownHostException;
36+
import java.util.LinkedHashSet;
3437
import java.util.Optional;
3538
import java.util.concurrent.CompletableFuture;
3639
import java.util.concurrent.CompletionStage;
3740
import java.util.concurrent.TimeUnit;
41+
import java.util.stream.Collectors;
3842

3943
import org.neo4j.driver.AuthToken;
4044
import org.neo4j.driver.AuthTokens;
4145
import org.neo4j.driver.Config;
46+
import org.neo4j.driver.internal.BoltServerAddress;
4247
import org.neo4j.driver.internal.DefaultDomainNameResolver;
4348
import org.neo4j.driver.internal.DomainNameResolver;
4449
import org.neo4j.driver.internal.DriverFactory;
@@ -128,9 +133,20 @@ private ServerAddressResolver callbackResolver( TestkitState testkitState )
128133
ResolverResolutionRequired.builder()
129134
.data( body )
130135
.build();
131-
testkitState.getResponseWriter().accept( response );
132-
testkitState.getProcessor().get();
133-
return testkitState.getIdToServerAddresses().remove( callbackId );
136+
CompletionStage<TestkitCallbackResult> c = testkitState.dispatchTestkitCallback( response );
137+
ResolverResolutionCompleted resolutionCompleted;
138+
try
139+
{
140+
resolutionCompleted = (ResolverResolutionCompleted) c.toCompletableFuture().get();
141+
}
142+
catch ( Exception e )
143+
{
144+
throw new RuntimeException( e );
145+
}
146+
return resolutionCompleted.getData().getAddresses()
147+
.stream()
148+
.map( BoltServerAddress::new )
149+
.collect( Collectors.toCollection( LinkedHashSet::new ) );
134150
};
135151
}
136152

@@ -144,13 +160,37 @@ private DomainNameResolver callbackDomainNameResolver( TestkitState testkitState
144160
.id( callbackId )
145161
.name( address )
146162
.build();
147-
DomainNameResolutionRequired response =
163+
DomainNameResolutionRequired callback =
148164
DomainNameResolutionRequired.builder()
149165
.data( body )
150166
.build();
151-
testkitState.getResponseWriter().accept( response );
152-
testkitState.getProcessor().get();
153-
return testkitState.getIdToResolvedAddresses().remove( callbackId );
167+
168+
CompletionStage<TestkitCallbackResult> callbackStage = testkitState.dispatchTestkitCallback( callback );
169+
DomainNameResolutionCompleted resolutionCompleted;
170+
try
171+
{
172+
resolutionCompleted = (DomainNameResolutionCompleted) callbackStage.toCompletableFuture().get();
173+
}
174+
catch ( Exception e )
175+
{
176+
throw new RuntimeException( "Unexpected failure during Testkit callback", e );
177+
}
178+
179+
return resolutionCompleted.getData().getAddresses()
180+
.stream()
181+
.map(
182+
addr ->
183+
{
184+
try
185+
{
186+
return InetAddress.getByName( addr );
187+
}
188+
catch ( UnknownHostException e )
189+
{
190+
throw new RuntimeException( e );
191+
}
192+
} )
193+
.toArray( InetAddress[]::new );
154194
};
155195
}
156196

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,36 +21,20 @@
2121
import lombok.Getter;
2222
import lombok.NoArgsConstructor;
2323
import lombok.Setter;
24-
import neo4j.org.testkit.backend.TestkitState;
25-
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2624

27-
import java.util.LinkedHashSet;
2825
import java.util.List;
29-
import java.util.Optional;
30-
import java.util.concurrent.CompletionStage;
31-
import java.util.stream.Collectors;
32-
33-
import org.neo4j.driver.internal.BoltServerAddress;
3426

3527
@Setter
3628
@Getter
3729
@NoArgsConstructor
38-
public class ResolverResolutionCompleted implements TestkitRequest
30+
public class ResolverResolutionCompleted implements TestkitCallbackResult
3931
{
4032
private ResolverResolutionCompletedBody data;
4133

4234
@Override
43-
public TestkitResponse process( TestkitState testkitState )
44-
{
45-
testkitState.getIdToServerAddresses().put( data.getRequestId(), data.getAddresses().stream().map( BoltServerAddress::new )
46-
.collect( Collectors.toCollection( LinkedHashSet::new ) ) );
47-
return null;
48-
}
49-
50-
@Override
51-
public CompletionStage<Optional<TestkitResponse>> processAsync( TestkitState testkitState )
35+
public String getCallbackId()
5236
{
53-
throw new UnsupportedOperationException();
37+
return data.getRequestId();
5438
}
5539

5640
@Setter

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,7 @@ public class StartTest implements TestkitRequest
4141

4242
static
4343
{
44-
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_fail_when_driver_closed_using_session_run$", "Does not throw error" );
45-
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_read_successfully_on_empty_discovery_result_using_session_run$", "Resolver not implemented" );
46-
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_request_rt_from_all_initial_routers_until_successful", "Resolver not implemented" );
47-
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors", "Resolver not implemented" );
48-
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_successfully_acquire_rt_when_router_ip_changes$", "Resolver not implemented" );
49-
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_use_resolver_during_rediscovery_when_existing_routers_fail$", "Resolver not implemented" );
50-
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0", "Does not error as expected" );
44+
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0$", "Does not error as expected" );
5145
}
5246

5347
private StartTestBody data;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend.messages.requests;
20+
21+
import neo4j.org.testkit.backend.TestkitState;
22+
import neo4j.org.testkit.backend.messages.responses.TestkitCallback;
23+
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
24+
25+
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CompletionStage;
28+
29+
/**
30+
* This request is sent by Testkit in response to previously sent {@link TestkitCallback}.
31+
*/
32+
public interface TestkitCallbackResult extends TestkitRequest
33+
{
34+
String getCallbackId();
35+
36+
@Override
37+
default TestkitResponse process( TestkitState testkitState )
38+
{
39+
testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this );
40+
return null;
41+
}
42+
43+
@Override
44+
default CompletionStage<Optional<TestkitResponse>> processAsync( TestkitState testkitState )
45+
{
46+
testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this );
47+
return CompletableFuture.completedFuture( Optional.empty() );
48+
}
49+
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
@Setter
2626
@Getter
2727
@Builder
28-
public class DomainNameResolutionRequired implements TestkitResponse
28+
public class DomainNameResolutionRequired implements TestkitCallback
2929
{
3030
private DomainNameResolutionRequiredBody data;
3131

@@ -35,6 +35,12 @@ public String testkitName()
3535
return "DomainNameResolutionRequired";
3636
}
3737

38+
@Override
39+
public String getCallbackId()
40+
{
41+
return data.getId();
42+
}
43+
3844
@Setter
3945
@Getter
4046
@Builder

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ResolverResolutionRequired.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
@Setter
2626
@Getter
2727
@Builder
28-
public class ResolverResolutionRequired implements TestkitResponse
28+
public class ResolverResolutionRequired implements TestkitCallback
2929
{
3030
private ResolverResolutionRequiredBody data;
3131

@@ -35,6 +35,12 @@ public String testkitName()
3535
return "ResolverResolutionRequired";
3636
}
3737

38+
@Override
39+
public String getCallbackId()
40+
{
41+
return data.getId();
42+
}
43+
3844
@Setter
3945
@Getter
4046
@Builder

0 commit comments

Comments
 (0)