18
18
*/
19
19
package org .neo4j .driver .internal ;
20
20
21
+ import java .util .ArrayList ;
22
+ import java .util .List ;
21
23
import java .util .Map ;
22
24
import java .util .concurrent .atomic .AtomicBoolean ;
23
25
26
+ import org .neo4j .driver .internal .retry .RetryDecision ;
27
+ import org .neo4j .driver .internal .retry .RetryLogic ;
24
28
import org .neo4j .driver .internal .spi .Connection ;
25
29
import org .neo4j .driver .internal .spi .ConnectionProvider ;
26
30
import org .neo4j .driver .internal .spi .PooledConnection ;
37
41
import org .neo4j .driver .v1 .Values ;
38
42
import org .neo4j .driver .v1 .exceptions .ClientException ;
39
43
import org .neo4j .driver .v1 .types .TypeSystem ;
44
+ import org .neo4j .driver .v1 .util .Function ;
40
45
41
46
import static org .neo4j .driver .v1 .Values .value ;
42
47
43
48
public class NetworkSession implements Session , SessionResourcesHandler
44
49
{
45
50
private final ConnectionProvider connectionProvider ;
46
51
private final AccessMode mode ;
52
+ private final RetryLogic <RetryDecision > retryLogic ;
47
53
protected final Logger logger ;
48
54
49
55
private String lastBookmark ;
@@ -52,10 +58,12 @@ public class NetworkSession implements Session, SessionResourcesHandler
52
58
53
59
private final AtomicBoolean isOpen = new AtomicBoolean ( true );
54
60
55
- public NetworkSession ( ConnectionProvider connectionProvider , AccessMode mode , Logging logging )
61
+ public NetworkSession ( ConnectionProvider connectionProvider , AccessMode mode , RetryLogic <RetryDecision > retryLogic ,
62
+ Logging logging )
56
63
{
57
64
this .connectionProvider = connectionProvider ;
58
65
this .mode = mode ;
66
+ this .retryLogic = retryLogic ;
59
67
this .logger = logging .getLog ( "Session-" + hashCode () );
60
68
}
61
69
@@ -92,12 +100,13 @@ public StatementResult run( Statement statement )
92
100
ensureNoOpenTransactionBeforeRunningSession ();
93
101
94
102
syncAndCloseCurrentConnection ();
95
- currentConnection = acquireConnection ();
103
+ currentConnection = acquireConnection ( mode );
96
104
97
105
return run ( currentConnection , statement , this );
98
106
}
99
107
100
- public static StatementResult run ( Connection connection , Statement statement , SessionResourcesHandler resourcesHandler )
108
+ public static StatementResult run ( Connection connection , Statement statement ,
109
+ SessionResourcesHandler resourcesHandler )
101
110
{
102
111
InternalStatementResult result = new InternalStatementResult ( connection , resourcesHandler , null , statement );
103
112
connection .run ( statement .text (), statement .parameters ().asMap ( Values .ofValue () ),
@@ -116,7 +125,7 @@ public synchronized void reset()
116
125
if ( currentTransaction != null )
117
126
{
118
127
currentTransaction .markToClose ();
119
- lastBookmark = currentTransaction . bookmark ( );
128
+ updateLastBookmarkFrom ( currentTransaction );
120
129
currentTransaction = null ;
121
130
}
122
131
if ( currentConnection != null )
@@ -155,28 +164,38 @@ public void close()
155
164
}
156
165
}
157
166
}
158
-
167
+
159
168
syncAndCloseCurrentConnection ();
160
169
}
161
170
162
171
@ Override
163
- public Transaction beginTransaction ()
172
+ public synchronized Transaction beginTransaction ()
164
173
{
165
- return beginTransaction ( null );
174
+ return beginTransaction ( mode );
166
175
}
167
176
168
177
@ Override
169
178
public synchronized Transaction beginTransaction ( String bookmark )
170
179
{
171
- ensureSessionIsOpen ();
172
- ensureNoOpenTransactionBeforeOpeningTransaction ();
180
+ lastBookmark = bookmark ;
181
+ return beginTransaction ();
182
+ }
173
183
174
- syncAndCloseCurrentConnection ();
175
- currentConnection = acquireConnection ();
184
+ @ Override
185
+ public <T > T readTransaction ( Function <Transaction ,T > work )
186
+ {
187
+ return transaction ( AccessMode .READ , work );
188
+ }
176
189
177
- currentTransaction = new ExplicitTransaction ( currentConnection , this , bookmark );
178
- currentConnection .setResourcesHandler ( this );
179
- return currentTransaction ;
190
+ @ Override
191
+ public <T > T writeTransaction ( Function <Transaction ,T > work )
192
+ {
193
+ return transaction ( AccessMode .WRITE , work );
194
+ }
195
+
196
+ void setLastBookmark ( String bookmark )
197
+ {
198
+ lastBookmark = bookmark ;
180
199
}
181
200
182
201
@ Override
@@ -203,7 +222,7 @@ public synchronized void onTransactionClosed( ExplicitTransaction tx )
203
222
if ( currentTransaction != null && currentTransaction == tx )
204
223
{
205
224
closeCurrentConnection ();
206
- lastBookmark = currentTransaction . bookmark ( );
225
+ updateLastBookmarkFrom ( currentTransaction );
207
226
currentTransaction = null ;
208
227
}
209
228
}
@@ -225,6 +244,47 @@ public synchronized void onConnectionError( boolean recoverable )
225
244
}
226
245
}
227
246
247
+ private synchronized <T > T transaction ( AccessMode mode , Function <Transaction ,T > work )
248
+ {
249
+ RetryDecision decision = null ;
250
+ List <Throwable > errors = null ;
251
+
252
+ while ( true )
253
+ {
254
+ try ( Transaction tx = beginTransaction ( mode ) )
255
+ {
256
+ return work .apply ( tx );
257
+ }
258
+ catch ( Throwable newError )
259
+ {
260
+ decision = retryLogic .apply ( newError , decision );
261
+
262
+ if ( decision .shouldRetry () )
263
+ {
264
+ errors = recordError ( newError , errors );
265
+ }
266
+ else
267
+ {
268
+ addSuppressed ( newError , errors );
269
+ throw newError ;
270
+ }
271
+ }
272
+ }
273
+ }
274
+
275
+ private synchronized Transaction beginTransaction ( AccessMode mode )
276
+ {
277
+ ensureSessionIsOpen ();
278
+ ensureNoOpenTransactionBeforeOpeningTransaction ();
279
+
280
+ syncAndCloseCurrentConnection ();
281
+ currentConnection = acquireConnection ( mode );
282
+
283
+ currentTransaction = new ExplicitTransaction ( currentConnection , this , lastBookmark );
284
+ currentConnection .setResourcesHandler ( this );
285
+ return currentTransaction ;
286
+ }
287
+
228
288
private void ensureNoUnrecoverableError ()
229
289
{
230
290
if ( currentConnection != null && currentConnection .hasUnrecoverableErrors () )
@@ -268,7 +328,7 @@ private void ensureSessionIsOpen()
268
328
}
269
329
}
270
330
271
- private PooledConnection acquireConnection ()
331
+ private PooledConnection acquireConnection ( AccessMode mode )
272
332
{
273
333
PooledConnection connection = connectionProvider .acquireConnection ( mode );
274
334
logger .debug ( "Acquired connection " + connection .hashCode () );
@@ -312,4 +372,36 @@ private void closeCurrentConnection( boolean sync )
312
372
logger .debug ( "Released connection " + connection .hashCode () );
313
373
}
314
374
}
375
+
376
+ private void updateLastBookmarkFrom ( ExplicitTransaction tx )
377
+ {
378
+ if ( tx .bookmark () != null )
379
+ {
380
+ lastBookmark = tx .bookmark ();
381
+ }
382
+ }
383
+
384
+ private static List <Throwable > recordError ( Throwable error , List <Throwable > errors )
385
+ {
386
+ if ( errors == null )
387
+ {
388
+ errors = new ArrayList <>();
389
+ }
390
+ errors .add ( error );
391
+ return errors ;
392
+ }
393
+
394
+ private static void addSuppressed ( Throwable error , List <Throwable > suppressedErrors )
395
+ {
396
+ if ( suppressedErrors != null )
397
+ {
398
+ for ( Throwable suppressedError : suppressedErrors )
399
+ {
400
+ if ( error != suppressedError )
401
+ {
402
+ error .addSuppressed ( suppressedError );
403
+ }
404
+ }
405
+ }
406
+ }
315
407
}
0 commit comments