stream: add map method to Readable

Implement the map method on readable stream. This starts the alignment
with the tc39-iterator-helpers proposal and adds a `.map` method to
every Node.js readable stream.

Co-Authored-By: Robert Nagy <ronag@icloud.com>

PR-URL: https://github.com/nodejs/node/pull/40815
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Benjamin Gruenbaum
2021-11-15 15:39:05 +02:00
committed by Robert Nagy
parent f81c62704f
commit b97b81d4ec
6 changed files with 317 additions and 3 deletions

View File

@@ -1737,6 +1737,50 @@ async function showBoth() {
showBoth();
```
### `readable.map(fn[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream mapped with the function `fn`.
This method allows mapping over the stream. The `fn` function will be called
for every item in the stream. If the `fn` function returns a promise - that
promise will be `await`ed before being passed to the result stream.
```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';
// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(item); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
```
### Duplex and transform streams
#### Class: `stream.Duplex`

View File

@@ -0,0 +1,152 @@
'use strict';
const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_TYPE,
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const {
MathFloor,
Promise,
PromiseReject,
PromisePrototypeCatch,
Symbol,
} = primordials;
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');
async function * map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
}
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}
let concurrency = 1;
if (options?.concurrency != null) {
concurrency = MathFloor(options.concurrency);
}
validateInteger(concurrency, 'concurrency', 1);
const ac = new AbortController();
const stream = this;
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };
const abort = () => ac.abort();
options?.signal?.addEventListener('abort', abort);
let next;
let resume;
let done = false;
function onDone() {
done = true;
}
async function pump() {
try {
for await (let val of stream) {
if (done) {
return;
}
if (signal.aborted) {
throw new AbortError();
}
try {
val = fn(val, signalOpt);
} catch (err) {
val = PromiseReject(err);
}
if (val === kEmpty) {
continue;
}
if (typeof val?.catch === 'function') {
val.catch(onDone);
}
queue.push(val);
if (next) {
next();
next = null;
}
if (!done && queue.length && queue.length >= concurrency) {
await new Promise((resolve) => {
resume = resolve;
});
}
}
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeCatch(val, onDone);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
}
options?.signal?.removeEventListener('abort', abort);
}
}
pump();
try {
while (true) {
while (queue.length > 0) {
const val = await queue[0];
if (val === kEof) {
return;
}
if (signal.aborted) {
throw new AbortError();
}
if (val !== kEmpty) {
yield val;
}
queue.shift();
if (resume) {
resume();
resume = null;
}
}
await new Promise((resolve) => {
next = resolve;
});
}
} finally {
ac.abort();
done = true;
if (resume) {
resume();
resume = null;
}
}
}
module.exports = {
map,
};

View File

@@ -23,12 +23,15 @@
const {
ObjectDefineProperty,
ObjectKeys,
ReflectApply,
} = primordials;
const {
promisify: { custom: customPromisify },
} = require('internal/util');
const operators = require('internal/streams/operators');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
@@ -42,6 +45,12 @@ const Stream = module.exports = require('internal/streams/legacy').Stream;
Stream.isDisturbed = utils.isDisturbed;
Stream.isErrored = utils.isErrored;
Stream.Readable = require('internal/streams/readable');
for (const key of ObjectKeys(operators)) {
const op = operators[key];
Stream.Readable.prototype[key] = function(...args) {
return Stream.Readable.from(ReflectApply(op, this, args));
};
}
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');

View File

@@ -110,6 +110,7 @@ const expectedModules = new Set([
'NativeModule internal/streams/end-of-stream',
'NativeModule internal/streams/from',
'NativeModule internal/streams/legacy',
'NativeModule internal/streams/operators',
'NativeModule internal/streams/passthrough',
'NativeModule internal/streams/pipeline',
'NativeModule internal/streams/readable',

View File

@@ -0,0 +1,108 @@
'use strict';
const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Map works on synchronous streams with an asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
await Promise.resolve();
return x + x;
});
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
return x + x;
}).map((x) => x + x);
const result = [4, 8, 12, 16, 20];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}
{
// Concurrency result order
const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
await setTimeout(10 - item, { signal });
return item;
}, { concurrency: 2 });
(async () => {
const expected = [1, 2];
for await (const item of stream) {
assert.strictEqual(item, expected.shift());
}
})().then(common.mustCall());
}
{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).map(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
assert.strictEqual(stream.readable, true);
}

View File

@@ -208,12 +208,12 @@ const customTypesMap = {
'Stream': 'stream.html#stream',
'stream.Duplex': 'stream.html#class-streamduplex',
'stream.Readable': 'stream.html#class-streamreadable',
'stream.Transform': 'stream.html#class-streamtransform',
'stream.Writable': 'stream.html#class-streamwritable',
'Duplex': 'stream.html#class-streamduplex',
'stream.Readable': 'stream.html#class-streamreadable',
'Readable': 'stream.html#class-streamreadable',
'stream.Transform': 'stream.html#class-streamtransform',
'Transform': 'stream.html#class-streamtransform',
'stream.Writable': 'stream.html#class-streamwritable',
'Writable': 'stream.html#class-streamwritable',
'Immediate': 'timers.html#class-immediate',