Skip to content

Commit bf2cac8

Browse files
MattiasBuelensricea
authored andcommitted
Add @@asyncIterator to ReadableStream
Enable using a ReadableStream with the JavaScript "for await" syntax.
1 parent 1d3eee8 commit bf2cac8

File tree

7 files changed

+253
-66
lines changed

7 files changed

+253
-66
lines changed

index.bs

+109-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ spec:promises-guide; type:dfn;
1818
<pre class="anchors">
1919
urlPrefix: https://tc39.github.io/ecma262/; spec: ECMASCRIPT
2020
text: %Uint8Array%; url: #sec-typedarray-objects; type: constructor
21+
text: %AsyncIteratorPrototype%; url: #sec-asynciteratorprototype; type: interface
22+
text: AsyncIterator; url: #sec-asynciterator-interface; type: interface
2123
text: ArrayBuffer; url: #sec-arraybuffer-objects; type: interface
2224
text: DataView; url: #sec-dataview-objects; type: interface
2325
text: Number; url: #sec-ecmascript-language-types-number-type; type: interface
@@ -399,11 +401,14 @@ like
399401
get <a href="#rs-locked">locked</a>()
400402

401403
<a href="#rs-cancel">cancel</a>(reason)
402-
<a href="#rs-get-reader">getReader</a>()
404+
<a href="#rs-get-iterator">getIterator</a>({ preventCancel } = {})
405+
<a href="#rs-get-reader">getReader</a>({ mode } = {})
403406
<a href="#rs-pipe-through">pipeThrough</a>({ writable, readable },
404407
{ preventClose, preventAbort, preventCancel, signal } = {})
405408
<a href="#rs-pipe-to">pipeTo</a>(dest, { preventClose, preventAbort, preventCancel, signal } = {})
406409
<a href="#rs-tee">tee</a>()
410+
411+
<a href="#rs-asynciterator">[@@asyncIterator]</a>({ preventCancel } = {})
407412
}
408413
</code></pre>
409414

@@ -602,6 +607,23 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
602607
1. Return ! ReadableStreamCancel(*this*, _reason_).
603608
</emu-alg>
604609

610+
<h5 id="rs-get-iterator" method for="ReadableStream">getIterator({ <var>preventCancel</var> } = {})</h5>
611+
612+
<div class="note">
613+
The <code>getIterator</code> method returns an async iterator which can be used to consume the stream. The
614+
{{ReadableStreamAsyncIteratorPrototype/return()}} method of this iterator object will, by default,
615+
<a lt="cancel a readable stream">cancel</a> the stream; it will also release the reader.
616+
</div>
617+
618+
<emu-alg>
619+
1. If ! IsReadableStream(*this*) is *false*, throw a *TypeError* exception.
620+
1. Let _reader_ be ? AcquireReadableStreamDefaultReader(*this*).
621+
1. Let _iterator_ be ! ObjectCreate(`<a idl>ReadableStreamAsyncIteratorPrototype</a>`).
622+
1. Set _iterator_.[[asyncIteratorReader]] to _reader_.
623+
1. Set _iterator_.[[preventCancel]] to ! ToBoolean(_preventCancel_).
624+
1. Return _iterator_.
625+
</emu-alg>
626+
605627
<h5 id="rs-get-reader" method for="ReadableStream">getReader({ <var ignore>mode</var> } = {})</h5>
606628

607629
<div class="note">
@@ -792,6 +814,82 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
792814
</code></pre>
793815
</div>
794816

817+
<!-- Bikeshed doesn't let us mark this up correctly: https://github.com/tabatkins/bikeshed/issues/1344 -->
818+
<h5 id="rs-asynciterator" iterator for="ReadableStream">[@@asyncIterator]({ <var>preventCancel</var> } = {})</h5>
819+
820+
<p class="note">
821+
The <code>@@asyncIterator</code> method is an alias of {{ReadableStream/getIterator()}}.
822+
</p>
823+
824+
The initial value of the <code>@@asyncIterator</code> method is the same function object as the initial value of the
825+
{{ReadableStream/getIterator()}} method.
826+
827+
<h3 id="rs-asynciterator-prototype" interface
828+
lt="ReadableStreamAsyncIteratorPrototype">ReadableStreamAsyncIteratorPrototype</h3>
829+
830+
{{ReadableStreamAsyncIteratorPrototype}} is an ordinary object that is used by {{ReadableStream/getIterator()}} to
831+
construct the objects it returns. Instances of {{ReadableStreamAsyncIteratorPrototype}} implement the {{AsyncIterator}}
832+
abstract interface from the JavaScript specification. [[!ECMASCRIPT]]
833+
834+
The {{ReadableStreamAsyncIteratorPrototype}} object must have its \[[Prototype]] internal slot set to
835+
{{%AsyncIteratorPrototype%}}.
836+
837+
<h4 id="default-reader-asynciterator-prototype-internal-slots">Internal slots</h4>
838+
839+
Objects created by {{ReadableStream/getIterator()}}, using {{ReadableStreamAsyncIteratorPrototype}} as their
840+
prototype, are created with the internal slots described in the following table:
841+
842+
<table>
843+
<thead>
844+
<tr>
845+
<th>Internal Slot</th>
846+
<th>Description (<em>non-normative</em>)</th>
847+
</tr>
848+
</thead>
849+
<tr>
850+
<td>\[[asyncIteratorReader]]
851+
<td class="non-normative">A {{ReadableStreamDefaultReader}} instance
852+
</tr>
853+
<tr>
854+
<td>\[[preventCancel]]
855+
<td class="non-normative">A boolean value indicating if the stream will be <a lt="cancel a readable
856+
stream">canceled</a> when the async iterator's {{ReadableStreamAsyncIteratorPrototype/return()}} method is called
857+
</tr>
858+
</table>
859+
860+
<h4 id="rs-asynciterator-prototype-next" method for="ReadableStreamAsyncIteratorPrototype">next()</h4>
861+
862+
<emu-alg>
863+
1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
864+
1. Let _reader_ be *this*.[[asyncIteratorReader]].
865+
1. If _reader_.[[ownerReadableStream]] is *undefined*, return <a>a promise rejected with</a> a *TypeError* exception.
866+
1. Return the result of <a>transforming</a> ! ReadableStreamDefaultReaderRead(_reader_) with a fulfillment handler
867+
which takes the argument _result_ and performs the following steps:
868+
1. Assert: Type(_result_) is Object.
869+
1. Let _value_ be ? Get(_result_, `"value"`).
870+
1. Let _done_ be ? Get(_result_, `"done"`).
871+
1. Assert: Type(_done_) is Boolean.
872+
1. If _done_ is *true*, perform ! ReadableStreamReaderGenericRelease(_reader_).
873+
1. Return ! ReadableStreamCreateReadResult(_value_, _done_, *true*).
874+
</emu-alg>
875+
876+
<h4 id="rs-asynciterator-prototype-return" method
877+
for="ReadableStreamAsyncIteratorPrototype">return( <var>value</var> )</h4>
878+
879+
<emu-alg>
880+
1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
881+
1. Let _reader_ be *this*.[[asyncIteratorReader]].
882+
1. If _reader_.[[ownerReadableStream]] is *undefined*, return <a>a promise rejected with</a> a *TypeError* exception.
883+
1. If _reader_.[[readRequests]] is not empty, return <a>a promise rejected with</a> a *TypeError* exception.
884+
1. If *this*.[[preventCancel]] is *false*, then:
885+
1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_).
886+
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
887+
1. Return the result of <a>transforming</a> _result_ by a fulfillment handler that returns !
888+
ReadableStreamCreateReadResult(_value_, *true*, *true*).
889+
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
890+
1. Return <a>a promise resolved with</a> ! ReadableStreamCreateReadResult(_value_, *true*, *true*).
891+
</emu-alg>
892+
795893
<h3 id="rs-abstract-ops">General readable stream abstract operations</h3>
796894

797895
The following abstract operations, unlike most in this specification, are meant to be generally useful by other
@@ -910,6 +1008,15 @@ readable stream is <a>locked to a reader</a>.
9101008
1. Return *true*.
9111009
</emu-alg>
9121010

1011+
<h4 id="is-readable-stream-asynciterator" aoid="IsReadableStreamAsyncIterator" nothrow
1012+
export>IsReadableStreamAsyncIterator ( <var>x</var> )</h4>
1013+
1014+
<emu-alg>
1015+
1. If Type(_x_) is not Object, return *false*.
1016+
1. If _x_ does not have a [[asyncIteratorReader]] internal slot, return *false*.
1017+
1. Return *true*.
1018+
</emu-alg>
1019+
9131020
<h4 id="readable-stream-tee" aoid="ReadableStreamTee" throws export>ReadableStreamTee ( <var>stream</var>,
9141021
<var>cloneForBranch2</var> )</h4>
9151022

@@ -5985,6 +6092,7 @@ Forbes Lindesay,
59856092
Forrest Norvell,
59866093
Gary Blackwood,
59876094
Gorgi Kosev,
6095+
Gus Caplan,
59886096
贺师俊 (hax),
59896097
Isaac Schlueter,
59906098
isonmad,

reference-implementation/.eslintrc.json

+9-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,15 @@
160160
"id-blacklist": "off",
161161
"id-length": "off",
162162
"id-match": "off",
163-
"indent": ["error", 2, { "SwitchCase": 1 }],
163+
"indent": ["error", 2, {
164+
"SwitchCase": 1,
165+
"FunctionDeclaration": { "parameters": "first" },
166+
"FunctionExpression": { "parameters": "first" },
167+
"CallExpression": { "arguments": "first" },
168+
"ArrayExpression": "first",
169+
"ObjectExpression": "first",
170+
"ImportDeclaration": "first"
171+
}],
164172
"jsx-quotes": "off",
165173
"key-spacing": ["error", { "beforeColon": false, "afterColon": true, "mode": "strict" }],
166174
"keyword-spacing": ["error", { "before": true, "after": true }],

reference-implementation/lib/readable-stream.js

+103-28
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class ReadableStream {
129129
}
130130
if (IsWritableStream(dest) === false) {
131131
return Promise.reject(
132-
new TypeError('ReadableStream.prototype.pipeTo\'s first argument must be a WritableStream'));
132+
new TypeError('ReadableStream.prototype.pipeTo\'s first argument must be a WritableStream'));
133133
}
134134

135135
preventClose = Boolean(preventClose);
@@ -158,8 +158,72 @@ class ReadableStream {
158158
const branches = ReadableStreamTee(this, false);
159159
return createArrayFromList(branches);
160160
}
161+
162+
getIterator({ preventCancel = false } = {}) {
163+
if (IsReadableStream(this) === false) {
164+
throw streamBrandCheckException('getIterator');
165+
}
166+
const reader = AcquireReadableStreamDefaultReader(this);
167+
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype);
168+
iterator._asyncIteratorReader = reader;
169+
iterator._preventCancel = Boolean(preventCancel);
170+
return iterator;
171+
}
161172
}
162173

174+
const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype);
175+
const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
176+
next() {
177+
if (IsReadableStreamAsyncIterator(this) === false) {
178+
return Promise.reject(streamAsyncIteratorBrandCheckException('next'));
179+
}
180+
const reader = this._asyncIteratorReader;
181+
if (reader._ownerReadableStream === undefined) {
182+
return Promise.reject(readerLockException('iterate'));
183+
}
184+
return ReadableStreamDefaultReaderRead(reader).then(result => {
185+
assert(typeIsObject(result));
186+
const value = result.value;
187+
const done = result.done;
188+
assert(typeof done === 'boolean');
189+
if (done) {
190+
ReadableStreamReaderGenericRelease(reader);
191+
}
192+
return ReadableStreamCreateReadResult(value, done, true);
193+
});
194+
},
195+
196+
return(value) {
197+
if (IsReadableStreamAsyncIterator(this) === false) {
198+
return Promise.reject(streamAsyncIteratorBrandCheckException('next'));
199+
}
200+
const reader = this._asyncIteratorReader;
201+
if (reader._ownerReadableStream === undefined) {
202+
return Promise.reject(readerLockException('finish iterating'));
203+
}
204+
if (reader._readRequests.length > 0) {
205+
return Promise.reject(new TypeError(
206+
'Tried to release a reader lock when that reader has pending read() calls un-settled'));
207+
}
208+
if (this._preventCancel === false) {
209+
const result = ReadableStreamReaderGenericCancel(reader, value);
210+
ReadableStreamReaderGenericRelease(reader);
211+
return result.then(() => ReadableStreamCreateReadResult(value, true, true));
212+
}
213+
ReadableStreamReaderGenericRelease(reader);
214+
return Promise.resolve(ReadableStreamCreateReadResult(value, true, true));
215+
}
216+
}, AsyncIteratorPrototype);
217+
Object.defineProperty(ReadableStreamAsyncIteratorPrototype, 'next', { enumerable: false });
218+
Object.defineProperty(ReadableStreamAsyncIteratorPrototype, 'return', { enumerable: false });
219+
220+
Object.defineProperty(ReadableStream.prototype, Symbol.asyncIterator, {
221+
value: ReadableStream.prototype.getIterator,
222+
enumerable: false,
223+
writable: true,
224+
configurable: true
225+
});
226+
163227
module.exports = {
164228
CreateReadableByteStream,
165229
CreateReadableStream,
@@ -194,7 +258,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi
194258
const controller = Object.create(ReadableStreamDefaultController.prototype);
195259

196260
SetUpReadableStreamDefaultController(
197-
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
261+
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
198262
);
199263

200264
return stream;
@@ -255,6 +319,18 @@ function IsReadableStreamLocked(stream) {
255319
return true;
256320
}
257321

322+
function IsReadableStreamAsyncIterator(x) {
323+
if (!typeIsObject(x)) {
324+
return false;
325+
}
326+
327+
if (!Object.prototype.hasOwnProperty.call(x, '_asyncIteratorReader')) {
328+
return false;
329+
}
330+
331+
return true;
332+
}
333+
258334
function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventCancel, signal) {
259335
assert(IsReadableStream(source) === true);
260336
assert(IsWritableStream(dest) === true);
@@ -420,10 +496,9 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
420496

421497
function doTheRest() {
422498
action().then(
423-
() => finalize(originalIsError, originalError),
424-
newError => finalize(true, newError)
425-
)
426-
.catch(rethrowAssertionErrorRejection);
499+
() => finalize(originalIsError, originalError),
500+
newError => finalize(true, newError)
501+
).catch(rethrowAssertionErrorRejection);
427502
}
428503
}
429504

@@ -931,12 +1006,12 @@ function ReadableStreamReaderGenericRelease(reader) {
9311006

9321007
if (reader._ownerReadableStream._state === 'readable') {
9331008
defaultReaderClosedPromiseReject(
934-
reader,
935-
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
1009+
reader,
1010+
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
9361011
} else {
9371012
defaultReaderClosedPromiseResetToRejected(
938-
reader,
939-
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
1013+
reader,
1014+
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
9401015
}
9411016
reader._closedPromise.catch(() => {});
9421017

@@ -1098,8 +1173,7 @@ function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
10981173
e => {
10991174
ReadableStreamDefaultControllerError(controller, e);
11001175
}
1101-
)
1102-
.catch(rethrowAssertionErrorRejection);
1176+
).catch(rethrowAssertionErrorRejection);
11031177

11041178
return undefined;
11051179
}
@@ -1260,8 +1334,7 @@ function SetUpReadableStreamDefaultController(
12601334
r => {
12611335
ReadableStreamDefaultControllerError(controller, r);
12621336
}
1263-
)
1264-
.catch(rethrowAssertionErrorRejection);
1337+
).catch(rethrowAssertionErrorRejection);
12651338
}
12661339

12671340
function SetUpReadableStreamDefaultControllerFromUnderlyingSource(stream, underlyingSource, highWaterMark,
@@ -1533,8 +1606,7 @@ function ReadableByteStreamControllerCallPullIfNeeded(controller) {
15331606
e => {
15341607
ReadableByteStreamControllerError(controller, e);
15351608
}
1536-
)
1537-
.catch(rethrowAssertionErrorRejection);
1609+
).catch(rethrowAssertionErrorRejection);
15381610

15391611
return undefined;
15401612
}
@@ -1570,7 +1642,7 @@ function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescripto
15701642
assert(bytesFilled % elementSize === 0);
15711643

15721644
return new pullIntoDescriptor.ctor(
1573-
pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
1645+
pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
15741646
}
15751647

15761648
function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) {
@@ -1994,19 +2066,18 @@ function SetUpReadableByteStreamController(stream, controller, startAlgorithm, p
19942066

19952067
const startResult = startAlgorithm();
19962068
Promise.resolve(startResult).then(
1997-
() => {
1998-
controller._started = true;
2069+
() => {
2070+
controller._started = true;
19992071

2000-
assert(controller._pulling === false);
2001-
assert(controller._pullAgain === false);
2072+
assert(controller._pulling === false);
2073+
assert(controller._pullAgain === false);
20022074

2003-
ReadableByteStreamControllerCallPullIfNeeded(controller);
2004-
},
2005-
r => {
2006-
ReadableByteStreamControllerError(controller, r);
2007-
}
2008-
)
2009-
.catch(rethrowAssertionErrorRejection);
2075+
ReadableByteStreamControllerCallPullIfNeeded(controller);
2076+
},
2077+
r => {
2078+
ReadableByteStreamControllerError(controller, r);
2079+
}
2080+
).catch(rethrowAssertionErrorRejection);
20102081
}
20112082

20122083
function SetUpReadableByteStreamControllerFromUnderlyingSource(stream, underlyingByteSource, highWaterMark) {
@@ -2063,6 +2134,10 @@ function streamBrandCheckException(name) {
20632134
return new TypeError(`ReadableStream.prototype.${name} can only be used on a ReadableStream`);
20642135
}
20652136

2137+
function streamAsyncIteratorBrandCheckException(name) {
2138+
return new TypeError(`ReadableStreamAsyncIterator.${name} can only be used on a ReadableSteamAsyncIterator`);
2139+
}
2140+
20662141
// Helper functions for the readers.
20672142

20682143
function readerLockException(name) {

0 commit comments

Comments
 (0)