mirror of
https://github.com/zebrajr/node.git
synced 2026-01-15 12:15:26 +00:00
stream: add a non-destroying iterator to Readable
add a non-destroying iterator to Readable fixes: https://github.com/nodejs/node/issues/38491 PR-URL: https://github.com/nodejs/node/pull/38526 Fixes: https://github.com/nodejs/node/issues/38491 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
committed by
James M Snell
parent
5a67f1504f
commit
df85d37050
@@ -1508,13 +1508,69 @@ async function print(readable) {
|
||||
print(fs.createReadStream('file')).catch(console.error);
|
||||
```
|
||||
|
||||
If the loop terminates with a `break` or a `throw`, the stream will be
|
||||
destroyed. In other terms, iterating over a stream will consume the stream
|
||||
If the loop terminates with a `break`, `return`, or a `throw`, the stream will
|
||||
be destroyed. In other terms, iterating over a stream will consume the stream
|
||||
fully. The stream will be read in chunks of size equal to the `highWaterMark`
|
||||
option. In the code example above, data will be in a single chunk if the file
|
||||
has less then 64KB of data because no `highWaterMark` option is provided to
|
||||
[`fs.createReadStream()`][].
|
||||
|
||||
##### `readable.iterator([options])`
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `options` {Object}
|
||||
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
|
||||
async iterator, or exiting a `for await...of` iteration using a `break`,
|
||||
`return`, or `throw` will not destroy the stream. **Default:** `true`.
|
||||
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
|
||||
error while it's being iterated, the iterator will not destroy the stream.
|
||||
**Default:** `true`.
|
||||
* Returns: {AsyncIterator} to consume the stream.
|
||||
|
||||
The iterator created by this method gives users the option to cancel the
|
||||
destruction of the stream if the `for await...of` loop is exited by `return`,
|
||||
`break`, or `throw`, or if the iterator should destroy the stream if the stream
|
||||
emitted an error during iteration.
|
||||
|
||||
```js
|
||||
const { Readable } = require('stream');
|
||||
|
||||
async function printIterator(readable) {
|
||||
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
|
||||
console.log(chunk); // 1
|
||||
break;
|
||||
}
|
||||
|
||||
console.log(readable.destroyed); // false
|
||||
|
||||
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
|
||||
console.log(chunk); // Will print 2 and then 3
|
||||
}
|
||||
|
||||
console.log(readable.destroyed); // True, stream was totally consumed
|
||||
}
|
||||
|
||||
async function printSymbolAsyncIterator(readable) {
|
||||
for await (const chunk of readable) {
|
||||
console.log(chunk); // 1
|
||||
break;
|
||||
}
|
||||
|
||||
console.log(readable.destroyed); // true
|
||||
}
|
||||
|
||||
async function showBoth() {
|
||||
await printIterator(Readable.from([1, 2, 3]));
|
||||
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
|
||||
}
|
||||
|
||||
showBoth();
|
||||
```
|
||||
|
||||
### Duplex and transform streams
|
||||
|
||||
#### Class: `stream.Duplex`
|
||||
|
||||
@@ -62,6 +62,7 @@ const {
|
||||
ERR_METHOD_NOT_IMPLEMENTED,
|
||||
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
|
||||
} = require('internal/errors').codes;
|
||||
const { validateObject } = require('internal/validators');
|
||||
|
||||
const kPaused = Symbol('kPaused');
|
||||
|
||||
@@ -1062,8 +1063,17 @@ Readable.prototype.wrap = function(stream) {
|
||||
};
|
||||
|
||||
Readable.prototype[SymbolAsyncIterator] = function() {
|
||||
let stream = this;
|
||||
return streamToAsyncIterator(this);
|
||||
};
|
||||
|
||||
Readable.prototype.iterator = function(options) {
|
||||
if (options !== undefined) {
|
||||
validateObject(options, 'options');
|
||||
}
|
||||
return streamToAsyncIterator(this, options);
|
||||
};
|
||||
|
||||
function streamToAsyncIterator(stream, options) {
|
||||
if (typeof stream.read !== 'function') {
|
||||
// v1 stream
|
||||
const src = stream;
|
||||
@@ -1076,14 +1086,20 @@ Readable.prototype[SymbolAsyncIterator] = function() {
|
||||
}).wrap(src);
|
||||
}
|
||||
|
||||
const iter = createAsyncIterator(stream);
|
||||
const iter = createAsyncIterator(stream, options);
|
||||
iter.stream = stream;
|
||||
return iter;
|
||||
};
|
||||
}
|
||||
|
||||
async function* createAsyncIterator(stream) {
|
||||
async function* createAsyncIterator(stream, options) {
|
||||
let callback = nop;
|
||||
|
||||
const opts = {
|
||||
destroyOnReturn: true,
|
||||
destroyOnError: true,
|
||||
...options,
|
||||
};
|
||||
|
||||
function next(resolve) {
|
||||
if (this === stream) {
|
||||
callback();
|
||||
@@ -1116,6 +1132,7 @@ async function* createAsyncIterator(stream) {
|
||||
next.call(this);
|
||||
});
|
||||
|
||||
let errorThrown = false;
|
||||
try {
|
||||
while (true) {
|
||||
const chunk = stream.destroyed ? null : stream.read();
|
||||
@@ -1132,12 +1149,17 @@ async function* createAsyncIterator(stream) {
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
destroyImpl.destroyer(stream, err);
|
||||
if (opts.destroyOnError) {
|
||||
destroyImpl.destroyer(stream, err);
|
||||
}
|
||||
errorThrown = true;
|
||||
throw err;
|
||||
} finally {
|
||||
if (state.autoDestroy || !endEmitted) {
|
||||
// TODO(ronag): ERR_PREMATURE_CLOSE?
|
||||
destroyImpl.destroyer(stream, null);
|
||||
if (!errorThrown && opts.destroyOnReturn) {
|
||||
if (state.autoDestroy || !endEmitted) {
|
||||
// TODO(ronag): ERR_PREMATURE_CLOSE?
|
||||
destroyImpl.destroyer(stream, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -693,6 +693,122 @@ async function tests() {
|
||||
});
|
||||
}
|
||||
|
||||
// AsyncIterator non-destroying iterator
|
||||
{
|
||||
function createReadable() {
|
||||
return Readable.from((async function* () {
|
||||
await Promise.resolve();
|
||||
yield 5;
|
||||
await Promise.resolve();
|
||||
yield 7;
|
||||
await Promise.resolve();
|
||||
})());
|
||||
}
|
||||
|
||||
function createErrorReadable() {
|
||||
const opts = { read() { throw new Error('inner'); } };
|
||||
return new Readable(opts);
|
||||
}
|
||||
|
||||
// Check default destroys on return
|
||||
(async function() {
|
||||
const readable = createReadable();
|
||||
for await (const chunk of readable.iterator()) {
|
||||
assert.strictEqual(chunk, 5);
|
||||
break;
|
||||
}
|
||||
|
||||
assert.ok(readable.destroyed);
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Check explicit destroying on return
|
||||
(async function() {
|
||||
const readable = createReadable();
|
||||
for await (const chunk of readable.iterator({ destroyOnReturn: true })) {
|
||||
assert.strictEqual(chunk, 5);
|
||||
break;
|
||||
}
|
||||
|
||||
assert.ok(readable.destroyed);
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Check default destroys on error
|
||||
(async function() {
|
||||
const readable = createErrorReadable();
|
||||
try {
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
for await (const chunk of readable) { }
|
||||
assert.fail('should have thrown');
|
||||
} catch (err) {
|
||||
assert.strictEqual(err.message, 'inner');
|
||||
}
|
||||
|
||||
assert.ok(readable.destroyed);
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Check explicit destroys on error
|
||||
(async function() {
|
||||
const readable = createErrorReadable();
|
||||
const opts = { destroyOnError: true, destroyOnReturn: false };
|
||||
try {
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
for await (const chunk of readable.iterator(opts)) { }
|
||||
assert.fail('should have thrown');
|
||||
} catch (err) {
|
||||
assert.strictEqual(err.message, 'inner');
|
||||
}
|
||||
|
||||
assert.ok(readable.destroyed);
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Check explicit non-destroy with return true
|
||||
(async function() {
|
||||
const readable = createErrorReadable();
|
||||
const opts = { destroyOnError: false, destroyOnReturn: true };
|
||||
try {
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
for await (const chunk of readable.iterator(opts)) { }
|
||||
assert.fail('should have thrown');
|
||||
} catch (err) {
|
||||
assert.strictEqual(err.message, 'inner');
|
||||
}
|
||||
|
||||
assert.ok(!readable.destroyed);
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Check explicit non-destroy with return true
|
||||
(async function() {
|
||||
const readable = createReadable();
|
||||
const opts = { destroyOnReturn: false };
|
||||
for await (const chunk of readable.iterator(opts)) {
|
||||
assert.strictEqual(chunk, 5);
|
||||
break;
|
||||
}
|
||||
|
||||
assert.ok(!readable.destroyed);
|
||||
|
||||
for await (const chunk of readable.iterator(opts)) {
|
||||
assert.strictEqual(chunk, 7);
|
||||
}
|
||||
|
||||
assert.ok(readable.destroyed);
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Check non-object options.
|
||||
{
|
||||
const readable = createReadable();
|
||||
assert.throws(
|
||||
() => readable.iterator(42),
|
||||
{
|
||||
code: 'ERR_INVALID_ARG_TYPE',
|
||||
name: 'TypeError',
|
||||
message: 'The "options" argument must be of type object. Received ' +
|
||||
'type number (42)',
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let _req;
|
||||
const server = http.createServer((request, response) => {
|
||||
|
||||
Reference in New Issue
Block a user