mirror of
https://github.com/zebrajr/node.git
synced 2026-01-15 12:15:26 +00:00
stream: add AbortSignal to promisified pipeline
add support for AbortSignal to promisified pipeline. Resolves: https://github.com/nodejs/node/issues/37321 PR-URL: https://github.com/nodejs/node/pull/37359 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Zijian Liu <lxxyxzj@gmail.com>
This commit is contained in:
committed by
Benjamin Gruenbaum
parent
bb35b6efa6
commit
38f6e5a789
@@ -1719,7 +1719,11 @@ pipeline(
|
||||
);
|
||||
```
|
||||
|
||||
The `pipeline` API provides promise version:
|
||||
The `pipeline` API provides a promise version, which can also
|
||||
receive an options argument as the last parameter with a
|
||||
`signal` {AbortSignal} property. When the signal is aborted,
|
||||
`destroy` will be called on the underlying pipeline, with an
|
||||
`AbortError`.
|
||||
|
||||
```js
|
||||
const { pipeline } = require('stream/promises');
|
||||
@@ -1736,6 +1740,30 @@ async function run() {
|
||||
run().catch(console.error);
|
||||
```
|
||||
|
||||
To use an `AbortSignal`, pass it inside an options object,
|
||||
as the last argument:
|
||||
|
||||
```js
|
||||
const { pipeline } = require('stream/promises');
|
||||
|
||||
async function run() {
|
||||
const ac = new AbortController();
|
||||
const options = {
|
||||
signal: ac.signal,
|
||||
};
|
||||
|
||||
setTimeout(() => ac.abort(), 1);
|
||||
await pipeline(
|
||||
fs.createReadStream('archive.tar'),
|
||||
zlib.createGzip(),
|
||||
fs.createWriteStream('archive.tar.gz'),
|
||||
options,
|
||||
);
|
||||
}
|
||||
|
||||
run().catch(console.error); // AbortError
|
||||
```
|
||||
|
||||
The `pipeline` API also supports async generators:
|
||||
|
||||
```js
|
||||
|
||||
@@ -1,22 +1,65 @@
|
||||
'use strict';
|
||||
|
||||
const {
|
||||
ArrayPrototypePop,
|
||||
Promise,
|
||||
SymbolAsyncIterator,
|
||||
SymbolIterator,
|
||||
} = primordials;
|
||||
|
||||
const {
|
||||
addAbortSignalNoValidate,
|
||||
} = require('internal/streams/add-abort-signal');
|
||||
|
||||
const {
|
||||
validateAbortSignal,
|
||||
} = require('internal/validators');
|
||||
|
||||
let pl;
|
||||
let eos;
|
||||
|
||||
function isReadable(obj) {
|
||||
return !!(obj && typeof obj.pipe === 'function');
|
||||
}
|
||||
|
||||
function isWritable(obj) {
|
||||
return !!(obj && typeof obj.write === 'function');
|
||||
}
|
||||
|
||||
function isStream(obj) {
|
||||
return isReadable(obj) || isWritable(obj);
|
||||
}
|
||||
|
||||
function isIterable(obj, isAsync) {
|
||||
if (!obj) return false;
|
||||
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
|
||||
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
|
||||
return typeof obj[SymbolAsyncIterator] === 'function' ||
|
||||
typeof obj[SymbolIterator] === 'function';
|
||||
}
|
||||
|
||||
function pipeline(...streams) {
|
||||
if (!pl) pl = require('internal/streams/pipeline');
|
||||
return new Promise((resolve, reject) => {
|
||||
pl(...streams, (err, value) => {
|
||||
let signal;
|
||||
const lastArg = streams[streams.length - 1];
|
||||
if (lastArg && typeof lastArg === 'object' &&
|
||||
!isStream(lastArg) && !isIterable(lastArg)) {
|
||||
const options = ArrayPrototypePop(streams);
|
||||
signal = options.signal;
|
||||
validateAbortSignal(signal, 'options.signal');
|
||||
}
|
||||
|
||||
const pipe = pl(...streams, (err, value) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(value);
|
||||
}
|
||||
});
|
||||
if (signal) {
|
||||
addAbortSignalNoValidate(signal, pipe);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -469,6 +469,78 @@ const net = require('net');
|
||||
run();
|
||||
}
|
||||
|
||||
{
|
||||
// Check aborted signal without values
|
||||
const pipelinePromise = promisify(pipeline);
|
||||
async function run() {
|
||||
const ac = new AbortController();
|
||||
const { signal } = ac;
|
||||
async function* producer() {
|
||||
ac.abort();
|
||||
await Promise.resolve();
|
||||
yield '8';
|
||||
}
|
||||
|
||||
const w = new Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
callback();
|
||||
}
|
||||
});
|
||||
await pipelinePromise(producer, w, { signal });
|
||||
}
|
||||
|
||||
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Check aborted signal after init.
|
||||
const pipelinePromise = promisify(pipeline);
|
||||
async function run() {
|
||||
const ac = new AbortController();
|
||||
const { signal } = ac;
|
||||
async function* producer() {
|
||||
yield '5';
|
||||
await Promise.resolve();
|
||||
ac.abort();
|
||||
await Promise.resolve();
|
||||
yield '8';
|
||||
}
|
||||
|
||||
const w = new Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
callback();
|
||||
}
|
||||
});
|
||||
await pipelinePromise(producer, w, { signal });
|
||||
}
|
||||
|
||||
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Check pre-aborted signal
|
||||
const pipelinePromise = promisify(pipeline);
|
||||
async function run() {
|
||||
const ac = new AbortController();
|
||||
const { signal } = ac;
|
||||
ac.abort();
|
||||
async function* producer() {
|
||||
yield '5';
|
||||
await Promise.resolve();
|
||||
yield '8';
|
||||
}
|
||||
|
||||
const w = new Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
callback();
|
||||
}
|
||||
});
|
||||
await pipelinePromise(producer, w, { signal });
|
||||
}
|
||||
|
||||
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
|
||||
Reference in New Issue
Block a user