diff --git a/src/session.js b/src/session.js index 36bee60a2..82051b0ae 100644 --- a/src/session.js +++ b/src/session.js @@ -103,8 +103,9 @@ class Session { ? new TxConfig(transactionConfig) : TxConfig.empty() - return this._run(validatedQuery, params, connection => - connection.protocol().run(validatedQuery, params, { + return this._run(validatedQuery, params, connection => { + this._assertSessionIsOpen() + return connection.protocol().run(validatedQuery, params, { bookmark: this._lastBookmark, txConfig: autoCommitTxConfig, mode: this._mode, @@ -113,7 +114,7 @@ class Session { reactive: this._reactive, fetchSize: this._fetchSize }) - ) + }) } _run (query, parameters, customRunner) { @@ -188,6 +189,7 @@ class Session { connectionHolder, onClose: this._transactionClosed.bind(this), onBookmark: this._updateBookmark.bind(this), + onConnection: this._assertSessionIsOpen.bind(this), reactive: this._reactive, fetchSize: this._fetchSize }) @@ -195,6 +197,12 @@ class Session { return tx } + _assertSessionIsOpen () { + if (!this._open) { + throw newError('You cannot run more transactions on a closed session.') + } + } + _transactionClosed () { this._hasTx = false } diff --git a/src/transaction.js b/src/transaction.js index 18d2b618c..ccb02a8a6 100644 --- a/src/transaction.js +++ b/src/transaction.js @@ -42,15 +42,25 @@ class Transaction { * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced. + * * @param {function()} onConnection - Function to be called when a connection is obtained to ensure the conneciton + * is not yet released. * @param {boolean} reactive whether this transaction generates reactive streams * @param {number} fetchSize - the record fetch size in each pulling batch. */ - constructor ({ connectionHolder, onClose, onBookmark, reactive, fetchSize }) { + constructor ({ + connectionHolder, + onClose, + onBookmark, + onConnection, + reactive, + fetchSize + }) { this._connectionHolder = connectionHolder this._reactive = reactive this._state = _states.ACTIVE this._onClose = onClose this._onBookmark = onBookmark + this._onConnection = onConnection this._onError = this._onErrorCallback.bind(this) this._onComplete = this._onCompleteCallback.bind(this) this._fetchSize = fetchSize @@ -60,8 +70,9 @@ class Transaction { _begin (bookmark, txConfig) { this._connectionHolder .getConnection() - .then(conn => - conn.protocol().beginTransaction({ + .then(conn => { + this._onConnection() + return conn.protocol().beginTransaction({ bookmark: bookmark, txConfig: txConfig, mode: this._connectionHolder.mode(), @@ -69,7 +80,7 @@ class Transaction { beforeError: this._onError, afterComplete: this._onComplete }) - ) + }) .catch(error => this._onError(error)) } @@ -91,6 +102,7 @@ class Transaction { connectionHolder: this._connectionHolder, onError: this._onError, onComplete: this._onComplete, + onConnection: this._onConnection, reactive: this._reactive, fetchSize: this._fetchSize }) @@ -110,6 +122,7 @@ class Transaction { connectionHolder: this._connectionHolder, onError: this._onError, onComplete: this._onComplete, + onConnection: this._onConnection, pendingResults: this._results }) this._state = committed.state @@ -136,6 +149,7 @@ class Transaction { connectionHolder: this._connectionHolder, onError: this._onError, onComplete: this._onComplete, + onConnection: this._onConnection, pendingResults: this._results }) this._state = rolledback.state @@ -176,25 +190,39 @@ class Transaction { const _states = { // The transaction is running with no explicit success or failure marked ACTIVE: { - commit: ({ connectionHolder, onError, onComplete, pendingResults }) => { + commit: ({ + connectionHolder, + onError, + onComplete, + onConnection, + pendingResults + }) => { return { result: finishTransaction( true, connectionHolder, onError, onComplete, + onConnection, pendingResults ), state: _states.SUCCEEDED } }, - rollback: ({ connectionHolder, onError, onComplete, pendingResults }) => { + rollback: ({ + connectionHolder, + onError, + onComplete, + onConnection, + pendingResults + }) => { return { result: finishTransaction( false, connectionHolder, onError, onComplete, + onConnection, pendingResults ), state: _states.ROLLED_BACK @@ -203,14 +231,22 @@ const _states = { run: ( query, parameters, - { connectionHolder, onError, onComplete, reactive, fetchSize } + { + connectionHolder, + onError, + onComplete, + onConnection, + reactive, + fetchSize + } ) => { // RUN in explicit transaction can't contain bookmarks and transaction configuration // No need to include mode and database name as it shall be inclued in begin const observerPromise = connectionHolder .getConnection() - .then(conn => - conn.protocol().run(query, parameters, { + .then(conn => { + onConnection() + return conn.protocol().run(query, parameters, { bookmark: Bookmark.empty(), txConfig: TxConfig.empty(), beforeError: onError, @@ -218,7 +254,7 @@ const _states = { reactive: reactive, fetchSize: fetchSize }) - ) + }) .catch(error => new FailedObserver({ error, onError })) return newCompletedResult(observerPromise, query, parameters) @@ -249,11 +285,7 @@ const _states = { state: _states.FAILED } }, - run: ( - query, - parameters, - { connectionHolder, onError, onComplete, reactive } - ) => { + run: (query, parameters, { connectionHolder, onError, onComplete }) => { return newCompletedResult( new FailedObserver({ error: newError( @@ -299,11 +331,7 @@ const _states = { state: _states.SUCCEEDED } }, - run: ( - query, - parameters, - { connectionHolder, onError, onComplete, reactive } - ) => { + run: (query, parameters, { connectionHolder, onError, onComplete }) => { return newCompletedResult( new FailedObserver({ error: newError( @@ -348,11 +376,7 @@ const _states = { state: _states.ROLLED_BACK } }, - run: ( - query, - parameters, - { connectionHolder, onError, onComplete, reactive } - ) => { + run: (query, parameters, { connectionHolder, onError, onComplete }) => { return newCompletedResult( new FailedObserver({ error: newError( @@ -373,6 +397,7 @@ const _states = { * @param {ConnectionHolder} connectionHolder * @param {function(err:Error): any} onError * @param {function(metadata:object): any} onComplete + * @param {function() : any} onConnection * @param {list>}pendingResults all run results in this transaction */ function finishTransaction ( @@ -380,11 +405,13 @@ function finishTransaction ( connectionHolder, onError, onComplete, + onConnection, pendingResults ) { const observerPromise = connectionHolder .getConnection() .then(connection => { + onConnection() pendingResults.forEach(r => r._cancel()) return Promise.all(pendingResults).then(results => { if (commit) { diff --git a/test/transaction.test.js b/test/transaction.test.js index 83406a824..e3296b9f3 100644 --- a/test/transaction.test.js +++ b/test/transaction.test.js @@ -20,6 +20,7 @@ import neo4j from '../src' import sharedNeo4j from './internal/shared-neo4j' import { ServerVersion } from '../src/internal/server-version' import TxConfig from '../src/internal/tx-config' +import { READ } from '../src/driver' describe('#integration transaction', () => { let driver @@ -589,6 +590,23 @@ describe('#integration transaction', () => { expect(result).toBeUndefined() }) + it('should reset transaction', async done => { + const session = driver.session({ defaultAccessMode: READ }) + const tx = session.beginTransaction() + await tx.run('RETURN 1') + + const closePromise = session.close() + try { + await tx.run('Match (n:Person) RETURN n') + } catch (error) { + expect(error.message).toBe( + 'You cannot run more transactions on a closed session.' + ) + await closePromise + done() + } + }) + function expectSyntaxError (error) { expect(error.code).toBe('Neo.ClientError.Statement.SyntaxError') }