stream: pipeline with end option

Currently pipeline cannot fully replace pipe due
to the missing end option. This PR adds the end
option to the promisified pipeline method.

PR-URL: https://github.com/nodejs/node/pull/40886
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Robert Nagy
2021-11-19 19:14:32 +01:00
parent cb75dec299
commit 8ee4e672ec
3 changed files with 50 additions and 17 deletions

View File

@@ -109,7 +109,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}
async function pump(iterable, writable, finish) {
async function pump(iterable, writable, finish, opts) {
let error;
let onresolve = null;
@@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
}
}
writable.end();
if (opts?.end !== false) {
writable.end();
}
await wait();
@@ -227,17 +229,22 @@ function pipelineImpl(streams, callback, opts) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
if (end) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
} else {
stream.on('error', finish);
}
}
if (i === 0) {
@@ -282,14 +289,17 @@ function pipelineImpl(streams, callback, opts) {
then.call(ret,
(val) => {
value = val;
pt.end(val);
pt.write(val);
if (end) {
pt.end();
}
}, (err) => {
pt.destroy(err);
},
);
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish);
pump(ret, pt, finish, { end });
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
@@ -302,7 +312,7 @@ function pipelineImpl(streams, callback, opts) {
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);
ret.pipe(stream, { end });
// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
@@ -314,7 +324,7 @@ function pipelineImpl(streams, callback, opts) {
ret = makeAsyncIterable(ret);
finishCount++;
pump(ret, stream, finish);
pump(ret, stream, finish, { end });
}
ret = stream;
} else {

View File

@@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream');
function pipeline(...streams) {
return new Promise((resolve, reject) => {
let signal;
let end;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
end = options.end;
}
pl(streams, (err, value) => {
@@ -29,7 +31,7 @@ function pipeline(...streams) {
} else {
resolve(value);
}
}, { signal });
}, { signal, end });
});
}

View File

@@ -1465,5 +1465,26 @@ const tsp = require('timers/promises');
assert.strictEqual(duplex.destroyed, true);
}
run();
run().then(common.mustCall());
}
{
const pipelinePromise = promisify(pipeline);
async function run() {
const read = new Readable({
read() {}
});
const duplex = new PassThrough();
read.push(null);
await pipelinePromise(read, duplex, { end: false });
assert.strictEqual(duplex.destroyed, false);
assert.strictEqual(duplex.writableEnded, false);
}
run().then(common.mustCall());
}