diff --git a/gulpfile.babel.js b/gulpfile.babel.js
index 031dc5e6e..5850a60f3 100644
--- a/gulpfile.babel.js
+++ b/gulpfile.babel.js
@@ -164,23 +164,17 @@ gulp.task('test', function(cb){
gulp.task('test-nodejs', ['install-driver-into-sandbox'], function () {
return gulp.src('test/**/*.test.js')
- .pipe(jasmine({
- // reporter: new reporters.JUnitXmlReporter({
- // savePath: "build/nodejs-test-reports",
- // consolidateAll: false
- // }),
- includeStackTrace: true
- }));
+ .pipe(jasmine({
+ includeStackTrace: true,
+ verbose: true
+ }));
});
gulp.task('test-boltkit', ['nodejs'], function () {
return gulp.src('test/**/*.boltkit.it.js')
.pipe(jasmine({
- // reporter: new reporters.JUnitXmlReporter({
- // savePath: "build/nodejs-test-reports",
- // consolidateAll: false
- // }),
- includeStackTrace: true
+ includeStackTrace: true,
+ verbose: true
}));
});
diff --git a/src/v1/driver.js b/src/v1/driver.js
index 2cede31c8..c5a732d41 100644
--- a/src/v1/driver.js
+++ b/src/v1/driver.js
@@ -115,7 +115,7 @@ class Driver {
*/
session(mode, bookmark) {
const sessionMode = Driver._validateSessionMode(mode);
- return this._createSession(sessionMode, this._connectionProvider, bookmark);
+ return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config);
}
static _validateSessionMode(rawMode) {
@@ -132,8 +132,8 @@ class Driver {
}
//Extension point
- _createSession(mode, connectionProvider, bookmark) {
- return new Session(mode, connectionProvider, bookmark);
+ _createSession(mode, connectionProvider, bookmark, config) {
+ return new Session(mode, connectionProvider, bookmark, config);
}
_driverOnErrorCallback(error) {
diff --git a/src/v1/index.js b/src/v1/index.js
index bf5031e79..a9425642e 100644
--- a/src/v1/index.js
+++ b/src/v1/index.js
@@ -100,6 +100,16 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
* // port, and this is then used to verify the host certificate does not change.
* // This setting has no effect unless TRUST_ON_FIRST_USE is enabled.
* knownHosts:"~/.neo4j/known_hosts",
+ *
+ * // The max number of connections that are allowed idle in the pool at any time.
+ * // Connection will be destroyed if this threshold is exceeded.
+ * connectionPoolSize: 50,
+ *
+ * // Specify the maximum time in milliseconds transactions are allowed to retry via
+ * // {@link Session#readTransaction()} and {@link Session#writeTransaction()} functions. These functions
+ * // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with
+ * // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds.
+ * maxTransactionRetryTime: 30000,
* }
*
* @param {string} url The URL for the Neo4j database, for instance "bolt://localhost"
diff --git a/src/v1/internal/transaction-executor.js b/src/v1/internal/transaction-executor.js
new file mode 100644
index 000000000..43a889707
--- /dev/null
+++ b/src/v1/internal/transaction-executor.js
@@ -0,0 +1,149 @@
+/**
+ * Copyright (c) 2002-2017 "Neo Technology,","
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
+
+const DEFAULT_MAX_RETRY_TIME_MS = 30 * 1000; // 30 seconds
+const DEFAULT_INITIAL_RETRY_DELAY_MS = 1000; // 1 seconds
+const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0;
+const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2;
+
+export default class TransactionExecutor {
+
+ constructor(maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor) {
+ this._maxRetryTimeMs = _valueOrDefault(maxRetryTimeMs, DEFAULT_MAX_RETRY_TIME_MS);
+ this._initialRetryDelayMs = _valueOrDefault(initialRetryDelayMs, DEFAULT_INITIAL_RETRY_DELAY_MS);
+ this._multiplier = _valueOrDefault(multiplier, DEFAULT_RETRY_DELAY_MULTIPLIER);
+ this._jitterFactor = _valueOrDefault(jitterFactor, DEFAULT_RETRY_DELAY_JITTER_FACTOR);
+
+ this._inFlightTimeoutIds = [];
+
+ this._verifyAfterConstruction();
+ }
+
+ execute(transactionCreator, transactionWork) {
+ return new Promise((resolve, reject) => {
+ this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
+ }).catch(error => {
+ const retryStartTimeMs = Date.now();
+ const retryDelayMs = this._initialRetryDelayMs;
+ return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTimeMs, retryDelayMs);
+ });
+ }
+
+ close() {
+ // cancel all existing timeouts to prevent further retries
+ this._inFlightTimeoutIds.forEach(timeoutId => clearTimeout(timeoutId));
+ this._inFlightTimeoutIds = [];
+ }
+
+ _retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, retryDelayMs) {
+ const elapsedTimeMs = Date.now() - retryStartTime;
+
+ if (elapsedTimeMs > this._maxRetryTimeMs || !TransactionExecutor._canRetryOn(error)) {
+ return Promise.reject(error);
+ }
+
+ return new Promise((resolve, reject) => {
+ const nextRetryTime = this._computeDelayWithJitter(retryDelayMs);
+ const timeoutId = setTimeout(() => {
+ // filter out this timeoutId when time has come and function is being executed
+ this._inFlightTimeoutIds = this._inFlightTimeoutIds.filter(id => id !== timeoutId);
+ this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject);
+ }, nextRetryTime);
+ // add newly created timeoutId to the list of all in-flight timeouts
+ this._inFlightTimeoutIds.push(timeoutId);
+ }).catch(error => {
+ const nextRetryDelayMs = retryDelayMs * this._multiplier;
+ return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, nextRetryDelayMs);
+ });
+ }
+
+ _executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) {
+ try {
+ const tx = transactionCreator();
+ const transactionWorkResult = transactionWork(tx);
+
+ // user defined callback is supposed to return a promise, but it might not; so to protect against an
+ // incorrect API usage we wrap the returned value with a resolved promise; this is effectively a
+ // validation step without type checks
+ const resultPromise = Promise.resolve(transactionWorkResult);
+
+ resultPromise.then(result => {
+ if (tx.isOpen()) {
+ // transaction work returned resolved promise and transaction has not been committed/rolled back
+ // try to commit the transaction
+ tx.commit().then(() => {
+ // transaction was committed, return result to the user
+ resolve(result);
+ }).catch(error => {
+ // transaction failed to commit, propagate the failure
+ reject(error);
+ });
+ } else {
+ // transaction work returned resolved promise and transaction is already committed/rolled back
+ // return the result returned by given transaction work
+ resolve(result);
+ }
+ }).catch(error => {
+ // transaction work returned rejected promise, propagate the failure
+ reject(error);
+ });
+
+ } catch (error) {
+ reject(error);
+ }
+ }
+
+ _computeDelayWithJitter(delayMs) {
+ const jitter = (delayMs * this._jitterFactor);
+ const min = delayMs - jitter;
+ const max = delayMs + jitter;
+ return Math.random() * (max - min) + min;
+ }
+
+ static _canRetryOn(error) {
+ return error && error.code &&
+ (error.code === SERVICE_UNAVAILABLE ||
+ error.code === SESSION_EXPIRED ||
+ error.code.indexOf('TransientError') >= 0);
+ }
+
+ _verifyAfterConstruction() {
+ if (this._maxRetryTimeMs < 0) {
+ throw newError('Max retry time should be >= 0: ' + this._maxRetryTimeMs);
+ }
+ if (this._initialRetryDelayMs < 0) {
+ throw newError('Initial retry delay should >= 0: ' + this._initialRetryDelayMs);
+ }
+ if (this._multiplier < 1.0) {
+ throw newError('Multiplier should be >= 1.0: ' + this._multiplier);
+ }
+ if (this._jitterFactor < 0 || this._jitterFactor > 1) {
+ throw newError('Jitter factor should be in [0.0, 1.0]: ' + this._jitterFactor);
+ }
+ }
+};
+
+function _valueOrDefault(value, defaultValue) {
+ if (value || value === 0) {
+ return value;
+ }
+ return defaultValue;
+}
diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js
index e3be9a561..0a3a2e17a 100644
--- a/src/v1/routing-driver.js
+++ b/src/v1/routing-driver.js
@@ -35,8 +35,8 @@ class RoutingDriver extends Driver {
return new LoadBalancer(address, connectionPool, driverOnErrorCallback);
}
- _createSession(mode, connectionProvider, bookmark) {
- return new RoutingSession(mode, connectionProvider, bookmark, (error, conn) => {
+ _createSession(mode, connectionProvider, bookmark, config) {
+ return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => {
if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) {
// connection is undefined if error happened before connection was acquired
if (conn) {
@@ -66,8 +66,8 @@ class RoutingDriver extends Driver {
}
class RoutingSession extends Session {
- constructor(mode, connectionProvider, bookmark, onFailedConnection) {
- super(mode, connectionProvider, bookmark);
+ constructor(mode, connectionProvider, bookmark, config, onFailedConnection) {
+ super(mode, connectionProvider, bookmark, config);
this._onFailedConnection = onFailedConnection;
}
diff --git a/src/v1/session.js b/src/v1/session.js
index d7c22a620..aec5a3504 100644
--- a/src/v1/session.js
+++ b/src/v1/session.js
@@ -22,7 +22,8 @@ import Transaction from './transaction';
import {newError} from './error';
import {assertString} from './internal/util';
import ConnectionHolder from './internal/connection-holder';
-import {READ, WRITE} from './driver';
+import Driver, {READ, WRITE} from './driver';
+import TransactionExecutor from './internal/transaction-executor';
/**
* A Session instance is used for handling the connection and
@@ -36,15 +37,17 @@ class Session {
* @constructor
* @param {string} mode the default access mode for this session.
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
- * @param {string} bookmark - the initial bookmark for this session.
+ * @param {string} [bookmark=undefined] - the initial bookmark for this session.
+ * @param {Object} [config={}] - this driver configuration.
*/
- constructor(mode, connectionProvider, bookmark) {
+ constructor(mode, connectionProvider, bookmark, config) {
this._mode = mode;
this._readConnectionHolder = new ConnectionHolder(READ, connectionProvider);
this._writeConnectionHolder = new ConnectionHolder(WRITE, connectionProvider);
this._open = true;
this._hasTx = false;
this._lastBookmark = bookmark;
+ this._transactionExecutor = _createTransactionExecutor(config);
}
/**
@@ -92,21 +95,24 @@ class Session {
* @returns {Transaction} - New Transaction
*/
beginTransaction(bookmark) {
+ return this._beginTransaction(this._mode, bookmark);
+ }
+
+ _beginTransaction(accessMode, bookmark) {
if (bookmark) {
assertString(bookmark, 'Bookmark');
this._updateBookmark(bookmark);
}
if (this._hasTx) {
- throw newError("You cannot begin a transaction on a session with an "
- + "open transaction; either run from within the transaction or use a "
- + "different session.")
+ throw newError('You cannot begin a transaction on a session with an open transaction; ' +
+ 'either run from within the transaction or use a different session.');
}
- this._hasTx = true;
-
- const connectionHolder = this._connectionHolderWithMode(this._mode);
+ const mode = Driver._validateSessionMode(accessMode);
+ const connectionHolder = this._connectionHolderWithMode(mode);
connectionHolder.initializeConnection();
+ this._hasTx = true;
return new Transaction(connectionHolder, () => {
this._hasTx = false;
@@ -114,10 +120,56 @@ class Session {
this._onRunFailure(), this._lastBookmark, this._updateBookmark.bind(this));
}
+ /**
+ * Return the bookmark received following the last completed {@link Transaction}.
+ *
+ * @return a reference to a previous transac'tion
+ */
lastBookmark() {
return this._lastBookmark;
}
+ /**
+ * Execute given unit of work in a {@link Driver#READ} transaction.
+ *
+ * Transaction will automatically be committed unless the given function throws or returns a rejected promise.
+ * Some failures of the given function or the commit itself will be retried with exponential backoff with initial
+ * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
+ * maxTransactionRetryTime
property in milliseconds.
+ *
+ * @param {function(Transaction)} transactionWork - callback that executes operations against
+ * a given {@link Transaction}.
+ * @return {Promise} resolved promise as returned by the given function or rejected promise when given
+ * function or commit fails.
+ */
+ readTransaction(transactionWork) {
+ return this._runTransaction(READ, transactionWork);
+ }
+
+ /**
+ * Execute given unit of work in a {@link Driver#WRITE} transaction.
+ *
+ * Transaction will automatically be committed unless the given function throws or returns a rejected promise.
+ * Some failures of the given function or the commit itself will be retried with exponential backoff with initial
+ * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
+ * maxTransactionRetryTime
property in milliseconds.
+ *
+ * @param {function(Transaction)} transactionWork - callback that executes operations against
+ * a given {@link Transaction}.
+ * @return {Promise} resolved promise as returned by the given function or rejected promise when given
+ * function or commit fails.
+ */
+ writeTransaction(transactionWork) {
+ return this._runTransaction(WRITE, transactionWork);
+ }
+
+ _runTransaction(accessMode, transactionWork) {
+ return this._transactionExecutor.execute(
+ () => this._beginTransaction(accessMode, this.lastBookmark()),
+ transactionWork
+ );
+ }
+
_updateBookmark(newBookmark) {
if (newBookmark) {
this._lastBookmark = newBookmark;
@@ -132,6 +184,7 @@ class Session {
close(callback = (() => null)) {
if (this._open) {
this._open = false;
+ this._transactionExecutor.close();
this._readConnectionHolder.close().then(() => {
this._writeConnectionHolder.close().then(() => {
callback();
@@ -180,4 +233,9 @@ class _RunObserver extends StreamObserver {
}
}
+function _createTransactionExecutor(config) {
+ const maxRetryTimeMs = (config && config.maxTransactionRetryTime) ? config.maxTransactionRetryTime : null;
+ return new TransactionExecutor(maxRetryTimeMs);
+}
+
export default Session;
diff --git a/src/v1/transaction.js b/src/v1/transaction.js
index cb3cd6892..cfbb3d003 100644
--- a/src/v1/transaction.js
+++ b/src/v1/transaction.js
@@ -61,7 +61,7 @@ class Transaction {
* or with the statement and parameters as separate arguments.
* @param {mixed} statement - Cypher statement to execute
* @param {Object} parameters - Map with parameters to use in statement
- * @return {Result} - New Result
+ * @return {Result} New Result
*/
run(statement, parameters) {
if(typeof statement === 'object' && statement.text) {
@@ -78,7 +78,7 @@ class Transaction {
*
* After committing the transaction can no longer be used.
*
- * @returns {Result} - New Result
+ * @returns {Result} New Result
*/
commit() {
let committed = this._state.commit(this._connectionHolder, new _TransactionStreamObserver(this));
@@ -93,7 +93,7 @@ class Transaction {
*
* After rolling back, the transaction can no longer be used.
*
- * @returns {Result} - New Result
+ * @returns {Result} New Result
*/
rollback() {
let committed = this._state.rollback(this._connectionHolder, new _TransactionStreamObserver(this));
@@ -103,15 +103,30 @@ class Transaction {
return committed.result;
}
+ /**
+ * Check if this transaction is active, which means commit and rollback did not happen.
+ * @return {boolean} true
when not committed and not rolled back, false
otherwise.
+ */
+ isOpen() {
+ return this._state == _states.ACTIVE;
+ }
+
_onError() {
- // rollback explicitly if tx.run failed, rollback
- if (this._state == _states.ACTIVE) {
- this.rollback();
+ if (this.isOpen()) {
+ // attempt to rollback, useful when Transaction#run() failed
+ return this.rollback().catch(ignoredError => {
+ // ignore all errors because it is best effort and transaction might already be rolled back
+ }).then(() => {
+ // after rollback attempt change this transaction's state to FAILED
+ this._state = _states.FAILED;
+ });
} else {
- // else just do the cleanup
- this._onClose();
+ // error happened in in-active transaction, just to the cleanup and change state to FAILED
+ this._state = _states.FAILED;
+ this._onClose();
+ // no async actions needed - return resolved promise
+ return Promise.resolve();
}
- this._state = _states.FAILED;
}
}
@@ -126,9 +141,10 @@ class _TransactionStreamObserver extends StreamObserver {
onError(error) {
if (!this._hasFailed) {
- this._tx._onError();
- super.onError(error);
- this._hasFailed = true;
+ this._tx._onError().then(() => {
+ super.onError(error);
+ this._hasFailed = true;
+ });
}
}
@@ -149,11 +165,11 @@ let _states = {
//The transaction is running with no explicit success or failure marked
ACTIVE: {
commit: (connectionHolder, observer) => {
- return {result: _runDiscardAll("COMMIT", connectionHolder, observer),
+ return {result: _runPullAll("COMMIT", connectionHolder, observer),
state: _states.SUCCEEDED}
},
rollback: (connectionHolder, observer) => {
- return {result: _runDiscardAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK};
+ return {result: _runPullAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK};
},
run: (connectionHolder, observer, statement, parameters) => {
connectionHolder.getConnection().then(conn => {
@@ -234,7 +250,7 @@ let _states = {
}
};
-function _runDiscardAll(msg, connectionHolder, observer) {
+function _runPullAll(msg, connectionHolder, observer) {
connectionHolder.getConnection().then(
conn => {
observer.resolveConnection(conn);
diff --git a/test/internal/timers-util.js b/test/internal/timers-util.js
new file mode 100644
index 000000000..a3452363f
--- /dev/null
+++ b/test/internal/timers-util.js
@@ -0,0 +1,86 @@
+/**
+ * Copyright (c) 2002-2017 "Neo Technology,","
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+class SetTimeoutMock {
+
+ constructor() {
+ this._clearState();
+ }
+
+ install() {
+ this._originalSetTimeout = global.setTimeout;
+ global.setTimeout = (code, delay) => {
+ if (!this._paused) {
+ code();
+ this.invocationDelays.push(delay);
+ }
+ return this._timeoutIdCounter++;
+ };
+
+ this._originalClearTimeout = global.clearTimeout;
+ global.clearTimeout = id => {
+ this.clearedTimeouts.push(id);
+ };
+
+ return this;
+ }
+
+ pause() {
+ this._paused = true;
+ }
+
+ uninstall() {
+ global.setTimeout = this._originalSetTimeout;
+ global.clearTimeout = this._originalClearTimeout;
+ this._clearState();
+ }
+
+ setTimeoutOriginal(code, delay) {
+ return this._originalSetTimeout.call(null, code, delay);
+ }
+
+ _clearState() {
+ this._originalSetTimeout = null;
+ this._originalClearTimeout = null;
+ this._paused = false;
+ this._timeoutIdCounter = 0;
+
+ this.invocationDelays = [];
+ this.clearedTimeouts = [];
+ }
+}
+
+export const setTimeoutMock = new SetTimeoutMock();
+
+export function hijackNextDateNowCall(newValue) {
+ const originalDate = global.Date;
+ global.Date = new FakeDate(originalDate, newValue);
+}
+
+class FakeDate {
+
+ constructor(originalDate, nextNowValue) {
+ this._originalDate = originalDate;
+ this._nextNowValue = nextNowValue;
+ }
+
+ now() {
+ global.Date = this._originalDate;
+ return this._nextNowValue;
+ }
+}
diff --git a/test/internal/transaction-executor.test.js b/test/internal/transaction-executor.test.js
new file mode 100644
index 000000000..5ee75b738
--- /dev/null
+++ b/test/internal/transaction-executor.test.js
@@ -0,0 +1,349 @@
+/**
+ * Copyright (c) 2002-2017 "Neo Technology,","
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import TransactionExecutor from '../../src/v1/internal/transaction-executor';
+import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error';
+import {hijackNextDateNowCall, setTimeoutMock} from './timers-util';
+
+const TRANSIENT_ERROR_1 = 'Neo.TransientError.Transaction.DeadlockDetected';
+const TRANSIENT_ERROR_2 = 'Neo.TransientError.Network.CommunicationError';
+const UNKNOWN_ERROR = 'Neo.DatabaseError.General.UnknownError';
+const OOM_ERROR = 'Neo.DatabaseError.General.OutOfMemoryError';
+
+describe('TransactionExecutor', () => {
+
+ let fakeSetTimeout;
+
+ beforeEach(() => {
+ fakeSetTimeout = setTimeoutMock.install();
+ });
+
+ afterEach(() => {
+ fakeSetTimeout.uninstall();
+ });
+
+ it('should retry when transaction work returns promise rejected with SERVICE_UNAVAILABLE', done => {
+ testRetryWhenTransactionWorkReturnsRejectedPromise([SERVICE_UNAVAILABLE], done);
+ });
+
+ it('should retry when transaction work returns promise rejected with SESSION_EXPIRED', done => {
+ testRetryWhenTransactionWorkReturnsRejectedPromise([SESSION_EXPIRED], done);
+ });
+
+ it('should retry when transaction work returns promise rejected with deadlock error', done => {
+ testRetryWhenTransactionWorkReturnsRejectedPromise([TRANSIENT_ERROR_1], done);
+ });
+
+ it('should retry when transaction work returns promise rejected with communication error', done => {
+ testRetryWhenTransactionWorkReturnsRejectedPromise([TRANSIENT_ERROR_2], done);
+ });
+
+ it('should not retry when transaction work returns promise rejected with OOM error', done => {
+ testNoRetryOnUnknownError([OOM_ERROR], 1, done);
+ });
+
+ it('should not retry when transaction work returns promise rejected with unknown error', done => {
+ testNoRetryOnUnknownError([UNKNOWN_ERROR], 1, done);
+ });
+
+ it('should stop retrying when time expires', done => {
+ const executor = new TransactionExecutor();
+ let workInvocationCounter = 0;
+ const realWork = transactionWork([SERVICE_UNAVAILABLE, SESSION_EXPIRED, TRANSIENT_ERROR_1, TRANSIENT_ERROR_2], 42);
+
+ const result = executor.execute(transactionCreator(), tx => {
+ expect(tx).toBeDefined();
+ workInvocationCounter++;
+ if (workInvocationCounter === 3) {
+ hijackNextDateNowCall(Date.now() + 30001); // move next `Date.now()` call forward by 30 seconds
+ }
+ return realWork();
+ });
+
+ result.catch(error => {
+ expect(workInvocationCounter).toEqual(3);
+ expect(error.code).toEqual(TRANSIENT_ERROR_1);
+ done();
+ });
+ });
+
+ it('should retry when given transaction creator throws once', done => {
+ testRetryWhenTransactionCreatorFails(
+ [SERVICE_UNAVAILABLE],
+ done
+ );
+ });
+
+ it('should retry when given transaction creator throws many times', done => {
+ testRetryWhenTransactionCreatorFails(
+ [SERVICE_UNAVAILABLE, SESSION_EXPIRED, TRANSIENT_ERROR_2, SESSION_EXPIRED, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_1],
+ done
+ );
+ });
+
+ it('should retry when given transaction work throws once', done => {
+ testRetryWhenTransactionWorkThrows([SERVICE_UNAVAILABLE], done);
+ });
+
+ it('should retry when given transaction work throws many times', done => {
+ testRetryWhenTransactionWorkThrows(
+ [SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, TRANSIENT_ERROR_2, SESSION_EXPIRED],
+ done
+ );
+ });
+
+ it('should retry when given transaction work returns rejected promise many times', done => {
+ testRetryWhenTransactionWorkReturnsRejectedPromise(
+ [SERVICE_UNAVAILABLE, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, SESSION_EXPIRED, TRANSIENT_ERROR_1, SESSION_EXPIRED],
+ done
+ );
+ });
+
+ it('should retry when transaction commit returns rejected promise once', done => {
+ testRetryWhenTransactionCommitReturnsRejectedPromise([TRANSIENT_ERROR_1], done);
+ });
+
+ it('should retry when transaction commit returns rejected promise multiple times', done => {
+ testRetryWhenTransactionCommitReturnsRejectedPromise(
+ [TRANSIENT_ERROR_1, TRANSIENT_ERROR_1, SESSION_EXPIRED, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2],
+ done
+ );
+ });
+
+ it('should retry until database error happens', done => {
+ testNoRetryOnUnknownError(
+ [SERVICE_UNAVAILABLE, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, SESSION_EXPIRED, UNKNOWN_ERROR, SESSION_EXPIRED],
+ 5,
+ done
+ );
+ });
+
+ it('should cancel in-flight timeouts when closed', done => {
+ const executor = new TransactionExecutor();
+ // do not execute setTimeout callbacks
+ fakeSetTimeout.pause();
+
+ executor.execute(transactionCreator([SERVICE_UNAVAILABLE]), () => Promise.resolve(42));
+ executor.execute(transactionCreator([TRANSIENT_ERROR_1]), () => Promise.resolve(4242));
+ executor.execute(transactionCreator([SESSION_EXPIRED]), () => Promise.resolve(424242));
+
+ fakeSetTimeout.setTimeoutOriginal(() => {
+ executor.close();
+ expect(fakeSetTimeout.clearedTimeouts.length).toEqual(3);
+ done();
+ }, 1000);
+ });
+
+ it('should allow zero max retry time', () => {
+ const executor = new TransactionExecutor(0);
+ expect(executor._maxRetryTimeMs).toEqual(0);
+ });
+
+ it('should allow zero initial delay', () => {
+ const executor = new TransactionExecutor(42, 0);
+ expect(executor._initialRetryDelayMs).toEqual(0);
+ });
+
+ it('should disallow zero multiplier', () => {
+ expect(() => new TransactionExecutor(42, 42, 0)).toThrow();
+ });
+
+ it('should allow zero jitter factor', () => {
+ const executor = new TransactionExecutor(42, 42, 42, 0);
+ expect(executor._jitterFactor).toEqual(0);
+ });
+
+ function testRetryWhenTransactionCreatorFails(errorCodes, done) {
+ const executor = new TransactionExecutor();
+ const transactionCreator = throwingTransactionCreator(errorCodes, new FakeTransaction());
+ let workInvocationCounter = 0;
+
+ const result = executor.execute(transactionCreator, tx => {
+ expect(tx).toBeDefined();
+ workInvocationCounter++;
+ return Promise.resolve(42);
+ });
+
+ result.then(value => {
+ expect(workInvocationCounter).toEqual(1);
+ expect(value).toEqual(42);
+ verifyRetryDelays(fakeSetTimeout, errorCodes.length);
+ done();
+ });
+ }
+
+ function testRetryWhenTransactionWorkReturnsRejectedPromise(errorCodes, done) {
+ const executor = new TransactionExecutor();
+ let workInvocationCounter = 0;
+ const realWork = transactionWork(errorCodes, 42);
+
+ const result = executor.execute(transactionCreator(), tx => {
+ expect(tx).toBeDefined();
+ workInvocationCounter++;
+ return realWork();
+ });
+
+ result.then(value => {
+ // work should have failed 'failures.length' times and succeeded 1 time
+ expect(workInvocationCounter).toEqual(errorCodes.length + 1);
+ expect(value).toEqual(42);
+ verifyRetryDelays(fakeSetTimeout, errorCodes.length);
+ done();
+ });
+ }
+
+ function testRetryWhenTransactionCommitReturnsRejectedPromise(errorCodes, done) {
+ const executor = new TransactionExecutor();
+ let workInvocationCounter = 0;
+ const realWork = () => Promise.resolve(4242);
+
+ const result = executor.execute(transactionCreator(errorCodes), tx => {
+ expect(tx).toBeDefined();
+ workInvocationCounter++;
+ return realWork();
+ });
+
+ result.then(value => {
+ // work should have failed 'failures.length' times and succeeded 1 time
+ expect(workInvocationCounter).toEqual(errorCodes.length + 1);
+ expect(value).toEqual(4242);
+ verifyRetryDelays(fakeSetTimeout, errorCodes.length);
+ done();
+ });
+ }
+
+ function testRetryWhenTransactionWorkThrows(errorCodes, done) {
+ const executor = new TransactionExecutor();
+ let workInvocationCounter = 0;
+ const realWork = throwingTransactionWork(errorCodes, 42);
+
+ const result = executor.execute(transactionCreator(), tx => {
+ expect(tx).toBeDefined();
+ workInvocationCounter++;
+ return realWork();
+ });
+
+ result.then(value => {
+ // work should have failed 'failures.length' times and succeeded 1 time
+ expect(workInvocationCounter).toEqual(errorCodes.length + 1);
+ expect(value).toEqual(42);
+ verifyRetryDelays(fakeSetTimeout, errorCodes.length);
+ done();
+ });
+ }
+
+ function testNoRetryOnUnknownError(errorCodes, expectedWorkInvocationCount, done) {
+ const executor = new TransactionExecutor();
+ let workInvocationCounter = 0;
+ const realWork = transactionWork(errorCodes, 42);
+
+ const result = executor.execute(transactionCreator(), tx => {
+ expect(tx).toBeDefined();
+ workInvocationCounter++;
+ return realWork();
+ });
+
+ result.catch(error => {
+ expect(workInvocationCounter).toEqual(expectedWorkInvocationCount);
+ if (errorCodes.length === 1) {
+ expect(error.code).toEqual(errorCodes[0]);
+ } else {
+ expect(error.code).toEqual(errorCodes[expectedWorkInvocationCount - 1]);
+ }
+ done();
+ });
+ }
+
+});
+
+function transactionCreator(commitErrorCodes) {
+ const remainingErrorCodes = (commitErrorCodes || []).slice().reverse();
+ return () => new FakeTransaction(remainingErrorCodes.pop());
+}
+
+function throwingTransactionCreator(errorCodes, result) {
+ const remainingErrorCodes = errorCodes.slice().reverse();
+ return () => {
+ if (remainingErrorCodes.length === 0) {
+ return result;
+ }
+ const errorCode = remainingErrorCodes.pop();
+ throw error(errorCode);
+ };
+}
+
+function throwingTransactionWork(errorCodes, result) {
+ const remainingErrorCodes = errorCodes.slice().reverse();
+ return () => {
+ if (remainingErrorCodes.length === 0) {
+ return Promise.resolve(result);
+ }
+ const errorCode = remainingErrorCodes.pop();
+ throw error(errorCode);
+ };
+}
+
+function transactionWork(errorCodes, result) {
+ const remainingErrorCodes = errorCodes.slice().reverse();
+ return () => {
+ if (remainingErrorCodes.length === 0) {
+ return Promise.resolve(result);
+ }
+ const errorCode = remainingErrorCodes.pop();
+ return Promise.reject(error(errorCode));
+ };
+}
+
+function error(code) {
+ return newError('', code);
+}
+
+function verifyRetryDelays(fakeSetTimeout, expectedInvocationCount) {
+ const delays = fakeSetTimeout.invocationDelays;
+ expect(delays.length).toEqual(expectedInvocationCount);
+ delays.forEach((delay, index) => {
+ // delays make a geometric progression with fist element 1000 and multiplier 2.0
+ // so expected delay can be calculated as n-th element: `firstElement * pow(multiplier, n - 1)`
+ const expectedDelayWithoutJitter = 1000 * Math.pow(2.0, index);
+ const jitter = expectedDelayWithoutJitter * 0.2;
+ const min = expectedDelayWithoutJitter - jitter;
+ const max = expectedDelayWithoutJitter + jitter;
+
+ expect(delay >= min).toBeTruthy();
+ expect(delay <= max).toBeTruthy();
+ });
+}
+
+class FakeTransaction {
+
+ constructor(commitErrorCode) {
+ this._commitErrorCode = commitErrorCode;
+ }
+
+ isOpen() {
+ return true;
+ }
+
+ commit() {
+ if (this._commitErrorCode) {
+ return Promise.reject(error(this._commitErrorCode));
+ }
+ return Promise.resolve();
+ }
+}
diff --git a/test/resources/boltkit/dead_server.script b/test/resources/boltkit/dead_read_server.script
similarity index 73%
rename from test/resources/boltkit/dead_server.script
rename to test/resources/boltkit/dead_read_server.script
index 037a6be65..4af7ef3b6 100644
--- a/test/resources/boltkit/dead_server.script
+++ b/test/resources/boltkit/dead_read_server.script
@@ -1,7 +1,8 @@
!: AUTO INIT
!: AUTO RESET
!: AUTO PULL_ALL
+!: AUTO RUN "BEGIN" {}
C: RUN "MATCH (n) RETURN n.name" {}
C: PULL_ALL
-S:
\ No newline at end of file
+S:
diff --git a/test/resources/boltkit/dead_write_server.script b/test/resources/boltkit/dead_write_server.script
new file mode 100644
index 000000000..ac36bc249
--- /dev/null
+++ b/test/resources/boltkit/dead_write_server.script
@@ -0,0 +1,8 @@
+!: AUTO INIT
+!: AUTO RESET
+!: AUTO PULL_ALL
+!: AUTO RUN "BEGIN" {}
+
+C: RUN "CREATE (n {name:'Bob'})" {}
+C: PULL_ALL
+S:
diff --git a/test/resources/boltkit/discover_servers.script b/test/resources/boltkit/discover_servers.script
index 6f92d458f..c1538995a 100644
--- a/test/resources/boltkit/discover_servers.script
+++ b/test/resources/boltkit/discover_servers.script
@@ -5,10 +5,5 @@
C: RUN "CALL dbms.cluster.routing.getServers" {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
- RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
+ RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9009"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
SUCCESS {}
-C: RUN "MATCH (n) RETURN n.name" {}
- PULL_ALL
-S: SUCCESS {"fields": ["n.name"]}
- SUCCESS {}
-
diff --git a/test/resources/boltkit/discover_servers_and_read.script b/test/resources/boltkit/discover_servers_and_read.script
new file mode 100644
index 000000000..6f92d458f
--- /dev/null
+++ b/test/resources/boltkit/discover_servers_and_read.script
@@ -0,0 +1,14 @@
+!: AUTO INIT
+!: AUTO RESET
+!: AUTO PULL_ALL
+
+C: RUN "CALL dbms.cluster.routing.getServers" {}
+ PULL_ALL
+S: SUCCESS {"fields": ["ttl", "servers"]}
+ RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
+ SUCCESS {}
+C: RUN "MATCH (n) RETURN n.name" {}
+ PULL_ALL
+S: SUCCESS {"fields": ["n.name"]}
+ SUCCESS {}
+
diff --git a/test/resources/boltkit/read_server.script b/test/resources/boltkit/read_server.script
index 0b4d44748..7fb3d6c81 100644
--- a/test/resources/boltkit/read_server.script
+++ b/test/resources/boltkit/read_server.script
@@ -1,6 +1,9 @@
!: AUTO INIT
!: AUTO RESET
!: AUTO PULL_ALL
+!: AUTO RUN "COMMIT" {}
+!: AUTO RUN "ROLLBACK" {}
+!: AUTO RUN "BEGIN" {}
C: RUN "MATCH (n) RETURN n.name" {}
PULL_ALL
@@ -8,4 +11,4 @@ S: SUCCESS {"fields": ["n.name"]}
RECORD ["Bob"]
RECORD ["Alice"]
RECORD ["Tina"]
- SUCCESS {}
\ No newline at end of file
+ SUCCESS {}
diff --git a/test/resources/boltkit/write_server.script b/test/resources/boltkit/write_server.script
index 993910f6c..4667e3609 100644
--- a/test/resources/boltkit/write_server.script
+++ b/test/resources/boltkit/write_server.script
@@ -1,8 +1,11 @@
!: AUTO INIT
!: AUTO RESET
!: AUTO PULL_ALL
+!: AUTO RUN "COMMIT" {}
+!: AUTO RUN "ROLLBACK" {}
+!: AUTO RUN "BEGIN" {}
C: RUN "CREATE (n {name:'Bob'})" {}
PULL_ALL
S: SUCCESS {}
- SUCCESS {}
\ No newline at end of file
+ SUCCESS {}
diff --git a/test/v1/examples.test.js b/test/v1/examples.test.js
index c41db24e8..136722999 100644
--- a/test/v1/examples.test.js
+++ b/test/v1/examples.test.js
@@ -97,7 +97,7 @@ describe('examples', function() {
});
});
- it('should be able to configure session pool size', function (done) {
+ it('should be able to configure connection pool size', function (done) {
var neo4j = neo4jv1;
// tag::configuration[]
var driver = neo4j.driver("bolt://localhost:7687", neo4j.auth.basic("neo4j", "neo4j"), {connectionPoolSize: 50});
@@ -118,6 +118,17 @@ describe('examples', function() {
});
});
+ it('should be able to configure maximum transaction retry time', function () {
+ var neo4j = neo4jv1;
+ // tag::configuration-transaction-retry-time[]
+ var maxRetryTimeMs = 45 * 1000; // 45 seconds
+ var driver = neo4j.driver('bolt://localhost:7687', neo4j.auth.basic('neo4j', 'neo4j'), {maxTransactionRetryTime: maxRetryTimeMs});
+ //end::configuration-transaction-retry-time[]
+
+ var session = driver.session();
+ expect(session._transactionExecutor._maxRetryTimeMs).toBe(maxRetryTimeMs);
+ });
+
it('should document a statement', function(done) {
var session = driverGlobal.session();
// tag::statement[]
diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js
index 75f4721e9..90d1d16a5 100644
--- a/test/v1/routing.driver.boltkit.it.js
+++ b/test/v1/routing.driver.boltkit.it.js
@@ -21,13 +21,15 @@ import neo4j from '../../src/v1';
import {READ, WRITE} from '../../src/v1/driver';
import boltkit from './boltkit';
import RoutingTable from '../../src/v1/internal/routing-table';
+import {SESSION_EXPIRED} from '../../src/v1/error';
+import {hijackNextDateNowCall} from '../internal/timers-util';
describe('routing driver', () => {
let originalTimeout;
beforeAll(() => {
originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL;
- jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000;
+ jasmine.DEFAULT_TIMEOUT_INTERVAL = 60000;
});
afterAll(() => {
@@ -41,7 +43,7 @@ describe('routing driver', () => {
}
// Given
const kit = new boltkit.BoltKit();
- const server = kit.start('./test/resources/boltkit/discover_servers.script', 9001);
+ const server = kit.start('./test/resources/boltkit/discover_servers_and_read.script', 9001);
kit.run(() => {
const driver = newDriver("bolt+routing://127.0.0.1:9001");
@@ -323,7 +325,7 @@ describe('routing driver', () => {
// Given
const kit = new boltkit.BoltKit();
const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
- const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005);
+ const readServer = kit.start('./test/resources/boltkit/dead_read_server.script', 9005);
kit.run(() => {
const driver = newDriver("bolt+routing://127.0.0.1:9001");
@@ -415,7 +417,7 @@ describe('routing driver', () => {
// Given
const kit = new boltkit.BoltKit();
const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
- const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9007);
+ const readServer = kit.start('./test/resources/boltkit/dead_read_server.script', 9007);
kit.run(() => {
const driver = newDriver("bolt+routing://127.0.0.1:9001");
@@ -475,7 +477,7 @@ describe('routing driver', () => {
// Given
const kit = new boltkit.BoltKit();
const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
- const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005);
+ const readServer = kit.start('./test/resources/boltkit/dead_read_server.script', 9005);
kit.run(() => {
const driver = newDriver("bolt+routing://127.0.0.1:9001");
@@ -1181,6 +1183,287 @@ describe('routing driver', () => {
});
});
+ it('should retry read transaction until success', done => {
+ if (!boltkit.BoltKitSupport) {
+ done();
+ return;
+ }
+
+ const kit = new boltkit.BoltKit();
+ const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
+ const brokenReader = kit.start('./test/resources/boltkit/dead_read_server.script', 9005);
+ const reader = kit.start('./test/resources/boltkit/read_server.script', 9006);
+
+ kit.run(() => {
+ const driver = newDriver('bolt+routing://127.0.0.1:9001');
+ const session = driver.session();
+
+ let invocations = 0;
+ const resultPromise = session.readTransaction(tx => {
+ invocations++;
+ return tx.run('MATCH (n) RETURN n.name');
+ });
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(3);
+ expect(invocations).toEqual(2);
+
+ session.close(() => {
+ driver.close();
+ router.exit(code1 => {
+ brokenReader.exit(code2 => {
+ reader.exit(code3 => {
+ expect(code1).toEqual(0);
+ expect(code2).toEqual(0);
+ expect(code3).toEqual(0);
+ done();
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+
+ it('should retry write transaction until success', done => {
+ if (!boltkit.BoltKitSupport) {
+ done();
+ return;
+ }
+
+ const kit = new boltkit.BoltKit();
+ const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
+ const brokenWriter = kit.start('./test/resources/boltkit/dead_write_server.script', 9007);
+ const writer = kit.start('./test/resources/boltkit/write_server.script', 9008);
+
+ kit.run(() => {
+ const driver = newDriver('bolt+routing://127.0.0.1:9001');
+ const session = driver.session();
+
+ let invocations = 0;
+ const resultPromise = session.writeTransaction(tx => {
+ invocations++;
+ return tx.run('CREATE (n {name:\'Bob\'})');
+ });
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(0);
+ expect(invocations).toEqual(2);
+
+ session.close(() => {
+ driver.close();
+ router.exit(code1 => {
+ brokenWriter.exit(code2 => {
+ writer.exit(code3 => {
+ expect(code1).toEqual(0);
+ expect(code2).toEqual(0);
+ expect(code3).toEqual(0);
+ done();
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+
+ it('should retry read transaction until failure', done => {
+ if (!boltkit.BoltKitSupport) {
+ done();
+ return;
+ }
+
+ const kit = new boltkit.BoltKit();
+ const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
+ const brokenReader1 = kit.start('./test/resources/boltkit/dead_read_server.script', 9005);
+ const brokenReader2 = kit.start('./test/resources/boltkit/dead_read_server.script', 9006);
+
+ kit.run(() => {
+ const driver = newDriver('bolt+routing://127.0.0.1:9001');
+ const session = driver.session();
+
+ let invocations = 0;
+ const resultPromise = session.readTransaction(tx => {
+ invocations++;
+ if (invocations === 2) {
+ // make retries stop after two invocations
+ moveNextDateNow30SecondsForward();
+ }
+ return tx.run('MATCH (n) RETURN n.name');
+ });
+
+ resultPromise.catch(error => {
+ expect(error.code).toEqual(SESSION_EXPIRED);
+ expect(invocations).toEqual(2);
+
+ session.close(() => {
+ driver.close();
+ router.exit(code1 => {
+ brokenReader1.exit(code2 => {
+ brokenReader2.exit(code3 => {
+ expect(code1).toEqual(0);
+ expect(code2).toEqual(0);
+ expect(code3).toEqual(0);
+ done();
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+
+ it('should retry write transaction until failure', done => {
+ if (!boltkit.BoltKitSupport) {
+ done();
+ return;
+ }
+
+ const kit = new boltkit.BoltKit();
+ const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
+ const brokenWriter1 = kit.start('./test/resources/boltkit/dead_write_server.script', 9007);
+ const brokenWriter2 = kit.start('./test/resources/boltkit/dead_write_server.script', 9008);
+
+ kit.run(() => {
+ const driver = newDriver('bolt+routing://127.0.0.1:9001');
+ const session = driver.session();
+
+ let invocations = 0;
+ const resultPromise = session.writeTransaction(tx => {
+ invocations++;
+ if (invocations === 2) {
+ // make retries stop after two invocations
+ moveNextDateNow30SecondsForward();
+ }
+ return tx.run('CREATE (n {name:\'Bob\'})');
+ });
+
+ resultPromise.catch(error => {
+ expect(error.code).toEqual(SESSION_EXPIRED);
+ expect(invocations).toEqual(2);
+
+ session.close(() => {
+ driver.close();
+ router.exit(code1 => {
+ brokenWriter1.exit(code2 => {
+ brokenWriter2.exit(code3 => {
+ expect(code1).toEqual(0);
+ expect(code2).toEqual(0);
+ expect(code3).toEqual(0);
+ done();
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+
+ it('should retry read transaction and perform rediscovery until success', done => {
+ if (!boltkit.BoltKitSupport) {
+ done();
+ return;
+ }
+
+ const kit = new boltkit.BoltKit();
+ const router1 = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9010);
+ const brokenReader1 = kit.start('./test/resources/boltkit/dead_read_server.script', 9005);
+ const brokenReader2 = kit.start('./test/resources/boltkit/dead_read_server.script', 9006);
+ const router2 = kit.start('./test/resources/boltkit/discover_servers.script', 9001);
+ const reader = kit.start('./test/resources/boltkit/read_server.script', 9002);
+
+ kit.run(() => {
+ const driver = newDriver('bolt+routing://127.0.0.1:9010');
+ const session = driver.session();
+
+ let invocations = 0;
+ const resultPromise = session.readTransaction(tx => {
+ invocations++;
+ return tx.run('MATCH (n) RETURN n.name');
+ });
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(3);
+ expect(invocations).toEqual(3);
+
+ session.close(() => {
+ driver.close();
+ router1.exit(code1 => {
+ brokenReader1.exit(code2 => {
+ brokenReader2.exit(code3 => {
+ router2.exit(code4 => {
+ reader.exit(code5 => {
+ expect(code1).toEqual(0);
+ expect(code2).toEqual(0);
+ expect(code3).toEqual(0);
+ expect(code4).toEqual(0);
+ expect(code5).toEqual(0);
+ done();
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+
+ it('should retry write transaction and perform rediscovery until success', done => {
+ if (!boltkit.BoltKitSupport) {
+ done();
+ return;
+ }
+
+ const kit = new boltkit.BoltKit();
+ const router1 = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9010);
+ const brokenWriter1 = kit.start('./test/resources/boltkit/dead_write_server.script', 9007);
+ const brokenWriter2 = kit.start('./test/resources/boltkit/dead_write_server.script', 9008);
+ const router2 = kit.start('./test/resources/boltkit/discover_servers.script', 9002);
+ const writer = kit.start('./test/resources/boltkit/write_server.script', 9009);
+
+ kit.run(() => {
+ const driver = newDriver('bolt+routing://127.0.0.1:9010');
+ const session = driver.session();
+
+ let invocations = 0;
+ const resultPromise = session.writeTransaction(tx => {
+ invocations++;
+ return tx.run('CREATE (n {name:\'Bob\'})');
+ });
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(0);
+ expect(invocations).toEqual(3);
+
+ session.close(() => {
+ driver.close();
+ router1.exit(code1 => {
+ brokenWriter1.exit(code2 => {
+ brokenWriter2.exit(code3 => {
+ router2.exit(code4 => {
+ writer.exit(code5 => {
+ expect(code1).toEqual(0);
+ expect(code2).toEqual(0);
+ expect(code3).toEqual(0);
+ expect(code4).toEqual(0);
+ expect(code5).toEqual(0);
+ done();
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+
+ function moveNextDateNow30SecondsForward() {
+ const currentTime = Date.now();
+ hijackNextDateNowCall(currentTime + 30 * 1000 + 1);
+ }
+
function testWriteSessionWithAccessModeAndBookmark(accessMode, bookmark, done) {
if (!boltkit.BoltKitSupport) {
done();
diff --git a/test/v1/session.test.js b/test/v1/session.test.js
index 22402e36a..10084a6b5 100644
--- a/test/v1/session.test.js
+++ b/test/v1/session.test.js
@@ -28,13 +28,13 @@ describe('session', () => {
let driver;
let session;
- let server;
+ let serverMetadata;
let originalTimeout;
beforeEach(done => {
driver = neo4j.driver('bolt://localhost', neo4j.auth.basic('neo4j', 'neo4j'));
driver.onCompleted = meta => {
- server = meta['server'];
+ serverMetadata = meta['server'];
};
session = driver.session();
originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL;
@@ -86,6 +86,23 @@ describe('session', () => {
});
});
+ it('should close transaction executor', done => {
+ const session = newSessionWithConnection(new FakeConnection());
+
+ let closeCalledTimes = 0;
+ const transactionExecutor = session._transactionExecutor;
+ const originalClose = transactionExecutor.close;
+ transactionExecutor.close = () => {
+ closeCalledTimes++;
+ originalClose.call(transactionExecutor);
+ };
+
+ session.close(() => {
+ expect(closeCalledTimes).toEqual(1);
+ done();
+ });
+ });
+
it('should be possible to close driver after closing session with failed tx ', done => {
const driver = neo4j.driver('bolt://localhost', neo4j.auth.basic('neo4j', 'neo4j'));
const session = driver.session();
@@ -211,11 +228,7 @@ describe('session', () => {
});
it('should expose server info on successful query', done => {
- //lazy way of checking the version number
- //if server has been set we know it is at least
- //3.1 (todo actually parse the version string)
- if (!server) {
- done();
+ if (!serverIs31OrLater(done)) {
return;
}
@@ -234,14 +247,10 @@ describe('session', () => {
});
it('should expose execution time information when using 3.1 and onwards', done => {
-
- //lazy way of checking the version number
- //if server has been set we know it is at least
- //3.1 (todo actually parse the version string)
- if (!server) {
- done();
+ if (!serverIs31OrLater(done)) {
return;
}
+
// Given
const statement = 'UNWIND range(1,10000) AS n RETURN n AS number';
// When & Then
@@ -563,6 +572,10 @@ describe('session', () => {
});
it('should update last bookmark after every read tx commit', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
const bookmarkBefore = session.lastBookmark();
const tx = session.beginTransaction();
@@ -583,6 +596,10 @@ describe('session', () => {
});
it('should update last bookmark after every write tx commit', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
const bookmarkBefore = session.lastBookmark();
const tx = session.beginTransaction();
@@ -599,6 +616,10 @@ describe('session', () => {
});
it('should not lose last bookmark after run', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
const tx = session.beginTransaction();
tx.run('CREATE ()').then(() => {
tx.commit().then(() => {
@@ -615,6 +636,184 @@ describe('session', () => {
});
});
+ it('should commit read transaction', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
+ const bookmarkBefore = session.lastBookmark();
+ const resultPromise = session.readTransaction(tx => tx.run('RETURN 42 AS answer'));
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(1);
+ expect(result.records[0].get('answer').toNumber()).toEqual(42);
+
+ const bookmarkAfter = session.lastBookmark();
+ verifyBookmark(bookmarkAfter);
+ expect(bookmarkAfter).not.toEqual(bookmarkBefore);
+
+ done();
+ });
+ });
+
+ it('should commit write transaction', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
+ const bookmarkBefore = session.lastBookmark();
+ const resultPromise = session.writeTransaction(tx => tx.run('CREATE (n:Node {id: 42}) RETURN n.id AS answer'));
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(1);
+ expect(result.records[0].get('answer').toNumber()).toEqual(42);
+ expect(result.summary.counters.nodesCreated()).toEqual(1);
+
+ const bookmarkAfter = session.lastBookmark();
+ verifyBookmark(bookmarkAfter);
+ expect(bookmarkAfter).not.toEqual(bookmarkBefore);
+
+ countNodes('Node', 'id', 42).then(count => {
+ expect(count).toEqual(1);
+ done();
+ });
+ });
+ });
+
+ it('should not commit already committed read transaction', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
+ const resultPromise = session.readTransaction(tx => {
+ return new Promise((resolve, reject) => {
+ tx.run('RETURN 42 AS answer').then(result => {
+ tx.commit().then(() => {
+ resolve({result: result, bookmark: session.lastBookmark()});
+ }).catch(error => reject(error));
+ }).catch(error => reject(error));
+ });
+ });
+
+ resultPromise.then(outcome => {
+ const bookmark = outcome.bookmark;
+ const result = outcome.result;
+
+ verifyBookmark(bookmark);
+ expect(session.lastBookmark()).toEqual(bookmark); // expect bookmark to not change
+
+ expect(result.records.length).toEqual(1);
+ expect(result.records[0].get('answer').toNumber()).toEqual(42);
+
+ done();
+ });
+ });
+
+ it('should not commit already committed write transaction', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
+ const resultPromise = session.readTransaction(tx => {
+ return new Promise((resolve, reject) => {
+ tx.run('CREATE (n:Node {id: 42}) RETURN n.id AS answer').then(result => {
+ tx.commit().then(() => {
+ resolve({result: result, bookmark: session.lastBookmark()});
+ }).catch(error => reject(error));
+ }).catch(error => reject(error));
+ });
+ });
+
+ resultPromise.then(outcome => {
+ const bookmark = outcome.bookmark;
+ const result = outcome.result;
+
+ verifyBookmark(bookmark);
+ expect(session.lastBookmark()).toEqual(bookmark); // expect bookmark to not change
+
+ expect(result.records.length).toEqual(1);
+ expect(result.records[0].get('answer').toNumber()).toEqual(42);
+ expect(result.summary.counters.nodesCreated()).toEqual(1);
+
+ countNodes('Node', 'id', 42).then(count => {
+ expect(count).toEqual(1);
+ done();
+ });
+ });
+ });
+
+ it('should not commit rolled back read transaction', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
+ const bookmarkBefore = session.lastBookmark();
+ const resultPromise = session.readTransaction(tx => {
+ return new Promise((resolve, reject) => {
+ tx.run('RETURN 42 AS answer').then(result => {
+ tx.rollback().then(() => {
+ resolve(result);
+ }).catch(error => reject(error));
+ }).catch(error => reject(error));
+ });
+ });
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(1);
+ expect(result.records[0].get('answer').toNumber()).toEqual(42);
+ expect(session.lastBookmark()).toBe(bookmarkBefore); // expect bookmark to not change
+
+ done();
+ });
+ });
+
+ it('should not commit rolled back write transaction', done => {
+ if (!serverIs31OrLater(done)) {
+ return;
+ }
+
+ const bookmarkBefore = session.lastBookmark();
+ const resultPromise = session.readTransaction(tx => {
+ return new Promise((resolve, reject) => {
+ tx.run('CREATE (n:Node {id: 42}) RETURN n.id AS answer').then(result => {
+ tx.rollback().then(() => {
+ resolve(result);
+ }).catch(error => reject(error));
+ }).catch(error => reject(error));
+ });
+ });
+
+ resultPromise.then(result => {
+ expect(result.records.length).toEqual(1);
+ expect(result.records[0].get('answer').toNumber()).toEqual(42);
+ expect(result.summary.counters.nodesCreated()).toEqual(1);
+ expect(session.lastBookmark()).toBe(bookmarkBefore); // expect bookmark to not change
+
+ countNodes('Node', 'id', 42).then(count => {
+ expect(count).toEqual(0);
+ done();
+ });
+ });
+ });
+
+ function serverIs31OrLater(done) {
+ // lazy way of checking the version number
+ // if server has been set we know it is at least 3.1
+ if (!serverMetadata) {
+ done();
+ return false;
+ }
+ return true;
+ }
+
+ function countNodes(label, propertyKey, propertyValue) {
+ return new Promise((resolve, reject) => {
+ session.run(`MATCH (n: ${label} {${propertyKey}: ${propertyValue}}) RETURN count(n) AS count`).then(result => {
+ resolve(result.records[0].get('count').toNumber());
+ }).catch(error => reject(error));
+ });
+ }
+
function withQueryInTmpSession(driver, callback) {
const tmpSession = driver.session();
return tmpSession.run('RETURN 1').then(() => {
@@ -636,4 +835,9 @@ describe('session', () => {
const idleConnections = connectionPool._pools[address];
return idleConnections.length;
}
+
+ function verifyBookmark(bookmark) {
+ expect(bookmark).toBeDefined();
+ expect(bookmark).not.toBeNull();
+ }
});
diff --git a/test/v1/transaction.test.js b/test/v1/transaction.test.js
index e742d9753..80091ca98 100644
--- a/test/v1/transaction.test.js
+++ b/test/v1/transaction.test.js
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import neo4j from "../../lib/v1";
+import neo4j from '../../lib/v1';
describe('transaction', () => {
@@ -449,6 +449,42 @@ describe('transaction', () => {
}).catch(console.log);
});
+ it('should be open when neither committed nor rolled back', () => {
+ const tx = session.beginTransaction();
+ expect(tx.isOpen()).toBeTruthy();
+ });
+
+ it('should not be open after commit', done => {
+ const tx = session.beginTransaction();
+
+ tx.run('CREATE ()').then(() => {
+ tx.commit().then(() => {
+ expect(tx.isOpen()).toBeFalsy();
+ done();
+ });
+ });
+ });
+
+ it('should not be open after rollback', done => {
+ const tx = session.beginTransaction();
+
+ tx.run('CREATE ()').then(() => {
+ tx.rollback().then(() => {
+ expect(tx.isOpen()).toBeFalsy();
+ done();
+ });
+ });
+ });
+
+ it('should not be open after run error', done => {
+ const tx = session.beginTransaction();
+
+ tx.run('RETURN').catch(() => {
+ expect(tx.isOpen()).toBeFalsy();
+ done();
+ });
+ });
+
function expectSyntaxError(error) {
expect(error.code).toBe('Neo.ClientError.Statement.SyntaxError');
}