-
Notifications
You must be signed in to change notification settings - Fork 13
Write Composition #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Not sure in use cases 1 and 3 what is subscribing to the concatWith, if you can elaborate on how you see this. Is concatWith an eager subscribe operation ? To me it seems we leak the write everywhere where the same stream could be unified and composed in a single connection.write call where the returned publisher would inform of completed writes and let its subscribers requesting manually for more data to write. connection.write(protocolRequest.concatWith(
connection.getInput()
.concatMap(response -> { //note that concatMap could happen after the concatWith too
// handshake response
if(response instanceof HANDSHAKE && response == ABC) {
// handshake ABC so output in "ABC" format for next write
return ABC-formatted-request()
} else if(response instanceof HANDSHAKE && response == DEF) {
// handshake DEF so output in "DEF" format for next write
return DEF-formatted-request()
} else {
// response data here
return doStuffAsynchronouslyWith(response);
}
})
).subscribe(postWritesSubscriber); //optional, would default to the IO writer to auto-request the user defined write publisher. |
For brevity I didn't include the outer handler code. These would all be returning a composed Publisher lazily to the server implementation which would subscribe and run the lifecycle. Thus, the Publisher emitted by concatWith is returned to the server which subscribes to invoke the lifecycle for that connection.
No, nothing here is eager. It all composes and waits to be subscribed to by the server. This is a handler definition.
In what way is it leaked? Just that the developer can call
That is elegant code and may work well for this example. The one concern I can think of right now is related to flushing as it would only work if it flushed on every With the independent calls to connection.write(protocolRequest)
connection.write(ABC-formatted-request) A single connection.write(protocolRequest.concatWith(ABC-formatted-request)) Thus, if they are all just emitted as This leads back to the discussion in #21. If there is a mechanism for a developer to easily and deterministically control flushing then my stance on |
Ah, talking with @NiteshKant reminded me of a key point I completely missed in my response, and far more important than flush semantics. The other key reason why individual If Using Equally important, conditional error handling can not be composed onto writes if just a single For example, |
Here are simple examples showing // this writes A and if it succeeds then writes B
connection.write(publisherA).concatWith(write(publisherB))
// which is the same as this
concat(connection.write(publisherA), connection.write(publisherB))
// this writes A and B concurrently
connection.write(publisherA).mergeWith(connection.write(publisherB))
// which is the same as this
merge(connection.write(publisherA), connection.write(publisherB))
// since merge is concurrent we could use a single write for it
connection.write(publisherA.mergeWith(publisherB));
// which is the same as this
connection.write(merge(publisherA, publisherB)); |
After a few discussions in #21 I realize that the usecase in sample 3 of this issue may not be clear. So, here is an attempt to clarify it. ProtocolThe sample is modeling a protocol that does the following:
(The two handshake flavors aren't required but provided just to demonstrate condition processing) In order to draw an analogy to an established protocol, with this hypothetical protocol. Let me take an example of HTTP/2. In HTTP/2 case,
Code explanationBased on the above protocol definition, I am explaining the code below: Step 1 in the protocol is done by: connection.write(protocolRequest) Step 2 says that if the request was successfully sent, then listen for handshake or requests. So, in order to make sure that input is subscribed only if the write was successful, we do: connection.write(protocolRequest)
.concatWith(connection.getInput() The sample now, intends to make the request/handshake processing sequential i.e. next message is processed only when the previous message was successfully processed and written back to the peer. For this reason, the sample uses connection.write(protocolRequest)
.concatWith(connection.getInput()
.concatMap(response -> { This behavior is surely just an approach we took in this sample, it is not prescribed by the protocol. Now, the processing of each message handshake/request is asynchronous and hence returns a connection.write(protocolRequest) // negotiate protocol
.concatWith(connection.getInput()
.concatMap(response -> {
// handshake response
if(response instanceof HANDSHAKE && response == ABC) {
// handshake ABC so output in "ABC" format for next write
return connection.write(ABC-formatted-request)
} else if(response instanceof HANDSHAKE && response == DEF) {
// handshake DEF so output in "DEF" format for next write
return connection.write(DEF-formatted-request)
} else {
// response data here
return doStuffAsynchronouslyWith(response);
}
})
})) AssumptionThere are a few assumptions here:
Need of compositionIn the above example there are two points where the write composition is used:
VarianceBy providing the ability to compose writes, we can achieve following variances:
connection.write(protocolRequest) // negotiate protocol
.mergeWith(connection.getInput() instead of connection.write(protocolRequest) // negotiate protocol
.concatWith(connection.getInput()
connection.getInput()
.flatMap(response -> { instead of connection.getInput()
.concatMap(response -> { |
Closing this as we all seem to agree on the need of write composition. |
It is required that writes be composable, particularly when everything is async since non-blocking writes can not be ordered or waited upon without an abstraction permitting composition.
Consider the following use cases:
The approach taken above involves
write
returning aPublisher
that can be composed. If that is not the desired implementation these same composition capabilities must be accommodated by whatever is done in its place.The text was updated successfully, but these errors were encountered: