From d999e8c8a4ee79c51841b94b3a634affad49da42 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 4 Feb 2022 13:45:01 +0100 Subject: [PATCH 1/2] Improve back-pressure control in `Result[Symbol.asyncIterator]` The previous implemenation only controls the flow while is iterating over the records, but if the interation is done in a really low pace, the control flow method won't be called in time for pausing the flow. Changing the flow control for observing the queue size instead of check on each time it iterates solve this issue. --- packages/core/src/result.ts | 41 +++++++++++++++++++++---------- packages/core/test/result.test.ts | 25 ++++++++++++++++++- 2 files changed, 52 insertions(+), 14 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index bc011e701..de905c3ce 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -241,19 +241,30 @@ class Result implements Promise { summary?: ResultSummary, } = { paused: true, firstRun: true, finished: false } + const controlFlow = () => { + if (!state.streaming) { + return; + } + const queueSizeIsOverHighOrEqualWatermark = state.queuedObserver!.size >= this._watermarks.high + const queueSizeIsBellowOrEqualLowWatermark = state.queuedObserver!.size <= this._watermarks.low + + if (queueSizeIsOverHighOrEqualWatermark && !state.paused) { + state.paused = true + state.streaming.pause() + } else if (queueSizeIsBellowOrEqualLowWatermark && state.paused || state.firstRun && !queueSizeIsOverHighOrEqualWatermark ) { + if (state.streaming) { + state.firstRun = false + state.paused = false + state.streaming.resume() + } + } + } - const controlFlow = async () => { + const initializeObserver = async () => { if (state.queuedObserver === undefined) { - state.queuedObserver = this._createQueuedResultObserver() + state.queuedObserver = this._createQueuedResultObserver(controlFlow) state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined) - } - if (state.queuedObserver.size >= this._watermarks.high && !state.paused) { - state.paused = true - state.streaming?.pause() - } else if (state.queuedObserver.size <= this._watermarks.low && state.paused || state.firstRun) { - state.firstRun = false - state.paused = false - state.streaming?.resume() + controlFlow() } } @@ -262,7 +273,7 @@ class Result implements Promise { if (state.finished) { return { done: true, value: state.summary! } } - await controlFlow() + await initializeObserver() const next = await state.queuedObserver!.dequeue() if (next.done) { state.finished = next.done @@ -280,7 +291,7 @@ class Result implements Promise { if (state.finished) { return { done: true, value: state.summary! } } - await controlFlow() + await initializeObserver() return await state.queuedObserver!.head() } } @@ -461,7 +472,7 @@ class Result implements Promise { /** * @access private */ - private _createQueuedResultObserver (): QueuedResultObserver { + private _createQueuedResultObserver (onQueueSizeChanged: () => void): QueuedResultObserver { interface ResolvablePromise { promise: Promise resolve: (arg: T) => any | undefined @@ -509,11 +520,13 @@ class Result implements Promise { } } else { buffer.push(element) + onQueueSizeChanged() } }, dequeue: async () => { if (buffer.length > 0) { const element = buffer.shift()! + onQueueSizeChanged() if (isError(element)) { throw element } @@ -538,6 +551,8 @@ class Result implements Promise { } catch (error) { buffer.unshift(error) throw error + } finally { + onQueueSizeChanged() } }, get size (): number { diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index db60b50d0..674734394 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -598,9 +598,10 @@ describe('Result', () => { it('should pause the stream if queue is bigger than high watermark', async () => { const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') streamObserverMock.onKeys(['a']) - for (let i = 0; i <= watermarks.high; i++) { + for (let i = 0; i < watermarks.high + 3; i++) { streamObserverMock.onNext([i]) } @@ -608,6 +609,28 @@ describe('Result', () => { await it.next() expect(pause).toBeCalledTimes(1) + expect(resume).toBeCalledTimes(0) + }) + + it('should pause the stream if queue is bigger than high watermark and not iteraction with the stream', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + streamObserverMock.onNext([-1]) + + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(pause).toBeCalledTimes(1) + expect(resume).toBeCalledTimes(1) + + for (let i = 0; i <= watermarks.high + 1; i++) { + streamObserverMock.onNext([i]) + } + + expect(pause).toBeCalledTimes(2) + expect(resume).toBeCalledTimes(1) }) it('should call resume if queue is smaller than low watermark', async () => { From 04a8b52f895fc5594c270d0c8c00f67c3d7a70b1 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 7 Feb 2022 12:56:06 +0100 Subject: [PATCH 2/2] Remove unnecessary if --- packages/core/src/result.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index de905c3ce..302f7d56c 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -252,11 +252,9 @@ class Result implements Promise { state.paused = true state.streaming.pause() } else if (queueSizeIsBellowOrEqualLowWatermark && state.paused || state.firstRun && !queueSizeIsOverHighOrEqualWatermark ) { - if (state.streaming) { - state.firstRun = false - state.paused = false - state.streaming.resume() - } + state.firstRun = false + state.paused = false + state.streaming.resume() } }