Skip to content

Supporting byte streams #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
danarmak opened this issue May 4, 2014 · 7 comments
Closed

Supporting byte streams #47

danarmak opened this issue May 4, 2014 · 7 comments

Comments

@danarmak
Copy link

danarmak commented May 4, 2014

I'm concerned that the API doesn't support a very common use case: streams of bytes. (Or, less commonly, chars, ints, bits, etc.) These types all have in common the fact that they are transported not one by one, but in arrays, buffers, strings, etc. And these buffer types don't have size limits as part of the type.

This creates a problem: if a Subscriber[ByteString] calls request(1), it will get one more ByteString - but of unknown size. This conflicts with the basic requirement of back pressure. Imagine a stream with an 'unzip' processor; a naive implementation would create zip bombs.

The programmer could manually configure all components in a reactive stream pipeline to emit arrays within min-max size limits. But this would mean hand-tuning for performance (since optimal buffer size varies with component), instead of relying on automatic back pressure communication.

Worse, if the programmer doesn't control both ends of a channel, he won't be able to rely on the behavior of the other side - which might be written using a different Reactive Streams implementation, a different language, or be across a network. This also limits the usability of a language-specific custom type like a size-limited ByteString.

I think this usecase will be common, so I'm suggesting it should be addressed in the API. What do you think?

@drewhk
Copy link
Contributor

drewhk commented May 4, 2014

This is the same problem with Lists, Maps, Sets, and basically every non-trivial data type. If you want to control the size of the emitted ByteStrings you can always add a chunker element. While I agree that the use-case of ByteString streams is common (I am writing a streaming decoder tool right now), I don't believe the problem outlined here is very relevant in practice. As for communication over the network, the emitted ByteString sizes will be limited by send/receive buffers anyway, it does not matter how large the original sent chunk was.

@danarmak
Copy link
Author

danarmak commented May 5, 2014

I don't think it's quite the same problem. Lists, Maps and other data structures have semantic meaning: you often really do want a stream of Map[String, Int] and not just a stream of (String, Int). Whereas byte streams only use arrays because a stream of actual bytes would be very inefficient.

Network: I'm not sure I understand you correctly. If a 'reactive streams over sockets' protocol is implemented as proposed in #45, would it not need to communicate applicative messages equivalent to request(n), which would raise the problem of counting n bytes vs. counting buffers of unknown size? Are you saying it would rely entirely on network buffers and TCP? (#45 actually suggests UDP too, so that wouldn't work everywhere.)

@tmontgomery
Copy link

TCPs semantics are problematic here. One is pointed out by the difference in the units of request(n). However, another I pointed out in #45 is initial reception on a new stream. While a stream abstraction is very powerful, the very nature of the SPI as I have seen it needs framing. I.e. boundaries around objects. And also can benefit from channels (multiple concurrent streams) for complete coverage.

@rkuhn
Copy link
Member

rkuhn commented May 7, 2014

@danarmak The problems of buffering and of back pressure are not quite as coupled as you portray them to be: the request(n) mechanism allows the Subscriber to limit the amount of data items it will get, and it is completely valid to accept only one at a time if you are paranoid. In case of a TCP stream the data will reside inside your heap at that point already, so there is nothing to be gained by being able to specify a more fine-grained measure. Buffering on the other hand is an orthogonal concern, you will build explicit buffers into your pipeline and those are free to use any cost function you like—you can create a Buffer element for exactly 1MB which takes in ByteStrings and hands out (fixed size) ByteStrings. Typically a Subscriber will have some sort of input buffer to support requesting items in batches, but that buffer’s purpose is only to keep the system running, it is the grease between the cogs and not meant as a real “data buffer” (at least that is what we are doing in the Akka implementation)

One important thing to note is that back pressure is mediated locally at every asynchronous boundary, so your example of the ZIP bomb can be fixed quite easily by making the extraction stream-based and using a Publisher to hand out the resulting byte stream piece by piece, as requested. That way you retain full control over the memory usage. Having requested one unit on the input side can very well generate a large number of units on the output side.

@danarmak
Copy link
Author

danarmak commented May 8, 2014

@rkuhn In the case of the zip bomb, we need to manually configure each producer to produce ByteStrings of the correct size (or in the correct size range). Consumers will also need to know this size in order to use request(n) correctly. Maybe manual configuration will be enough; I worried that doing so across different producer/consumer implementations might be a problem.

The TCP problem is different. IIUC, you say you'd rely on TCP's own back pressure mechanism and just ignore the 'n' parameter of request(n) over the TCP link. That still wouldn't work over UDP.

The question is really whether semi-manual configuration of 'block size range' for each producer/consumer will be enough. Probably it will be, it just won't be as automatic or convenient as with non-byte-streams. And maybe that's good enough to keep the API simple. If everyone feels that way, then please close this ticket.

@danarmak danarmak closed this as completed May 8, 2014
@danarmak danarmak reopened this May 8, 2014
@benjchristensen
Copy link
Contributor

Related to this, I could have a Publisher<T> where T is an object of arbitrarily large size, say a MovieFile that is gigabytes in size. Obviously this API wouldn't work well to prevent memory problems (1 item n of 200GB for example), but if such a type is used I consider that a programmer error.

I see byte streams as similar, because in reality a stream is broken into discrete chunks (say byte[1024]) where request(n) then makes sense. I think this proposal offers a protocol for communicating capacity of n, but it's still up to the Publisher to provide intelligent data types and be clear on what the consumer should expect from each T it receives (for example, a byte array of 1024 versus arbitrary length).

There was discussion at some point to do something like request(n, maxBytes) but in practice that is unusable in most application use cases other than trivial data transfer using raw bytes since most use cases can't break up a type T on arbitrary byte sizes (even when serializing over a network). This is the primary reason why I think it's still up to the provider of a Publisher to be sensible and provide clear documentation of what their type T will be, particularly if going over network boundaries (not such a big deal when inside the same JVM).

Considering this, is there an API change that could be made to request(n) (or something else on Subscriber/Subscription/Subscriber) that would make the contract (byte size?) of type T part of the interface rather than left to each implementation to communicate? I can't think of one for the reasons stated above.

@danarmak
Copy link
Author

I agree that there isn't a simple change to the API that would solve this problem. And a complex change is not warranted. So I'm closing this. Thanks for discussing.

Also, I'm writing a Reactive Streams implementation based on Futures (for internal use, but I'm trying get permission to opensource it). We will use byte streams in many places, so I'll see for myself how this issue resolves itself. Maybe it will turn out to be enough to use sane defaults everywhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants