Skip to content

Reinstantiate backpressure propagation to NettyInbound #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mp911de opened this issue Jan 22, 2020 · 5 comments
Closed

Reinstantiate backpressure propagation to NettyInbound #231

mp911de opened this issue Jan 22, 2020 · 5 comments
Labels
for: team-attention An issue we need to discuss as a team to make progress type: bug A general bug
Milestone

Comments

@mp911de
Copy link
Collaborator

mp911de commented Jan 22, 2020

With a recent change, we no longer propagate the backpressure from the R2DBC SPI (e.g. Result.map(…)) to NettyInbound. Previously, we used windowing and passed the Flux<Message> directly to the consumer. Now we're using handle(…) and send the messages to the conversation receiver instead of to a window that is directly connected to NettyInbound.

The missing backpressure propagation can lead to overflows.

@mp911de mp911de added type: bug A general bug for: team-attention An issue we need to discuss as a team to make progress labels Jan 22, 2020
@mp911de mp911de added this to the 0.8.1.RELEASE milestone Jan 22, 2020
@Squiry
Copy link
Collaborator

Squiry commented Jan 22, 2020

Well it's not like backend will stop message producing if we propagate it to NettyInbound, so why should we care?

@mp911de
Copy link
Collaborator Author

mp911de commented Jan 22, 2020

Not reading messages from the channel leaves the messages in the receive buffers. If these buffers are full, then the sending side does not receive acknowledgments and is required to slow down. This effect is called TCP Backpressure and part of TCP Flowcontrol. See also Wikipedia. The subscriber is guaranteed to not overload and that is a protection for the client application.

In blocking I/O we get the same level of protection if we properly set up e.g. the JDBC ResultSet fetch size. As long as ResultSet.next() is not called, results are not read from I/O.

@Squiry
Copy link
Collaborator

Squiry commented Jan 22, 2020

Well reactor netty kind of doesn't care too: it will store 'em all in a queue, not in a tcp buffer.

@mp911de
Copy link
Collaborator Author

mp911de commented Jan 22, 2020

Right, Reactor Netty can buffer a few messages. If FluxReceive sees there is no demand, it disables auto-read from the channel and that is how additional TCP packets are kept outside of the JVM process.

@mp911de mp911de added type: blocker An issue that is blocking us from releasing status: in-progress An issue that is currently being worked on labels Jan 24, 2020
@mp911de
Copy link
Collaborator Author

mp911de commented Jan 29, 2020

@Squiry I took your commits and refactored BackendMessageSubscriber and Conversation to make it nicer. As of now, all tests run green.

I'm still worried by the impact this change may have and all the side-effects that can come with it.

mp911de pushed a commit that referenced this issue Jan 29, 2020
Constant size requests to reactor-netty connection

[#231]
mp911de added a commit that referenced this issue Jan 29, 2020
Encapsulate access to nested sink and demand fields with methods. Replace Atomic* fields with volatile ones that are updated through AtomicLongFieldUpdater. Use Operators.addCap(…) to accumulate demand and to prevent overflow.
Introduce buffer for received Conversation that do not have demand. Introduce drainloop to drain the buffer on request(n). Extract constants. Use Operators.on…Dropped(…) to propagate dropped signals.

[resolves #231]
mp911de pushed a commit that referenced this issue Jan 29, 2020
Constant size requests to reactor-netty connection

[#231]
@mp911de mp911de removed status: in-progress An issue that is currently being worked on type: blocker An issue that is blocking us from releasing labels Jan 29, 2020
mp911de added a commit that referenced this issue Jan 30, 2020
Eagerly remove Conversation on complete to avoid cancellation if the conversation is already completed (e.g. concatMap(close())).
Remove error logging in FluxDiscardOnCancel to avoid unnecessary noise.

[#231]
mp911de added a commit that referenced this issue Jan 30, 2020
Eagerly remove Conversation on complete to avoid cancellation if the conversation is already completed (e.g. concatMap(close())).
Remove error logging in FluxDiscardOnCancel to avoid unnecessary noise.

[#231]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: team-attention An issue we need to discuss as a team to make progress type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants