Skip to content

Prepare for 0.4.0.M1 #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jun 18, 2014
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CopyrightWaivers.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ github name | Real Name, Email Address used for git commits, Company
rkuhn | Roland Kuhn, [email protected], Typesafe Inc.
benjchristensen| Ben Christensen, [email protected], Netflix Inc.
viktorklang | Viktor Klang, [email protected], Typesafe Inc.
smaldini | Stephane Maldini, [email protected], Pivotal Software Inc.
49 changes: 42 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ The latest preview release is available on Maven Central as

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-spi</artifactId>
<version>0.3</version>
<artifactId>reactive-streams</artifactId>
<version>0.4.0.M1</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>0.3</version>
<version>0.4.0.M1</version>
</dependency>

## Goals, Design and Scope ##
Expand Down Expand Up @@ -47,6 +47,7 @@ The API consists of the following components that are required to be provided by
- Publisher
- Subscriber
- Subscription
- Processor

A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose that we give identifiers to ALL spec rules so it is easy to refer to them from other rules.


Expand All @@ -73,6 +74,16 @@ public interface Subscriber<T> {
- A `Subscriber` MUST NOT block a `Publisher` thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have a footnote to define what blocking means in this context.

- A `Subscriber` MUST signal demand via `Subscription.request` to receive notifications.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true—onError and onComplete can happen without demand.
Needs to be clarified as:
"- A Subscriber MUST signal demand via Subscription.request to receive onNext notifications."

- A `Subscriber` MAY behave synchronously or asynchronously but SHOULD NOT synchronously perform heavy computations in its methods (`onNext`, `onError`, `onComplete`, `onSubscribe`).
- A `Subscriber.onNext(T t)` and `Subscriber.onSubscribe(Subscription s)` MUST NOT call any methods on the `Subscription`, the `Publisher` or any other `Publishers` or `Subscribers`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about that @viktorklang @benjchristensen @jbrisbin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Subscriber is allowed synchronous, then this would be a weird rule (to me).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

- A `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription`, the `Publisher` or any other `Publishers` or `Subscribers`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, at least in the Subscription-case it is weird. If onError or onComplete is called, then the Subscription has to be considered invalidated—so calling methods on it from onError on onComplete should be considered a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the next line:

- A `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the event
- A `Subscriber` MUST NOT accept an `onSubscribe` event if it already has an active Subscription. What exactly "not accepting" means is left to the implementation but should include behavior that makes the user aware of the usage error (e.g. by logging, throwing an exception or similar).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we "refactor" this part: "What exactly "not accepting" means is left to the implementation but should include behavior that makes the user aware of the usage error (e.g. by logging, throwing an exception or similar)." into a general recommendation as to what should be done for spec violations?

- A `Subscriber` MUST call `Subscription.cancel()` during shutdown if it still has an active `Subscription`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify this to mean "A Subscriber MUST call Subscription.cancel() if it is no longer valid to the Publisher" without thePublisherhaving signalledonErrororonComplete`?

- A `Subscriber` MUST ensure that all calls on a `Subscription` take place from the same thread or provide for respective external synchronization.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace "a Subscription" with "its Subscription"?

- A `Subscriber` MUST be prepared to receive one or more `onNext` events after having called `Subscription.cancel()` if there are still requested elements pending.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add footnote explaining why this is needed?

- A `Subscriber` MUST be prepared to receive an `onComplete` event with or without a preceding `Subscription.request(int n)` call.
- A `Subscriber` MUST be prepared to receive an `onError` event with or without a preceding `Subscription.request(int n)` call.
- A `Subscriber` MUST make sure that all calls on its `onXXX` methods happen-before the processing of the respective events. I.e. the Subscriber must take care of properly publishing the event to its processing logic.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still relevant with the latest discussions about Subscriber @reactive-streams/contributors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe this is still valid. If one uses an AsyncSubscriber the invocation of onX must happen-before the processing of that signal.



#### Publisher ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Publisher.java))
Expand All @@ -90,14 +101,19 @@ public interface Publisher<T> {
- If a `Publisher` terminates successfully (finite stream) it MUST emit an `onComplete`.
- If a Publisher signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered canceled.
- Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further events can be sent.
- Upon receiving a `Subscription.cancel` request it SHOULD stop sending events as soon as it can.
- Upon receiving a `Subscription.cancel` request it SHOULD stop sending events as soon as it can.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd want to rephrase this (since the SHOULD is ambiguous about whether it is allowed to continue sending events indefinitely, or whether it is allowed to be slow in stopping):

Upon receiving a Subscription.cancel request it SHOULD, as soon as it can, stop sending events.

- `Subscription`'s which have been canceled SHOULD NOT receive subsequent `onError` or `onComplete` events, but implementations will not be able to strictly guarantee this in all cases due to the intrinsic race condition between actions taken concurrently by `Publisher` and `Subscriber`.
- A `Publisher` SHOULD NOT throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method.
- The `Subscriber.onSubscribe` method on a given `Subscriber` instance MUST NOT be called more than once.
- The `Publisher.subscribe` method MAY be called as many times as wanted but MUST be with a different `Subscriber` each time.
- The `Publisher.subscribe` method MAY be called as many times as wanted but MUST be with a different `Subscriber` each time, based on object equality. It MUST reject the Subscription with a `java.lang.IllegalStateException` if the same Subscriber already has an active `Subscription` with this `Publisher`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds nicer to me:

"The Publisher.subscribe method MAY be called as many times as wanted but MUST be with a different (based on object equality) Subscriber each time."

- A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast.
- A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them (e.g. because it is overwhelmed or bounded by a finite number of underlying resources, etc...). If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`".

- A `Publisher` in `completed` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onComplete` on the given `Subscriber`
- A `Publisher` in `error` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onError` with the error cause on the given `Subscriber`
- A `Publisher` in `shut-down` state MUST NOT call `Subscriber.onSubscribe` and MUST emit an `Subscriber.onError` with `java.lang.IllegalStateException` on the given `Subscriber`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all these spec violation IllegalArgumentException it would be nice if their message referred to the spec rule they signal to be violated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better if it also quoted the spec

- A `Publisher` MUST support a pending element count up to 2^63-1 (java.lang.Long.MAX_VALUE) and provide for overflow protection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be made explicit what overflow protection in this case entails.

- A `Publisher` MUST produce the same elements in the same sequence for all its subscribers. Producing the stream elements at (temporarily) differing rates to different subscribers is allowed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if still relevant @reactive-streams/contributors

- A `Publisher` MUST start producing with the oldest element still available for a new subscriber.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if still relevant @reactive-streams/contributors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is still relevant.



#### Subscription ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Subscription.java))
Expand All @@ -114,9 +130,28 @@ public interface Subscription {
- The `Subscription.request` method MUST assume that it will be invoked synchronously and MUST NOT allow unbounded recursion such as `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext`.
- The `Subscription.request` method SHOULD NOT synchronously perform heavy computations.
- The `Subscription.cancel` method MUST assume that it will be invoked synchronously and SHOULD NOT synchronously perform heavy computations.
- When the `Subscription` is cancelled, `Subscription.request(int n)` MUST ignore the call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"When the Subscription is cancelled, Subscription.request(int n) MUST ignore the call."

"After the Subscription is cancelled, additional Subscription.request(int n) MUST be NOPs." <- I find this less confusing then "ignore the call"

- When the `Subscription` is cancelled, `Subscription.cancel()` MUST ignore the call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"After the Subscription is cancelled, additional Subscription.cancel() MUST be NOPs." <- I find this less confusing then "ignore the call"

- When the `Subscription` is not cancelled, `Subscription.request(int n)` MUST register the given number of additional elements to be produced to the respective subscriber.
- When the `Subscription` is not cancelled, `Subscription.request(int n)` MUST throw a `java.lang.IllegalArgumentException` if the argument is <= 0.
- When the `Subscription` is not cancelled, `Subscription.request(int n)` COULD synchronously call `onNext` on this (or other) subscriber(s) if and only if the next element is already available.
- When the `Subscription` is not cancelled, `Subscription.request(int n)` COULD synchronously call `onComplete` or `onError` on this (or other) subscriber(s).
- When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST eventually cease to call any methods on the corresponding subscriber.
- When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` instance is discouraged, but this specification does not mandate that it is disallowed since that would mean having to store previously canceled subscriptions indefinitely.
- When the `Subscription` is not cancelled, `Subscription.cancel()` the `Publisher` MUST shut itself down if the given Subscription is the last downstream `Subscription`. Explicitly adding "keep-alive" Subscribers SHOULD prevent automatic shutdown if required.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird wording around "Subscription.cancel() the Publisher MUST shut itself down"


#### Processor ([Code](https://github.com/reactive-streams/reactive-streams/blob/master/api/src/main/java/org/reactivestreams/Processor.java))

```java
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
````


- A `Processor` represents a processing stage—which is both a `Subscriber` and a `Publisher` and obeys the contracts of both.
- A `Processor` must cancel its upstream Subscription if its last downstream Subscription has been cancelled
- A `Processor` must immediately pass on `onError` events received from its upstream to its downstream
- A `Processor` must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this rule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that much too, it is from the previous TCK tho. My understanding is simply that the initial capacity on a Processor might be positive (the spec assumes the opposite).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it is to express that it may behave like an active Subscriber before it behaves like an active Publisher

requested anything yet

### Asynchronous vs Synchronous Processing ###

Expand Down
3 changes: 0 additions & 3 deletions tck/src/main/resources/spec.md

This file was deleted.