Skip to content

Initial implementation #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 26, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
target/
.project
.classpath
.cache
.target/
*~
.#*
.*.swp
.DS_Store
.codefellow
.ensime*
.eprj
.history
.idea
121 changes: 121 additions & 0 deletions COPYING
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
Creative Commons Legal Code

CC0 1.0 Universal

CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.

Statement of Purpose

The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").

Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.

For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.

1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:

i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.

2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.

3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.

4. Limitations and Disclaimers.

a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.
8 changes: 8 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Licensed under Public Domain (CC0)

To the extent possible under law, the person who associated CC0 with
this code has waived all copyright and related or neighboring
rights to this code.

You should have received a copy of the CC0 legalcode along with this
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Reactive Streams

This project consists of a set of interfaces and a TCK defining Reactive Streams. The textual description of the semantics verified by the TCK can be found in `tck/src/main/resources/spec.txt`.

### Legal

This project is a collaboration between Netflix, Twitter, RedHat, Pivotal and Typesafe. The code is offered to the Public Domain in order to allow free use by interested parties who want to create compatible implementations. For details see `COPYING`.
13 changes: 13 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
organization := "org.asyncrx"

version := "0.1-SNAPSHOT"

licenses := Seq("CC0" -> url("http://creativecommons.org/publicdomain/zero/1.0/"))

homepage := Some(url("https://groups.google.com/forum/?hl=en#!forum/reactive-streams"))

scalaVersion := "2.10.3"

lazy val spi = project

lazy val tck = project.dependsOn(spi)
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=0.13.1
3 changes: 3 additions & 0 deletions spi/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
organization := "asyncrx"

name := "reactive-streams-spi"
9 changes: 9 additions & 0 deletions spi/src/main/scala/asyncrx/api/Processor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package asyncrx.api

/**
* A Processor is a stand-alone representation of a transformation for
* elements from In to Out types. Implementations of this API will provide
* factory methods for creating Processors and connecting them to
* [[Producer]] and [[Consumer]].
*/
trait Processor[In, Out] extends Consumer[In] with Producer[Out]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In RxJava 0.17 we have significantly improved the distinction of an Operator which appears to match this Processor. It does not require each step of the chain subscribing to each other, instead it lifts and composes the functions within a single Observable that is subscribed to only once at the end of the chain.

If is defined as:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>

It gets lifted into an Observable like this (I removed error handling and plugin hooks for clarity):

    /**
     * Lift a function to the current Observable and return a new Observable that when subscribed to will pass
     * the values of the current Observable through the function.
     * <p>
     * In other words, this allows chaining Observers together on an Observable for acting on the values within
     * the Observable.
     * <p> {@code
     * observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe()
     * }
     * 
     * @param bind
     * @return an Observable that emits values that are the result of applying the bind function to the values
     *         of the current Observable
     */
    public <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                    onSubscribe.call(lift.call(o));
            }
        });
    }

Forgive the verbosity, Java8 isn't an option yet ... still on Java 6 to support Android et al.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comparison to RxJava is not quite right, which is the underlying reason for several of your other questions below: the Publisher/Subscriber/Subscription SPI is not directly equivalent to anything in Rx as far as I know, since this triplet of interfaces describes only exactly the asynchronous boundaries within your stream processing. Building up (synchronous) transformations is something which is supposed to be done in an implementation-specific fashion.

Translated to RxJava this could mean that a Publisher is somewhat like a Subject which supports back-pressure (and which knows that its downstream clients will never block its callback methods). A Processor is like a Subject-Observable-Observable-…-Subject structure that is not yet subscribed to any upstream source and which also does not necessarily have a sink connected; it is like a free-standing piece of pipe waiting to be flanged to other pipes. A flange itself is exactly the triplet Publisher/Subscription/Subscriber, which transports data from one pipe element to the next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Roland, it clarified the spi and the scope for me 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

describes only exactly the asynchronous boundaries within your stream processing

Then should the Processor interface be removed from this if this proposal is only for defining the async boundaries?

37 changes: 37 additions & 0 deletions spi/src/main/scala/asyncrx/api/Producer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package asyncrx
package api

/**
* A Producer is the logical source of elements of a given type.
* The underlying implementation is done by way of a [[asyncrx.spi.Publisher]].
* This interface is the user-level API for a source while a Publisher is the
* SPI.
*
* Implementations of this interface will typically offer domain- or language-specific
* methods for transforming or otherwise interacting with the produced stream of elements.
*/
trait Producer[T] {

/**
* Get the underlying Publisher for this Producer. This method should only be used by
* implementations of this API.
*/
def getPublisher: spi.Publisher[T]
}

/**
* A Consumer is the logical sink of elements of a given type.
* The underlying implementation is done by way of a [[asyncrx.spi.Subscriber]].
* This interface is the user-level API for a sink while a Subscriber is the SPI.
*
* Implementations of this interface will typically offer domain- or language-specific
* methods for transforming or otherwise interacting with the stream of elements.
*/
trait Consumer[T] {

/**
* Get the underlying Subscriber for this Consumer. This method should only be used by
* implementations of this API.
*/
def getSubscriber: spi.Subscriber[T]
}
80 changes: 80 additions & 0 deletions spi/src/main/scala/asyncrx/spi/Publisher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package asyncrx.spi

/**
* A Subscription models the relationship between a [[Publisher]] and a [[Subscriber]].
* The Subscriber receives a Subscription so that it can ask for elements to be delivered
* using [[Subscription#requestMore]]. The Subscription can be disposed of by canceling it.
*/
trait Subscription {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our experiments have taken us a slightly different direction to allow the Subscriber to request the Subscription to pause the Producer (i.e. an Observable).

Don't focus on the pause name, just the concept, though it does make semantic sense to "pause a Subscription" and later "resume the Subscription".

The use cases we are seeking to achieve are:

1) Synchronous or Asynchronous source data

We should be able to handle synchronous reading of Files and Iterables without needing to put them inside their own artificially needed thread and do a push/pull model with them or support unsubscribe.

Async data sources are easy to handle unsubscribe behavior, but it's better for both sync and async sources to be dealt with the same for simplicity.

2) Different Strategies

We should be able to achieve different strategies such as:

a) stop work

Synchronous, cold sources such as files and iterables can be stopped and their state stored in a function for later resumption.

b) drop

Ignore data for a period of time without tearing down the network connections or other resources.

Using something like drop with a pause/resume would have the effect of a dynamic throttle which receives sampled data as fast as it can receive.

c) buffer

Buffer data to handle spikes and keep delivering, or buffer to a point and then drop.

3) Composable

It should work with flatMap, zip, observeOn, subscribeOn and other operators that traditionally require unbounded buffering to work and remain non-blocking.

Operators that are purely synchronous pass-thru (such as map, take, etc) don't need to involve themselves at all, but the flow of pause/resume behavior should compose through them the same as subscriptions do.

Thus, a zip operator may end up pausing some or all as it loops over the sources so as to allow waiting for each to emit data before continuing. A merge (such as in flatMap) may need to pause all, or be smart enough to track which one is overwhelming the rest and pause it. The merge use case under flatMap is actually one of the trickiest as the code at that point has no specific knowledge about each data source and therefore must treat each equally. It can probably only track whether the combination of all results in the need to queue and at that point request the sources to pause.

4) Resume When Ready

The Subscriber should be in control of when it resumes the Subscription, such as when it has finished draining the queue of events it has already received.

Following is an early version of a prototype that @abersnaze put together just the past couple days (presented it Friday) working with zip, take, map, etc:

public interface Subscription {

    /**
     * Stop receiving notifications on the {@link Subscriber} that was 
     * registered when this Subscription was received.
     * <p>
     * This allows unregistering an {@link Subscriber} before it has 
     * finished receiving all events (ie. before onCompleted is called).
     */
    public void unsubscribe();

    public boolean isUnsubscribed();

    /**
     * Request that events stop being sent. If the pause is accepted then the `resumeWith` 
     * method will be invoked with a function for resuming.
     * 
     * It is up to the Observable (producer) to choose if they can be paused and what behavior
     * happens when paused. 
     * 
     * The pause() request does not need to be immediately fulfilled as it may be unable to do so until
     * after the next event is emitted.
     * 
     * Examples:
     * 
     * 1) Iterable
     * 
     * The source can be paused by retaining the position in the Iterable in the `resumeAction` and
     * when requested resume emitting data from that same place.
     * 
     * 2) File
     * 
     * The source can be paused by retaining the position in the File in the `resumeAction` and
     * when requested resume emitting data from that same place.
     * 
     * 3) Network Stream (such as metrics, stocks, etc)
     * 
     * a) Buffer
     * 
     * Data could be buffered for later delivery. It could be buffered to a limit and then dropped.
     * 
     * b) Drop
     * 
     * All data could be dropped (ignored) while paused.
     * 
     * c) Unsubscribe
     * 
     * The network connection could be disconnected while paused and reconnected when resumed.
     * 
     */
    public void pause();

    public boolean isPaused();

    /**
     * If the source supports and accepts the pause request an Action0 will be sent.
     *  
     * After receipt of this `resumeAction` it should not be expected to receive any more events
     * from the source until invoking `resumeAction.call()` to request resuming.
     * 
     * This is sent back asynchronously instead of returned from `pause()` since it is a request
     * that could take some time for the source to fulfill.
     * 
     * @param resume
     */
    public void resumeWith(Action0 resumeAction);

}

In addition to the ability this gives for the source to be pausable, if a source does not support pausing then a new whilePaused operator can be used to choose a strategy.

For example, if a source doesn't support pausing it could be done like this:

source.whilePaused(Strategy.DROP).map({t -> doStuff(t)}).subscribe(subscriber);

or

source.whilePaused(Strategy.BUFFER).map({t -> doStuff(t)}).subscribe(subscriber);

The strategies considered thus far are: DROP, BUFFER, UNSUBSCRIBE, BLOCK (when blocking the thread is okay). Of course it is preferable for the source data to natively support pausing, but if it doesn't this gives a way to deal with it.

I'd like to understand if requestMore can satisfy the use cases we have been working to solve similar to how pause does, if both result in the same, if one is more performant than the other, or if they are solving different use cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention of pause/resume is the same as that of requestMore: it is a means to signal to the upstream that it may or may not send more data. As such, both approaches support the same use cases and everything you show is possible with either API. The strategy to apply when back-pressure is exerted is also configurable in either scenario, the same kind of strategy element would be used in the API (with the default being to pause and back-pressure transitively and automatically once the allowance signaled via requestMore has been used up).

The difference between the two approaches is that the recipient needs to actively raise the pause signal in your proposal while that happens automatically at a recipient-defined point in our proposal. We have experimented with a manual SuspendReading/ResumeReading scheme in our Akka IO implementation, and we found that for asynchronous participants this can lead to problems: the Publisher may push a lot of data before it gets around to the pause message, overwhelming the Subscriber even though it tried to cry for help.

For this reason we have chosen a mechanism which puts the Subscriber precisely in control over how many elements it is willing to buffer in the worst case—if processing takes long or it does not get CPU time it will still be protected from overflows. Now the criticism that “element” does not equate “effort” is valid, and what we envision is that this buffering strategy should (within limits) auto-tune itself by measuring the throughput of the stream and its own idle time. For this it should be noted that the Subscription demarcates only the asynchronous boundaries in your stream, which should be coarse-grained. Small transformation steps will typically be done synchronously within a Processor as described in the comment there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds reasonable approach. An implementation can drive the requestMore automatically I suppose. I quite like the idea of auto-tuning the buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. Autotuning buffer sizes and "demand" upstream based on live statistics is quite cool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

puts the Subscriber precisely in control over how many elements it is willing to buffer in the worst case

Do you envision async operators starting off at requestMore(100) or some other arbitrary number and then moving that number up and down?


/**
* Cancel this subscription. The [[Publisher]] to which produced this Subscription
* will eventually stop sending more elements to the [[Subscriber]] which owns
* this Subscription. This may happen before the requested number of elements has
* been delivered, even if the Publisher would still have more elements.
*/
def cancel(): Unit
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to pass an optional ‘reason’ here? If the subscriber failed it often makes sense for the upstream to know.


/**
* Request more data from the [[Publisher]] which produced this Subscription.
* The number of requested elements is cumulative to the number requested previously.
* The Publisher may eventually publish up to the requested number of elements to
* the [[Subscriber]] which owns this Subscription.
*/
def requestMore(elements: Int): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the int argument a bit constraining. What about no-arg (request until completed) too ?
Also I would have read it "flush()" rather than "requestMore()" but this would be closer to Stream vocabulary maybe ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several use cases that I don't think it can accomplish. See my comment at #1 (comment) for use cases and a possible alternative.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my response to that other comment, let’s keep the discussion up there.

}

/**
* A Publisher is a source of elements of a given type. One or more [[Subscriber]] may be connected
* to this Publisher in order to receive the published elements, contingent on availability of these
* elements as well as the presence of demand signaled by the Subscriber via [[Subscription#requestMore]].
*/
trait Publisher[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publisher is confusing me with Producer. Observable maybe ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way Publisher is described in this document suggests it is more akin to what Subject is since it allows multiple Subscribers and is retaining history (like ReplaySubject) rather than just emitting data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale for having two distinct types—one at the implementation level and one presented to users—is given in the Producer scaladoc (possibly too terse there). The methods on Publisher/Subscription/Subscriber have complicated semantics which shall not be exposed to users, and users should never get into a situation where they would implement the SPI directly; if they ever do then

  • we have made a mistake in not providing all combinators which are needed, and
  • they will need to use the TCK to validate their implementation.

Observable was deliberately avoided since none of the types in this proposal are directly equivalent to it. The intention was that RxJava’s implementation of this SPI will be able to keep its established style of describing stream transformations as usual, and the API in this proposal is then only used to hook up Rx streams with Akka streams or Reactor streams and so on. In fact, what we included as API here are only those parts which relate to this interoperability, concerning the transformations and stream combinators no assumptions are made.


/**
* Subscribe the given [[Subscriber]] to this Publisher. A Subscriber can at most be subscribed once
* to a given Publisher, and to at most one Publisher in total.
*/
def subscribe(subscriber: Subscriber[T]): Unit
}

/**
* A Subscriber receives elements from a [[Publisher]] based on the [[Subscription]] it has.
* The Publisher may supply elements as they become available, the Subscriber signals demand via
* [[Subscription#requestMore]] and elements from when supply and demand are both present.
*/
trait Subscriber[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a candidate to extend Consumer[T], removing onNext. It would promote reuse I guess.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my response above: we submit that none of the Subscriber methods should be interesting to API users (i.e. exposed via Consumer).


/**
* The [[Publisher]] generates a [[Subscription]] upon [[Publisher#subscribe]] and passes
* it on to the Subscriber named there using this method. The Publisher may choose to reject
* the subscription request by calling [[#onError]] instead.
*/
def onSubscribe(subscription: Subscription): Unit

/**
* The [[Publisher]] calls this method to pass one element to this Subscriber. The element
* must not be <code>null</code>. The Publisher must not call this method more often than
* the Subscriber has signaled demand for via the corresponding [[Subscription]].
*/
def onNext(element: T): Unit

/**
* The [[Publisher]] calls this method in order to signal that it terminated normally.
* No more elements will be forthcoming and none of the Subscriber’s methods will be
* called hereafter.
*/
def onComplete(): Unit

/**
* The [[Publisher]] calls this method to signal that the stream of elements has failed
* and is being aborted. The Subscriber should abort its processing as soon as possible.
* No more elements will be forthcoming and none of the Subscriber’s methods will be
* called hereafter.
*
* This method is not intended to pass validation errors or similar from Publisher to Subscriber
* in order to initiate an orderly shutdown of the exchange; it is intended only for fatal
* failure conditions which make it impossible to continue processing further elements.
*/
def onError(cause: Throwable): Unit
}
5 changes: 5 additions & 0 deletions tck/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
organization := "asyncrx"

name := "reactive-streams-tck"

libraryDependencies += "org.testng" % "testng" % "5.14.10"
Loading