Skip to content

routing: Fix default database name in the routing connection provider #653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 115 additions & 32 deletions src/internal/connection-provider-routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
})

this._seedRouter = address
this._routingTables = {}
this._rediscovery = new Rediscovery(routingContext, address.toString())
this._loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(
this._connectionPool
Expand All @@ -72,9 +71,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._dnsResolver = new HostNameResolver()
this._log = log
this._useSeedRouter = true
this._routingTablePurgeDelay = routingTablePurgeDelay
? int(routingTablePurgeDelay)
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
this._routingTableRegistry = new RoutingTableRegistry(
routingTablePurgeDelay
? int(routingTablePurgeDelay)
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
)
}

_createConnectionErrorHandler () {
Expand All @@ -87,15 +88,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._log.warn(
`Routing driver ${this._id} will forget ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
)
this.forget(address, database || '')
this.forget(address, database || DEFAULT_DB_NAME)
return error
}

_handleWriteFailure (error, address, database) {
this._log.warn(
`Routing driver ${this._id} will forget writer ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
)
this.forgetWriter(address, database || '')
this.forgetWriter(address, database || DEFAULT_DB_NAME)
return newError(
'No longer possible to write to server at ' + address,
SESSION_EXPIRED
Expand Down Expand Up @@ -206,36 +207,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
}

forget (address, database) {
if (database || database === '') {
this._routingTables[database].forget(address)
} else {
Object.values(this._routingTables).forEach(routingTable =>
routingTable.forget(address)
)
}
this._routingTableRegistry.apply(database, {
applyWhenExists: routingTable => routingTable.forget(address)
})

// We're firing and forgetting this operation explicitly and listening for any
// errors to avoid unhandled promise rejection
this._connectionPool.purge(address).catch(() => {})
}

forgetWriter (address, database) {
if (database || database === '') {
this._routingTables[database].forgetWriter(address)
} else {
Object.values(this._routingTables).forEach(routingTable =>
routingTable.forgetWriter(address)
)
}
this._routingTableRegistry.apply(database, {
applyWhenExists: routingTable => routingTable.forgetWriter(address)
})
}

_acquireConnectionToServer (address, serverName, routingTable) {
return this._connectionPool.acquire(address)
}

_freshRoutingTable ({ accessMode, database, bookmark } = {}) {
const currentRoutingTable =
this._routingTables[database] || new RoutingTable({ database })
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
)

if (!currentRoutingTable.isStaleFor(accessMode)) {
return currentRoutingTable
Expand Down Expand Up @@ -481,16 +476,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
async _updateRoutingTable (newRoutingTable) {
// close old connections to servers not present in the new routing table
await this._connectionPool.keepAll(newRoutingTable.allServers())

// filter out expired to purge (expired for a pre-configured amount of time) routing table entries
Object.values(this._routingTables).forEach(value => {
if (value.isExpiredFor(this._routingTablePurgeDelay)) {
delete this._routingTables[value.database]
}
})

// make this driver instance aware of the new table
this._routingTables[newRoutingTable.database] = newRoutingTable
this._routingTableRegistry.removeExpired()
this._routingTableRegistry.register(
newRoutingTable.database,
newRoutingTable
)
this._log.info(`Updated routing table ${newRoutingTable}`)
}

Expand All @@ -501,3 +491,96 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
}
}
}

/**
* Responsible for keeping track of the existing routing tables
*/
class RoutingTableRegistry {
/**
* Constructor
* @param {int} routingTablePurgeDelay The routing table purge delay
*/
constructor (routingTablePurgeDelay) {
this._tables = new Map()
this._routingTablePurgeDelay = routingTablePurgeDelay
}

/**
* Put a routing table in the registry
*
* @param {string} database The database name
* @param {RoutingTable} table The routing table
* @returns {RoutingTableRegistry} this
*/
register (database, table) {
this._tables.set(database, table)
return this
}

/**
* Apply function in the routing table for an specific database. If the database name is not defined, the function will
* be applied for each element
*
* @param {string} database The database name
* @param {object} callbacks The actions
* @param {function (RoutingTable)} callbacks.applyWhenExists Call when the db exists or when the database property is not informed
* @param {function ()} callbacks.applyWhenDontExists Call when the database doesn't have the routing table registred
* @returns {RoutingTableRegistry} this
*/
apply (database, { applyWhenExists, applyWhenDontExists = () => {} } = {}) {
if (this._tables.has(database)) {
applyWhenExists(this._tables.get(database))
} else if (typeof database === 'string' || database === null) {
applyWhenDontExists()
} else {
this._forEach(applyWhenExists)
}
return this
}

/**
* Retrieves a routing table from a given database name
* @param {string} database The database name
* @param {function()|RoutingTable} defaultSupplier The routing table supplier, if it's not a function or not exists, it will return itself as default value
* @returns {RoutingTable} The routing table for the respective database
*/
get (database, defaultSupplier) {
if (this._tables.has(database)) {
return this._tables.get(database)
}
return typeof defaultSupplier === 'function'
? defaultSupplier()
: defaultSupplier
}

/**
* Remove the routing table which is already expired
* @returns {RoutingTableRegistry} this
*/
removeExpired () {
return this._removeIf(value =>
value.isExpiredFor(this._routingTablePurgeDelay)
)
}

_forEach (apply) {
for (const [, value] of this._tables) {
apply(value)
}
return this
}

_remove (key) {
this._tables.delete(key)
return this
}

_removeIf (predicate) {
for (const [key, value] of this._tables) {
if (predicate(value)) {
this._remove(key)
}
}
return this
}
}
106 changes: 100 additions & 6 deletions test/internal/connection-provider-routing.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,99 @@ describe('#unit RoutingConnectionProvider', () => {
)
})

it('should forget write server from the default database routing table on availability error', async () => {
const pool = newPool()
const connectionProvider = newRoutingConnectionProvider(
[
newRoutingTable(
'databaseA',
[server1, server2, server3],
[server1, server2],
[server3]
),
newRoutingTable(
null,
[serverA, serverB, serverC],
[serverA, serverB],
[serverA, serverC]
)
],
pool
)

const conn1 = await connectionProvider.acquireConnection({
accessMode: WRITE,
database: null
})

// when
conn1._errorHandler.handleAndTransformError(
newError('connection error', SERVICE_UNAVAILABLE),
conn1.address
)

expectRoutingTable(
connectionProvider,
'databaseA',
[server1, server2, server3],
[server1, server2],
[server3]
)
expectRoutingTable(
connectionProvider,
null,
[serverA, serverB, serverC],
[serverB],
[serverC]
)
})

it('should forget write server from the default database routing table on availability error when db not informed', async () => {
const pool = newPool()
const connectionProvider = newRoutingConnectionProvider(
[
newRoutingTable(
'databaseA',
[server1, server2, server3],
[server1, server2],
[server3]
),
newRoutingTable(
null,
[serverA, serverB, serverC],
[serverA, serverB],
[serverA, serverC]
)
],
pool
)

const conn1 = await connectionProvider.acquireConnection({
accessMode: WRITE
})

// when
conn1._errorHandler.handleAndTransformError(
newError('connection error', SERVICE_UNAVAILABLE),
conn1.address
)

expectRoutingTable(
connectionProvider,
'databaseA',
[server1, server2, server3],
[server1, server2],
[server3]
)
expectRoutingTable(
connectionProvider,
null,
[serverA, serverB, serverC],
[serverB],
[serverC]
)
})

it('should forget write server from correct routing table on write error', async () => {
const pool = newPool()
const connectionProvider = newRoutingConnectionProvider(
Expand Down Expand Up @@ -1847,7 +1940,7 @@ function newRoutingConnectionProviderWithSeedRouter (
})
connectionProvider._connectionPool = pool
routingTables.forEach(r => {
connectionProvider._routingTables[r.database] = r
connectionProvider._routingTableRegistry.register(r.database, r)
})
connectionProvider._rediscovery = new FakeRediscovery(routerToRoutingTable)
connectionProvider._hostNameResolver = new FakeDnsResolver(seedRouterResolved)
Expand Down Expand Up @@ -1903,14 +1996,15 @@ function expectRoutingTable (
readers,
writers
) {
expect(connectionProvider._routingTables[database].database).toEqual(database)
expect(connectionProvider._routingTables[database].routers).toEqual(routers)
expect(connectionProvider._routingTables[database].readers).toEqual(readers)
expect(connectionProvider._routingTables[database].writers).toEqual(writers)
const routingTable = connectionProvider._routingTableRegistry.get(database)
expect(routingTable.database).toEqual(database)
expect(routingTable.routers).toEqual(routers)
expect(routingTable.readers).toEqual(readers)
expect(routingTable.writers).toEqual(writers)
}

function expectNoRoutingTable (connectionProvider, database) {
expect(connectionProvider._routingTables[database]).toBeFalsy()
expect(connectionProvider._routingTableRegistry.get(database)).toBeFalsy()
}

function expectPoolToContain (pool, addresses) {
Expand Down