stream: call helper function from push and unshift

PR-URL: https://github.com/nodejs/node/pull/50173
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
Raz Luvaton
2023-10-15 21:31:08 +03:00
committed by GitHub
parent 00de2faf78
commit 3907bd18f8

View File

@@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() {
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false);
debug('push', chunk);
const state = this._readableState;
return (state[kState] & kObjectMode) === 0 ?
readableAddChunkPushByteMode(this, state, chunk, encoding) :
readableAddChunkPushObjectMode(this, state, chunk, encoding);
};
// Unshift should *always* be something directly out of read().
Readable.prototype.unshift = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, true);
debug('unshift', chunk);
const state = this._readableState;
return (state[kState] & kObjectMode) === 0 ?
readableAddChunkUnshiftByteMode(this, state, chunk, encoding) :
readableAddChunkUnshiftObjectMode(this, state, chunk);
};
function readableAddChunk(stream, chunk, encoding, addToFront) {
debug('readableAddChunk', chunk);
const state = stream._readableState;
let err;
if ((state[kState] & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
if (addToFront && state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
}
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
}
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
} else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed || state.errored) {
return false;
} else {
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}
}
} else if (!addToFront) {
state[kState] &= ~kReading;
maybeReadMore(stream, state);
return false;
}
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
if (state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
}
}
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
} else if (chunk !== undefined && !(chunk instanceof Buffer)) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
return false;
}
if (!(chunk && chunk.length > 0)) {
return canPushMore(state);
}
return readableAddChunkUnshiftValue(stream, state, chunk);
}
function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
return false;
}
return readableAddChunkUnshiftValue(stream, state, chunk);
}
function readableAddChunkUnshiftValue(stream, state, chunk) {
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
else
addChunk(stream, state, chunk, true);
return canPushMore(state);
}
function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
return false;
}
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk !== undefined) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
return false;
}
if (!chunk || chunk.length <= 0) {
state[kState] &= ~kReading;
maybeReadMore(stream, state);
return canPushMore(state);
}
if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
return false;
}
if (state.destroyed || state.errored) {
return false;
}
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (chunk.length === 0) {
maybeReadMore(stream, state);
return canPushMore(state);
}
}
addChunk(stream, state, chunk, false);
return canPushMore(state);
}
function readableAddChunkPushObjectMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
return false;
}
if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
return false;
}
if (state.destroyed || state.errored) {
return false;
}
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
}
addChunk(stream, state, chunk, false);
return canPushMore(state);
}
function canPushMore(state) {
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.