Skip to content

Fix connection getting timeout while idle #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions packages/bolt-connection/src/channel/browser/browser-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export default class WebSocketChannel {
this._receiveTimeout = null
this._receiveTimeoutStarted = false
this._receiveTimeoutId = null
this._closingPromise = null

const { scheme, error } = determineWebSocketScheme(config, protocolSupplier)
if (error) {
Expand Down Expand Up @@ -163,17 +164,23 @@ export default class WebSocketChannel {
* @returns {Promise} A promise that will be resolved after channel is closed
*/
close () {
return new Promise((resolve, reject) => {
this._clearConnectionTimeout()
if (this._ws && this._ws.readyState !== WS_CLOSED) {
this._open = false
this.stopReceiveTimeout()
this._ws.onclose = () => resolve()
this._ws.close()
} else {
resolve()
}
})
if (this._closingPromise === null) {
this._closingPromise = new Promise((resolve, reject) => {
this._clearConnectionTimeout()
if (this._ws && this._ws.readyState !== WS_CLOSED) {
this._open = false
this.stopReceiveTimeout()
this._ws.onclose = () => {
resolve()
}
this._ws.close()
} else {
resolve()
}
})
}

return this._closingPromise
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
}

static _installIdleObserverOnConnection (conn, observer) {
conn._queueObserver(observer)
conn._setIdle(observer)
}

static _removeIdleObserverOnConnection (conn) {
conn._updateCurrentObserver()
conn._unsetIdle()
}

_handleSecurityError (error, address, connection) {
Expand Down
28 changes: 26 additions & 2 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export default class ChannelConnection extends Connection {
) {
super(errorHandler)
this._authToken = null
this._idle = false
this._reseting = false
this._resetObservers = []
this._id = idGenerator++
Expand Down Expand Up @@ -393,7 +394,27 @@ export default class ChannelConnection extends Connection {
}

/**
* This method still here because it's used by the {@link PooledConnectionProvider}
* This method is used by the {@link PooledConnectionProvider}
*
* @param {any} observer
*/
_setIdle (observer) {
this._idle = true
this._ch.stopReceiveTimeout()
return this._protocol.queueObserverIfProtocolIsNotBroken(observer)
}

/**
* This method is used by the {@link PooledConnectionProvider}
*
*/
_unsetIdle () {
this._idle = false
this._updateCurrentObserver()
}

/**
* This method still here because of the connection-channel.tests.js
*
* @param {any} observer
*/
Expand All @@ -402,7 +423,7 @@ export default class ChannelConnection extends Connection {
}

hasOngoingObservableRequests () {
return this._protocol.hasOngoingObservableRequests()
return !this._idle && this._protocol.hasOngoingObservableRequests()
}

/**
Expand Down Expand Up @@ -500,6 +521,9 @@ export default class ChannelConnection extends Connection {
* @param {number} requestsNumber Ongoing requests number
*/
_handleOngoingRequestsNumberChange (requestsNumber) {
if (this._idle) {
return
}
if (requestsNumber === 0) {
this._ch.stopReceiveTimeout()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,41 @@ describe('WebSocketChannel', () => {
fakeSetTimeout.uninstall()
}
})

it('should return always the same promise', async () => {
const fakeSetTimeout = setTimeoutMock.install()
try {
// do not execute setTimeout callbacks
fakeSetTimeout.pause()
const address = ServerAddress.fromUrl('bolt://localhost:8989')
const driverConfig = { connectionTimeout: 4242 }
const channelConfig = new ChannelConfig(
address,
driverConfig,
SERVICE_UNAVAILABLE
)
webSocketChannel = new WebSocketChannel(
channelConfig,
undefined,
createWebSocketFactory(WS_OPEN)
)

const promise1 = webSocketChannel.close()
const promise2 = webSocketChannel.close()

expect(promise1).toBe(promise2)

await Promise.all([promise1, promise2])

const promise3 = webSocketChannel.close()

expect(promise3).toBe(promise2)

await promise3
} finally {
fakeSetTimeout.uninstall()
}
})
})

describe('.setupReceiveTimeout()', () => {
Expand Down
143 changes: 139 additions & 4 deletions packages/bolt-connection/test/connection/connection-channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ describe('ChannelConnection', () => {
})

describe('.__handleOngoingRequestsNumberChange()', () => {
it('should call channel.stopReceiveTimeout when requets number equals to 0', () => {
it('should call channel.stopReceiveTimeout when requests number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -571,7 +571,7 @@ describe('ChannelConnection', () => {
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should not call channel.startReceiveTimeout when requets number equals to 0', () => {
it('should not call channel.startReceiveTimeout when requests number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -585,7 +585,7 @@ describe('ChannelConnection', () => {

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requets number equals to %d', (requests) => {
])('should call channel.startReceiveTimeout when requests number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -599,7 +599,7 @@ describe('ChannelConnection', () => {

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout when requets number equals to %d', (requests) => {
])('should not call channel.stopReceiveTimeout when requests number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -610,6 +610,68 @@ describe('ChannelConnection', () => {

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[0], [1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout or startReceiveTimeout when requests number equals to %d and connection is idle', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requests number equals to %d and connection is not idle anymore', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
connection._unsetIdle()
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should call channel.stopReceiveTimeout when requests number equals to 0 and connection is not idle anymore', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
connection._unsetIdle()
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(0)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})
})

describe('.resetAndFlush()', () => {
Expand Down Expand Up @@ -1181,6 +1243,44 @@ describe('ChannelConnection', () => {
})

describe('.hasOngoingObservableRequests()', () => {
it('should return false if connection is idle', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true),
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
connection._setIdle({})

const result = connection.hasOngoingObservableRequests()

expect(result).toBe(false)
expect(protocol.hasOngoingObservableRequests).not.toBeCalledWith()
})

it('should redirect request to the protocol when connection is not idle anymore', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true),
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
connection._setIdle({})
connection._unsetIdle()

const result = connection.hasOngoingObservableRequests()

expect(result).toBe(true)
expect(protocol.hasOngoingObservableRequests).toBeCalledWith()
})

it('should call redirect request to the protocol', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true)
Expand All @@ -1195,6 +1295,41 @@ describe('ChannelConnection', () => {
})
})

describe('._setIdle()', () => {
it('should stop receive timeout and enqueue observer', () => {
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}
const observer = {
onComplete: () => {}
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })

connection._setIdle(observer)

expect(channel.stopReceiveTimeout).toBeCalledTimes(1)
expect(protocol.queueObserverIfProtocolIsNotBroken).toBeCalledWith(observer)
})
})

describe('._unsetIdle()', () => {
it('should update current observer', () => {
const protocol = {
updateCurrentObserver: jest.fn(() => {})
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol })

connection._unsetIdle()

expect(protocol.updateCurrentObserver).toBeCalledTimes(1)
})
})

function spyOnConnectionChannel ({
channel,
errorHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export default class WebSocketChannel {
this._receiveTimeout = null
this._receiveTimeoutStarted = false
this._receiveTimeoutId = null
this._closingPromise = null

const { scheme, error } = determineWebSocketScheme(config, protocolSupplier)
if (error) {
Expand Down Expand Up @@ -163,17 +164,23 @@ export default class WebSocketChannel {
* @returns {Promise} A promise that will be resolved after channel is closed
*/
close () {
return new Promise((resolve, reject) => {
this._clearConnectionTimeout()
if (this._ws && this._ws.readyState !== WS_CLOSED) {
this._open = false
this.stopReceiveTimeout()
this._ws.onclose = () => resolve()
this._ws.close()
} else {
resolve()
}
})
if (this._closingPromise === null) {
this._closingPromise = new Promise((resolve, reject) => {
this._clearConnectionTimeout()
if (this._ws && this._ws.readyState !== WS_CLOSED) {
this._open = false
this.stopReceiveTimeout()
this._ws.onclose = () => {
resolve()
}
this._ws.close()
} else {
resolve()
}
})
}

return this._closingPromise
}

/**
Expand Down
Loading