Skip to content

Commit 46c3b89

Browse files
domenicricea
andauthored
Allow aborting an ongoing pipe operation using AbortSignals
Closes #446. Co-authored-by: Adam Rice <[email protected]>
1 parent 3197c7e commit 46c3b89

File tree

5 files changed

+152
-7
lines changed

5 files changed

+152
-7
lines changed

index.bs

+27-3
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ like
401401
<a href="#rs-cancel">cancel</a>(reason)
402402
<a href="#rs-get-reader">getReader</a>()
403403
<a href="#rs-pipe-through">pipeThrough</a>({ writable, readable }, options)
404-
<a href="#rs-pipe-to">pipeTo</a>(dest, { preventClose, preventAbort, preventCancel } = {})
404+
<a href="#rs-pipe-to">pipeTo</a>(dest, { preventClose, preventAbort, preventCancel, signal } = {})
405405
<a href="#rs-tee">tee</a>()
406406
}
407407
</code></pre>
@@ -701,8 +701,9 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
701701
</code></pre>
702702
</div>
703703

704-
<h5 id="rs-pipe-to" method for="ReadableStream" lt="pipeTo(dest, options)">pipeTo(<var ignore>dest</var>, {
705-
<var ignore>preventClose</var>, <var ignore>preventAbort</var>, <var ignore>preventCancel</var> } = {})</h5>
704+
<h5 id="rs-pipe-to" method for="ReadableStream" lt="pipeTo(dest, options)">pipeTo(<var ignore>dest</var>,
705+
{ <var ignore>preventClose</var>, <var ignore>preventAbort</var>, <var ignore>preventCancel</var>,
706+
<var ignore>signal</var> } = {})</h5>
706707

707708
<div class="note">
708709
The <code>pipeTo</code> method <a lt="piping">pipes</a> this <a>readable stream</a> to a given <a>writable
@@ -733,13 +734,21 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
733734
promise will be rejected with an error indicating piping to a closed stream failed, or with any error that occurs
734735
during canceling the source.</p></li>
735736
</ul>
737+
738+
The <code>signal</code> option can be set to an {{AbortSignal}} to allow aborting an ongoing pipe operation via the
739+
corresponding {{AbortController}}. In this case, the source <a>readable stream</a> will be <a lt="cancel a readable
740+
stream">canceled</a>, and the destination <a>writable stream</a> <a lt="abort a writable stream">aborted</a>, unless
741+
the respective options <code>preventCancel</code> or <code>preventAbort</code> are set.
742+
736743
</div>
737744

738745
<emu-alg>
739746
1. If ! IsReadableStream(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
740747
1. If ! IsWritableStream(_dest_) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
741748
1. Set _preventClose_ to ! ToBoolean(_preventClose_), set _preventAbort_ to ! ToBoolean(_preventAbort_), and set
742749
_preventCancel_ to ! ToBoolean(_preventCancel_).
750+
1. If _signal_ is not *undefined*, and _signal_ is not an instance of the `<a idl>AbortSignal</a>` interface, return
751+
<a>a promise rejected with</a> a *TypeError* exception.
743752
1. If ! IsReadableStreamLocked(*this*) is *true*, return <a>a promise rejected with</a> a *TypeError* exception.
744753
1. If ! IsWritableStreamLocked(_dest_) is *true*, return <a>a promise rejected with</a> a *TypeError* exception.
745754
1. If ! IsReadableByteStreamController(*this*.[[readableStreamController]]) is *true*, let _reader_ be either !
@@ -749,6 +758,20 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
749758
1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_).
750759
1. Let _shuttingDown_ be *false*.
751760
1. Let _promise_ be <a>a new promise</a>.
761+
1. If _signal_ is not *undefined*,
762+
1. Let _abortAlgorithm_ be the following steps:
763+
1. Let _error_ be a new "`<a idl>AbortError</a>`" `<a idl>DOMException</a>`.
764+
1. Let _actions_ be an empty <a>ordered set</a>.
765+
1. If _preventAbort_ is *false*, <a for="set">append</a> the following action to _actions_:
766+
1. If _dest_.[[_state_]] is `"writable"`, return ! WritableStreamAbort(_dest_, _error_).
767+
1. Otherwise, return <a>a promise resolved with</a> *undefined*.
768+
1. If _preventCancel_ is *false*, <a for="set">append</a> the following action action to _actions_:
769+
1. If *this*.[[_state_]] is `"readable"`, return ! ReadableStreamCancel(*this*, _error_).
770+
1. Otherwise, return <a>a promise resolved with</a> *undefined*.
771+
1. <a href="#rs-pipeTo-shutdown-with-action">Shutdown with an action</a> consisting of <a>waiting for all</a>
772+
of the actions in _actions_, and with _error_.
773+
1. If _signal_'s <a for=AbortSignal>aborted flag</a> is set, perform _abortAlgorithm_ and return _promise_.
774+
1. <a for="AbortSignal">Add</a> _abortAlgorithm_ to _signal_.
752775
1. <a>In parallel</a> <span class="XXX">but not really; see <a
753776
href="https://github.com/whatwg/streams/issues/905">#905</a></span>, using _reader_ and _writer_, read all
754777
<a>chunks</a> from *this* and write them to _dest_. Due to the locking provided by the reader and writer, the exact
@@ -817,6 +840,7 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
817840
an error _error_, which means to perform the following steps:
818841
1. Perform ! WritableStreamDefaultWriterRelease(_writer_).
819842
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
843+
1. If _signal_ is not *undefined*, <a for="AbortSignal">remove</a> _abortAlgorithm_ from _signal_.
820844
1. If _error_ was given, <a>reject</a> _promise_ with _error_.
821845
1. Otherwise, <a>resolve</a> _promise_ with *undefined*.
822846
1. Return _promise_.

reference-implementation/lib/helpers.js

+64
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,67 @@ exports.MakeSizeAlgorithmFromSizeFunction = size => {
156156
}
157157
return chunk => size(chunk);
158158
};
159+
160+
exports.PerformPromiseThen = (promise, onFulfilled, onRejected) => {
161+
// There doesn't appear to be any way to correctly emulate the behaviour from JavaScript, so this is just an
162+
// approximation.
163+
return Promise.prototype.then.call(promise, onFulfilled, onRejected);
164+
};
165+
166+
exports.WaitForAll = (promises, successSteps, failureSteps) => {
167+
let rejected = false;
168+
const rejectionHandler = arg => {
169+
if (rejected === false) {
170+
rejected = true;
171+
failureSteps(arg);
172+
}
173+
};
174+
let index = 0;
175+
let fulfilledCount = 0;
176+
const total = promises.length;
177+
const result = new Array(total);
178+
for (const promise of promises) {
179+
const promiseIndex = index;
180+
const fulfillmentHandler = arg => {
181+
result[promiseIndex] = arg;
182+
++fulfilledCount;
183+
if (fulfilledCount === total) {
184+
successSteps(result);
185+
}
186+
};
187+
exports.PerformPromiseThen(promise, fulfillmentHandler, rejectionHandler);
188+
++index;
189+
}
190+
};
191+
192+
exports.WaitForAllPromise = (promises, successSteps, failureSteps = undefined) => {
193+
let resolvePromise;
194+
let rejectPromise;
195+
const promise = new Promise((resolve, reject) => {
196+
resolvePromise = resolve;
197+
rejectPromise = reject;
198+
});
199+
if (failureSteps === undefined) {
200+
failureSteps = arg => {
201+
throw arg;
202+
};
203+
}
204+
const successStepsWrapper = results => {
205+
try {
206+
const stepsResult = successSteps(results);
207+
resolvePromise(stepsResult);
208+
} catch (e) {
209+
rejectPromise(e);
210+
}
211+
};
212+
const failureStepsWrapper = reason => {
213+
try {
214+
const stepsResult = failureSteps(reason);
215+
resolvePromise(stepsResult);
216+
} catch (e) {
217+
rejectPromise(e);
218+
}
219+
};
220+
exports.WaitForAll(promises, successStepsWrapper, failureStepsWrapper);
221+
return promise;
222+
};

reference-implementation/lib/readable-stream.js

+59-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
'use strict';
2+
/* global AbortSignal:false */
3+
24
const assert = require('better-assert');
35
const { ArrayBufferCopy, CreateAlgorithmFromUnderlyingMethod, IsFiniteNonNegativeNumber, InvokeOrNoop,
46
IsDetachedBuffer, TransferArrayBuffer, ValidateAndNormalizeHighWaterMark, IsNonNegativeNumber,
5-
MakeSizeAlgorithmFromSizeFunction, createArrayFromList, typeIsObject } = require('./helpers.js');
7+
MakeSizeAlgorithmFromSizeFunction, createArrayFromList, typeIsObject, WaitForAllPromise } =
8+
require('./helpers.js');
69
const { rethrowAssertionErrorRejection } = require('./utils.js');
710
const { DequeueValue, EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
811
const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLocked,
@@ -97,7 +100,7 @@ class ReadableStream {
97100
return readable;
98101
}
99102

100-
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
103+
pipeTo(dest, { preventClose, preventAbort, preventCancel, signal } = {}) {
101104
if (IsReadableStream(this) === false) {
102105
return Promise.reject(streamBrandCheckException('pipeTo'));
103106
}
@@ -110,6 +113,10 @@ class ReadableStream {
110113
preventAbort = Boolean(preventAbort);
111114
preventCancel = Boolean(preventCancel);
112115

116+
if (signal !== undefined && !isAbortSignal(signal)) {
117+
return Promise.reject(new TypeError('ReadableStream.prototype.pipeTo\'s signal option must be an AbortSignal'));
118+
}
119+
113120
if (IsReadableStreamLocked(this) === true) {
114121
return Promise.reject(new TypeError('ReadableStream.prototype.pipeTo cannot be used on a locked ReadableStream'));
115122
}
@@ -126,6 +133,38 @@ class ReadableStream {
126133
let currentWrite = Promise.resolve();
127134

128135
return new Promise((resolve, reject) => {
136+
let abortAlgorithm;
137+
if (signal !== undefined) {
138+
abortAlgorithm = () => {
139+
const error = new DOMException('Aborted', 'AbortError');
140+
const actions = [];
141+
if (preventAbort === false) {
142+
actions.push(() => {
143+
if (dest._state === 'writable') {
144+
return WritableStreamAbort(dest, error);
145+
}
146+
return Promise.resolve();
147+
});
148+
}
149+
if (preventCancel === false) {
150+
actions.push(() => {
151+
if (this._state === 'readable') {
152+
return ReadableStreamCancel(this, error);
153+
}
154+
return Promise.resolve();
155+
});
156+
}
157+
shutdownWithAction(() => WaitForAllPromise(actions.map(action => action()), results => results), true, error);
158+
};
159+
160+
if (signal.aborted === true) {
161+
abortAlgorithm();
162+
return;
163+
}
164+
165+
signal.addEventListener('abort', abortAlgorithm);
166+
}
167+
129168
// Using reader and writer, read all chunks from this and write them to dest
130169
// - Backpressure must be enforced
131170
// - Shutdown must stop all activity
@@ -250,6 +289,9 @@ class ReadableStream {
250289
WritableStreamDefaultWriterRelease(writer);
251290
ReadableStreamReaderGenericRelease(reader);
252291

292+
if (signal !== undefined) {
293+
signal.removeEventListener('abort', abortAlgorithm);
294+
}
253295
if (isError) {
254296
reject(error);
255297
} else {
@@ -1952,6 +1994,21 @@ function SetUpReadableStreamBYOBRequest(request, controller, view) {
19521994

19531995
// Helper functions for the ReadableStream.
19541996

1997+
function isAbortSignal(value) {
1998+
if (typeof value !== 'object' || value === null) {
1999+
return false;
2000+
}
2001+
2002+
// Use the brand check to distinguish a real AbortSignal from a fake one.
2003+
const aborted = Object.getOwnPropertyDescriptor(AbortSignal.prototype, 'aborted').get;
2004+
try {
2005+
aborted.call(value);
2006+
return true;
2007+
} catch (e) {
2008+
return false;
2009+
}
2010+
}
2011+
19552012
function streamBrandCheckException(name) {
19562013
return new TypeError(`ReadableStream.prototype.${name} can only be used on a ReadableStream`);
19572014
}

reference-implementation/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"minimatch": "^3.0.4",
1919
"nyc": "^13.0.1",
2020
"opener": "^1.5.1",
21-
"wpt-runner": "^2.2.0"
21+
"wpt-runner": "^2.6.0"
2222
},
2323
"nyc": {
2424
"include": [

0 commit comments

Comments
 (0)