Skip to content

Commit 48e0649

Browse files
committed
Lazy connection initialization and eager release
This commit effectively decouples session from connection. Session now holds two `ConnectionHolder`s that lazily pick connection from the pool when it is needed and try to release connection back to the pool when it does not have any users. Session keeps one connection holder for read connection and one for write connection. Connection is acquired for `Session#run()` and released when the returned result is either fully consumed or error happens. Also connection is acquired for `Session#beginTransaction()` and released when transaction commits or rolls back.
1 parent b01d369 commit 48e0649

14 files changed

+751
-148
lines changed

src/v1/internal/connection-holder.js

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {newError} from '../error';
21+
22+
/**
23+
* Utility to lazily initialize connections and return them back to the pool when unused.
24+
*/
25+
export default class ConnectionHolder {
26+
27+
/**
28+
* @constructor
29+
* @param {string} mode - the access mode for new connection holder.
30+
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
31+
*/
32+
constructor(mode, connectionProvider) {
33+
this._mode = mode;
34+
this._connectionProvider = connectionProvider;
35+
this._referenceCount = 0;
36+
this._connectionPromise = Promise.resolve(null);
37+
}
38+
39+
/**
40+
* Make this holder initialize new connection if none exists already.
41+
* @return {undefined}
42+
*/
43+
initializeConnection() {
44+
if (this._referenceCount === 0) {
45+
this._connectionPromise = this._connectionProvider.acquireConnection(this._mode);
46+
}
47+
this._referenceCount++;
48+
}
49+
50+
/**
51+
* Get the current connection promise.
52+
* @return {Promise<Connection>} promise resolved with the current connection.
53+
*/
54+
getConnection() {
55+
return this._connectionPromise;
56+
}
57+
58+
/**
59+
* Notify this holder that single party does not require current connection any more.
60+
* @return {Promise<Connection>} promise resolved with the current connection.
61+
*/
62+
releaseConnection() {
63+
if (this._referenceCount === 0) {
64+
return this._connectionPromise;
65+
}
66+
67+
this._referenceCount--;
68+
if (this._referenceCount === 0) {
69+
// release a connection without muting ACK_FAILURE, this is the last action on this connection
70+
return this._releaseConnection(true);
71+
}
72+
return this._connectionPromise;
73+
}
74+
75+
/**
76+
* Closes this holder and releases current connection (if any) despite any existing users.
77+
* @return {Promise<Connection>} promise resolved when current connection is released to the pool.
78+
*/
79+
close() {
80+
if (this._referenceCount === 0) {
81+
return this._connectionPromise;
82+
}
83+
this._referenceCount = 0;
84+
// release a connection and mute ACK_FAILURE, this might be called concurrently with other
85+
// operations and thus should ignore failure handling
86+
return this._releaseConnection(false);
87+
}
88+
89+
/**
90+
* Return the current pooled connection instance to the connection pool.
91+
* We don't pool Session instances, to avoid users using the Session after they've called close.
92+
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
93+
* @return {Promise} - promise resolved then connection is returned to the pool.
94+
* @private
95+
*/
96+
_releaseConnection(sync) {
97+
this._connectionPromise = this._connectionPromise.then(connection => {
98+
if (connection) {
99+
if(sync) {
100+
connection.reset();
101+
} else {
102+
connection.resetAsync();
103+
}
104+
connection.sync();
105+
connection._release();
106+
}
107+
}).catch(ignoredError => {
108+
});
109+
110+
return this._connectionPromise;
111+
}
112+
}
113+
114+
class EmptyConnectionHolder extends ConnectionHolder {
115+
116+
initializeConnection() {
117+
// nothing to initialize
118+
}
119+
120+
getConnection() {
121+
return Promise.reject(newError('This connection holder does not serve connections'));
122+
}
123+
124+
releaseConnection() {
125+
return Promise.resolve();
126+
}
127+
128+
close() {
129+
return Promise.resolve();
130+
}
131+
}
132+
133+
/**
134+
* Connection holder that does not manage any connections.
135+
* @type {ConnectionHolder}
136+
*/
137+
export const EMPTY_CONNECTION_HOLDER = new EmptyConnectionHolder();

src/v1/internal/connection-providers.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ class ConnectionProvider {
3333
_withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) {
3434
// install error handler from the driver on the connection promise; this callback is installed separately
3535
// so that it does not handle errors, instead it is just an additional error reporting facility.
36-
connectionPromise.catch(error => driverOnErrorCallback(error));
36+
connectionPromise.catch(error => {
37+
driverOnErrorCallback(error)
38+
});
3739
// return the original connection promise
3840
return connectionPromise;
3941
}
@@ -49,7 +51,8 @@ export class DirectConnectionProvider extends ConnectionProvider {
4951
}
5052

5153
acquireConnection(mode) {
52-
const connectionPromise = Promise.resolve(this._connectionPool.acquire(this._address));
54+
const connection = this._connectionPool.acquire(this._address);
55+
const connectionPromise = Promise.resolve(connection);
5356
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
5457
}
5558
}

src/v1/internal/connector.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ function log(actor, msg) {
9696
}
9797
}
9898

99-
10099
function NO_OP(){}
101100

102101
let NO_OP_OBSERVER = {
@@ -384,9 +383,9 @@ class Connection {
384383
this._chunker.messageBoundary();
385384
}
386385

387-
/** Queue a RESET-message to be sent to the database */
388-
reset( observer ) {
389-
log("C", "RESET");
386+
/** Queue a RESET-message to be sent to the database. Mutes failure handling. */
387+
resetAsync( observer ) {
388+
log("C", "RESET_ASYNC");
390389
this._isHandlingFailure = true;
391390
let self = this;
392391
let wrappedObs = {
@@ -404,6 +403,14 @@ class Connection {
404403
this._chunker.messageBoundary();
405404
}
406405

406+
/** Queue a RESET-message to be sent to the database */
407+
reset(observer) {
408+
log('C', 'RESET');
409+
this._queueObserver(observer);
410+
this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err));
411+
this._chunker.messageBoundary();
412+
}
413+
407414
/** Queue a ACK_FAILURE-message to be sent to the database */
408415
_ackFailure( observer ) {
409416
log("C", "ACK_FAILURE");

src/v1/internal/get-servers-util.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
* limitations under the License.
1818
*/
1919

20-
import RoundRobinArray from "./round-robin-array";
21-
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error";
22-
import Integer, {int} from "../integer";
20+
import RoundRobinArray from './round-robin-array';
21+
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error';
22+
import Integer, {int} from '../integer';
2323

2424
const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers';
2525
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';

src/v1/result.js

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
import ResultSummary from './result-summary';
21+
import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder';
2122

2223
/**
2324
* A stream of {@link Record} representing the result of a statement.
@@ -32,13 +33,15 @@ class Result {
3233
* @param {mixed} statement - Cypher statement to execute
3334
* @param {Object} parameters - Map with parameters to use in statement
3435
* @param metaSupplier function, when called provides metadata
36+
* @param {ConnectionHolder} connectionHolder - to be notified when result is either fully consumed or error happened.
3537
*/
36-
constructor(streamObserver, statement, parameters, metaSupplier) {
38+
constructor(streamObserver, statement, parameters, metaSupplier, connectionHolder) {
3739
this._streamObserver = streamObserver;
3840
this._p = null;
3941
this._statement = statement;
4042
this._parameters = parameters || {};
4143
this._metaSupplier = metaSupplier || function(){return {};};
44+
this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER;
4245
}
4346

4447
/**
@@ -99,23 +102,39 @@ class Result {
99102
* @return
100103
*/
101104
subscribe(observer) {
102-
let onCompletedOriginal = observer.onCompleted;
103-
let self = this;
104-
let onCompletedWrapper = (metadata) => {
105+
const onCompletedOriginal = observer.onCompleted;
106+
const self = this;
107+
const onCompletedWrapper = (metadata) => {
105108

106-
let additionalMeta = self._metaSupplier();
107-
for(var key in additionalMeta) {
109+
const additionalMeta = self._metaSupplier();
110+
for(let key in additionalMeta) {
108111
if (additionalMeta.hasOwnProperty(key)) {
109112
metadata[key] = additionalMeta[key];
110113
}
111114
}
112-
let sum = new ResultSummary(this._statement, this._parameters, metadata);
113-
onCompletedOriginal.call(observer, sum);
115+
const sum = new ResultSummary(this._statement, this._parameters, metadata);
116+
117+
// notify connection holder that the used connection is not needed any more because result has
118+
// been fully consumed; call the original onCompleted callback after that
119+
self._connectionHolder.releaseConnection().then(() => {
120+
onCompletedOriginal.call(observer, sum);
121+
});
114122
};
115123
observer.onCompleted = onCompletedWrapper;
116-
observer.onError = observer.onError || ((err) => {
117-
console.log("Uncaught error when processing result: " + err);
124+
125+
const onErrorOriginal = observer.onError || (error => {
126+
console.log("Uncaught error when processing result: " + error);
118127
});
128+
129+
const onErrorWrapper = error => {
130+
// notify connection holder that the used connection is not needed any more because error happened
131+
// and result can't bee consumed any further; call the original onError callback after that
132+
self._connectionHolder.releaseConnection().then(() => {
133+
onErrorOriginal.call(observer, error);
134+
});
135+
};
136+
observer.onError = onErrorWrapper;
137+
119138
this._streamObserver.subscribe(observer);
120139
}
121140
}

0 commit comments

Comments
 (0)