Skip to content

Update to using the local variable type inference in backend, examples #1449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public AsyncTransactionFunctionExample(String uri, String user, String password)

// tag::async-transaction-function[]
public CompletionStage<ResultSummary> printAllProducts() {
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
var query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String, Object> parameters = Collections.singletonMap("id", 0);

AsyncSession session = driver.session(AsyncSession.class);
var session = driver.session(AsyncSession.class);

return session.executeReadAsync(tx -> tx.runAsync(query, parameters)
.thenCompose(cursor -> cursor.forEachAsync(record ->
Expand Down
100 changes: 50 additions & 50 deletions examples/src/test/java/org/neo4j/docs/driver/ExamplesIT.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class RoutingExamplesIT {
@Test
void testShouldRunConfigCustomResolverExample() {
// Given
URI boltUri = URI.create(NEO4J_CONTAINER.getBoltUrl());
String neo4jUrl = String.format("neo4j://%s:%d", boltUri.getHost(), boltUri.getPort());
try (ConfigCustomResolverExample example = new ConfigCustomResolverExample(
var boltUri = URI.create(NEO4J_CONTAINER.getBoltUrl());
var neo4jUrl = String.format("neo4j://%s:%d", boltUri.getHost(), boltUri.getPort());
try (var example = new ConfigCustomResolverExample(
neo4jUrl, AuthTokens.none(), ServerAddress.of(boltUri.getHost(), boltUri.getPort()))) {
// Then
assertTrue(example.canConnect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package neo4j.org.testkit.backend;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
Expand All @@ -35,7 +34,7 @@
public class Runner {
public static void main(String[] args) throws InterruptedException {
TestkitRequestProcessorHandler.BackendMode backendMode;
String modeArg = args.length > 0 ? args[0] : null;
var modeArg = args.length > 0 ? args[0] : null;
if ("async".equals(modeArg)) {
backendMode = TestkitRequestProcessorHandler.BackendMode.ASYNC;
} else if ("reactive-legacy".equals(modeArg)) {
Expand All @@ -54,7 +53,7 @@ public static void main(String[] args) throws InterruptedException {

EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
var bootstrap = new ServerBootstrap();
bootstrap
.group(group)
.channel(NioServerSocketChannel.class)
Expand All @@ -68,7 +67,7 @@ protected void initChannel(SocketChannel channel) {
channel.pipeline().addLast(new TestkitRequestProcessorHandler(backendMode, logging));
}
});
ChannelFuture server = bootstrap.bind().sync();
var server = bootstrap.bind().sync();
server.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class RxBufferedSubscriber<T> extends BaseSubscriber<T> {

public RxBufferedSubscriber(long fetchSize) {
this.fetchSize = fetchSize;
AtomicReference<FluxSink<T>> sinkRef = new AtomicReference<>();
var sinkRef = new AtomicReference<FluxSink<T>>();
itemsSubscriber = new OneSignalSubscriber<>();
Flux.<T>create(fluxSink -> {
sinkRef.set(fluxSink);
Expand Down Expand Up @@ -121,7 +121,7 @@ private void requestFromUpstream() {
if (moreItemsPending) {
return;
}
Subscription subscription = subscriptionFuture.getNow(null);
var subscription = subscriptionFuture.getNow(null);
if (subscription == null) {
throw new IllegalStateException("Upstream subscription must not be null at this stage");
}
Expand Down Expand Up @@ -169,7 +169,7 @@ protected void hookOnSubscribe(Subscription subscription) {

@Override
protected void hookOnNext(T value) {
MonoSink<T> sink = executeWithLock(lock, () -> {
var sink = executeWithLock(lock, () -> {
emitted = true;
return this.sink;
});
Expand All @@ -178,7 +178,7 @@ protected void hookOnNext(T value) {

@Override
protected void hookOnComplete() {
MonoSink<T> sink = executeWithLock(lock, () -> {
var sink = executeWithLock(lock, () -> {
completionFuture.complete(null);
return !emitted ? this.sink : null;
});
Expand All @@ -189,7 +189,7 @@ protected void hookOnComplete() {

@Override
protected void hookOnError(Throwable throwable) {
MonoSink<T> sink = executeWithLock(lock, () -> {
var sink = executeWithLock(lock, () -> {
completionFuture.completeExceptionally(throwable);
return !emitted ? this.sink : null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,22 @@ public void removeAuthProvider(String id) {
}

private <T> String add(T value, Map<String, T> idToT) {
String id = newId();
var id = newId();
idToT.put(id, value);
return id;
}

private <T> T get(String id, Map<String, T> idToT, String notFoundMessage) {
T value = idToT.get(id);
var value = idToT.get(id);
if (value == null) {
throw new RuntimeException(notFoundMessage);
}
return value;
}

private <T> CompletableFuture<T> getAsync(String id, Map<String, T> idToT, String notFoundMessage) {
CompletableFuture<T> result = new CompletableFuture<>();
T value = idToT.get(id);
var result = new CompletableFuture<T>();
var value = idToT.get(id);
if (value == null) {
result.completeExceptionally(new RuntimeException(notFoundMessage));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public class TestkitMessageInboundHandler extends SimpleChannelInboundHandler<By

@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
String requestStr = byteBuf.toString(CharsetUtil.UTF_8);
var requestStr = byteBuf.toString(CharsetUtil.UTF_8);
requestBuffer.append(requestStr);

List<String> testkitMessages = new ArrayList<>();
Optional<String> testkitMessageOpt = extractTestkitMessage();
var testkitMessageOpt = extractTestkitMessage();
while (testkitMessageOpt.isPresent()) {
testkitMessages.add(testkitMessageOpt.get());
testkitMessageOpt = extractTestkitMessage();
Expand All @@ -45,18 +45,18 @@ public void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
}

private Optional<String> extractTestkitMessage() {
String requestEndMarker = "#request end\n";
int endMarkerIndex = requestBuffer.indexOf(requestEndMarker);
var requestEndMarker = "#request end\n";
var endMarkerIndex = requestBuffer.indexOf(requestEndMarker);
if (endMarkerIndex < 0) {
return Optional.empty();
}
String requestBeginMarker = "#request begin\n";
int beginMarkerIndex = requestBuffer.indexOf(requestBeginMarker);
var requestBeginMarker = "#request begin\n";
var beginMarkerIndex = requestBuffer.indexOf(requestBeginMarker);
if (beginMarkerIndex != 0) {
throw new RuntimeException("Unexpected data in message buffer");
}
// extract Testkit message without markers
String testkitMessage = requestBuffer.substring(requestBeginMarker.length(), endMarkerIndex);
var testkitMessage = requestBuffer.substring(requestBeginMarker.length(), endMarkerIndex);
if (testkitMessage.contains(requestBeginMarker) || testkitMessage.contains(requestEndMarker)) {
throw new RuntimeException("Testkit message contains request markers");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package neo4j.org.testkit.backend.channel.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
Expand All @@ -28,9 +27,9 @@
public class TestkitMessageOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
String testkitResponseStr = (String) msg;
String testkitMessage = String.format("#response begin\n%s\n#response end\n", testkitResponseStr);
ByteBuf byteBuf = Unpooled.copiedBuffer(testkitMessage, StandardCharsets.UTF_8);
var testkitResponseStr = (String) msg;
var testkitMessage = String.format("#response begin\n%s\n#response end\n", testkitResponseStr);
var byteBuf = Unpooled.copiedBuffer(testkitMessage, StandardCharsets.UTF_8);
ctx.writeAndFlush(byteBuf, promise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// resolvers support, is blocking.
requestExecutorService.execute(() -> {
try {
TestkitRequest request = (TestkitRequest) msg;
CompletionStage<TestkitResponse> responseStage = processorImpl.apply(request, testkitState);
var request = (TestkitRequest) msg;
var responseStage = processorImpl.apply(request, testkitState);
responseStage.whenComplete((response, throwable) -> {
if (throwable != null) {
ctx.writeAndFlush(createErrorResponse(throwable));
Expand All @@ -96,7 +96,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

private static CompletionStage<TestkitResponse> wrapSyncRequest(
TestkitRequest testkitRequest, TestkitState testkitState) {
CompletableFuture<TestkitResponse> result = new CompletableFuture<>();
var result = new CompletableFuture<TestkitResponse>();
try {
result.complete(testkitRequest.process(testkitState));
} catch (Throwable t) {
Expand All @@ -115,8 +115,8 @@ private TestkitResponse createErrorResponse(Throwable throwable) {
throwable = throwable.getCause();
}
if (throwable instanceof Neo4jException) {
String id = testkitState.newId();
Neo4jException e = (Neo4jException) throwable;
var id = testkitState.newId();
var e = (Neo4jException) throwable;
testkitState.getErrors().put(id, e);
return DriverError.builder()
.data(DriverError.DriverErrorBody.builder()
Expand All @@ -130,7 +130,7 @@ private TestkitResponse createErrorResponse(Throwable throwable) {
|| throwable instanceof UntrustedServerException
|| throwable instanceof NoSuchRecordException
|| throwable instanceof ZoneRulesException) {
String id = testkitState.newId();
var id = testkitState.newId();
testkitState.getErrors().put(id, (Exception) throwable);
return DriverError.builder()
.data(DriverError.DriverErrorBody.builder()
Expand All @@ -141,7 +141,7 @@ private TestkitResponse createErrorResponse(Throwable throwable) {
.build();
} else if (throwable instanceof CustomDriverError) {
throwable = throwable.getCause();
String id = testkitState.newId();
var id = testkitState.newId();
return DriverError.builder()
.data(DriverError.DriverErrorBody.builder()
.id(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public TestkitRequestResponseMapperHandler(Logging logging) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String testkitMessage = (String) msg;
var testkitMessage = (String) msg;
log.debug("Inbound Testkit message '%s'", testkitMessage.trim());
TestkitRequest testkitRequest;
testkitRequest = objectMapper.readValue(testkitMessage, TestkitRequest.class);
Expand All @@ -48,15 +48,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
TestkitResponse testkitResponse = (TestkitResponse) msg;
String responseStr = objectMapper.writeValueAsString(testkitResponse);
var testkitResponse = (TestkitResponse) msg;
var responseStr = objectMapper.writeValueAsString(testkitResponse);
log.debug("Outbound Testkit message '%s'", responseStr.trim());
ctx.writeAndFlush(responseStr, promise);
}

public static ObjectMapper newObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
TestkitModule testkitModule = new TestkitModule();
var objectMapper = new ObjectMapper();
var testkitModule = new TestkitModule();
objectMapper.registerModule(testkitModule);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
return objectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@
import neo4j.org.testkit.backend.messages.responses.NullRecord;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import reactor.core.publisher.Mono;

public abstract class AbstractResultNext implements TestkitRequest {
@Override
public TestkitResponse process(TestkitState testkitState) {
try {
Result result = testkitState.getResultHolder(getResultId()).getResult();
var result = testkitState.getResultHolder(getResultId()).getResult();
return createResponse(result.next());
} catch (NoSuchRecordException ignored) {
return NullRecord.builder().build();
Expand All @@ -57,15 +56,12 @@ public CompletionStage<TestkitResponse> processAsync(TestkitState testkitState)
@Override
public Mono<TestkitResponse> processRx(TestkitState testkitState) {
return testkitState.getRxResultHolder(getResultId()).flatMap(resultHolder -> {
RxBufferedSubscriber<Record> subscriber = resultHolder
.getSubscriber()
.orElseGet(() -> {
RxBufferedSubscriber<Record> subscriberInstance =
new RxBufferedSubscriber<>(getFetchSize(resultHolder));
resultHolder.setSubscriber(subscriberInstance);
resultHolder.getResult().records().subscribe(subscriberInstance);
return subscriberInstance;
});
var subscriber = resultHolder.getSubscriber().orElseGet(() -> {
var subscriberInstance = new RxBufferedSubscriber<Record>(getFetchSize(resultHolder));
resultHolder.setSubscriber(subscriberInstance);
resultHolder.getResult().records().subscribe(subscriberInstance);
return subscriberInstance;
});
return subscriber
.next()
.map(this::createResponse)
Expand All @@ -76,15 +72,12 @@ public Mono<TestkitResponse> processRx(TestkitState testkitState) {
@Override
public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
return testkitState.getReactiveResultHolder(getResultId()).flatMap(resultHolder -> {
RxBufferedSubscriber<Record> subscriber = resultHolder
.getSubscriber()
.orElseGet(() -> {
RxBufferedSubscriber<Record> subscriberInstance =
new RxBufferedSubscriber<>(getFetchSize(resultHolder));
resultHolder.setSubscriber(subscriberInstance);
resultHolder.getResult().records().subscribe(toFlowSubscriber(subscriberInstance));
return subscriberInstance;
});
var subscriber = resultHolder.getSubscriber().orElseGet(() -> {
var subscriberInstance = new RxBufferedSubscriber<Record>(getFetchSize(resultHolder));
resultHolder.setSubscriber(subscriberInstance);
resultHolder.getResult().records().subscribe(toFlowSubscriber(subscriberInstance));
return subscriberInstance;
});
return subscriber
.next()
.map(this::createResponse)
Expand All @@ -95,15 +88,12 @@ public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
@Override
public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
return testkitState.getReactiveResultStreamsHolder(getResultId()).flatMap(resultHolder -> {
RxBufferedSubscriber<Record> subscriber = resultHolder
.getSubscriber()
.orElseGet(() -> {
RxBufferedSubscriber<Record> subscriberInstance =
new RxBufferedSubscriber<>(getFetchSize(resultHolder));
resultHolder.setSubscriber(subscriberInstance);
resultHolder.getResult().records().subscribe(subscriberInstance);
return subscriberInstance;
});
var subscriber = resultHolder.getSubscriber().orElseGet(() -> {
var subscriberInstance = new RxBufferedSubscriber<Record>(getFetchSize(resultHolder));
resultHolder.setSubscriber(subscriberInstance);
resultHolder.getResult().records().subscribe(subscriberInstance);
return subscriberInstance;
});
return subscriber
.next()
.map(this::createResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import lombok.Getter;
import lombok.Setter;
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.holder.DriverHolder;
import neo4j.org.testkit.backend.messages.responses.DriverIsEncrypted;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -59,7 +58,7 @@ public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
}

private DriverIsEncrypted createResponse(TestkitState testkitState) {
DriverHolder driverHolder = testkitState.getDriverHolder(data.getDriverId());
var driverHolder = testkitState.getDriverHolder(data.getDriverId());
return DriverIsEncrypted.builder()
.data(DriverIsEncrypted.DriverIsEncryptedBody.builder()
.encrypted(driverHolder.getDriver().isEncrypted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class CheckMultiDBSupport implements TestkitRequest {

@Override
public TestkitResponse process(TestkitState testkitState) {
String driverId = data.getDriverId();
boolean available = testkitState.getDriverHolder(driverId).getDriver().supportsMultiDb();
var driverId = data.getDriverId();
var available = testkitState.getDriverHolder(driverId).getDriver().supportsMultiDb();
return createResponse(available);
}

Expand Down
Loading