-
Notifications
You must be signed in to change notification settings - Fork 534
Initial implementation #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,14 @@ | ||
target/ | ||
.project | ||
.classpath | ||
.cache | ||
.target/ | ||
*~ | ||
.#* | ||
.*.swp | ||
.DS_Store | ||
.codefellow | ||
.ensime* | ||
.eprj | ||
.history | ||
.idea |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
Creative Commons Legal Code | ||
|
||
CC0 1.0 Universal | ||
|
||
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE | ||
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN | ||
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS | ||
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES | ||
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS | ||
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM | ||
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED | ||
HEREUNDER. | ||
|
||
Statement of Purpose | ||
|
||
The laws of most jurisdictions throughout the world automatically confer | ||
exclusive Copyright and Related Rights (defined below) upon the creator | ||
and subsequent owner(s) (each and all, an "owner") of an original work of | ||
authorship and/or a database (each, a "Work"). | ||
|
||
Certain owners wish to permanently relinquish those rights to a Work for | ||
the purpose of contributing to a commons of creative, cultural and | ||
scientific works ("Commons") that the public can reliably and without fear | ||
of later claims of infringement build upon, modify, incorporate in other | ||
works, reuse and redistribute as freely as possible in any form whatsoever | ||
and for any purposes, including without limitation commercial purposes. | ||
These owners may contribute to the Commons to promote the ideal of a free | ||
culture and the further production of creative, cultural and scientific | ||
works, or to gain reputation or greater distribution for their Work in | ||
part through the use and efforts of others. | ||
|
||
For these and/or other purposes and motivations, and without any | ||
expectation of additional consideration or compensation, the person | ||
associating CC0 with a Work (the "Affirmer"), to the extent that he or she | ||
is an owner of Copyright and Related Rights in the Work, voluntarily | ||
elects to apply CC0 to the Work and publicly distribute the Work under its | ||
terms, with knowledge of his or her Copyright and Related Rights in the | ||
Work and the meaning and intended legal effect of CC0 on those rights. | ||
|
||
1. Copyright and Related Rights. A Work made available under CC0 may be | ||
protected by copyright and related or neighboring rights ("Copyright and | ||
Related Rights"). Copyright and Related Rights include, but are not | ||
limited to, the following: | ||
|
||
i. the right to reproduce, adapt, distribute, perform, display, | ||
communicate, and translate a Work; | ||
ii. moral rights retained by the original author(s) and/or performer(s); | ||
iii. publicity and privacy rights pertaining to a person's image or | ||
likeness depicted in a Work; | ||
iv. rights protecting against unfair competition in regards to a Work, | ||
subject to the limitations in paragraph 4(a), below; | ||
v. rights protecting the extraction, dissemination, use and reuse of data | ||
in a Work; | ||
vi. database rights (such as those arising under Directive 96/9/EC of the | ||
European Parliament and of the Council of 11 March 1996 on the legal | ||
protection of databases, and under any national implementation | ||
thereof, including any amended or successor version of such | ||
directive); and | ||
vii. other similar, equivalent or corresponding rights throughout the | ||
world based on applicable law or treaty, and any national | ||
implementations thereof. | ||
|
||
2. Waiver. To the greatest extent permitted by, but not in contravention | ||
of, applicable law, Affirmer hereby overtly, fully, permanently, | ||
irrevocably and unconditionally waives, abandons, and surrenders all of | ||
Affirmer's Copyright and Related Rights and associated claims and causes | ||
of action, whether now known or unknown (including existing as well as | ||
future claims and causes of action), in the Work (i) in all territories | ||
worldwide, (ii) for the maximum duration provided by applicable law or | ||
treaty (including future time extensions), (iii) in any current or future | ||
medium and for any number of copies, and (iv) for any purpose whatsoever, | ||
including without limitation commercial, advertising or promotional | ||
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each | ||
member of the public at large and to the detriment of Affirmer's heirs and | ||
successors, fully intending that such Waiver shall not be subject to | ||
revocation, rescission, cancellation, termination, or any other legal or | ||
equitable action to disrupt the quiet enjoyment of the Work by the public | ||
as contemplated by Affirmer's express Statement of Purpose. | ||
|
||
3. Public License Fallback. Should any part of the Waiver for any reason | ||
be judged legally invalid or ineffective under applicable law, then the | ||
Waiver shall be preserved to the maximum extent permitted taking into | ||
account Affirmer's express Statement of Purpose. In addition, to the | ||
extent the Waiver is so judged Affirmer hereby grants to each affected | ||
person a royalty-free, non transferable, non sublicensable, non exclusive, | ||
irrevocable and unconditional license to exercise Affirmer's Copyright and | ||
Related Rights in the Work (i) in all territories worldwide, (ii) for the | ||
maximum duration provided by applicable law or treaty (including future | ||
time extensions), (iii) in any current or future medium and for any number | ||
of copies, and (iv) for any purpose whatsoever, including without | ||
limitation commercial, advertising or promotional purposes (the | ||
"License"). The License shall be deemed effective as of the date CC0 was | ||
applied by Affirmer to the Work. Should any part of the License for any | ||
reason be judged legally invalid or ineffective under applicable law, such | ||
partial invalidity or ineffectiveness shall not invalidate the remainder | ||
of the License, and in such case Affirmer hereby affirms that he or she | ||
will not (i) exercise any of his or her remaining Copyright and Related | ||
Rights in the Work or (ii) assert any associated claims and causes of | ||
action with respect to the Work, in either case contrary to Affirmer's | ||
express Statement of Purpose. | ||
|
||
4. Limitations and Disclaimers. | ||
|
||
a. No trademark or patent rights held by Affirmer are waived, abandoned, | ||
surrendered, licensed or otherwise affected by this document. | ||
b. Affirmer offers the Work as-is and makes no representations or | ||
warranties of any kind concerning the Work, express, implied, | ||
statutory or otherwise, including without limitation warranties of | ||
title, merchantability, fitness for a particular purpose, non | ||
infringement, or the absence of latent or other defects, accuracy, or | ||
the present or absence of errors, whether or not discoverable, all to | ||
the greatest extent permissible under applicable law. | ||
c. Affirmer disclaims responsibility for clearing rights of other persons | ||
that may apply to the Work or any use thereof, including without | ||
limitation any person's Copyright and Related Rights in the Work. | ||
Further, Affirmer disclaims responsibility for obtaining any necessary | ||
consents, permissions or other rights required for any use of the | ||
Work. | ||
d. Affirmer understands and acknowledges that Creative Commons is not a | ||
party to this document and has no duty or obligation with respect to | ||
this CC0 or use of the Work. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
Licensed under Public Domain (CC0) | ||
|
||
To the extent possible under law, the person who associated CC0 with | ||
this code has waived all copyright and related or neighboring | ||
rights to this code. | ||
|
||
You should have received a copy of the CC0 legalcode along with this | ||
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
## Reactive Streams | ||
|
||
This project consists of a set of interfaces and a TCK defining Reactive Streams. The textual description of the semantics verified by the TCK can be found in `tck/src/main/resources/spec.txt`. | ||
|
||
### Legal | ||
|
||
This project is a collaboration between Netflix, Twitter, RedHat, Pivotal and Typesafe. The code is offered to the Public Domain in order to allow free use by interested parties who want to create compatible implementations. For details see `COPYING`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
organization := "org.asyncrx" | ||
|
||
version := "0.1-SNAPSHOT" | ||
|
||
licenses := Seq("CC0" -> url("http://creativecommons.org/publicdomain/zero/1.0/")) | ||
|
||
homepage := Some(url("https://groups.google.com/forum/?hl=en#!forum/reactive-streams")) | ||
|
||
scalaVersion := "2.10.3" | ||
|
||
lazy val spi = project | ||
|
||
lazy val tck = project.dependsOn(spi) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
sbt.version=0.13.1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
organization := "asyncrx" | ||
|
||
name := "reactive-streams-spi" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package asyncrx.api | ||
|
||
/** | ||
* A Processor is a stand-alone representation of a transformation for | ||
* elements from In to Out types. Implementations of this API will provide | ||
* factory methods for creating Processors and connecting them to | ||
* [[Producer]] and [[Consumer]]. | ||
*/ | ||
trait Processor[In, Out] extends Consumer[In] with Producer[Out] | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package asyncrx | ||
package api | ||
|
||
/** | ||
* A Producer is the logical source of elements of a given type. | ||
* The underlying implementation is done by way of a [[asyncrx.spi.Publisher]]. | ||
* This interface is the user-level API for a source while a Publisher is the | ||
* SPI. | ||
* | ||
* Implementations of this interface will typically offer domain- or language-specific | ||
* methods for transforming or otherwise interacting with the produced stream of elements. | ||
*/ | ||
trait Producer[T] { | ||
|
||
/** | ||
* Get the underlying Publisher for this Producer. This method should only be used by | ||
* implementations of this API. | ||
*/ | ||
def getPublisher: spi.Publisher[T] | ||
} | ||
|
||
/** | ||
* A Consumer is the logical sink of elements of a given type. | ||
* The underlying implementation is done by way of a [[asyncrx.spi.Subscriber]]. | ||
* This interface is the user-level API for a sink while a Subscriber is the SPI. | ||
* | ||
* Implementations of this interface will typically offer domain- or language-specific | ||
* methods for transforming or otherwise interacting with the stream of elements. | ||
*/ | ||
trait Consumer[T] { | ||
|
||
/** | ||
* Get the underlying Subscriber for this Consumer. This method should only be used by | ||
* implementations of this API. | ||
*/ | ||
def getSubscriber: spi.Subscriber[T] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package asyncrx.spi | ||
|
||
/** | ||
* A Subscription models the relationship between a [[Publisher]] and a [[Subscriber]]. | ||
* The Subscriber receives a Subscription so that it can ask for elements to be delivered | ||
* using [[Subscription#requestMore]]. The Subscription can be disposed of by canceling it. | ||
*/ | ||
trait Subscription { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our experiments have taken us a slightly different direction to allow the Don't focus on the The use cases we are seeking to achieve are: 1) Synchronous or Asynchronous source data We should be able to handle synchronous reading of Files and Iterables without needing to put them inside their own artificially needed thread and do a push/pull model with them or support Async data sources are easy to handle 2) Different Strategies We should be able to achieve different strategies such as: a) stop work Synchronous, cold sources such as files and iterables can be stopped and their state stored in a function for later resumption. b) drop Ignore data for a period of time without tearing down the network connections or other resources. Using something like drop with a pause/resume would have the effect of a dynamic throttle which receives sampled data as fast as it can receive. c) buffer Buffer data to handle spikes and keep delivering, or buffer to a point and then drop. 3) Composable It should work with Operators that are purely synchronous pass-thru (such as Thus, a 4) Resume When Ready The Following is an early version of a prototype that @abersnaze put together just the past couple days (presented it Friday) working with public interface Subscription {
/**
* Stop receiving notifications on the {@link Subscriber} that was
* registered when this Subscription was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has
* finished receiving all events (ie. before onCompleted is called).
*/
public void unsubscribe();
public boolean isUnsubscribed();
/**
* Request that events stop being sent. If the pause is accepted then the `resumeWith`
* method will be invoked with a function for resuming.
*
* It is up to the Observable (producer) to choose if they can be paused and what behavior
* happens when paused.
*
* The pause() request does not need to be immediately fulfilled as it may be unable to do so until
* after the next event is emitted.
*
* Examples:
*
* 1) Iterable
*
* The source can be paused by retaining the position in the Iterable in the `resumeAction` and
* when requested resume emitting data from that same place.
*
* 2) File
*
* The source can be paused by retaining the position in the File in the `resumeAction` and
* when requested resume emitting data from that same place.
*
* 3) Network Stream (such as metrics, stocks, etc)
*
* a) Buffer
*
* Data could be buffered for later delivery. It could be buffered to a limit and then dropped.
*
* b) Drop
*
* All data could be dropped (ignored) while paused.
*
* c) Unsubscribe
*
* The network connection could be disconnected while paused and reconnected when resumed.
*
*/
public void pause();
public boolean isPaused();
/**
* If the source supports and accepts the pause request an Action0 will be sent.
*
* After receipt of this `resumeAction` it should not be expected to receive any more events
* from the source until invoking `resumeAction.call()` to request resuming.
*
* This is sent back asynchronously instead of returned from `pause()` since it is a request
* that could take some time for the source to fulfill.
*
* @param resume
*/
public void resumeWith(Action0 resumeAction);
} In addition to the ability this gives for the source to be pausable, if a source does not support pausing then a new For example, if a source doesn't support pausing it could be done like this: source.whilePaused(Strategy.DROP).map({t -> doStuff(t)}).subscribe(subscriber); or source.whilePaused(Strategy.BUFFER).map({t -> doStuff(t)}).subscribe(subscriber); The strategies considered thus far are: DROP, BUFFER, UNSUBSCRIBE, BLOCK (when blocking the thread is okay). Of course it is preferable for the source data to natively support pausing, but if it doesn't this gives a way to deal with it. I'd like to understand if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intention of pause/resume is the same as that of requestMore: it is a means to signal to the upstream that it may or may not send more data. As such, both approaches support the same use cases and everything you show is possible with either API. The strategy to apply when back-pressure is exerted is also configurable in either scenario, the same kind of strategy element would be used in the API (with the default being to pause and back-pressure transitively and automatically once the allowance signaled via requestMore has been used up). The difference between the two approaches is that the recipient needs to actively raise the pause signal in your proposal while that happens automatically at a recipient-defined point in our proposal. We have experimented with a manual SuspendReading/ResumeReading scheme in our Akka IO implementation, and we found that for asynchronous participants this can lead to problems: the Publisher may push a lot of data before it gets around to the pause message, overwhelming the Subscriber even though it tried to cry for help. For this reason we have chosen a mechanism which puts the Subscriber precisely in control over how many elements it is willing to buffer in the worst case—if processing takes long or it does not get CPU time it will still be protected from overflows. Now the criticism that “element” does not equate “effort” is valid, and what we envision is that this buffering strategy should (within limits) auto-tune itself by measuring the throughput of the stream and its own idle time. For this it should be noted that the Subscription demarcates only the asynchronous boundaries in your stream, which should be coarse-grained. Small transformation steps will typically be done synchronously within a Processor as described in the comment there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sounds reasonable approach. An implementation can drive the requestMore automatically I suppose. I quite like the idea of auto-tuning the buffer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, exactly. Autotuning buffer sizes and "demand" upstream based on live statistics is quite cool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you envision async operators starting off at |
||
|
||
/** | ||
* Cancel this subscription. The [[Publisher]] to which produced this Subscription | ||
* will eventually stop sending more elements to the [[Subscriber]] which owns | ||
* this Subscription. This may happen before the requested number of elements has | ||
* been delivered, even if the Publisher would still have more elements. | ||
*/ | ||
def cancel(): Unit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to pass an optional ‘reason’ here? If the subscriber failed it often makes sense for the upstream to know. |
||
|
||
/** | ||
* Request more data from the [[Publisher]] which produced this Subscription. | ||
* The number of requested elements is cumulative to the number requested previously. | ||
* The Publisher may eventually publish up to the requested number of elements to | ||
* the [[Subscriber]] which owns this Subscription. | ||
*/ | ||
def requestMore(elements: Int): Unit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find the int argument a bit constraining. What about no-arg (request until completed) too ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are several use cases that I don't think it can accomplish. See my comment at #1 (comment) for use cases and a possible alternative. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my response to that other comment, let’s keep the discussion up there. |
||
} | ||
|
||
/** | ||
* A Publisher is a source of elements of a given type. One or more [[Subscriber]] may be connected | ||
* to this Publisher in order to receive the published elements, contingent on availability of these | ||
* elements as well as the presence of demand signaled by the Subscriber via [[Subscription#requestMore]]. | ||
*/ | ||
trait Publisher[T] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Publisher is confusing me with Producer. Observable maybe ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rationale for having two distinct types—one at the implementation level and one presented to users—is given in the Producer scaladoc (possibly too terse there). The methods on Publisher/Subscription/Subscriber have complicated semantics which shall not be exposed to users, and users should never get into a situation where they would implement the SPI directly; if they ever do then
Observable was deliberately avoided since none of the types in this proposal are directly equivalent to it. The intention was that RxJava’s implementation of this SPI will be able to keep its established style of describing stream transformations as usual, and the API in this proposal is then only used to hook up Rx streams with Akka streams or Reactor streams and so on. In fact, what we included as API here are only those parts which relate to this interoperability, concerning the transformations and stream combinators no assumptions are made. |
||
|
||
/** | ||
* Subscribe the given [[Subscriber]] to this Publisher. A Subscriber can at most be subscribed once | ||
* to a given Publisher, and to at most one Publisher in total. | ||
*/ | ||
def subscribe(subscriber: Subscriber[T]): Unit | ||
} | ||
|
||
/** | ||
* A Subscriber receives elements from a [[Publisher]] based on the [[Subscription]] it has. | ||
* The Publisher may supply elements as they become available, the Subscriber signals demand via | ||
* [[Subscription#requestMore]] and elements from when supply and demand are both present. | ||
*/ | ||
trait Subscriber[T] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like a candidate to extend Consumer[T], removing onNext. It would promote reuse I guess. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see my response above: we submit that none of the Subscriber methods should be interesting to API users (i.e. exposed via Consumer). |
||
|
||
/** | ||
* The [[Publisher]] generates a [[Subscription]] upon [[Publisher#subscribe]] and passes | ||
* it on to the Subscriber named there using this method. The Publisher may choose to reject | ||
* the subscription request by calling [[#onError]] instead. | ||
*/ | ||
def onSubscribe(subscription: Subscription): Unit | ||
|
||
/** | ||
* The [[Publisher]] calls this method to pass one element to this Subscriber. The element | ||
* must not be <code>null</code>. The Publisher must not call this method more often than | ||
* the Subscriber has signaled demand for via the corresponding [[Subscription]]. | ||
*/ | ||
def onNext(element: T): Unit | ||
|
||
/** | ||
* The [[Publisher]] calls this method in order to signal that it terminated normally. | ||
* No more elements will be forthcoming and none of the Subscriber’s methods will be | ||
* called hereafter. | ||
*/ | ||
def onComplete(): Unit | ||
|
||
/** | ||
* The [[Publisher]] calls this method to signal that the stream of elements has failed | ||
* and is being aborted. The Subscriber should abort its processing as soon as possible. | ||
* No more elements will be forthcoming and none of the Subscriber’s methods will be | ||
* called hereafter. | ||
* | ||
* This method is not intended to pass validation errors or similar from Publisher to Subscriber | ||
* in order to initiate an orderly shutdown of the exchange; it is intended only for fatal | ||
* failure conditions which make it impossible to continue processing further elements. | ||
*/ | ||
def onError(cause: Throwable): Unit | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
organization := "asyncrx" | ||
|
||
name := "reactive-streams-tck" | ||
|
||
libraryDependencies += "org.testng" % "testng" % "5.14.10" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In RxJava 0.17 we have significantly improved the distinction of an
Operator
which appears to match thisProcessor
. It does not require each step of the chain subscribing to each other, instead it lifts and composes the functions within a singleObservable
that is subscribed to only once at the end of the chain.If is defined as:
It gets lifted into an
Observable
like this (I removed error handling and plugin hooks for clarity):Forgive the verbosity, Java8 isn't an option yet ... still on Java 6 to support Android et al.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comparison to RxJava is not quite right, which is the underlying reason for several of your other questions below: the Publisher/Subscriber/Subscription SPI is not directly equivalent to anything in Rx as far as I know, since this triplet of interfaces describes only exactly the asynchronous boundaries within your stream processing. Building up (synchronous) transformations is something which is supposed to be done in an implementation-specific fashion.
Translated to RxJava this could mean that a Publisher is somewhat like a Subject which supports back-pressure (and which knows that its downstream clients will never block its callback methods). A Processor is like a Subject-Observable-Observable-…-Subject structure that is not yet subscribed to any upstream source and which also does not necessarily have a sink connected; it is like a free-standing piece of pipe waiting to be flanged to other pipes. A flange itself is exactly the triplet Publisher/Subscription/Subscriber, which transports data from one pipe element to the next.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Roland, it clarified the spi and the scope for me 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then should the
Processor
interface be removed from this if this proposal is only for defining the async boundaries?