Skip to content

Add DiscardOnCancel operator #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 27, 2020
Merged

Add DiscardOnCancel operator #228

merged 2 commits into from
Jan 27, 2020

Conversation

mp911de
Copy link
Collaborator

@mp911de mp911de commented Jan 21, 2020

FluxDiscardOnCancel replays source signals unless cancelling
the subscription. On cancellation, the subscriber requests Long.MAX_VALUE
to drain the source and discard elements that are emitted afterwards.

[closes #222]

/cc @Squiry

@mp911de mp911de added this to the 0.8.1.RELEASE milestone Jan 21, 2020
@Squiry
Copy link
Collaborator

Squiry commented Jan 21, 2020

I believe my changes here kind of almost fixed this issue since I've totally ignored subscription status and pushed all the frames anyway.
Your tests are not failing on master, but there's an obvious leak that can be fixed without all those operators, we just need to change that:

receiver.sink.next(message);

to that

if (receiver.sink.isCancelled()) {
    ReferenceCountUtil.release(message);
} else {
    receiver.sink.next(message);
}

FluxDiscardOnCancel replays source signals unless cancelling
the subscription. On cancellation, the subscriber requests Long.MAX_VALUE
to drain the source and discard elements that are emitted afterwards.

[closes #222]
@mp911de
Copy link
Collaborator Author

mp911de commented Jan 22, 2020

It makes sense to fix the loophole. There is another issue here that we should address: FluxDiscardOnCancel works on a proper backpressure implementation. The blocked statement issue was caused because the demand (as seen by Netty Inbound) went to zero.

Now with handle((message, sink) -> { (introduced with #225), we now no longer bind the backpressure from the R2DBC SPI to the netty channel which can overflow the actual sink.

@gregturn gregturn merged commit 98b4031 into 0.8.x Jan 27, 2020
@mp911de mp911de deleted the discard-on-cancel branch April 30, 2020 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants