stream: add CompressionStream and DecompressionStream

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: https://github.com/nodejs/node/pull/39348
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell
2021-07-10 20:56:56 -07:00
parent 25e2f177cb
commit 09ad64d66d
4 changed files with 281 additions and 0 deletions

View File

@@ -1217,5 +1217,56 @@ added: REPLACEME
* Type: {WritableStream}
### Class: `CompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new CompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `compressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `compressionStream.writable`
<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
### Class: `DecompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new DecompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `decompressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `deccompressionStream.writable`
<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/

View File

@@ -0,0 +1,164 @@
'use strict';
const {
ObjectDefineProperties,
Symbol,
} = primordials;
const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_INVALID_THIS,
},
} = require('internal/errors');
const {
newReadableWritablePairFromDuplex,
} = require('internal/webstreams/adapters');
const {
customInspect,
kEnumerableProperty,
} = require('internal/webstreams/util');
const {
customInspectSymbol: kInspect,
} = require('internal/util');
let zlib;
function lazyZlib() {
zlib ??= require('zlib');
return zlib;
}
const kHandle = Symbol('kHandle');
const kTransform = Symbol('kTransform');
const kType = Symbol('kType');
/**
* @typedef {import('./readablestream').ReadableStream} ReadableStream
* @typedef {import('./writablestream').WritableStream} WritableStream
*/
function isCompressionStream(value) {
return typeof value?.[kHandle] === 'object' &&
value?.[kType] === 'CompressionStream';
}
function isDecompressionStream(value) {
return typeof value?.[kHandle] === 'object' &&
value?.[kType] === 'DecompressionStream';
}
class CompressionStream {
/**
* @param {'deflate'|'gzip'} format
*/
constructor(format) {
this[kType] = 'CompressionStream';
switch (format) {
case 'deflate':
this[kHandle] = lazyZlib().createDeflate();
break;
case 'gzip':
this[kHandle] = lazyZlib().createGzip();
break;
default:
throw new ERR_INVALID_ARG_VALUE('format', format);
}
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
}
/**
* @readonly
* @type {ReadableStream}
*/
get readable() {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
return this[kTransform].readable;
}
/**
* @readonly
* @type {WritableStream}
*/
get writable() {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
return this[kTransform].writable;
}
[kInspect](depth, options) {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
customInspect(depth, options, 'CompressionStream', {
readable: this[kTransform].readable,
writable: this[kTransform].writable,
});
}
}
class DecompressionStream {
/**
* @param {'deflate'|'gzip'} format
*/
constructor(format) {
this[kType] = 'DecompressionStream';
switch (format) {
case 'deflate':
this[kHandle] = lazyZlib().createInflate();
break;
case 'gzip':
this[kHandle] = lazyZlib().createGunzip();
break;
default:
throw new ERR_INVALID_ARG_VALUE('format', format);
}
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
}
/**
* @readonly
* @type {ReadableStream}
*/
get readable() {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
return this[kTransform].readable;
}
/**
* @readonly
* @type {WritableStream}
*/
get writable() {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
return this[kTransform].writable;
}
[kInspect](depth, options) {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
customInspect(depth, options, 'DecompressionStream', {
readable: this[kTransform].readable,
writable: this[kTransform].writable,
});
}
}
ObjectDefineProperties(CompressionStream.prototype, {
readable: kEnumerableProperty,
writable: kEnumerableProperty,
});
ObjectDefineProperties(DecompressionStream.prototype, {
readable: kEnumerableProperty,
writable: kEnumerableProperty,
});
module.exports = {
CompressionStream,
DecompressionStream,
};

View File

@@ -36,6 +36,11 @@ const {
TextDecoderStream,
} = require('internal/webstreams/encoding');
const {
CompressionStream,
DecompressionStream,
} = require('internal/webstreams/compression');
module.exports = {
ReadableStream,
ReadableStreamDefaultReader,
@@ -52,4 +57,6 @@ module.exports = {
CountQueuingStrategy,
TextEncoderStream,
TextDecoderStream,
CompressionStream,
DecompressionStream,
};

View File

@@ -0,0 +1,59 @@
// Flags: --no-warnings
'use strict';
const common = require('../common');
const {
CompressionStream,
DecompressionStream,
} = require('stream/web');
const assert = require('assert');
const dec = new TextDecoder();
async function test(format) {
const gzip = new CompressionStream(format);
const gunzip = new DecompressionStream(format);
gzip.readable.pipeTo(gunzip.writable).then(common.mustCall());
const reader = gunzip.readable.getReader();
const writer = gzip.writable.getWriter();
await Promise.all([
reader.read().then(({ value, done }) => {
assert.strictEqual(dec.decode(value), 'hello');
}),
reader.read().then(({ done }) => assert(done)),
writer.write('hello'),
writer.close(),
]);
}
Promise.all(['gzip', 'deflate'].map((i) => test(i))).then(common.mustCall());
[1, 'hello', false, {}].forEach((i) => {
assert.throws(() => new CompressionStream(i), {
code: 'ERR_INVALID_ARG_VALUE',
});
assert.throws(() => new DecompressionStream(i), {
code: 'ERR_INVALID_ARG_VALUE',
});
});
assert.throws(
() => Reflect.get(CompressionStream.prototype, 'readable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(CompressionStream.prototype, 'writable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(DecompressionStream.prototype, 'readable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(DecompressionStream.prototype, 'writable', {}), {
code: 'ERR_INVALID_THIS',
});