diff --git a/src/internal/connection-holder-readonly.js b/src/internal/connection-holder-readonly.js new file mode 100644 index 000000000..831c37477 --- /dev/null +++ b/src/internal/connection-holder-readonly.js @@ -0,0 +1,77 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import ConnectionHolder from './connection-holder' + +/** + * Provides a interaction with a ConnectionHolder without change it state by + * releasing or initilizing + */ +export default class ReadOnlyConnectionHolder extends ConnectionHolder { + /** + * Contructor + * @param {ConnectionHolder} connectionHolder the connection holder which will treat the requests + */ + constructor (connectionHolder) { + super({ + mode: connectionHolder._mode, + database: connectionHolder._database, + bookmark: connectionHolder._bookmark, + connectionProvider: connectionHolder._connectionProvider + }) + this._connectionHolder = connectionHolder + } + + /** + * Return the true if the connection is suppose to be initilized with the command. + * + * @return {boolean} + */ + initializeConnection () { + if (this._connectionHolder._referenceCount === 0) { + return false + } + return true + } + + /** + * Get the current connection promise. + * @return {Promise} promise resolved with the current connection. + */ + getConnection () { + return this._connectionHolder.getConnection() + } + + /** + * Get the current connection promise, doesn't performs the release + * @return {Promise} promise with the resolved current connection + */ + releaseConnection () { + return this._connectionHolder.getConnection().catch(() => Promise.resolve()) + } + + /** + * Get the current connection promise, doesn't performs the connection close + * @return {Promise} promise with the resolved current connection + */ + close () { + return this._connectionHolder + .getConnection() + .catch(() => () => Promise.resolve()) + } +} diff --git a/src/result.js b/src/result.js index 6dc77859f..565b3ba19 100644 --- a/src/result.js +++ b/src/result.js @@ -166,26 +166,34 @@ class Result { const query = this._query const parameters = this._parameters - function release (protocolVersion) { + function complete (protocolVersion) { + onCompletedOriginal.call( + observer, + new ResultSummary(query, parameters, metadata, protocolVersion) + ) + } + + function release () { // notify connection holder that the used connection is not needed any more because result has // been fully consumed; call the original onCompleted callback after that - connectionHolder.releaseConnection().then(() => { - onCompletedOriginal.call( - observer, - new ResultSummary(query, parameters, metadata, protocolVersion) - ) - }) + return connectionHolder.releaseConnection() } connectionHolder.getConnection().then( // onFulfilled: connection => { - release(connection ? connection.protocol().version : undefined) + release().then(() => + complete( + connection !== undefined + ? connection.protocol().version + : undefined + ) + ) }, // onRejected: _ => { - release() + complete() } ) } diff --git a/src/transaction.js b/src/transaction.js index ccb02a8a6..2e7e57a0e 100644 --- a/src/transaction.js +++ b/src/transaction.js @@ -21,6 +21,7 @@ import { validateQueryAndParameters } from './internal/util' import ConnectionHolder, { EMPTY_CONNECTION_HOLDER } from './internal/connection-holder' +import ReadOnlyConnectionHolder from './internal/connection-holder-readonly' import Bookmark from './internal/bookmark' import TxConfig from './internal/tx-config' @@ -257,7 +258,12 @@ const _states = { }) .catch(error => new FailedObserver({ error, onError })) - return newCompletedResult(observerPromise, query, parameters) + return newCompletedResult( + observerPromise, + query, + parameters, + connectionHolder + ) } }, @@ -274,14 +280,20 @@ const _states = { onError }), 'COMMIT', - {} + {}, + connectionHolder ), state: _states.FAILED } }, rollback: ({ connectionHolder, onError, onComplete }) => { return { - result: newCompletedResult(new CompletedObserver(), 'ROLLBACK', {}), + result: newCompletedResult( + new CompletedObserver(), + 'ROLLBACK', + {}, + connectionHolder + ), state: _states.FAILED } }, @@ -294,7 +306,8 @@ const _states = { onError }), query, - parameters + parameters, + connectionHolder ) } }, @@ -313,7 +326,8 @@ const _states = { 'COMMIT', {} ), - state: _states.SUCCEEDED + state: _states.SUCCEEDED, + connectionHolder } }, rollback: ({ connectionHolder, onError, onComplete }) => { @@ -328,7 +342,8 @@ const _states = { 'ROLLBACK', {} ), - state: _states.SUCCEEDED + state: _states.SUCCEEDED, + connectionHolder } }, run: (query, parameters, { connectionHolder, onError, onComplete }) => { @@ -340,7 +355,8 @@ const _states = { onError }), query, - parameters + parameters, + connectionHolder ) } }, @@ -357,7 +373,8 @@ const _states = { onError }), 'COMMIT', - {} + {}, + connectionHolder ), state: _states.ROLLED_BACK } @@ -371,7 +388,8 @@ const _states = { ) }), 'ROLLBACK', - {} + {}, + connectionHolder ), state: _states.ROLLED_BACK } @@ -385,7 +403,8 @@ const _states = { onError }), query, - parameters + parameters, + connectionHolder ) } } @@ -446,15 +465,21 @@ function finishTransaction ( * @param {ResultStreamObserver} observer - an observer for the created result. * @param {string} query - the cypher query that produced the result. * @param {Object} parameters - the parameters for cypher query that produced the result. + * @param {ConnectionHolder} connectionHolder - the connection holder used to get the result * @return {Result} new result. * @private */ -function newCompletedResult (observerPromise, query, parameters) { +function newCompletedResult ( + observerPromise, + query, + parameters, + connectionHolder = EMPTY_CONNECTION_HOLDER +) { return new Result( Promise.resolve(observerPromise), query, parameters, - EMPTY_CONNECTION_HOLDER + new ReadOnlyConnectionHolder(connectionHolder || EMPTY_CONNECTION_HOLDER) ) } diff --git a/test/internal/connection-holder-readonly.test.js b/test/internal/connection-holder-readonly.test.js new file mode 100644 index 000000000..442da07d2 --- /dev/null +++ b/test/internal/connection-holder-readonly.test.js @@ -0,0 +1,327 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ConnectionHolder, { + EMPTY_CONNECTION_HOLDER +} from '../../src/internal/connection-holder' +import SingleConnectionProvider from '../../src/internal/connection-provider-single' +import { READ, WRITE } from '../../src/driver' +import FakeConnection from './fake-connection' +import Connection from '../../src/internal/connection' +import ReadOnlyConnectionHolder from '../../src/internal/connection-holder-readonly' + +describe('#unit ReadOnlyConnectionHolder wrapping EmptyConnectionHolder', () => { + it('should return rejected promise instead of connection', done => { + EMPTY_CONNECTION_HOLDER.getConnection().catch(() => { + done() + }) + }) + + it('should return resolved promise on release', done => { + EMPTY_CONNECTION_HOLDER.releaseConnection().then(() => { + done() + }) + }) + + it('should return resolved promise on close', done => { + EMPTY_CONNECTION_HOLDER.close().then(() => { + done() + }) + }) +}) + +describe('#unit ReadOnlyConnectionHolder wrapping ConnectionHolder', () => { + it('should return connection promise', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.getConnection().then(conn => { + expect(conn).toBe(connection) + done() + }) + }) + + it('should return connection promise with version', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/9.9.9') + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.getConnection().then(conn => { + verifyConnection(conn, 'Neo4j/9.9.9') + done() + }) + }) + + it('should propagate connection acquisition failure', done => { + const errorMessage = 'Failed to acquire or initialize the connection' + const connectionPromise = Promise.reject(new Error(errorMessage)) + const connectionProvider = newSingleConnectionProvider(connectionPromise) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.getConnection().catch(error => { + expect(error.message).toEqual(errorMessage) + done() + }) + }) + + it('should release not connection with single user', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should not release connection with multiple users', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => { + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + } + ) + + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should not release connection with multiple users when all users release', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => { + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + } + ) + + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + }) + }) + + it('should do nothing when closed and not initialized', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.close().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should not close even when users exist', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => { + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + } + ) + + connectionHolder.close().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should initialize new connection after releasing current one', done => { + const connection1 = new FakeConnection() + const connection2 = new FakeConnection() + const connectionProvider = new RecordingConnectionProvider([ + connection1, + connection2 + ]) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.releaseConnection().then(() => { + expect(connection1.isReleasedOnce()).toBeFalsy() + + connectionHolder.initializeConnection() + connectionHolder.releaseConnection().then(() => { + expect(connection2.isReleasedOnce()).toBeFalsy() + done() + }) + }) + }) + + it('should initialize new connection after being closed', done => { + const connection1 = new FakeConnection() + const connection2 = new FakeConnection() + const connectionProvider = new RecordingConnectionProvider([ + connection1, + connection2 + ]) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.close().then(() => { + expect(connection1.isNeverReleased()).toBeTruthy() + + connectionHolder.initializeConnection() + connectionHolder.close().then(() => { + expect(connection2.isNeverReleased()).toBeTruthy() + done() + }) + }) + }) + + it('should return passed mode', () => { + function verifyMode (connectionProvider, mode) { + expect(connectionProvider.mode()).toBe(mode) + } + + verifyMode(newConnectionHolder(), WRITE) + verifyMode(newConnectionHolder({ mode: WRITE }), WRITE) + verifyMode(newConnectionHolder({ mode: READ }), READ) + }) + + it('should default to empty database', () => { + function verifyDefault (connectionProvider) { + expect(connectionProvider.database()).toBe('') + } + + const connectionProvider = newSingleConnectionProvider(new FakeConnection()) + + verifyDefault(newConnectionHolder()) + verifyDefault(newConnectionHolder({ mode: READ, connectionProvider })) + verifyDefault(newConnectionHolder({ mode: WRITE, connectionProvider })) + verifyDefault( + newConnectionHolder({ mode: WRITE, database: '', connectionProvider }) + ) + verifyDefault( + newConnectionHolder({ mode: WRITE, database: null, connectionProvider }) + ) + verifyDefault( + newConnectionHolder({ + mode: WRITE, + database: undefined, + connectionProvider + }) + ) + }) + + it('should return passed database', () => { + const connectionProvider = newSingleConnectionProvider(new FakeConnection()) + const connectionHolder = newConnectionHolder({ + database: 'testdb', + connectionProvider + }) + + expect(connectionHolder.database()).toBe('testdb') + }) +}) + +function newConnectionHolder (params, connectionHolderInit = () => {}) { + const connectionHolder = new ConnectionHolder(params) + connectionHolderInit(connectionHolder) + return new ReadOnlyConnectionHolder(connectionHolder) +} + +class RecordingConnectionProvider extends SingleConnectionProvider { + constructor (connections) { + super(Promise.resolve()) + this.connectionPromises = connections.map(conn => Promise.resolve(conn)) + this.acquireConnectionInvoked = 0 + } + + acquireConnection (mode, database) { + return this.connectionPromises[this.acquireConnectionInvoked++] + } +} + +function newSingleConnectionProvider (connection) { + return new SingleConnectionProvider(Promise.resolve(connection)) +} + +/** + * @param {Connection} connection + * @param {*} expectedServerVersion + */ +function verifyConnection (connection, expectedServerVersion) { + expect(connection).toBeDefined() + expect(connection.server).toBeDefined() + expect(connection.server.version).toEqual(expectedServerVersion) +} diff --git a/test/internal/shared-neo4j.js b/test/internal/shared-neo4j.js index 0c7e56cc4..13dffc88c 100644 --- a/test/internal/shared-neo4j.js +++ b/test/internal/shared-neo4j.js @@ -310,7 +310,9 @@ function restart (configOverride) { async function cleanupAndGetProtocolVersion (driver) { const session = driver.session({ defaultAccessMode: neo4j.session.WRITE }) try { - const result = await session.run('MATCH (n) DETACH DELETE n') + const result = await session.writeTransaction(tx => + tx.run('MATCH (n) DETACH DELETE n') + ) return result.summary.server.protocolVersion } finally { await session.close() @@ -320,7 +322,9 @@ async function cleanupAndGetProtocolVersion (driver) { async function getEdition (driver) { const session = driver.session({ defaultAccessMode: neo4j.session.READ }) try { - const result = await session.run('CALL dbms.components() YIELD edition') + const result = await session.readTransaction(tx => + tx.run('CALL dbms.components() YIELD edition') + ) const singleRecord = result.records[0] return singleRecord.get(0) } finally { diff --git a/test/transaction.test.js b/test/transaction.test.js index ed0b397f1..954c976a2 100644 --- a/test/transaction.test.js +++ b/test/transaction.test.js @@ -87,6 +87,21 @@ describe('#integration transaction', () => { .catch(console.log) }, 60000) + it('should populate result.summary.server.protocolVersion for transaction#run', done => { + const tx = session.beginTransaction() + tx.run('CREATE (:TXNode1)') + .then(result => { + tx.commit() + .then(() => { + expect(result.summary.server.protocolVersion).toBeDefined() + expect(result.summary.server.protocolVersion).not.toBeLessThan(0) + done() + }) + .catch(done.fail.bind(done)) + }) + .catch(done.fail.bind(done)) + }, 60000) + it('should handle interactive session', done => { const tx = session.beginTransaction() tx.run("RETURN 'foo' AS res")