stream: add promises version to utility functions

PR-URL: https://github.com/nodejs/node/pull/33991
Fixes: https://github.com/nodejs/node/issues/33582
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
This commit is contained in:
rickyes
2020-06-28 16:29:01 +08:00
committed by Robert Nagy
parent 6ae1b9c457
commit 527e2147af
5 changed files with 193 additions and 8 deletions

View File

@@ -48,6 +48,13 @@ Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][] and
[`stream.Readable.from()`][].
### Streams Promises API
The `stream/promises` API provides an alternative set of asynchronous utility
functions for streams that return `Promise` objects rather than using
callbacks. The API is accessible via `require('stream/promises')`
or `require('stream').promises`.
### Object mode
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
@@ -1597,10 +1604,10 @@ Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.
The `finished` API is promisify-able as well;
The `finished` API provides promise version:
```js
const finished = util.promisify(stream.finished);
const { finished } = require('stream/promises');
const rs = fs.createReadStream('archive.tar');
@@ -1684,10 +1691,10 @@ pipeline(
);
```
The `pipeline` API is promisify-able as well:
The `pipeline` API provides promise version:
```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');
async function run() {
await pipeline(
@@ -1704,7 +1711,7 @@ run().catch(console.error);
The `pipeline` API also supports async generators:
```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');
const fs = require('fs');
async function run() {
@@ -2927,9 +2934,9 @@ handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
the handling of backpressure and backpressure-related errors:
```js
const { pipeline } = require('stream');
const util = require('util');
const fs = require('fs');
const { pipeline } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');
const writable = fs.createWriteStream('./file');
@@ -2943,7 +2950,6 @@ pipeline(iterator, writable, (err, value) => {
});
// Promise Pattern
const pipelinePromise = util.promisify(pipeline);
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');

View File

@@ -21,10 +21,21 @@
'use strict';
const {
ObjectDefineProperty,
} = primordials;
const {
promisify: { custom: customPromisify },
} = require('internal/util');
const pipeline = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
// Lazy loaded
let promises = null;
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
@@ -38,6 +49,31 @@ Stream.PassThrough = require('_stream_passthrough');
Stream.pipeline = pipeline;
Stream.finished = eos;
ObjectDefineProperty(Stream, 'promises', {
configurable: true,
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises;
}
});
ObjectDefineProperty(pipeline, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.pipeline;
}
});
ObjectDefineProperty(eos, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.finished;
}
});
// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

39
lib/stream/promises.js Normal file
View File

@@ -0,0 +1,39 @@
'use strict';
const {
Promise,
} = primordials;
let pl;
let eos;
function pipeline(...streams) {
if (!pl) pl = require('internal/streams/pipeline');
return new Promise((resolve, reject) => {
pl(...streams, (err, value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
});
}
function finished(stream, opts) {
if (!eos) eos = require('internal/streams/end-of-stream');
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
module.exports = {
finished,
pipeline,
};

View File

@@ -78,6 +78,7 @@
'lib/readline.js',
'lib/repl.js',
'lib/stream.js',
'lib/stream/promises.js',
'lib/_stream_readable.js',
'lib/_stream_writable.js',
'lib/_stream_duplex.js',

View File

@@ -0,0 +1,103 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const {
Readable,
Writable,
promises,
} = stream;
const {
finished,
pipeline,
} = require('stream/promises');
const fs = require('fs');
const assert = require('assert');
const { promisify } = require('util');
assert.strictEqual(promises.pipeline, pipeline);
assert.strictEqual(promises.finished, finished);
assert.strictEqual(pipeline, promisify(stream.pipeline));
assert.strictEqual(finished, promisify(stream.finished));
// pipeline success
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c')
];
const read = new Readable({
read() { }
});
const write = new Writable({
write(data, enc, cb) {
processed.push(data);
cb();
}
});
write.on('finish', () => {
finished = true;
});
for (let i = 0; i < expected.length; i++) {
read.push(expected[i]);
}
read.push(null);
pipeline(read, write).then(common.mustCall((value) => {
assert.ok(finished);
assert.deepStrictEqual(processed, expected);
}));
}
// pipeline error
{
const read = new Readable({
read() { }
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.push('data');
setImmediate(() => read.destroy());
pipeline(read, write).catch(common.mustCall((err) => {
assert.ok(err, 'should have an error');
}));
}
// finished success
{
async function run() {
const rs = fs.createReadStream(__filename);
let ended = false;
rs.resume();
rs.on('end', () => {
ended = true;
});
await finished(rs);
assert(ended);
}
run().then(common.mustCall());
}
// finished error
{
const rs = fs.createReadStream('file-does-not-exist');
assert.rejects(finished(rs), {
code: 'ENOENT'
}).then(common.mustCall());
}