stream: add compose operator

PR-URL: https://github.com/nodejs/node/pull/44937
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
This commit is contained in:
Raz Luvaton
2022-10-31 15:57:02 +02:00
committed by GitHub
parent ffa2e964e8
commit ddb3ae7c30
4 changed files with 210 additions and 15 deletions

View File

@@ -1681,6 +1681,41 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].
##### `readable.compose(stream[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `stream` {Stream|Iterable|AsyncIterable|Function}
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Duplex} a stream composed with the stream `stream`.
```mjs
import { Readable } from 'node:stream';
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');
for (const word of words) {
yield word;
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
```
See [`stream.compose`][] for more information.
##### `readable.iterator([options])`
<!-- YAML
@@ -2720,6 +2755,8 @@ await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
```
See [`readable.compose(stream)`][] for `stream.compose` as operator.
### `stream.Readable.from(iterable[, options])`
<!-- YAML
@@ -4487,11 +4524,13 @@ contain multi-byte characters.
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`readable._read()`]: #readable_readsize
[`readable.compose(stream)`]: #readablecomposestream-options
[`readable.map`]: #readablemapfn-options
[`readable.push('')`]: #readablepush
[`readable.setEncoding()`]: #readablesetencodingencoding
[`stream.Readable.from()`]: #streamreadablefromiterable-options
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
[`stream.compose`]: #streamcomposestreams
[`stream.cork()`]: #writablecork
[`stream.finished()`]: #streamfinishedstream-options-callback
[`stream.pipe()`]: #readablepipedestination-options

View File

@@ -4,6 +4,7 @@ const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
@@ -17,6 +18,11 @@ const {
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
const staticCompose = require('internal/streams/compose');
const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');
const { isWritable, isNodeStream } = require('internal/streams/utils');
const {
ArrayPrototypePush,
@@ -32,6 +38,31 @@ const {
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');
function compose(stream, options) {
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
if (isNodeStream(stream) && !isWritable(stream)) {
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
}
const composedStream = staticCompose(this, stream);
if (options?.signal) {
// Not validating as we already validated before
addAbortSignalNoValidate(
options.signal,
composedStream
);
}
return composedStream;
}
function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
@@ -392,6 +423,7 @@ module.exports.streamReturningOperators = {
flatMap,
map,
take,
compose,
};
module.exports.promiseReturningOperators = {

View File

@@ -0,0 +1,127 @@
'use strict';
const common = require('../common');
const {
Readable, Transform,
} = require('stream');
const assert = require('assert');
{
// with async generator
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
let str = '';
for await (const chunk of stream) {
str += chunk;
if (str.length === 2) {
yield str;
str = '';
}
}
});
const result = ['ab', 'cd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// With Transformer
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
}, 4)
}));
const result = ['a', 'b', 'c', 'd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Throwing an error during `compose` (before waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
throw new Error('boom');
});
assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, /boom/).then(common.mustCall());
}
{
// Throwing an error during `compose` (when waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
for await (const chunk of stream) {
if (chunk === 3) {
throw new Error('boom');
}
yield chunk;
}
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}
{
// Throwing an error during `compose` (after finishing all readable data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
// eslint-disable-next-line no-unused-vars,no-empty
for await (const chunk of stream) {
}
throw new Error('boom');
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}
{
// AbortSignal
const ac = new AbortController();
const stream = Readable.from([1, 2, 3, 4, 5])
.compose(async function *(source) {
// Should not reach here
for await (const chunk of source) {
yield chunk;
}
}, { signal: ac.signal });
ac.abort();
assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}
{
assert.throws(
() => Readable.from(['a']).compose(Readable.from(['b'])),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}
{
assert.throws(
() => Readable.from(['a']).compose(),
{ code: 'ERR_INVALID_ARG_TYPE' }
);
}

View File

@@ -358,27 +358,24 @@ const assert = require('assert');
}
{
try {
compose();
} catch (err) {
assert.strictEqual(err.code, 'ERR_MISSING_ARGS');
}
assert.throws(
() => compose(),
{ code: 'ERR_MISSING_ARGS' }
);
}
{
try {
compose(new Writable(), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new Writable(), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}
{
try {
compose(new PassThrough(), new Readable({ read() {} }), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}
{