Skip to content

Bookmarking #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal;

import org.neo4j.driver.internal.spi.Collector.NoOperationCollector;

class BookmarkCollector extends NoOperationCollector
{
private final ExplicitTransaction transaction;

BookmarkCollector( ExplicitTransaction transaction )
{
this.transaction = transaction;
}

@Override
public void bookmark( String bookmark )
{
transaction.setBookmark( bookmark );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public DirectDriver( BoltServerAddress address, ConnectionSettings connectionSet
@Override
public Session session()
{
return new InternalSession( connections.acquire( address ), log );
return new NetworkSession( connections.acquire( address ), log );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Map;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.StreamCollector;
import org.neo4j.driver.internal.spi.Collector;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
Expand All @@ -34,10 +34,13 @@
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.types.TypeSystem;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;

import static org.neo4j.driver.v1.Values.ofValue;
import static org.neo4j.driver.v1.Values.value;

public class InternalTransaction implements Transaction
class ExplicitTransaction implements Transaction
{
private enum State
{
Expand Down Expand Up @@ -66,16 +69,30 @@ private enum State
private final Runnable cleanup;
private final Connection conn;

private String bookmark = null;
private State state = State.ACTIVE;

public InternalTransaction( Connection conn, Runnable cleanup )
ExplicitTransaction( Connection conn, Runnable cleanup )
{
this( conn, cleanup, null );
}

ExplicitTransaction( Connection conn, Runnable cleanup, String bookmark )
{
this.conn = conn;
this.cleanup = cleanup;

// Note there is no sync here, so this will just value queued locally
conn.run( "BEGIN", Collections.<String, Value>emptyMap(), StreamCollector.NO_OP );
conn.discardAll();
final Map<String, Value> parameters;
if ( bookmark == null )
{
parameters = emptyMap();
}
else
{
parameters = singletonMap( "bookmark", value( bookmark ) );
}
conn.run( "BEGIN", parameters, Collector.NO_OP );
conn.discardAll( Collector.NO_OP );
}

@Override
Expand Down Expand Up @@ -105,15 +122,15 @@ public void close()
{
if ( state == State.MARKED_SUCCESS )
{
conn.run( "COMMIT", Collections.<String, Value>emptyMap(), StreamCollector.NO_OP );
conn.discardAll();
conn.run( "COMMIT", Collections.<String, Value>emptyMap(), Collector.NO_OP );
conn.discardAll( new BookmarkCollector( this ) );
conn.sync();
state = State.SUCCEEDED;
}
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
{
conn.run( "ROLLBACK", Collections.<String, Value>emptyMap(), StreamCollector.NO_OP );
conn.discardAll();
conn.run( "ROLLBACK", Collections.<String, Value>emptyMap(), Collector.NO_OP );
conn.discardAll( new BookmarkCollector( this ) );
conn.sync();
state = State.ROLLED_BACK;
}
Expand Down Expand Up @@ -159,7 +176,7 @@ public StatementResult run( Statement statement )

try
{
InternalStatementResult cursor = new InternalStatementResult( conn, statement );
InternalStatementResult cursor = new InternalStatementResult( conn, this, statement );
conn.run( statement.text(),
statement.parameters().asMap( ofValue() ),
cursor.runResponseCollector() );
Expand Down Expand Up @@ -204,4 +221,15 @@ public void markToClose()
{
state = State.FAILED;
}

public String bookmark()
{
return bookmark;
}

void setBookmark( String bookmark )
{
this.bookmark = bookmark;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Queue;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.StreamCollector;
import org.neo4j.driver.internal.spi.Collector;
import org.neo4j.driver.internal.summary.SummaryBuilder;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
Expand All @@ -47,8 +47,8 @@
public class InternalStatementResult implements StatementResult
{
private final Connection connection;
private final StreamCollector runResponseCollector;
private final StreamCollector pullAllResponseCollector;
private final Collector runResponseCollector;
private final Collector pullAllResponseCollector;
private final Queue<Record> recordBuffer = new LinkedList<>();

private List<String> keys = null;
Expand All @@ -57,16 +57,16 @@ public class InternalStatementResult implements StatementResult
private long position = -1;
private boolean done = false;

public InternalStatementResult( Connection connection, Statement statement )
InternalStatementResult( Connection connection, ExplicitTransaction transaction, Statement statement )
{
this.connection = connection;
this.runResponseCollector = newRunResponseCollector();
this.pullAllResponseCollector = newPullAllResponseCollector( statement );
this.pullAllResponseCollector = newStreamResponseCollector( transaction, statement );
}

private StreamCollector newRunResponseCollector()
private Collector newRunResponseCollector()
{
return new StreamCollector.NoOperationStreamCollector()
return new Collector.NoOperationCollector()
{
@Override
public void keys( String[] names )
Expand All @@ -91,11 +91,11 @@ public void resultAvailableAfter( long l )
};
}

private StreamCollector newPullAllResponseCollector( Statement statement )
private Collector newStreamResponseCollector( final ExplicitTransaction transaction, final Statement statement )
{
final SummaryBuilder summaryBuilder = new SummaryBuilder( statement );

return new StreamCollector.NoOperationStreamCollector()
return new Collector.NoOperationCollector()
{
@Override
public void record( Value[] fields )
Expand Down Expand Up @@ -134,7 +134,17 @@ public void notifications( List<Notification> notifications )
}

@Override
public void done() {
public void bookmark( String bookmark )
{
if ( transaction != null )
{
transaction.setBookmark( bookmark );
}
}

@Override
public void done()
{
summary = summaryBuilder.build();
done = true;
}
Expand All @@ -153,12 +163,12 @@ public void resultConsumedAfter(long l)
};
}

StreamCollector runResponseCollector()
Collector runResponseCollector()
{
return runResponseCollector;
}

StreamCollector pullAllResponseCollector()
Collector pullAllResponseCollector()
{
return pullAllResponseCollector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,31 @@

import static org.neo4j.driver.v1.Values.value;

public class InternalSession implements Session
public class NetworkSession implements Session
{
private final Connection connection;

private final Logger logger;

/** Called when a transaction object is closed */
private String lastBookmark = null;

// Called when a transaction object is closed
private final Runnable txCleanup = new Runnable()
{
@Override
public void run()
{
currentTransaction = null;
if ( currentTransaction != null )
{
lastBookmark = currentTransaction.bookmark();
currentTransaction = null;
}
}
};

private InternalTransaction currentTransaction;
private ExplicitTransaction currentTransaction;
private AtomicBoolean isOpen = new AtomicBoolean( true );

public InternalSession( Connection connection, Logger logger )
NetworkSession( Connection connection, Logger logger )
{
this.connection = connection;
this.logger = logger;
Expand Down Expand Up @@ -91,7 +96,7 @@ public StatementResult run( String statementText, Value statementParameters )
public StatementResult run( Statement statement )
{
ensureConnectionIsValidBeforeRunningSession();
InternalStatementResult cursor = new InternalStatementResult( connection, statement );
InternalStatementResult cursor = new InternalStatementResult( connection, null, statement );
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), cursor.runResponseCollector() );
connection.pullAll( cursor.pullAllResponseCollector() );
connection.flush();
Expand Down Expand Up @@ -152,17 +157,24 @@ public String server()

@Override
public Transaction beginTransaction()
{
return beginTransaction( null );
}

@Override
public Transaction beginTransaction( String bookmark )
{
ensureConnectionIsValidBeforeOpeningTransaction();
currentTransaction = new InternalTransaction( connection, txCleanup );
connection.onError( new Runnable() {
currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark );
connection.onError( new Runnable()
{
@Override
public void run()
{
//must check if transaction has been closed
if (currentTransaction != null)
// must check if transaction has been closed
if ( currentTransaction != null )
{
if( connection.hasUnrecoverableErrors() )
if ( connection.hasUnrecoverableErrors() )
{
currentTransaction.markToClose();
}
Expand All @@ -172,10 +184,16 @@ public void run()
}
}
}
});
} );
return currentTransaction;
}

@Override
public String lastBookmark()
{
return lastBookmark;
}

@Override
public TypeSystem typeSystem()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.StreamCollector;
import org.neo4j.driver.internal.spi.Collector;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

Expand Down Expand Up @@ -58,7 +58,7 @@ public void init( String clientName, Map<String,Value> authToken )

@Override
public void run( String statement, Map<String,Value> parameters,
StreamCollector collector )
Collector collector )
{
try
{
Expand All @@ -72,12 +72,12 @@ public void run( String statement, Map<String,Value> parameters,
}

@Override
public void discardAll()
public void discardAll( Collector collector )
{
try
{
markAsInUse();
delegate.discardAll();
delegate.discardAll( collector );
}
finally
{
Expand All @@ -86,7 +86,7 @@ public void discardAll()
}

@Override
public void pullAll( StreamCollector collector )
public void pullAll( Collector collector )
{
try
{
Expand Down
Loading