Skip to content

Commit f8a6ba0

Browse files
committed
Merge pull request reactive-streams#14 from typesafehub/wip-apispi-in-java-√
Reimplements the SPI and API in Java
2 parents aea767a + 26aca12 commit f8a6ba0

File tree

9 files changed

+158
-150
lines changed

9 files changed

+158
-150
lines changed

spi/build.sbt

+10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
11
name := "reactive-streams-spi"
22

3+
javacOptions in compile ++= Seq("-encoding", "UTF-8", "-source", "1.6", "-target", "1.6", "-Xlint:unchecked", "-Xlint:deprecation")
4+
5+
javacOptions in (Compile,doc) ++= Seq("-encoding","UTF-8","-docencoding", "UTF-8", "-charset", "UTF-8", "-notimestamp", "-linksource")
6+
7+
autoScalaLibrary := false
8+
9+
crossPaths := false
10+
11+
publishMavenStyle := true
12+
313
Common.javadocSettings
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.reactivestreams.api;
2+
3+
import org.reactivestreams.spi.Subscriber;
4+
5+
/**
6+
* A Consumer is the logical sink of elements of a given type.
7+
* The underlying implementation is done by way of a {@link org.reactivestreams.spi.Subscriber Subscriber}.
8+
* This interface is the user-level API for a sink while a Subscriber is the SPI.
9+
* <p>
10+
* Implementations of this interface will typically offer domain- or language-specific
11+
* methods for transforming or otherwise interacting with the stream of elements.
12+
*/
13+
public interface Consumer<T> {
14+
15+
/**
16+
* Get the underlying {@link org.reactivestreams.spi.Subscriber Subscriber} for this Consumer. This method should only be used by
17+
* implementations of this API.
18+
* @return the underlying subscriber for this consumer
19+
*/
20+
public Subscriber<T> getSubscriber();
21+
}
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
package org.reactivestreams.api
1+
package org.reactivestreams.api;
22

33
/**
44
* A Processor is a stand-alone representation of a transformation for
55
* elements from In to Out types. Implementations of this API will provide
66
* factory methods for creating Processors and connecting them to
7-
* [[Producer]] and [[Consumer]].
7+
* {@link org.reactivestreams.api.Producer Producer} and {@link org.reactivestreams.api.Consumer Consumer}.
88
*/
9-
trait Processor[In, Out] extends Consumer[In] with Producer[Out]
9+
public interface Processor<I, O> extends Consumer<I>, Producer<O> {
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.reactivestreams.api;
2+
3+
import org.reactivestreams.spi.Publisher;
4+
5+
/**
6+
* A Producer is the logical source of elements of a given type.
7+
* The underlying implementation is done by way of a {@link org.reactivestreams.spi.Publisher Publisher}.
8+
* This interface is the user-level API for a source while a Publisher is the
9+
* SPI.
10+
* <p>
11+
* Implementations of this interface will typically offer domain- or language-specific
12+
* methods for transforming or otherwise interacting with the produced stream of elements.
13+
*/
14+
public interface Producer<T> {
15+
16+
/**
17+
* Get the underlying {@link org.reactivestreams.spi.Publisher Publisher} for this Producer. This method should only be used by
18+
* implementations of this API.
19+
* @return the underlying publisher for this producer
20+
*/
21+
public Publisher<T> getPublisher();
22+
23+
/**
24+
* Connect the given consumer to this producer. This means that the
25+
* Subscriber underlying the {@link org.reactivestreams.api.Consumer Consumer} subscribes to this Producer’s
26+
* underlying {@link org.reactivestreams.spi.Publisher Publisher}, which will initiate the transfer of the produced
27+
* stream of elements from producer to consumer until either of three things
28+
* happen:
29+
* <p>
30+
* <ul>
31+
* <li>The stream ends normally (no more elements available).</li>
32+
* <li>The producer encounters a fatal error condition.</li>
33+
* <li>The consumer cancels the reception of more elements.</li>
34+
* </ul>
35+
* @param consumer The consumer to register with this producer.
36+
*/
37+
public void produceTo(Consumer<T> consumer);
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.reactivestreams.spi;
2+
3+
/**
4+
* A Publisher is a source of elements of a given type. One or more {@link org.reactivestreams.spi.Subscriber Subscriber} may be connected
5+
* to this Publisher in order to receive the published elements, contingent on availability of these
6+
* elements as well as the presence of demand signaled by the Subscriber via {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}.
7+
*/
8+
public interface Publisher<T> {
9+
10+
/**
11+
* Subscribe the given {@link org.reactivestreams.spi.Subscriber Subscriber} to this Publisher. A Subscriber can at most be subscribed once
12+
* to a given Publisher, and to at most one Publisher in total.
13+
* @param subscriber The subscriber to register with this publisher.
14+
*/
15+
public void subscribe(Subscriber<T> subscriber);
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.reactivestreams.spi;
2+
3+
/**
4+
* A Subscriber receives elements from a {@link org.reactivestreams.spi.Publisher Publisher} based on the {@link org.reactivestreams.spi.Subscription Subscription} it has.
5+
* The Publisher may supply elements as they become available, the Subscriber signals demand via
6+
* {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore} and elements from when supply and demand are both present.
7+
*/
8+
public interface Subscriber<T> {
9+
10+
/**
11+
* The {@link org.reactivestreams.spi.Publisher Publisher} generates a {@link org.reactivestreams.spi.Subscription Subscription} upon {@link org.reactivestreams.spi.Publisher#subscribe(org.reactivestreams.spi.Subscriber) subscribe} and passes
12+
* it on to the Subscriber named there using this method. The Publisher may choose to reject
13+
* the subscription request by calling {@link #onError onError} instead.
14+
* @param subscription The subscription which connects this subscriber to its publisher.
15+
*/
16+
public void onSubscribe(Subscription subscription);
17+
18+
/**
19+
* The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to pass one element to this Subscriber. The element
20+
* must not be <code>null</code>. The Publisher must not call this method more often than
21+
* the Subscriber has signaled demand for via the corresponding {@link org.reactivestreams.spi.Subscription Subscription}.
22+
* @param element The element that is passed from publisher to subscriber.
23+
*/
24+
public void onNext(T element);
25+
26+
/**
27+
* The {@link org.reactivestreams.spi.Publisher Publisher} calls this method in order to signal that it terminated normally.
28+
* No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter.
29+
*/
30+
public void onComplete();
31+
32+
/**
33+
* The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to signal that the stream of elements has failed
34+
* and is being aborted. The Subscriber should abort its processing as soon as possible.
35+
* No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter.
36+
* <p>
37+
* This method is not intended to pass validation errors or similar from Publisher to Subscriber
38+
* in order to initiate an orderly shutdown of the exchange; it is intended only for fatal
39+
* failure conditions which make it impossible to continue processing further elements.
40+
* @param cause An exception which describes the reason for tearing down this stream.
41+
*/
42+
public void onError(Throwable cause);
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.reactivestreams.spi;
2+
3+
/**
4+
* A Subscription models the relationship between a {@link org.reactivestreams.spi.Publisher Publisher} and a {@link org.reactivestreams.spi.Subscriber Subscriber}.
5+
* The Subscriber receives a Subscription so that it can ask for elements to be delivered
6+
* using {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. The Subscription can be disposed of by canceling it.
7+
*/
8+
public interface Subscription {
9+
10+
/**
11+
* Cancel this subscription. The {@link org.reactivestreams.spi.Publisher Publisher} to which produced this Subscription
12+
* will eventually stop sending more elements to the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns
13+
* this Subscription. This may happen before the requested number of elements has
14+
* been delivered, even if the Publisher would still have more elements.
15+
*/
16+
public void cancel();
17+
18+
/**
19+
* Request more data from the {@link org.reactivestreams.spi.Publisher Publisher} which produced this Subscription.
20+
* The number of requested elements is cumulative to the number requested previously.
21+
* The Publisher may eventually publish up to the requested number of elements to
22+
* the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns this Subscription.
23+
* @param elements The number of elements requested.
24+
*/
25+
public void requestMore(int elements);
26+
}

spi/src/main/scala/org/reactivestreams/api/Producer.scala

-57
This file was deleted.

spi/src/main/scala/org/reactivestreams/spi/Publisher.scala

-90
This file was deleted.

0 commit comments

Comments
 (0)