Skip to content

Commit 7e6c5ff

Browse files
authored
Merge pull request #219 from lutovich/1.2-read-write-tx
Add transaction with retries API
2 parents 3568b43 + ecb34a7 commit 7e6c5ff

19 files changed

+1292
-72
lines changed

gulpfile.babel.js

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,23 +164,17 @@ gulp.task('test', function(cb){
164164

165165
gulp.task('test-nodejs', ['install-driver-into-sandbox'], function () {
166166
return gulp.src('test/**/*.test.js')
167-
.pipe(jasmine({
168-
// reporter: new reporters.JUnitXmlReporter({
169-
// savePath: "build/nodejs-test-reports",
170-
// consolidateAll: false
171-
// }),
172-
includeStackTrace: true
173-
}));
167+
.pipe(jasmine({
168+
includeStackTrace: true,
169+
verbose: true
170+
}));
174171
});
175172

176173
gulp.task('test-boltkit', ['nodejs'], function () {
177174
return gulp.src('test/**/*.boltkit.it.js')
178175
.pipe(jasmine({
179-
// reporter: new reporters.JUnitXmlReporter({
180-
// savePath: "build/nodejs-test-reports",
181-
// consolidateAll: false
182-
// }),
183-
includeStackTrace: true
176+
includeStackTrace: true,
177+
verbose: true
184178
}));
185179
});
186180

src/v1/driver.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class Driver {
115115
*/
116116
session(mode, bookmark) {
117117
const sessionMode = Driver._validateSessionMode(mode);
118-
return this._createSession(sessionMode, this._connectionProvider, bookmark);
118+
return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config);
119119
}
120120

121121
static _validateSessionMode(rawMode) {
@@ -132,8 +132,8 @@ class Driver {
132132
}
133133

134134
//Extension point
135-
_createSession(mode, connectionProvider, bookmark) {
136-
return new Session(mode, connectionProvider, bookmark);
135+
_createSession(mode, connectionProvider, bookmark, config) {
136+
return new Session(mode, connectionProvider, bookmark, config);
137137
}
138138

139139
_driverOnErrorCallback(error) {

src/v1/index.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
100100
* // port, and this is then used to verify the host certificate does not change.
101101
* // This setting has no effect unless TRUST_ON_FIRST_USE is enabled.
102102
* knownHosts:"~/.neo4j/known_hosts",
103+
*
104+
* // The max number of connections that are allowed idle in the pool at any time.
105+
* // Connection will be destroyed if this threshold is exceeded.
106+
* connectionPoolSize: 50,
107+
*
108+
* // Specify the maximum time in milliseconds transactions are allowed to retry via
109+
* // {@link Session#readTransaction()} and {@link Session#writeTransaction()} functions. These functions
110+
* // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with
111+
* // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds.
112+
* maxTransactionRetryTime: 30000,
103113
* }
104114
*
105115
* @param {string} url The URL for the Neo4j database, for instance "bolt://localhost"
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
21+
22+
const DEFAULT_MAX_RETRY_TIME_MS = 30 * 1000; // 30 seconds
23+
const DEFAULT_INITIAL_RETRY_DELAY_MS = 1000; // 1 seconds
24+
const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0;
25+
const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2;
26+
27+
export default class TransactionExecutor {
28+
29+
constructor(maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor) {
30+
this._maxRetryTimeMs = _valueOrDefault(maxRetryTimeMs, DEFAULT_MAX_RETRY_TIME_MS);
31+
this._initialRetryDelayMs = _valueOrDefault(initialRetryDelayMs, DEFAULT_INITIAL_RETRY_DELAY_MS);
32+
this._multiplier = _valueOrDefault(multiplier, DEFAULT_RETRY_DELAY_MULTIPLIER);
33+
this._jitterFactor = _valueOrDefault(jitterFactor, DEFAULT_RETRY_DELAY_JITTER_FACTOR);
34+
35+
this._inFlightTimeoutIds = [];
36+
37+
this._verifyAfterConstruction();
38+
}
39+
40+
execute(transactionCreator, transactionWork) {
41+
return new Promise((resolve, reject) => {
42+
this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
43+
}).catch(error => {
44+
const retryStartTimeMs = Date.now();
45+
const retryDelayMs = this._initialRetryDelayMs;
46+
return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTimeMs, retryDelayMs);
47+
});
48+
}
49+
50+
close() {
51+
// cancel all existing timeouts to prevent further retries
52+
this._inFlightTimeoutIds.forEach(timeoutId => clearTimeout(timeoutId));
53+
this._inFlightTimeoutIds = [];
54+
}
55+
56+
_retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, retryDelayMs) {
57+
const elapsedTimeMs = Date.now() - retryStartTime;
58+
59+
if (elapsedTimeMs > this._maxRetryTimeMs || !TransactionExecutor._canRetryOn(error)) {
60+
return Promise.reject(error);
61+
}
62+
63+
return new Promise((resolve, reject) => {
64+
const nextRetryTime = this._computeDelayWithJitter(retryDelayMs);
65+
const timeoutId = setTimeout(() => {
66+
// filter out this timeoutId when time has come and function is being executed
67+
this._inFlightTimeoutIds = this._inFlightTimeoutIds.filter(id => id !== timeoutId);
68+
this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
69+
}, nextRetryTime);
70+
// add newly created timeoutId to the list of all in-flight timeouts
71+
this._inFlightTimeoutIds.push(timeoutId);
72+
}).catch(error => {
73+
const nextRetryDelayMs = retryDelayMs * this._multiplier;
74+
return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, nextRetryDelayMs);
75+
});
76+
}
77+
78+
_executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) {
79+
try {
80+
const tx = transactionCreator();
81+
const transactionWorkResult = transactionWork(tx);
82+
83+
// user defined callback is supposed to return a promise, but it might not; so to protect against an
84+
// incorrect API usage we wrap the returned value with a resolved promise; this is effectively a
85+
// validation step without type checks
86+
const resultPromise = Promise.resolve(transactionWorkResult);
87+
88+
resultPromise.then(result => {
89+
if (tx.isOpen()) {
90+
// transaction work returned resolved promise and transaction has not been committed/rolled back
91+
// try to commit the transaction
92+
tx.commit().then(() => {
93+
// transaction was committed, return result to the user
94+
resolve(result);
95+
}).catch(error => {
96+
// transaction failed to commit, propagate the failure
97+
reject(error);
98+
});
99+
} else {
100+
// transaction work returned resolved promise and transaction is already committed/rolled back
101+
// return the result returned by given transaction work
102+
resolve(result);
103+
}
104+
}).catch(error => {
105+
// transaction work returned rejected promise, propagate the failure
106+
reject(error);
107+
});
108+
109+
} catch (error) {
110+
reject(error);
111+
}
112+
}
113+
114+
_computeDelayWithJitter(delayMs) {
115+
const jitter = (delayMs * this._jitterFactor);
116+
const min = delayMs - jitter;
117+
const max = delayMs + jitter;
118+
return Math.random() * (max - min) + min;
119+
}
120+
121+
static _canRetryOn(error) {
122+
return error && error.code &&
123+
(error.code === SERVICE_UNAVAILABLE ||
124+
error.code === SESSION_EXPIRED ||
125+
error.code.indexOf('TransientError') >= 0);
126+
}
127+
128+
_verifyAfterConstruction() {
129+
if (this._maxRetryTimeMs < 0) {
130+
throw newError('Max retry time should be >= 0: ' + this._maxRetryTimeMs);
131+
}
132+
if (this._initialRetryDelayMs < 0) {
133+
throw newError('Initial retry delay should >= 0: ' + this._initialRetryDelayMs);
134+
}
135+
if (this._multiplier < 1.0) {
136+
throw newError('Multiplier should be >= 1.0: ' + this._multiplier);
137+
}
138+
if (this._jitterFactor < 0 || this._jitterFactor > 1) {
139+
throw newError('Jitter factor should be in [0.0, 1.0]: ' + this._jitterFactor);
140+
}
141+
}
142+
};
143+
144+
function _valueOrDefault(value, defaultValue) {
145+
if (value || value === 0) {
146+
return value;
147+
}
148+
return defaultValue;
149+
}

src/v1/routing-driver.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ class RoutingDriver extends Driver {
3535
return new LoadBalancer(address, connectionPool, driverOnErrorCallback);
3636
}
3737

38-
_createSession(mode, connectionProvider, bookmark) {
39-
return new RoutingSession(mode, connectionProvider, bookmark, (error, conn) => {
38+
_createSession(mode, connectionProvider, bookmark, config) {
39+
return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => {
4040
if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) {
4141
// connection is undefined if error happened before connection was acquired
4242
if (conn) {
@@ -66,8 +66,8 @@ class RoutingDriver extends Driver {
6666
}
6767

6868
class RoutingSession extends Session {
69-
constructor(mode, connectionProvider, bookmark, onFailedConnection) {
70-
super(mode, connectionProvider, bookmark);
69+
constructor(mode, connectionProvider, bookmark, config, onFailedConnection) {
70+
super(mode, connectionProvider, bookmark, config);
7171
this._onFailedConnection = onFailedConnection;
7272
}
7373

src/v1/session.js

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import Transaction from './transaction';
2222
import {newError} from './error';
2323
import {assertString} from './internal/util';
2424
import ConnectionHolder from './internal/connection-holder';
25-
import {READ, WRITE} from './driver';
25+
import Driver, {READ, WRITE} from './driver';
26+
import TransactionExecutor from './internal/transaction-executor';
2627

2728
/**
2829
* A Session instance is used for handling the connection and
@@ -36,15 +37,17 @@ class Session {
3637
* @constructor
3738
* @param {string} mode the default access mode for this session.
3839
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
39-
* @param {string} bookmark - the initial bookmark for this session.
40+
* @param {string} [bookmark=undefined] - the initial bookmark for this session.
41+
* @param {Object} [config={}] - this driver configuration.
4042
*/
41-
constructor(mode, connectionProvider, bookmark) {
43+
constructor(mode, connectionProvider, bookmark, config) {
4244
this._mode = mode;
4345
this._readConnectionHolder = new ConnectionHolder(READ, connectionProvider);
4446
this._writeConnectionHolder = new ConnectionHolder(WRITE, connectionProvider);
4547
this._open = true;
4648
this._hasTx = false;
4749
this._lastBookmark = bookmark;
50+
this._transactionExecutor = _createTransactionExecutor(config);
4851
}
4952

5053
/**
@@ -92,32 +95,81 @@ class Session {
9295
* @returns {Transaction} - New Transaction
9396
*/
9497
beginTransaction(bookmark) {
98+
return this._beginTransaction(this._mode, bookmark);
99+
}
100+
101+
_beginTransaction(accessMode, bookmark) {
95102
if (bookmark) {
96103
assertString(bookmark, 'Bookmark');
97104
this._updateBookmark(bookmark);
98105
}
99106

100107
if (this._hasTx) {
101-
throw newError("You cannot begin a transaction on a session with an "
102-
+ "open transaction; either run from within the transaction or use a "
103-
+ "different session.")
108+
throw newError('You cannot begin a transaction on a session with an open transaction; ' +
109+
'either run from within the transaction or use a different session.');
104110
}
105111

106-
this._hasTx = true;
107-
108-
const connectionHolder = this._connectionHolderWithMode(this._mode);
112+
const mode = Driver._validateSessionMode(accessMode);
113+
const connectionHolder = this._connectionHolderWithMode(mode);
109114
connectionHolder.initializeConnection();
115+
this._hasTx = true;
110116

111117
return new Transaction(connectionHolder, () => {
112118
this._hasTx = false;
113119
},
114120
this._onRunFailure(), this._lastBookmark, this._updateBookmark.bind(this));
115121
}
116122

123+
/**
124+
* Return the bookmark received following the last completed {@link Transaction}.
125+
*
126+
* @return a reference to a previous transac'tion
127+
*/
117128
lastBookmark() {
118129
return this._lastBookmark;
119130
}
120131

132+
/**
133+
* Execute given unit of work in a {@link Driver#READ} transaction.
134+
*
135+
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
136+
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
137+
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
138+
* <code>maxTransactionRetryTime</code> property in milliseconds.
139+
*
140+
* @param {function(Transaction)} transactionWork - callback that executes operations against
141+
* a given {@link Transaction}.
142+
* @return {Promise} resolved promise as returned by the given function or rejected promise when given
143+
* function or commit fails.
144+
*/
145+
readTransaction(transactionWork) {
146+
return this._runTransaction(READ, transactionWork);
147+
}
148+
149+
/**
150+
* Execute given unit of work in a {@link Driver#WRITE} transaction.
151+
*
152+
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
153+
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
154+
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
155+
* <code>maxTransactionRetryTime</code> property in milliseconds.
156+
*
157+
* @param {function(Transaction)} transactionWork - callback that executes operations against
158+
* a given {@link Transaction}.
159+
* @return {Promise} resolved promise as returned by the given function or rejected promise when given
160+
* function or commit fails.
161+
*/
162+
writeTransaction(transactionWork) {
163+
return this._runTransaction(WRITE, transactionWork);
164+
}
165+
166+
_runTransaction(accessMode, transactionWork) {
167+
return this._transactionExecutor.execute(
168+
() => this._beginTransaction(accessMode, this.lastBookmark()),
169+
transactionWork
170+
);
171+
}
172+
121173
_updateBookmark(newBookmark) {
122174
if (newBookmark) {
123175
this._lastBookmark = newBookmark;
@@ -132,6 +184,7 @@ class Session {
132184
close(callback = (() => null)) {
133185
if (this._open) {
134186
this._open = false;
187+
this._transactionExecutor.close();
135188
this._readConnectionHolder.close().then(() => {
136189
this._writeConnectionHolder.close().then(() => {
137190
callback();
@@ -180,4 +233,9 @@ class _RunObserver extends StreamObserver {
180233
}
181234
}
182235

236+
function _createTransactionExecutor(config) {
237+
const maxRetryTimeMs = (config && config.maxTransactionRetryTime) ? config.maxTransactionRetryTime : null;
238+
return new TransactionExecutor(maxRetryTimeMs);
239+
}
240+
183241
export default Session;

0 commit comments

Comments
 (0)