diff --git a/gulpfile.babel.js b/gulpfile.babel.js index 031dc5e6e..5850a60f3 100644 --- a/gulpfile.babel.js +++ b/gulpfile.babel.js @@ -164,23 +164,17 @@ gulp.task('test', function(cb){ gulp.task('test-nodejs', ['install-driver-into-sandbox'], function () { return gulp.src('test/**/*.test.js') - .pipe(jasmine({ - // reporter: new reporters.JUnitXmlReporter({ - // savePath: "build/nodejs-test-reports", - // consolidateAll: false - // }), - includeStackTrace: true - })); + .pipe(jasmine({ + includeStackTrace: true, + verbose: true + })); }); gulp.task('test-boltkit', ['nodejs'], function () { return gulp.src('test/**/*.boltkit.it.js') .pipe(jasmine({ - // reporter: new reporters.JUnitXmlReporter({ - // savePath: "build/nodejs-test-reports", - // consolidateAll: false - // }), - includeStackTrace: true + includeStackTrace: true, + verbose: true })); }); diff --git a/src/v1/driver.js b/src/v1/driver.js index 2cede31c8..c5a732d41 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -115,7 +115,7 @@ class Driver { */ session(mode, bookmark) { const sessionMode = Driver._validateSessionMode(mode); - return this._createSession(sessionMode, this._connectionProvider, bookmark); + return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config); } static _validateSessionMode(rawMode) { @@ -132,8 +132,8 @@ class Driver { } //Extension point - _createSession(mode, connectionProvider, bookmark) { - return new Session(mode, connectionProvider, bookmark); + _createSession(mode, connectionProvider, bookmark, config) { + return new Session(mode, connectionProvider, bookmark, config); } _driverOnErrorCallback(error) { diff --git a/src/v1/index.js b/src/v1/index.js index bf5031e79..a9425642e 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -100,6 +100,16 @@ let USER_AGENT = "neo4j-javascript/" + VERSION; * // port, and this is then used to verify the host certificate does not change. * // This setting has no effect unless TRUST_ON_FIRST_USE is enabled. * knownHosts:"~/.neo4j/known_hosts", + * + * // The max number of connections that are allowed idle in the pool at any time. + * // Connection will be destroyed if this threshold is exceeded. + * connectionPoolSize: 50, + * + * // Specify the maximum time in milliseconds transactions are allowed to retry via + * // {@link Session#readTransaction()} and {@link Session#writeTransaction()} functions. These functions + * // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with + * // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds. + * maxTransactionRetryTime: 30000, * } * * @param {string} url The URL for the Neo4j database, for instance "bolt://localhost" diff --git a/src/v1/internal/transaction-executor.js b/src/v1/internal/transaction-executor.js new file mode 100644 index 000000000..43a889707 --- /dev/null +++ b/src/v1/internal/transaction-executor.js @@ -0,0 +1,149 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error'; + +const DEFAULT_MAX_RETRY_TIME_MS = 30 * 1000; // 30 seconds +const DEFAULT_INITIAL_RETRY_DELAY_MS = 1000; // 1 seconds +const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0; +const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2; + +export default class TransactionExecutor { + + constructor(maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor) { + this._maxRetryTimeMs = _valueOrDefault(maxRetryTimeMs, DEFAULT_MAX_RETRY_TIME_MS); + this._initialRetryDelayMs = _valueOrDefault(initialRetryDelayMs, DEFAULT_INITIAL_RETRY_DELAY_MS); + this._multiplier = _valueOrDefault(multiplier, DEFAULT_RETRY_DELAY_MULTIPLIER); + this._jitterFactor = _valueOrDefault(jitterFactor, DEFAULT_RETRY_DELAY_JITTER_FACTOR); + + this._inFlightTimeoutIds = []; + + this._verifyAfterConstruction(); + } + + execute(transactionCreator, transactionWork) { + return new Promise((resolve, reject) => { + this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject); + }).catch(error => { + const retryStartTimeMs = Date.now(); + const retryDelayMs = this._initialRetryDelayMs; + return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTimeMs, retryDelayMs); + }); + } + + close() { + // cancel all existing timeouts to prevent further retries + this._inFlightTimeoutIds.forEach(timeoutId => clearTimeout(timeoutId)); + this._inFlightTimeoutIds = []; + } + + _retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, retryDelayMs) { + const elapsedTimeMs = Date.now() - retryStartTime; + + if (elapsedTimeMs > this._maxRetryTimeMs || !TransactionExecutor._canRetryOn(error)) { + return Promise.reject(error); + } + + return new Promise((resolve, reject) => { + const nextRetryTime = this._computeDelayWithJitter(retryDelayMs); + const timeoutId = setTimeout(() => { + // filter out this timeoutId when time has come and function is being executed + this._inFlightTimeoutIds = this._inFlightTimeoutIds.filter(id => id !== timeoutId); + this._executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject); + }, nextRetryTime); + // add newly created timeoutId to the list of all in-flight timeouts + this._inFlightTimeoutIds.push(timeoutId); + }).catch(error => { + const nextRetryDelayMs = retryDelayMs * this._multiplier; + return this._retryTransactionPromise(transactionCreator, transactionWork, error, retryStartTime, nextRetryDelayMs); + }); + } + + _executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) { + try { + const tx = transactionCreator(); + const transactionWorkResult = transactionWork(tx); + + // user defined callback is supposed to return a promise, but it might not; so to protect against an + // incorrect API usage we wrap the returned value with a resolved promise; this is effectively a + // validation step without type checks + const resultPromise = Promise.resolve(transactionWorkResult); + + resultPromise.then(result => { + if (tx.isOpen()) { + // transaction work returned resolved promise and transaction has not been committed/rolled back + // try to commit the transaction + tx.commit().then(() => { + // transaction was committed, return result to the user + resolve(result); + }).catch(error => { + // transaction failed to commit, propagate the failure + reject(error); + }); + } else { + // transaction work returned resolved promise and transaction is already committed/rolled back + // return the result returned by given transaction work + resolve(result); + } + }).catch(error => { + // transaction work returned rejected promise, propagate the failure + reject(error); + }); + + } catch (error) { + reject(error); + } + } + + _computeDelayWithJitter(delayMs) { + const jitter = (delayMs * this._jitterFactor); + const min = delayMs - jitter; + const max = delayMs + jitter; + return Math.random() * (max - min) + min; + } + + static _canRetryOn(error) { + return error && error.code && + (error.code === SERVICE_UNAVAILABLE || + error.code === SESSION_EXPIRED || + error.code.indexOf('TransientError') >= 0); + } + + _verifyAfterConstruction() { + if (this._maxRetryTimeMs < 0) { + throw newError('Max retry time should be >= 0: ' + this._maxRetryTimeMs); + } + if (this._initialRetryDelayMs < 0) { + throw newError('Initial retry delay should >= 0: ' + this._initialRetryDelayMs); + } + if (this._multiplier < 1.0) { + throw newError('Multiplier should be >= 1.0: ' + this._multiplier); + } + if (this._jitterFactor < 0 || this._jitterFactor > 1) { + throw newError('Jitter factor should be in [0.0, 1.0]: ' + this._jitterFactor); + } + } +}; + +function _valueOrDefault(value, defaultValue) { + if (value || value === 0) { + return value; + } + return defaultValue; +} diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index e3be9a561..0a3a2e17a 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -35,8 +35,8 @@ class RoutingDriver extends Driver { return new LoadBalancer(address, connectionPool, driverOnErrorCallback); } - _createSession(mode, connectionProvider, bookmark) { - return new RoutingSession(mode, connectionProvider, bookmark, (error, conn) => { + _createSession(mode, connectionProvider, bookmark, config) { + return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => { if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) { // connection is undefined if error happened before connection was acquired if (conn) { @@ -66,8 +66,8 @@ class RoutingDriver extends Driver { } class RoutingSession extends Session { - constructor(mode, connectionProvider, bookmark, onFailedConnection) { - super(mode, connectionProvider, bookmark); + constructor(mode, connectionProvider, bookmark, config, onFailedConnection) { + super(mode, connectionProvider, bookmark, config); this._onFailedConnection = onFailedConnection; } diff --git a/src/v1/session.js b/src/v1/session.js index d7c22a620..aec5a3504 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -22,7 +22,8 @@ import Transaction from './transaction'; import {newError} from './error'; import {assertString} from './internal/util'; import ConnectionHolder from './internal/connection-holder'; -import {READ, WRITE} from './driver'; +import Driver, {READ, WRITE} from './driver'; +import TransactionExecutor from './internal/transaction-executor'; /** * A Session instance is used for handling the connection and @@ -36,15 +37,17 @@ class Session { * @constructor * @param {string} mode the default access mode for this session. * @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from. - * @param {string} bookmark - the initial bookmark for this session. + * @param {string} [bookmark=undefined] - the initial bookmark for this session. + * @param {Object} [config={}] - this driver configuration. */ - constructor(mode, connectionProvider, bookmark) { + constructor(mode, connectionProvider, bookmark, config) { this._mode = mode; this._readConnectionHolder = new ConnectionHolder(READ, connectionProvider); this._writeConnectionHolder = new ConnectionHolder(WRITE, connectionProvider); this._open = true; this._hasTx = false; this._lastBookmark = bookmark; + this._transactionExecutor = _createTransactionExecutor(config); } /** @@ -92,21 +95,24 @@ class Session { * @returns {Transaction} - New Transaction */ beginTransaction(bookmark) { + return this._beginTransaction(this._mode, bookmark); + } + + _beginTransaction(accessMode, bookmark) { if (bookmark) { assertString(bookmark, 'Bookmark'); this._updateBookmark(bookmark); } if (this._hasTx) { - throw newError("You cannot begin a transaction on a session with an " - + "open transaction; either run from within the transaction or use a " - + "different session.") + throw newError('You cannot begin a transaction on a session with an open transaction; ' + + 'either run from within the transaction or use a different session.'); } - this._hasTx = true; - - const connectionHolder = this._connectionHolderWithMode(this._mode); + const mode = Driver._validateSessionMode(accessMode); + const connectionHolder = this._connectionHolderWithMode(mode); connectionHolder.initializeConnection(); + this._hasTx = true; return new Transaction(connectionHolder, () => { this._hasTx = false; @@ -114,10 +120,56 @@ class Session { this._onRunFailure(), this._lastBookmark, this._updateBookmark.bind(this)); } + /** + * Return the bookmark received following the last completed {@link Transaction}. + * + * @return a reference to a previous transac'tion + */ lastBookmark() { return this._lastBookmark; } + /** + * Execute given unit of work in a {@link Driver#READ} transaction. + * + * Transaction will automatically be committed unless the given function throws or returns a rejected promise. + * Some failures of the given function or the commit itself will be retried with exponential backoff with initial + * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's + * maxTransactionRetryTime property in milliseconds. + * + * @param {function(Transaction)} transactionWork - callback that executes operations against + * a given {@link Transaction}. + * @return {Promise} resolved promise as returned by the given function or rejected promise when given + * function or commit fails. + */ + readTransaction(transactionWork) { + return this._runTransaction(READ, transactionWork); + } + + /** + * Execute given unit of work in a {@link Driver#WRITE} transaction. + * + * Transaction will automatically be committed unless the given function throws or returns a rejected promise. + * Some failures of the given function or the commit itself will be retried with exponential backoff with initial + * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's + * maxTransactionRetryTime property in milliseconds. + * + * @param {function(Transaction)} transactionWork - callback that executes operations against + * a given {@link Transaction}. + * @return {Promise} resolved promise as returned by the given function or rejected promise when given + * function or commit fails. + */ + writeTransaction(transactionWork) { + return this._runTransaction(WRITE, transactionWork); + } + + _runTransaction(accessMode, transactionWork) { + return this._transactionExecutor.execute( + () => this._beginTransaction(accessMode, this.lastBookmark()), + transactionWork + ); + } + _updateBookmark(newBookmark) { if (newBookmark) { this._lastBookmark = newBookmark; @@ -132,6 +184,7 @@ class Session { close(callback = (() => null)) { if (this._open) { this._open = false; + this._transactionExecutor.close(); this._readConnectionHolder.close().then(() => { this._writeConnectionHolder.close().then(() => { callback(); @@ -180,4 +233,9 @@ class _RunObserver extends StreamObserver { } } +function _createTransactionExecutor(config) { + const maxRetryTimeMs = (config && config.maxTransactionRetryTime) ? config.maxTransactionRetryTime : null; + return new TransactionExecutor(maxRetryTimeMs); +} + export default Session; diff --git a/src/v1/transaction.js b/src/v1/transaction.js index cb3cd6892..cfbb3d003 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -61,7 +61,7 @@ class Transaction { * or with the statement and parameters as separate arguments. * @param {mixed} statement - Cypher statement to execute * @param {Object} parameters - Map with parameters to use in statement - * @return {Result} - New Result + * @return {Result} New Result */ run(statement, parameters) { if(typeof statement === 'object' && statement.text) { @@ -78,7 +78,7 @@ class Transaction { * * After committing the transaction can no longer be used. * - * @returns {Result} - New Result + * @returns {Result} New Result */ commit() { let committed = this._state.commit(this._connectionHolder, new _TransactionStreamObserver(this)); @@ -93,7 +93,7 @@ class Transaction { * * After rolling back, the transaction can no longer be used. * - * @returns {Result} - New Result + * @returns {Result} New Result */ rollback() { let committed = this._state.rollback(this._connectionHolder, new _TransactionStreamObserver(this)); @@ -103,15 +103,30 @@ class Transaction { return committed.result; } + /** + * Check if this transaction is active, which means commit and rollback did not happen. + * @return {boolean} true when not committed and not rolled back, false otherwise. + */ + isOpen() { + return this._state == _states.ACTIVE; + } + _onError() { - // rollback explicitly if tx.run failed, rollback - if (this._state == _states.ACTIVE) { - this.rollback(); + if (this.isOpen()) { + // attempt to rollback, useful when Transaction#run() failed + return this.rollback().catch(ignoredError => { + // ignore all errors because it is best effort and transaction might already be rolled back + }).then(() => { + // after rollback attempt change this transaction's state to FAILED + this._state = _states.FAILED; + }); } else { - // else just do the cleanup - this._onClose(); + // error happened in in-active transaction, just to the cleanup and change state to FAILED + this._state = _states.FAILED; + this._onClose(); + // no async actions needed - return resolved promise + return Promise.resolve(); } - this._state = _states.FAILED; } } @@ -126,9 +141,10 @@ class _TransactionStreamObserver extends StreamObserver { onError(error) { if (!this._hasFailed) { - this._tx._onError(); - super.onError(error); - this._hasFailed = true; + this._tx._onError().then(() => { + super.onError(error); + this._hasFailed = true; + }); } } @@ -149,11 +165,11 @@ let _states = { //The transaction is running with no explicit success or failure marked ACTIVE: { commit: (connectionHolder, observer) => { - return {result: _runDiscardAll("COMMIT", connectionHolder, observer), + return {result: _runPullAll("COMMIT", connectionHolder, observer), state: _states.SUCCEEDED} }, rollback: (connectionHolder, observer) => { - return {result: _runDiscardAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK}; + return {result: _runPullAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK}; }, run: (connectionHolder, observer, statement, parameters) => { connectionHolder.getConnection().then(conn => { @@ -234,7 +250,7 @@ let _states = { } }; -function _runDiscardAll(msg, connectionHolder, observer) { +function _runPullAll(msg, connectionHolder, observer) { connectionHolder.getConnection().then( conn => { observer.resolveConnection(conn); diff --git a/test/internal/timers-util.js b/test/internal/timers-util.js new file mode 100644 index 000000000..a3452363f --- /dev/null +++ b/test/internal/timers-util.js @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +class SetTimeoutMock { + + constructor() { + this._clearState(); + } + + install() { + this._originalSetTimeout = global.setTimeout; + global.setTimeout = (code, delay) => { + if (!this._paused) { + code(); + this.invocationDelays.push(delay); + } + return this._timeoutIdCounter++; + }; + + this._originalClearTimeout = global.clearTimeout; + global.clearTimeout = id => { + this.clearedTimeouts.push(id); + }; + + return this; + } + + pause() { + this._paused = true; + } + + uninstall() { + global.setTimeout = this._originalSetTimeout; + global.clearTimeout = this._originalClearTimeout; + this._clearState(); + } + + setTimeoutOriginal(code, delay) { + return this._originalSetTimeout.call(null, code, delay); + } + + _clearState() { + this._originalSetTimeout = null; + this._originalClearTimeout = null; + this._paused = false; + this._timeoutIdCounter = 0; + + this.invocationDelays = []; + this.clearedTimeouts = []; + } +} + +export const setTimeoutMock = new SetTimeoutMock(); + +export function hijackNextDateNowCall(newValue) { + const originalDate = global.Date; + global.Date = new FakeDate(originalDate, newValue); +} + +class FakeDate { + + constructor(originalDate, nextNowValue) { + this._originalDate = originalDate; + this._nextNowValue = nextNowValue; + } + + now() { + global.Date = this._originalDate; + return this._nextNowValue; + } +} diff --git a/test/internal/transaction-executor.test.js b/test/internal/transaction-executor.test.js new file mode 100644 index 000000000..5ee75b738 --- /dev/null +++ b/test/internal/transaction-executor.test.js @@ -0,0 +1,349 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import TransactionExecutor from '../../src/v1/internal/transaction-executor'; +import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error'; +import {hijackNextDateNowCall, setTimeoutMock} from './timers-util'; + +const TRANSIENT_ERROR_1 = 'Neo.TransientError.Transaction.DeadlockDetected'; +const TRANSIENT_ERROR_2 = 'Neo.TransientError.Network.CommunicationError'; +const UNKNOWN_ERROR = 'Neo.DatabaseError.General.UnknownError'; +const OOM_ERROR = 'Neo.DatabaseError.General.OutOfMemoryError'; + +describe('TransactionExecutor', () => { + + let fakeSetTimeout; + + beforeEach(() => { + fakeSetTimeout = setTimeoutMock.install(); + }); + + afterEach(() => { + fakeSetTimeout.uninstall(); + }); + + it('should retry when transaction work returns promise rejected with SERVICE_UNAVAILABLE', done => { + testRetryWhenTransactionWorkReturnsRejectedPromise([SERVICE_UNAVAILABLE], done); + }); + + it('should retry when transaction work returns promise rejected with SESSION_EXPIRED', done => { + testRetryWhenTransactionWorkReturnsRejectedPromise([SESSION_EXPIRED], done); + }); + + it('should retry when transaction work returns promise rejected with deadlock error', done => { + testRetryWhenTransactionWorkReturnsRejectedPromise([TRANSIENT_ERROR_1], done); + }); + + it('should retry when transaction work returns promise rejected with communication error', done => { + testRetryWhenTransactionWorkReturnsRejectedPromise([TRANSIENT_ERROR_2], done); + }); + + it('should not retry when transaction work returns promise rejected with OOM error', done => { + testNoRetryOnUnknownError([OOM_ERROR], 1, done); + }); + + it('should not retry when transaction work returns promise rejected with unknown error', done => { + testNoRetryOnUnknownError([UNKNOWN_ERROR], 1, done); + }); + + it('should stop retrying when time expires', done => { + const executor = new TransactionExecutor(); + let workInvocationCounter = 0; + const realWork = transactionWork([SERVICE_UNAVAILABLE, SESSION_EXPIRED, TRANSIENT_ERROR_1, TRANSIENT_ERROR_2], 42); + + const result = executor.execute(transactionCreator(), tx => { + expect(tx).toBeDefined(); + workInvocationCounter++; + if (workInvocationCounter === 3) { + hijackNextDateNowCall(Date.now() + 30001); // move next `Date.now()` call forward by 30 seconds + } + return realWork(); + }); + + result.catch(error => { + expect(workInvocationCounter).toEqual(3); + expect(error.code).toEqual(TRANSIENT_ERROR_1); + done(); + }); + }); + + it('should retry when given transaction creator throws once', done => { + testRetryWhenTransactionCreatorFails( + [SERVICE_UNAVAILABLE], + done + ); + }); + + it('should retry when given transaction creator throws many times', done => { + testRetryWhenTransactionCreatorFails( + [SERVICE_UNAVAILABLE, SESSION_EXPIRED, TRANSIENT_ERROR_2, SESSION_EXPIRED, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_1], + done + ); + }); + + it('should retry when given transaction work throws once', done => { + testRetryWhenTransactionWorkThrows([SERVICE_UNAVAILABLE], done); + }); + + it('should retry when given transaction work throws many times', done => { + testRetryWhenTransactionWorkThrows( + [SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, TRANSIENT_ERROR_2, SESSION_EXPIRED], + done + ); + }); + + it('should retry when given transaction work returns rejected promise many times', done => { + testRetryWhenTransactionWorkReturnsRejectedPromise( + [SERVICE_UNAVAILABLE, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, SESSION_EXPIRED, TRANSIENT_ERROR_1, SESSION_EXPIRED], + done + ); + }); + + it('should retry when transaction commit returns rejected promise once', done => { + testRetryWhenTransactionCommitReturnsRejectedPromise([TRANSIENT_ERROR_1], done); + }); + + it('should retry when transaction commit returns rejected promise multiple times', done => { + testRetryWhenTransactionCommitReturnsRejectedPromise( + [TRANSIENT_ERROR_1, TRANSIENT_ERROR_1, SESSION_EXPIRED, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2], + done + ); + }); + + it('should retry until database error happens', done => { + testNoRetryOnUnknownError( + [SERVICE_UNAVAILABLE, SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, SESSION_EXPIRED, UNKNOWN_ERROR, SESSION_EXPIRED], + 5, + done + ); + }); + + it('should cancel in-flight timeouts when closed', done => { + const executor = new TransactionExecutor(); + // do not execute setTimeout callbacks + fakeSetTimeout.pause(); + + executor.execute(transactionCreator([SERVICE_UNAVAILABLE]), () => Promise.resolve(42)); + executor.execute(transactionCreator([TRANSIENT_ERROR_1]), () => Promise.resolve(4242)); + executor.execute(transactionCreator([SESSION_EXPIRED]), () => Promise.resolve(424242)); + + fakeSetTimeout.setTimeoutOriginal(() => { + executor.close(); + expect(fakeSetTimeout.clearedTimeouts.length).toEqual(3); + done(); + }, 1000); + }); + + it('should allow zero max retry time', () => { + const executor = new TransactionExecutor(0); + expect(executor._maxRetryTimeMs).toEqual(0); + }); + + it('should allow zero initial delay', () => { + const executor = new TransactionExecutor(42, 0); + expect(executor._initialRetryDelayMs).toEqual(0); + }); + + it('should disallow zero multiplier', () => { + expect(() => new TransactionExecutor(42, 42, 0)).toThrow(); + }); + + it('should allow zero jitter factor', () => { + const executor = new TransactionExecutor(42, 42, 42, 0); + expect(executor._jitterFactor).toEqual(0); + }); + + function testRetryWhenTransactionCreatorFails(errorCodes, done) { + const executor = new TransactionExecutor(); + const transactionCreator = throwingTransactionCreator(errorCodes, new FakeTransaction()); + let workInvocationCounter = 0; + + const result = executor.execute(transactionCreator, tx => { + expect(tx).toBeDefined(); + workInvocationCounter++; + return Promise.resolve(42); + }); + + result.then(value => { + expect(workInvocationCounter).toEqual(1); + expect(value).toEqual(42); + verifyRetryDelays(fakeSetTimeout, errorCodes.length); + done(); + }); + } + + function testRetryWhenTransactionWorkReturnsRejectedPromise(errorCodes, done) { + const executor = new TransactionExecutor(); + let workInvocationCounter = 0; + const realWork = transactionWork(errorCodes, 42); + + const result = executor.execute(transactionCreator(), tx => { + expect(tx).toBeDefined(); + workInvocationCounter++; + return realWork(); + }); + + result.then(value => { + // work should have failed 'failures.length' times and succeeded 1 time + expect(workInvocationCounter).toEqual(errorCodes.length + 1); + expect(value).toEqual(42); + verifyRetryDelays(fakeSetTimeout, errorCodes.length); + done(); + }); + } + + function testRetryWhenTransactionCommitReturnsRejectedPromise(errorCodes, done) { + const executor = new TransactionExecutor(); + let workInvocationCounter = 0; + const realWork = () => Promise.resolve(4242); + + const result = executor.execute(transactionCreator(errorCodes), tx => { + expect(tx).toBeDefined(); + workInvocationCounter++; + return realWork(); + }); + + result.then(value => { + // work should have failed 'failures.length' times and succeeded 1 time + expect(workInvocationCounter).toEqual(errorCodes.length + 1); + expect(value).toEqual(4242); + verifyRetryDelays(fakeSetTimeout, errorCodes.length); + done(); + }); + } + + function testRetryWhenTransactionWorkThrows(errorCodes, done) { + const executor = new TransactionExecutor(); + let workInvocationCounter = 0; + const realWork = throwingTransactionWork(errorCodes, 42); + + const result = executor.execute(transactionCreator(), tx => { + expect(tx).toBeDefined(); + workInvocationCounter++; + return realWork(); + }); + + result.then(value => { + // work should have failed 'failures.length' times and succeeded 1 time + expect(workInvocationCounter).toEqual(errorCodes.length + 1); + expect(value).toEqual(42); + verifyRetryDelays(fakeSetTimeout, errorCodes.length); + done(); + }); + } + + function testNoRetryOnUnknownError(errorCodes, expectedWorkInvocationCount, done) { + const executor = new TransactionExecutor(); + let workInvocationCounter = 0; + const realWork = transactionWork(errorCodes, 42); + + const result = executor.execute(transactionCreator(), tx => { + expect(tx).toBeDefined(); + workInvocationCounter++; + return realWork(); + }); + + result.catch(error => { + expect(workInvocationCounter).toEqual(expectedWorkInvocationCount); + if (errorCodes.length === 1) { + expect(error.code).toEqual(errorCodes[0]); + } else { + expect(error.code).toEqual(errorCodes[expectedWorkInvocationCount - 1]); + } + done(); + }); + } + +}); + +function transactionCreator(commitErrorCodes) { + const remainingErrorCodes = (commitErrorCodes || []).slice().reverse(); + return () => new FakeTransaction(remainingErrorCodes.pop()); +} + +function throwingTransactionCreator(errorCodes, result) { + const remainingErrorCodes = errorCodes.slice().reverse(); + return () => { + if (remainingErrorCodes.length === 0) { + return result; + } + const errorCode = remainingErrorCodes.pop(); + throw error(errorCode); + }; +} + +function throwingTransactionWork(errorCodes, result) { + const remainingErrorCodes = errorCodes.slice().reverse(); + return () => { + if (remainingErrorCodes.length === 0) { + return Promise.resolve(result); + } + const errorCode = remainingErrorCodes.pop(); + throw error(errorCode); + }; +} + +function transactionWork(errorCodes, result) { + const remainingErrorCodes = errorCodes.slice().reverse(); + return () => { + if (remainingErrorCodes.length === 0) { + return Promise.resolve(result); + } + const errorCode = remainingErrorCodes.pop(); + return Promise.reject(error(errorCode)); + }; +} + +function error(code) { + return newError('', code); +} + +function verifyRetryDelays(fakeSetTimeout, expectedInvocationCount) { + const delays = fakeSetTimeout.invocationDelays; + expect(delays.length).toEqual(expectedInvocationCount); + delays.forEach((delay, index) => { + // delays make a geometric progression with fist element 1000 and multiplier 2.0 + // so expected delay can be calculated as n-th element: `firstElement * pow(multiplier, n - 1)` + const expectedDelayWithoutJitter = 1000 * Math.pow(2.0, index); + const jitter = expectedDelayWithoutJitter * 0.2; + const min = expectedDelayWithoutJitter - jitter; + const max = expectedDelayWithoutJitter + jitter; + + expect(delay >= min).toBeTruthy(); + expect(delay <= max).toBeTruthy(); + }); +} + +class FakeTransaction { + + constructor(commitErrorCode) { + this._commitErrorCode = commitErrorCode; + } + + isOpen() { + return true; + } + + commit() { + if (this._commitErrorCode) { + return Promise.reject(error(this._commitErrorCode)); + } + return Promise.resolve(); + } +} diff --git a/test/resources/boltkit/dead_server.script b/test/resources/boltkit/dead_read_server.script similarity index 73% rename from test/resources/boltkit/dead_server.script rename to test/resources/boltkit/dead_read_server.script index 037a6be65..4af7ef3b6 100644 --- a/test/resources/boltkit/dead_server.script +++ b/test/resources/boltkit/dead_read_server.script @@ -1,7 +1,8 @@ !: AUTO INIT !: AUTO RESET !: AUTO PULL_ALL +!: AUTO RUN "BEGIN" {} C: RUN "MATCH (n) RETURN n.name" {} C: PULL_ALL -S: \ No newline at end of file +S: diff --git a/test/resources/boltkit/dead_write_server.script b/test/resources/boltkit/dead_write_server.script new file mode 100644 index 000000000..ac36bc249 --- /dev/null +++ b/test/resources/boltkit/dead_write_server.script @@ -0,0 +1,8 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL +!: AUTO RUN "BEGIN" {} + +C: RUN "CREATE (n {name:'Bob'})" {} +C: PULL_ALL +S: diff --git a/test/resources/boltkit/discover_servers.script b/test/resources/boltkit/discover_servers.script index 6f92d458f..c1538995a 100644 --- a/test/resources/boltkit/discover_servers.script +++ b/test/resources/boltkit/discover_servers.script @@ -5,10 +5,5 @@ C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} - RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9009"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] SUCCESS {} -C: RUN "MATCH (n) RETURN n.name" {} - PULL_ALL -S: SUCCESS {"fields": ["n.name"]} - SUCCESS {} - diff --git a/test/resources/boltkit/discover_servers_and_read.script b/test/resources/boltkit/discover_servers_and_read.script new file mode 100644 index 000000000..6f92d458f --- /dev/null +++ b/test/resources/boltkit/discover_servers_and_read.script @@ -0,0 +1,14 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name" {} + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + SUCCESS {} + diff --git a/test/resources/boltkit/read_server.script b/test/resources/boltkit/read_server.script index 0b4d44748..7fb3d6c81 100644 --- a/test/resources/boltkit/read_server.script +++ b/test/resources/boltkit/read_server.script @@ -1,6 +1,9 @@ !: AUTO INIT !: AUTO RESET !: AUTO PULL_ALL +!: AUTO RUN "COMMIT" {} +!: AUTO RUN "ROLLBACK" {} +!: AUTO RUN "BEGIN" {} C: RUN "MATCH (n) RETURN n.name" {} PULL_ALL @@ -8,4 +11,4 @@ S: SUCCESS {"fields": ["n.name"]} RECORD ["Bob"] RECORD ["Alice"] RECORD ["Tina"] - SUCCESS {} \ No newline at end of file + SUCCESS {} diff --git a/test/resources/boltkit/write_server.script b/test/resources/boltkit/write_server.script index 993910f6c..4667e3609 100644 --- a/test/resources/boltkit/write_server.script +++ b/test/resources/boltkit/write_server.script @@ -1,8 +1,11 @@ !: AUTO INIT !: AUTO RESET !: AUTO PULL_ALL +!: AUTO RUN "COMMIT" {} +!: AUTO RUN "ROLLBACK" {} +!: AUTO RUN "BEGIN" {} C: RUN "CREATE (n {name:'Bob'})" {} PULL_ALL S: SUCCESS {} - SUCCESS {} \ No newline at end of file + SUCCESS {} diff --git a/test/v1/examples.test.js b/test/v1/examples.test.js index c41db24e8..136722999 100644 --- a/test/v1/examples.test.js +++ b/test/v1/examples.test.js @@ -97,7 +97,7 @@ describe('examples', function() { }); }); - it('should be able to configure session pool size', function (done) { + it('should be able to configure connection pool size', function (done) { var neo4j = neo4jv1; // tag::configuration[] var driver = neo4j.driver("bolt://localhost:7687", neo4j.auth.basic("neo4j", "neo4j"), {connectionPoolSize: 50}); @@ -118,6 +118,17 @@ describe('examples', function() { }); }); + it('should be able to configure maximum transaction retry time', function () { + var neo4j = neo4jv1; + // tag::configuration-transaction-retry-time[] + var maxRetryTimeMs = 45 * 1000; // 45 seconds + var driver = neo4j.driver('bolt://localhost:7687', neo4j.auth.basic('neo4j', 'neo4j'), {maxTransactionRetryTime: maxRetryTimeMs}); + //end::configuration-transaction-retry-time[] + + var session = driver.session(); + expect(session._transactionExecutor._maxRetryTimeMs).toBe(maxRetryTimeMs); + }); + it('should document a statement', function(done) { var session = driverGlobal.session(); // tag::statement[] diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index 75f4721e9..90d1d16a5 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -21,13 +21,15 @@ import neo4j from '../../src/v1'; import {READ, WRITE} from '../../src/v1/driver'; import boltkit from './boltkit'; import RoutingTable from '../../src/v1/internal/routing-table'; +import {SESSION_EXPIRED} from '../../src/v1/error'; +import {hijackNextDateNowCall} from '../internal/timers-util'; describe('routing driver', () => { let originalTimeout; beforeAll(() => { originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL; - jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000; + jasmine.DEFAULT_TIMEOUT_INTERVAL = 60000; }); afterAll(() => { @@ -41,7 +43,7 @@ describe('routing driver', () => { } // Given const kit = new boltkit.BoltKit(); - const server = kit.start('./test/resources/boltkit/discover_servers.script', 9001); + const server = kit.start('./test/resources/boltkit/discover_servers_and_read.script', 9001); kit.run(() => { const driver = newDriver("bolt+routing://127.0.0.1:9001"); @@ -323,7 +325,7 @@ describe('routing driver', () => { // Given const kit = new boltkit.BoltKit(); const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005); + const readServer = kit.start('./test/resources/boltkit/dead_read_server.script', 9005); kit.run(() => { const driver = newDriver("bolt+routing://127.0.0.1:9001"); @@ -415,7 +417,7 @@ describe('routing driver', () => { // Given const kit = new boltkit.BoltKit(); const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9007); + const readServer = kit.start('./test/resources/boltkit/dead_read_server.script', 9007); kit.run(() => { const driver = newDriver("bolt+routing://127.0.0.1:9001"); @@ -475,7 +477,7 @@ describe('routing driver', () => { // Given const kit = new boltkit.BoltKit(); const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005); + const readServer = kit.start('./test/resources/boltkit/dead_read_server.script', 9005); kit.run(() => { const driver = newDriver("bolt+routing://127.0.0.1:9001"); @@ -1181,6 +1183,287 @@ describe('routing driver', () => { }); }); + it('should retry read transaction until success', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const brokenReader = kit.start('./test/resources/boltkit/dead_read_server.script', 9005); + const reader = kit.start('./test/resources/boltkit/read_server.script', 9006); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + const session = driver.session(); + + let invocations = 0; + const resultPromise = session.readTransaction(tx => { + invocations++; + return tx.run('MATCH (n) RETURN n.name'); + }); + + resultPromise.then(result => { + expect(result.records.length).toEqual(3); + expect(invocations).toEqual(2); + + session.close(() => { + driver.close(); + router.exit(code1 => { + brokenReader.exit(code2 => { + reader.exit(code3 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + + it('should retry write transaction until success', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const brokenWriter = kit.start('./test/resources/boltkit/dead_write_server.script', 9007); + const writer = kit.start('./test/resources/boltkit/write_server.script', 9008); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + const session = driver.session(); + + let invocations = 0; + const resultPromise = session.writeTransaction(tx => { + invocations++; + return tx.run('CREATE (n {name:\'Bob\'})'); + }); + + resultPromise.then(result => { + expect(result.records.length).toEqual(0); + expect(invocations).toEqual(2); + + session.close(() => { + driver.close(); + router.exit(code1 => { + brokenWriter.exit(code2 => { + writer.exit(code3 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + + it('should retry read transaction until failure', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const brokenReader1 = kit.start('./test/resources/boltkit/dead_read_server.script', 9005); + const brokenReader2 = kit.start('./test/resources/boltkit/dead_read_server.script', 9006); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + const session = driver.session(); + + let invocations = 0; + const resultPromise = session.readTransaction(tx => { + invocations++; + if (invocations === 2) { + // make retries stop after two invocations + moveNextDateNow30SecondsForward(); + } + return tx.run('MATCH (n) RETURN n.name'); + }); + + resultPromise.catch(error => { + expect(error.code).toEqual(SESSION_EXPIRED); + expect(invocations).toEqual(2); + + session.close(() => { + driver.close(); + router.exit(code1 => { + brokenReader1.exit(code2 => { + brokenReader2.exit(code3 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + + it('should retry write transaction until failure', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const brokenWriter1 = kit.start('./test/resources/boltkit/dead_write_server.script', 9007); + const brokenWriter2 = kit.start('./test/resources/boltkit/dead_write_server.script', 9008); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + const session = driver.session(); + + let invocations = 0; + const resultPromise = session.writeTransaction(tx => { + invocations++; + if (invocations === 2) { + // make retries stop after two invocations + moveNextDateNow30SecondsForward(); + } + return tx.run('CREATE (n {name:\'Bob\'})'); + }); + + resultPromise.catch(error => { + expect(error.code).toEqual(SESSION_EXPIRED); + expect(invocations).toEqual(2); + + session.close(() => { + driver.close(); + router.exit(code1 => { + brokenWriter1.exit(code2 => { + brokenWriter2.exit(code3 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + + it('should retry read transaction and perform rediscovery until success', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router1 = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9010); + const brokenReader1 = kit.start('./test/resources/boltkit/dead_read_server.script', 9005); + const brokenReader2 = kit.start('./test/resources/boltkit/dead_read_server.script', 9006); + const router2 = kit.start('./test/resources/boltkit/discover_servers.script', 9001); + const reader = kit.start('./test/resources/boltkit/read_server.script', 9002); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9010'); + const session = driver.session(); + + let invocations = 0; + const resultPromise = session.readTransaction(tx => { + invocations++; + return tx.run('MATCH (n) RETURN n.name'); + }); + + resultPromise.then(result => { + expect(result.records.length).toEqual(3); + expect(invocations).toEqual(3); + + session.close(() => { + driver.close(); + router1.exit(code1 => { + brokenReader1.exit(code2 => { + brokenReader2.exit(code3 => { + router2.exit(code4 => { + reader.exit(code5 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + expect(code4).toEqual(0); + expect(code5).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + }); + }); + + it('should retry write transaction and perform rediscovery until success', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router1 = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9010); + const brokenWriter1 = kit.start('./test/resources/boltkit/dead_write_server.script', 9007); + const brokenWriter2 = kit.start('./test/resources/boltkit/dead_write_server.script', 9008); + const router2 = kit.start('./test/resources/boltkit/discover_servers.script', 9002); + const writer = kit.start('./test/resources/boltkit/write_server.script', 9009); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9010'); + const session = driver.session(); + + let invocations = 0; + const resultPromise = session.writeTransaction(tx => { + invocations++; + return tx.run('CREATE (n {name:\'Bob\'})'); + }); + + resultPromise.then(result => { + expect(result.records.length).toEqual(0); + expect(invocations).toEqual(3); + + session.close(() => { + driver.close(); + router1.exit(code1 => { + brokenWriter1.exit(code2 => { + brokenWriter2.exit(code3 => { + router2.exit(code4 => { + writer.exit(code5 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + expect(code4).toEqual(0); + expect(code5).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + }); + }); + + function moveNextDateNow30SecondsForward() { + const currentTime = Date.now(); + hijackNextDateNowCall(currentTime + 30 * 1000 + 1); + } + function testWriteSessionWithAccessModeAndBookmark(accessMode, bookmark, done) { if (!boltkit.BoltKitSupport) { done(); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index 22402e36a..10084a6b5 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -28,13 +28,13 @@ describe('session', () => { let driver; let session; - let server; + let serverMetadata; let originalTimeout; beforeEach(done => { driver = neo4j.driver('bolt://localhost', neo4j.auth.basic('neo4j', 'neo4j')); driver.onCompleted = meta => { - server = meta['server']; + serverMetadata = meta['server']; }; session = driver.session(); originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL; @@ -86,6 +86,23 @@ describe('session', () => { }); }); + it('should close transaction executor', done => { + const session = newSessionWithConnection(new FakeConnection()); + + let closeCalledTimes = 0; + const transactionExecutor = session._transactionExecutor; + const originalClose = transactionExecutor.close; + transactionExecutor.close = () => { + closeCalledTimes++; + originalClose.call(transactionExecutor); + }; + + session.close(() => { + expect(closeCalledTimes).toEqual(1); + done(); + }); + }); + it('should be possible to close driver after closing session with failed tx ', done => { const driver = neo4j.driver('bolt://localhost', neo4j.auth.basic('neo4j', 'neo4j')); const session = driver.session(); @@ -211,11 +228,7 @@ describe('session', () => { }); it('should expose server info on successful query', done => { - //lazy way of checking the version number - //if server has been set we know it is at least - //3.1 (todo actually parse the version string) - if (!server) { - done(); + if (!serverIs31OrLater(done)) { return; } @@ -234,14 +247,10 @@ describe('session', () => { }); it('should expose execution time information when using 3.1 and onwards', done => { - - //lazy way of checking the version number - //if server has been set we know it is at least - //3.1 (todo actually parse the version string) - if (!server) { - done(); + if (!serverIs31OrLater(done)) { return; } + // Given const statement = 'UNWIND range(1,10000) AS n RETURN n AS number'; // When & Then @@ -563,6 +572,10 @@ describe('session', () => { }); it('should update last bookmark after every read tx commit', done => { + if (!serverIs31OrLater(done)) { + return; + } + const bookmarkBefore = session.lastBookmark(); const tx = session.beginTransaction(); @@ -583,6 +596,10 @@ describe('session', () => { }); it('should update last bookmark after every write tx commit', done => { + if (!serverIs31OrLater(done)) { + return; + } + const bookmarkBefore = session.lastBookmark(); const tx = session.beginTransaction(); @@ -599,6 +616,10 @@ describe('session', () => { }); it('should not lose last bookmark after run', done => { + if (!serverIs31OrLater(done)) { + return; + } + const tx = session.beginTransaction(); tx.run('CREATE ()').then(() => { tx.commit().then(() => { @@ -615,6 +636,184 @@ describe('session', () => { }); }); + it('should commit read transaction', done => { + if (!serverIs31OrLater(done)) { + return; + } + + const bookmarkBefore = session.lastBookmark(); + const resultPromise = session.readTransaction(tx => tx.run('RETURN 42 AS answer')); + + resultPromise.then(result => { + expect(result.records.length).toEqual(1); + expect(result.records[0].get('answer').toNumber()).toEqual(42); + + const bookmarkAfter = session.lastBookmark(); + verifyBookmark(bookmarkAfter); + expect(bookmarkAfter).not.toEqual(bookmarkBefore); + + done(); + }); + }); + + it('should commit write transaction', done => { + if (!serverIs31OrLater(done)) { + return; + } + + const bookmarkBefore = session.lastBookmark(); + const resultPromise = session.writeTransaction(tx => tx.run('CREATE (n:Node {id: 42}) RETURN n.id AS answer')); + + resultPromise.then(result => { + expect(result.records.length).toEqual(1); + expect(result.records[0].get('answer').toNumber()).toEqual(42); + expect(result.summary.counters.nodesCreated()).toEqual(1); + + const bookmarkAfter = session.lastBookmark(); + verifyBookmark(bookmarkAfter); + expect(bookmarkAfter).not.toEqual(bookmarkBefore); + + countNodes('Node', 'id', 42).then(count => { + expect(count).toEqual(1); + done(); + }); + }); + }); + + it('should not commit already committed read transaction', done => { + if (!serverIs31OrLater(done)) { + return; + } + + const resultPromise = session.readTransaction(tx => { + return new Promise((resolve, reject) => { + tx.run('RETURN 42 AS answer').then(result => { + tx.commit().then(() => { + resolve({result: result, bookmark: session.lastBookmark()}); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); + }); + + resultPromise.then(outcome => { + const bookmark = outcome.bookmark; + const result = outcome.result; + + verifyBookmark(bookmark); + expect(session.lastBookmark()).toEqual(bookmark); // expect bookmark to not change + + expect(result.records.length).toEqual(1); + expect(result.records[0].get('answer').toNumber()).toEqual(42); + + done(); + }); + }); + + it('should not commit already committed write transaction', done => { + if (!serverIs31OrLater(done)) { + return; + } + + const resultPromise = session.readTransaction(tx => { + return new Promise((resolve, reject) => { + tx.run('CREATE (n:Node {id: 42}) RETURN n.id AS answer').then(result => { + tx.commit().then(() => { + resolve({result: result, bookmark: session.lastBookmark()}); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); + }); + + resultPromise.then(outcome => { + const bookmark = outcome.bookmark; + const result = outcome.result; + + verifyBookmark(bookmark); + expect(session.lastBookmark()).toEqual(bookmark); // expect bookmark to not change + + expect(result.records.length).toEqual(1); + expect(result.records[0].get('answer').toNumber()).toEqual(42); + expect(result.summary.counters.nodesCreated()).toEqual(1); + + countNodes('Node', 'id', 42).then(count => { + expect(count).toEqual(1); + done(); + }); + }); + }); + + it('should not commit rolled back read transaction', done => { + if (!serverIs31OrLater(done)) { + return; + } + + const bookmarkBefore = session.lastBookmark(); + const resultPromise = session.readTransaction(tx => { + return new Promise((resolve, reject) => { + tx.run('RETURN 42 AS answer').then(result => { + tx.rollback().then(() => { + resolve(result); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); + }); + + resultPromise.then(result => { + expect(result.records.length).toEqual(1); + expect(result.records[0].get('answer').toNumber()).toEqual(42); + expect(session.lastBookmark()).toBe(bookmarkBefore); // expect bookmark to not change + + done(); + }); + }); + + it('should not commit rolled back write transaction', done => { + if (!serverIs31OrLater(done)) { + return; + } + + const bookmarkBefore = session.lastBookmark(); + const resultPromise = session.readTransaction(tx => { + return new Promise((resolve, reject) => { + tx.run('CREATE (n:Node {id: 42}) RETURN n.id AS answer').then(result => { + tx.rollback().then(() => { + resolve(result); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); + }); + + resultPromise.then(result => { + expect(result.records.length).toEqual(1); + expect(result.records[0].get('answer').toNumber()).toEqual(42); + expect(result.summary.counters.nodesCreated()).toEqual(1); + expect(session.lastBookmark()).toBe(bookmarkBefore); // expect bookmark to not change + + countNodes('Node', 'id', 42).then(count => { + expect(count).toEqual(0); + done(); + }); + }); + }); + + function serverIs31OrLater(done) { + // lazy way of checking the version number + // if server has been set we know it is at least 3.1 + if (!serverMetadata) { + done(); + return false; + } + return true; + } + + function countNodes(label, propertyKey, propertyValue) { + return new Promise((resolve, reject) => { + session.run(`MATCH (n: ${label} {${propertyKey}: ${propertyValue}}) RETURN count(n) AS count`).then(result => { + resolve(result.records[0].get('count').toNumber()); + }).catch(error => reject(error)); + }); + } + function withQueryInTmpSession(driver, callback) { const tmpSession = driver.session(); return tmpSession.run('RETURN 1').then(() => { @@ -636,4 +835,9 @@ describe('session', () => { const idleConnections = connectionPool._pools[address]; return idleConnections.length; } + + function verifyBookmark(bookmark) { + expect(bookmark).toBeDefined(); + expect(bookmark).not.toBeNull(); + } }); diff --git a/test/v1/transaction.test.js b/test/v1/transaction.test.js index e742d9753..80091ca98 100644 --- a/test/v1/transaction.test.js +++ b/test/v1/transaction.test.js @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import neo4j from "../../lib/v1"; +import neo4j from '../../lib/v1'; describe('transaction', () => { @@ -449,6 +449,42 @@ describe('transaction', () => { }).catch(console.log); }); + it('should be open when neither committed nor rolled back', () => { + const tx = session.beginTransaction(); + expect(tx.isOpen()).toBeTruthy(); + }); + + it('should not be open after commit', done => { + const tx = session.beginTransaction(); + + tx.run('CREATE ()').then(() => { + tx.commit().then(() => { + expect(tx.isOpen()).toBeFalsy(); + done(); + }); + }); + }); + + it('should not be open after rollback', done => { + const tx = session.beginTransaction(); + + tx.run('CREATE ()').then(() => { + tx.rollback().then(() => { + expect(tx.isOpen()).toBeFalsy(); + done(); + }); + }); + }); + + it('should not be open after run error', done => { + const tx = session.beginTransaction(); + + tx.run('RETURN').catch(() => { + expect(tx.isOpen()).toBeFalsy(); + done(); + }); + }); + function expectSyntaxError(error) { expect(error.code).toBe('Neo.ClientError.Statement.SyntaxError'); }