Skip to content

Commit 22b45ac

Browse files
authored
Merge pull request #223 from nigelsmall/bookmarking
Bookmarking
2 parents eb4087e + 9a44a81 commit 22b45ac

25 files changed

+386
-163
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
import org.neo4j.driver.internal.spi.Collector.NoOperationCollector;
22+
23+
class BookmarkCollector extends NoOperationCollector
24+
{
25+
private final ExplicitTransaction transaction;
26+
27+
BookmarkCollector( ExplicitTransaction transaction )
28+
{
29+
this.transaction = transaction;
30+
}
31+
32+
@Override
33+
public void bookmark( String bookmark )
34+
{
35+
transaction.setBookmark( bookmark );
36+
}
37+
38+
}

driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public DirectDriver( BoltServerAddress address, ConnectionSettings connectionSet
4545
@Override
4646
public Session session()
4747
{
48-
return new InternalSession( connections.acquire( address ), log );
48+
return new NetworkSession( connections.acquire( address ), log );
4949
}
5050

5151
@Override

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java renamed to driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.Map;
2323

2424
import org.neo4j.driver.internal.spi.Connection;
25-
import org.neo4j.driver.internal.spi.StreamCollector;
25+
import org.neo4j.driver.internal.spi.Collector;
2626
import org.neo4j.driver.internal.types.InternalTypeSystem;
2727
import org.neo4j.driver.v1.Record;
2828
import org.neo4j.driver.v1.Statement;
@@ -34,10 +34,13 @@
3434
import org.neo4j.driver.v1.exceptions.Neo4jException;
3535
import org.neo4j.driver.v1.types.TypeSystem;
3636

37+
import static java.util.Collections.emptyMap;
38+
import static java.util.Collections.singletonMap;
39+
3740
import static org.neo4j.driver.v1.Values.ofValue;
3841
import static org.neo4j.driver.v1.Values.value;
3942

40-
public class InternalTransaction implements Transaction
43+
class ExplicitTransaction implements Transaction
4144
{
4245
private enum State
4346
{
@@ -66,16 +69,30 @@ private enum State
6669
private final Runnable cleanup;
6770
private final Connection conn;
6871

72+
private String bookmark = null;
6973
private State state = State.ACTIVE;
7074

71-
public InternalTransaction( Connection conn, Runnable cleanup )
75+
ExplicitTransaction( Connection conn, Runnable cleanup )
76+
{
77+
this( conn, cleanup, null );
78+
}
79+
80+
ExplicitTransaction( Connection conn, Runnable cleanup, String bookmark )
7281
{
7382
this.conn = conn;
7483
this.cleanup = cleanup;
7584

76-
// Note there is no sync here, so this will just value queued locally
77-
conn.run( "BEGIN", Collections.<String, Value>emptyMap(), StreamCollector.NO_OP );
78-
conn.discardAll();
85+
final Map<String, Value> parameters;
86+
if ( bookmark == null )
87+
{
88+
parameters = emptyMap();
89+
}
90+
else
91+
{
92+
parameters = singletonMap( "bookmark", value( bookmark ) );
93+
}
94+
conn.run( "BEGIN", parameters, Collector.NO_OP );
95+
conn.discardAll( Collector.NO_OP );
7996
}
8097

8198
@Override
@@ -105,15 +122,15 @@ public void close()
105122
{
106123
if ( state == State.MARKED_SUCCESS )
107124
{
108-
conn.run( "COMMIT", Collections.<String, Value>emptyMap(), StreamCollector.NO_OP );
109-
conn.discardAll();
125+
conn.run( "COMMIT", Collections.<String, Value>emptyMap(), Collector.NO_OP );
126+
conn.discardAll( new BookmarkCollector( this ) );
110127
conn.sync();
111128
state = State.SUCCEEDED;
112129
}
113130
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
114131
{
115-
conn.run( "ROLLBACK", Collections.<String, Value>emptyMap(), StreamCollector.NO_OP );
116-
conn.discardAll();
132+
conn.run( "ROLLBACK", Collections.<String, Value>emptyMap(), Collector.NO_OP );
133+
conn.discardAll( new BookmarkCollector( this ) );
117134
conn.sync();
118135
state = State.ROLLED_BACK;
119136
}
@@ -159,7 +176,7 @@ public StatementResult run( Statement statement )
159176

160177
try
161178
{
162-
InternalStatementResult cursor = new InternalStatementResult( conn, statement );
179+
InternalStatementResult cursor = new InternalStatementResult( conn, this, statement );
163180
conn.run( statement.text(),
164181
statement.parameters().asMap( ofValue() ),
165182
cursor.runResponseCollector() );
@@ -204,4 +221,15 @@ public void markToClose()
204221
{
205222
state = State.FAILED;
206223
}
224+
225+
public String bookmark()
226+
{
227+
return bookmark;
228+
}
229+
230+
void setBookmark( String bookmark )
231+
{
232+
this.bookmark = bookmark;
233+
}
234+
207235
}

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.Queue;
2626

2727
import org.neo4j.driver.internal.spi.Connection;
28-
import org.neo4j.driver.internal.spi.StreamCollector;
28+
import org.neo4j.driver.internal.spi.Collector;
2929
import org.neo4j.driver.internal.summary.SummaryBuilder;
3030
import org.neo4j.driver.v1.Record;
3131
import org.neo4j.driver.v1.Statement;
@@ -47,8 +47,8 @@
4747
public class InternalStatementResult implements StatementResult
4848
{
4949
private final Connection connection;
50-
private final StreamCollector runResponseCollector;
51-
private final StreamCollector pullAllResponseCollector;
50+
private final Collector runResponseCollector;
51+
private final Collector pullAllResponseCollector;
5252
private final Queue<Record> recordBuffer = new LinkedList<>();
5353

5454
private List<String> keys = null;
@@ -57,16 +57,16 @@ public class InternalStatementResult implements StatementResult
5757
private long position = -1;
5858
private boolean done = false;
5959

60-
public InternalStatementResult( Connection connection, Statement statement )
60+
InternalStatementResult( Connection connection, ExplicitTransaction transaction, Statement statement )
6161
{
6262
this.connection = connection;
6363
this.runResponseCollector = newRunResponseCollector();
64-
this.pullAllResponseCollector = newPullAllResponseCollector( statement );
64+
this.pullAllResponseCollector = newStreamResponseCollector( transaction, statement );
6565
}
6666

67-
private StreamCollector newRunResponseCollector()
67+
private Collector newRunResponseCollector()
6868
{
69-
return new StreamCollector.NoOperationStreamCollector()
69+
return new Collector.NoOperationCollector()
7070
{
7171
@Override
7272
public void keys( String[] names )
@@ -91,11 +91,11 @@ public void resultAvailableAfter( long l )
9191
};
9292
}
9393

94-
private StreamCollector newPullAllResponseCollector( Statement statement )
94+
private Collector newStreamResponseCollector( final ExplicitTransaction transaction, final Statement statement )
9595
{
9696
final SummaryBuilder summaryBuilder = new SummaryBuilder( statement );
9797

98-
return new StreamCollector.NoOperationStreamCollector()
98+
return new Collector.NoOperationCollector()
9999
{
100100
@Override
101101
public void record( Value[] fields )
@@ -134,7 +134,17 @@ public void notifications( List<Notification> notifications )
134134
}
135135

136136
@Override
137-
public void done() {
137+
public void bookmark( String bookmark )
138+
{
139+
if ( transaction != null )
140+
{
141+
transaction.setBookmark( bookmark );
142+
}
143+
}
144+
145+
@Override
146+
public void done()
147+
{
138148
summary = summaryBuilder.build();
139149
done = true;
140150
}
@@ -153,12 +163,12 @@ public void resultConsumedAfter(long l)
153163
};
154164
}
155165

156-
StreamCollector runResponseCollector()
166+
Collector runResponseCollector()
157167
{
158168
return runResponseCollector;
159169
}
160170

161-
StreamCollector pullAllResponseCollector()
171+
Collector pullAllResponseCollector()
162172
{
163173
return pullAllResponseCollector;
164174
}

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java renamed to driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,31 @@
3636

3737
import static org.neo4j.driver.v1.Values.value;
3838

39-
public class InternalSession implements Session
39+
public class NetworkSession implements Session
4040
{
4141
private final Connection connection;
42-
4342
private final Logger logger;
4443

45-
/** Called when a transaction object is closed */
44+
private String lastBookmark = null;
45+
46+
// Called when a transaction object is closed
4647
private final Runnable txCleanup = new Runnable()
4748
{
4849
@Override
4950
public void run()
5051
{
51-
currentTransaction = null;
52+
if ( currentTransaction != null )
53+
{
54+
lastBookmark = currentTransaction.bookmark();
55+
currentTransaction = null;
56+
}
5257
}
5358
};
5459

55-
private InternalTransaction currentTransaction;
60+
private ExplicitTransaction currentTransaction;
5661
private AtomicBoolean isOpen = new AtomicBoolean( true );
5762

58-
public InternalSession( Connection connection, Logger logger )
63+
NetworkSession( Connection connection, Logger logger )
5964
{
6065
this.connection = connection;
6166
this.logger = logger;
@@ -91,7 +96,7 @@ public StatementResult run( String statementText, Value statementParameters )
9196
public StatementResult run( Statement statement )
9297
{
9398
ensureConnectionIsValidBeforeRunningSession();
94-
InternalStatementResult cursor = new InternalStatementResult( connection, statement );
99+
InternalStatementResult cursor = new InternalStatementResult( connection, null, statement );
95100
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), cursor.runResponseCollector() );
96101
connection.pullAll( cursor.pullAllResponseCollector() );
97102
connection.flush();
@@ -152,17 +157,24 @@ public String server()
152157

153158
@Override
154159
public Transaction beginTransaction()
160+
{
161+
return beginTransaction( null );
162+
}
163+
164+
@Override
165+
public Transaction beginTransaction( String bookmark )
155166
{
156167
ensureConnectionIsValidBeforeOpeningTransaction();
157-
currentTransaction = new InternalTransaction( connection, txCleanup );
158-
connection.onError( new Runnable() {
168+
currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark );
169+
connection.onError( new Runnable()
170+
{
159171
@Override
160172
public void run()
161173
{
162-
//must check if transaction has been closed
163-
if (currentTransaction != null)
174+
// must check if transaction has been closed
175+
if ( currentTransaction != null )
164176
{
165-
if( connection.hasUnrecoverableErrors() )
177+
if ( connection.hasUnrecoverableErrors() )
166178
{
167179
currentTransaction.markToClose();
168180
}
@@ -172,10 +184,16 @@ public void run()
172184
}
173185
}
174186
}
175-
});
187+
} );
176188
return currentTransaction;
177189
}
178190

191+
@Override
192+
public String lastBookmark()
193+
{
194+
return lastBookmark;
195+
}
196+
179197
@Override
180198
public TypeSystem typeSystem()
181199
{

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323

2424
import org.neo4j.driver.internal.spi.Connection;
25-
import org.neo4j.driver.internal.spi.StreamCollector;
25+
import org.neo4j.driver.internal.spi.Collector;
2626
import org.neo4j.driver.v1.Value;
2727
import org.neo4j.driver.v1.exceptions.ClientException;
2828

@@ -58,7 +58,7 @@ public void init( String clientName, Map<String,Value> authToken )
5858

5959
@Override
6060
public void run( String statement, Map<String,Value> parameters,
61-
StreamCollector collector )
61+
Collector collector )
6262
{
6363
try
6464
{
@@ -72,12 +72,12 @@ public void run( String statement, Map<String,Value> parameters,
7272
}
7373

7474
@Override
75-
public void discardAll()
75+
public void discardAll( Collector collector )
7676
{
7777
try
7878
{
7979
markAsInUse();
80-
delegate.discardAll();
80+
delegate.discardAll( collector );
8181
}
8282
finally
8383
{
@@ -86,7 +86,7 @@ public void discardAll()
8686
}
8787

8888
@Override
89-
public void pullAll( StreamCollector collector )
89+
public void pullAll( Collector collector )
9090
{
9191
try
9292
{

0 commit comments

Comments
 (0)