Skip to content

Adding tx with built-in retries support for RxSession #582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions driver/src/main/java/org/neo4j/driver/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.neo4j.driver;

import java.util.Map;
import java.util.function.Consumer;

import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.util.Resource;
Expand Down Expand Up @@ -85,20 +84,6 @@ public interface Session extends Resource, StatementRunner
*/
Transaction beginTransaction( TransactionConfig config );

/**
* Begin a new <em>explicit {@linkplain Transaction transaction}</em>,
* requiring that the server hosting is at least as up-to-date as the
* transaction referenced by the supplied <em>bookmark</em>.
*
* @param bookmark a reference to a previous transaction
* @return a new {@link Transaction}
* @deprecated This method is deprecated in favour of {@link Driver#session(Consumer)} that accepts an initial
* bookmark. Session will ensure that all nested transactions are chained with bookmarks to guarantee
* causal consistency. <b>This method will be removed in the next major release.</b>
*/
@Deprecated
Transaction beginTransaction( String bookmark );

/**
* Execute given unit of work in a {@link AccessMode#READ read} transaction.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.neo4j.driver.async;

import org.neo4j.driver.Transaction;

/**
* Callback that executes operations against a given {@link Transaction}.
* Callback that executes operations against a given {@link AsyncTransaction}.
* To be used with {@link AsyncSession#readTransactionAsync(AsyncTransactionWork)} and
* {@link AsyncSession#writeTransactionAsync(AsyncTransactionWork)} (AsyncTransactionWork)} methods.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,44 @@
package org.neo4j.driver.internal;

import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.async.AsyncStatementRunner;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.Extract;
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.Record;
import org.neo4j.driver.Statement;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.StatementRunner;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.Extract;
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.types.TypeSystem;

public abstract class AbstractStatementRunner implements StatementRunner, AsyncStatementRunner
public abstract class AbstractStatementRunner implements StatementRunner
{
@Override
public final StatementResult run( String statementTemplate, Value parameters )
{
return run( new Statement( statementTemplate, parameters ) );
}

@Override
public final CompletionStage<StatementResultCursor> runAsync( String statementTemplate, Value parameters )
{
return runAsync( new Statement( statementTemplate, parameters ) );
}

@Override
public final StatementResult run( String statementTemplate, Map<String,Object> statementParameters )
{
return run( statementTemplate, parameters( statementParameters ) );
}

@Override
public final CompletionStage<StatementResultCursor> runAsync( String statementTemplate, Map<String,Object> statementParameters )
{
return runAsync( statementTemplate, parameters( statementParameters ) );
}

@Override
public final StatementResult run( String statementTemplate, Record statementParameters )
{
return run( statementTemplate, parameters( statementParameters ) );
}

@Override
public final CompletionStage<StatementResultCursor> runAsync( String statementTemplate, Record statementParameters )
{
return runAsync( statementTemplate, parameters( statementParameters ) );
}

@Override
public final StatementResult run( String statementText )
{
return run( statementText, Values.EmptyMap );
}

@Override
public final CompletionStage<StatementResultCursor> runAsync( String statementText )
{
return runAsync( statementText, Values.EmptyMap );
}

@Override
public final TypeSystem typeSystem()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.internal.async.DecoratedConnection;
import org.neo4j.driver.internal.async.connection.DecoratedConnection;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.ChannelConnector;
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
import org.neo4j.driver.internal.async.connection.BootstrapFactory;
import org.neo4j.driver.internal.async.connection.ChannelConnector;
import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl;
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.DnsResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionParametersTemplate;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.internal.async.InternalAsyncSession;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.metrics.MetricsProvider;
import org.neo4j.driver.internal.reactive.InternalRxSession;
import org.neo4j.driver.internal.security.SecurityPlan;
Expand Down Expand Up @@ -57,15 +59,15 @@ public class InternalDriver implements Driver
@Override
public Session session()
{
return newSession( SessionParameters.empty() );
return new InternalSession( newSession( SessionParameters.empty() ) );
}

@Override
public Session session( Consumer<SessionParametersTemplate> templateConsumer )
{
SessionParameters.Template template = SessionParameters.template();
templateConsumer.accept( template );
return newSession( template.build() );
return new InternalSession( newSession( template.build() ) );
}

@Override
Expand All @@ -85,15 +87,15 @@ public RxSession rxSession( Consumer<SessionParametersTemplate> templateConsumer
@Override
public AsyncSession asyncSession()
{
return newSession( SessionParameters.empty() );
return new InternalAsyncSession( newSession( SessionParameters.empty() ) );
}

@Override
public AsyncSession asyncSession( Consumer<SessionParametersTemplate> templateConsumer )
{
SessionParameters.Template template = SessionParameters.template();
templateConsumer.accept( template );
return newSession( template.build() );
return new InternalAsyncSession( newSession( template.build() ) );
}

@Override
Expand Down Expand Up @@ -148,7 +150,7 @@ private static RuntimeException driverCloseException()
return new IllegalStateException( "This driver instance has already been closed" );
}

private NetworkSession newSession( SessionParameters parameters )
public NetworkSession newSession( SessionParameters parameters )
{
assertOpen();
NetworkSession session = sessionFactory.newInstance( parameters );
Expand Down
190 changes: 190 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal;

import java.util.Map;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Session;
import org.neo4j.driver.Statement;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.TransactionWork;
import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.internal.async.ExplicitTransaction;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;

import static java.util.Collections.emptyMap;

public class InternalSession extends AbstractStatementRunner implements Session
{
private final NetworkSession session;

public InternalSession( NetworkSession session )
{
this.session = session;
}

@Override
public StatementResult run( Statement statement )
{
return run( statement, TransactionConfig.empty() );
}

@Override
public StatementResult run( String statement, TransactionConfig config )
{
return run( statement, emptyMap(), config );
}

@Override
public StatementResult run( String statement, Map<String,Object> parameters, TransactionConfig config )
{
return run( new Statement( statement, parameters ), config );
}

@Override
public StatementResult run( Statement statement, TransactionConfig config )
{
StatementResultCursor cursor = Futures.blockingGet( session.runAsync( statement, config, false ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );

// query executed, it is safe to obtain a connection in a blocking way
Connection connection = Futures.getNow( session.connectionAsync() );
return new InternalStatementResult( connection, cursor );
}

@Override
public boolean isOpen()
{
return session.isOpen();
}

@Override
public void close()
{
Futures.blockingGet( session.closeAsync(), () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the session" ) );
}

@Override
public Transaction beginTransaction()
{
return beginTransaction( TransactionConfig.empty() );
}

@Override
public Transaction beginTransaction( TransactionConfig config )
{
ExplicitTransaction tx = Futures.blockingGet( session.beginTransactionAsync( config ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
return new InternalTransaction( tx );
}

@Override
public <T> T readTransaction( TransactionWork<T> work )
{
return readTransaction( work, TransactionConfig.empty() );
}

@Override
public <T> T readTransaction( TransactionWork<T> work, TransactionConfig config )
{
return transaction( AccessMode.READ, work, config );
}

@Override
public <T> T writeTransaction( TransactionWork<T> work )
{
return writeTransaction( work, TransactionConfig.empty() );
}

@Override
public <T> T writeTransaction( TransactionWork<T> work, TransactionConfig config )
{
return transaction( AccessMode.WRITE, work, config );
}

@Override
public String lastBookmark()
{
return session.lastBookmark();
}

@Override
@SuppressWarnings( "deprecation" )
public void reset()
{
Futures.blockingGet( session.resetAsync(), () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) );
}

private <T> T transaction( AccessMode mode, TransactionWork<T> work, TransactionConfig config )
{
// use different code path compared to async so that work is executed in the caller thread
// caller thread will also be the one who sleeps between retries;
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
// event loop thread will bock and wait for itself to read some data
return session.retryLogic().retry( () -> {
try ( Transaction tx = beginTransaction( mode, config ) )
{
try
{
T result = work.execute( tx );
tx.success();
return result;
}
catch ( Throwable t )
{
// mark transaction for failure if the given unit of work threw exception
// this will override any success marks that were made by the unit of work
tx.failure();
throw t;
}
}
} );
}

private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
{
ExplicitTransaction tx = Futures.blockingGet( session.beginTransactionAsync( mode, config ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
return new InternalTransaction( tx );
}

private void terminateConnectionOnThreadInterrupt( String reason )
{
// try to get current connection if it has been acquired
Connection connection = null;
try
{
connection = Futures.getNow( session.connectionAsync() );
}
catch ( Throwable ignore )
{
// ignore errors because handing interruptions is best effort
}

if ( connection != null )
{
connection.terminateAndRelease( reason );
}
}
}
Loading