[Fizz/Flight] Pass in Destination lazily to startFlowing instead of in createRequest (#22449)

* Pass in Destination lazily in startFlowing instead of createRequest

* Delay fatal errors until we have a destination to forward them to

* Flow can now be inferred by whether there's a destination set

We can drop the destination when we're not flowing since there's nothing to
write to.

Fatal errors now close once flowing starts back up again.

* Defer fatal errors in Flight too
This commit is contained in:
Sebastian Markbåge
2021-09-28 18:32:09 -04:00
committed by GitHub
parent d9fb383d6f
commit 7843b142ac
14 changed files with 110 additions and 88 deletions

View File

@@ -154,7 +154,7 @@ describe('ReactDOMFizzServer', () => {
it('should error the stream when an error is thrown at the root', async () => {
const reportedErrors = [];
const {writable, output, completed} = getTestWritable();
ReactDOMFizzServer.pipeToNodeWritable(
const {startWriting} = ReactDOMFizzServer.pipeToNodeWritable(
<div>
<Throw />
</div>,
@@ -166,7 +166,8 @@ describe('ReactDOMFizzServer', () => {
},
);
// The stream is errored even if we haven't started writing.
// The stream is errored once we start writing.
startWriting();
await completed;

View File

@@ -37,7 +37,15 @@ function renderToReadableStream(
children: ReactNodeList,
options?: Options,
): ReadableStream {
let request;
const request = createRequest(
children,
createResponseState(options ? options.identifierPrefix : undefined),
createRootFormatContext(options ? options.namespaceURI : undefined),
options ? options.progressiveChunkSize : undefined,
options ? options.onError : undefined,
options ? options.onCompleteAll : undefined,
options ? options.onCompleteShell : undefined,
);
if (options && options.signal) {
const signal = options.signal;
const listener = () => {
@@ -48,16 +56,6 @@ function renderToReadableStream(
}
const stream = new ReadableStream({
start(controller) {
request = createRequest(
children,
controller,
createResponseState(options ? options.identifierPrefix : undefined),
createRootFormatContext(options ? options.namespaceURI : undefined),
options ? options.progressiveChunkSize : undefined,
options ? options.onError : undefined,
options ? options.onCompleteAll : undefined,
options ? options.onCompleteShell : undefined,
);
startWork(request);
},
pull(controller) {
@@ -66,7 +64,7 @@ function renderToReadableStream(
// is actually used by something so we can give it the best result possible
// at that point.
if (stream.locked) {
startFlowing(request);
startFlowing(request, controller);
}
},
cancel(reason) {},

View File

@@ -25,7 +25,7 @@ import {
} from './ReactDOMServerFormatConfig';
function createDrainHandler(destination, request) {
return () => startFlowing(request);
return () => startFlowing(request, destination);
}
type Options = {|
@@ -44,14 +44,9 @@ type Controls = {|
startWriting(): void,
|};
function createRequestImpl(
children: ReactNodeList,
destination: Writable,
options: void | Options,
) {
function createRequestImpl(children: ReactNodeList, options: void | Options) {
return createRequest(
children,
destination,
createResponseState(options ? options.identifierPrefix : undefined),
createRootFormatContext(options ? options.namespaceURI : undefined),
options ? options.progressiveChunkSize : undefined,
@@ -66,7 +61,7 @@ function pipeToNodeWritable(
destination: Writable,
options?: Options,
): Controls {
const request = createRequestImpl(children, destination, options);
const request = createRequestImpl(children, options);
let hasStartedFlowing = false;
startWork(request);
return {
@@ -75,7 +70,7 @@ function pipeToNodeWritable(
return;
}
hasStartedFlowing = true;
startFlowing(request);
startFlowing(request, destination);
destination.on('drain', createDrainHandler(destination, request));
},
abort() {

View File

@@ -59,7 +59,6 @@ function renderToStringImpl(
}
const request = createRequest(
children,
destination,
createResponseState(
generateStaticMarkup,
options ? options.identifierPrefix : undefined,
@@ -74,7 +73,7 @@ function renderToStringImpl(
// If anything suspended and is still pending, we'll abort it before writing.
// That way we write only client-rendered boundaries from the start.
abort(request);
startFlowing(request);
startFlowing(request, destination);
if (didFatal) {
throw fatalError;
}

View File

@@ -54,7 +54,7 @@ class ReactMarkupReadableStream extends Readable {
_read(size) {
if (this.startedFlowing) {
startFlowing(this.request);
startFlowing(this.request, this);
}
}
}
@@ -72,12 +72,11 @@ function renderToNodeStreamImpl(
// We wait until everything has loaded before starting to write.
// That way we only end up with fully resolved HTML even if we suspend.
destination.startedFlowing = true;
startFlowing(request);
startFlowing(request, destination);
}
const destination = new ReactMarkupReadableStream();
const request = createRequest(
children,
destination,
createResponseState(false, options ? options.identifierPrefix : undefined),
createRootFormatContext(),
Infinity,

View File

@@ -63,12 +63,11 @@ function render(model: ReactModel, options?: Options): Destination {
const bundlerConfig = undefined;
const request = ReactNoopFlightServer.createRequest(
model,
destination,
bundlerConfig,
options ? options.onError : undefined,
);
ReactNoopFlightServer.startWork(request);
ReactNoopFlightServer.startFlowing(request);
ReactNoopFlightServer.startFlowing(request, destination);
return destination;
}

View File

@@ -259,7 +259,6 @@ function render(children: React$Element<any>, options?: Options): Destination {
};
const request = ReactNoopServer.createRequest(
children,
destination,
null,
null,
options ? options.progressiveChunkSize : undefined,
@@ -268,7 +267,7 @@ function render(children: React$Element<any>, options?: Options): Destination {
options ? options.onCompleteShell : undefined,
);
ReactNoopServer.startWork(request);
ReactNoopServer.startFlowing(request);
ReactNoopServer.startFlowing(request, destination);
return destination;
}

View File

@@ -46,7 +46,6 @@ function renderToStream(children: ReactNodeList, options: Options): Stream {
};
const request = createRequest(
children,
destination,
createResponseState(options ? options.identifierPrefix : undefined),
createRootFormatContext(undefined),
options ? options.progressiveChunkSize : undefined,
@@ -71,7 +70,7 @@ function abortStream(stream: Stream): void {
function renderNextChunk(stream: Stream): string {
const {request, destination} = stream;
performWork(request);
startFlowing(request);
startFlowing(request, destination);
if (destination.fatal) {
throw destination.error;
}

View File

@@ -31,12 +31,11 @@ function render(
): void {
const request = createRequest(
model,
destination,
config,
options ? options.onError : undefined,
);
startWork(request);
startFlowing(request);
startFlowing(request, destination);
}
export {render};

View File

@@ -25,15 +25,13 @@ function renderToReadableStream(
webpackMap: BundlerConfig,
options?: Options,
): ReadableStream {
let request;
const request = createRequest(
model,
webpackMap,
options ? options.onError : undefined,
);
const stream = new ReadableStream({
start(controller) {
request = createRequest(
model,
controller,
webpackMap,
options ? options.onError : undefined,
);
startWork(request);
},
pull(controller) {
@@ -42,7 +40,7 @@ function renderToReadableStream(
// is actually used by something so we can give it the best result possible
// at that point.
if (stream.locked) {
startFlowing(request);
startFlowing(request, controller);
}
},
cancel(reason) {},

View File

@@ -18,7 +18,7 @@ import {
} from 'react-server/src/ReactFlightServer';
function createDrainHandler(destination, request) {
return () => startFlowing(request);
return () => startFlowing(request, destination);
}
type Options = {
@@ -33,12 +33,11 @@ function pipeToNodeWritable(
): void {
const request = createRequest(
model,
destination,
webpackMap,
options ? options.onError : undefined,
);
startWork(request);
startFlowing(request);
startFlowing(request, destination);
destination.on('drain', createDrainHandler(destination, request));
}

View File

@@ -24,9 +24,9 @@ function render(
destination: Destination,
config: BundlerConfig,
): void {
const request = createRequest(model, destination, config);
const request = createRequest(model, config);
startWork(request);
startFlowing(request);
startFlowing(request, destination);
}
export {render};

View File

@@ -166,15 +166,16 @@ type Segment = {
+boundary: null | SuspenseBoundary,
};
const BUFFERING = 0;
const FLOWING = 1;
const OPEN = 0;
const CLOSING = 1;
const CLOSED = 2;
export opaque type Request = {
+destination: Destination,
destination: null | Destination,
+responseState: ResponseState,
+progressiveChunkSize: number,
status: 0 | 1 | 2,
fatalError: mixed,
nextSegmentId: number,
allPendingTasks: number, // when it reaches zero, we can close the connection.
pendingRootTasks: number, // when this reaches zero, we've finished at least the root boundary.
@@ -221,7 +222,6 @@ function noop(): void {}
export function createRequest(
children: ReactNodeList,
destination: Destination,
responseState: ResponseState,
rootFormatContext: FormatContext,
progressiveChunkSize: void | number,
@@ -232,13 +232,14 @@ export function createRequest(
const pingedTasks = [];
const abortSet: Set<Task> = new Set();
const request = {
destination,
destination: null,
responseState,
progressiveChunkSize:
progressiveChunkSize === undefined
? DEFAULT_PROGRESSIVE_CHUNK_SIZE
: progressiveChunkSize,
status: BUFFERING,
status: OPEN,
fatalError: null,
nextSegmentId: 0,
allPendingTasks: 0,
pendingRootTasks: 0,
@@ -404,8 +405,13 @@ function fatalError(request: Request, error: mixed): void {
// This is called outside error handling code such as if the root errors outside
// a suspense boundary or if the root suspense boundary's fallback errors.
// It's also called if React itself or its host configs errors.
request.status = CLOSED;
closeWithError(request.destination, error);
if (request.destination !== null) {
request.status = CLOSED;
closeWithError(request.destination, error);
} else {
request.status = CLOSING;
request.fatalError = error;
}
}
function renderSuspenseBoundary(
@@ -1330,7 +1336,9 @@ function abortTask(task: Task): void {
// the request;
if (request.status !== CLOSED) {
request.status = CLOSED;
close(request.destination);
if (request.destination !== null) {
close(request.destination);
}
}
} else {
boundary.pendingTasks--;
@@ -1490,8 +1498,8 @@ export function performWork(request: Request): void {
retryTask(request, task);
}
pingedTasks.splice(0, i);
if (request.status === FLOWING) {
flushCompletedQueues(request);
if (request.destination !== null) {
flushCompletedQueues(request, request.destination);
}
} catch (error) {
reportError(request, error);
@@ -1748,8 +1756,10 @@ function flushPartiallyCompletedSegment(
}
}
function flushCompletedQueues(request: Request): void {
const destination = request.destination;
function flushCompletedQueues(
request: Request,
destination: Destination,
): void {
beginWriting(destination);
try {
// The structure of this is to go through each queue one by one and write
@@ -1775,7 +1785,7 @@ function flushCompletedQueues(request: Request): void {
for (i = 0; i < clientRenderedBoundaries.length; i++) {
const boundary = clientRenderedBoundaries[i];
if (!flushClientRenderedBoundary(request, destination, boundary)) {
request.status = BUFFERING;
request.destination = null;
i++;
clientRenderedBoundaries.splice(0, i);
return;
@@ -1790,7 +1800,7 @@ function flushCompletedQueues(request: Request): void {
for (i = 0; i < completedBoundaries.length; i++) {
const boundary = completedBoundaries[i];
if (!flushCompletedBoundary(request, destination, boundary)) {
request.status = BUFFERING;
request.destination = null;
i++;
completedBoundaries.splice(0, i);
return;
@@ -1811,7 +1821,7 @@ function flushCompletedQueues(request: Request): void {
for (i = 0; i < partialBoundaries.length; i++) {
const boundary = partialBoundaries[i];
if (!flushPartialBoundary(request, destination, boundary)) {
request.status = BUFFERING;
request.destination = null;
i++;
partialBoundaries.splice(0, i);
return;
@@ -1826,7 +1836,7 @@ function flushCompletedQueues(request: Request): void {
for (i = 0; i < largeBoundaries.length; i++) {
const boundary = largeBoundaries[i];
if (!flushCompletedBoundary(request, destination, boundary)) {
request.status = BUFFERING;
request.destination = null;
i++;
largeBoundaries.splice(0, i);
return;
@@ -1861,13 +1871,18 @@ export function startWork(request: Request): void {
scheduleWork(() => performWork(request));
}
export function startFlowing(request: Request): void {
export function startFlowing(request: Request, destination: Destination): void {
if (request.status === CLOSING) {
request.status = CLOSED;
closeWithError(destination, request.fatalError);
return;
}
if (request.status === CLOSED) {
return;
}
request.status = FLOWING;
request.destination = destination;
try {
flushCompletedQueues(request);
flushCompletedQueues(request, destination);
} catch (error) {
reportError(request, error);
fatalError(request, error);
@@ -1880,8 +1895,8 @@ export function abort(request: Request): void {
const abortableTasks = request.abortableTasks;
abortableTasks.forEach(abortTask, request);
abortableTasks.clear();
if (request.status === FLOWING) {
flushCompletedQueues(request);
if (request.destination !== null) {
flushCompletedQueues(request, request.destination);
}
} catch (error) {
reportError(request, error);

View File

@@ -72,7 +72,9 @@ type Segment = {
};
export type Request = {
destination: Destination,
status: 0 | 1 | 2,
fatalError: mixed,
destination: null | Destination,
bundlerConfig: BundlerConfig,
cache: Map<Function, mixed>,
nextChunkId: number,
@@ -84,25 +86,30 @@ export type Request = {
writtenSymbols: Map<Symbol, number>,
writtenModules: Map<ModuleKey, number>,
onError: (error: mixed) => void,
flowing: boolean,
toJSON: (key: string, value: ReactModel) => ReactJSONValue,
};
const ReactCurrentDispatcher = ReactSharedInternals.ReactCurrentDispatcher;
function defaultErrorHandler(error: mixed) {
console['error'](error); // Don't transform to our wrapper
console['error'](error);
// Don't transform to our wrapper
}
const OPEN = 0;
const CLOSING = 1;
const CLOSED = 2;
export function createRequest(
model: ReactModel,
destination: Destination,
bundlerConfig: BundlerConfig,
onError: void | ((error: mixed) => void),
): Request {
const pingedSegments = [];
const request = {
destination,
status: OPEN,
fatalError: null,
destination: null,
bundlerConfig,
cache: new Map(),
nextChunkId: 0,
@@ -114,7 +121,6 @@ export function createRequest(
writtenSymbols: new Map(),
writtenModules: new Map(),
onError: onError === undefined ? defaultErrorHandler : onError,
flowing: false,
toJSON: function(key: string, value: ReactModel): ReactJSONValue {
return resolveModelToJSON(request, this, key, value);
},
@@ -604,7 +610,13 @@ function reportError(request: Request, error: mixed): void {
function fatalError(request: Request, error: mixed): void {
// This is called outside error handling code such as if an error happens in React internals.
closeWithError(request.destination, error);
if (request.destination !== null) {
request.status = CLOSED;
closeWithError(request.destination, error);
} else {
request.status = CLOSING;
request.fatalError = error;
}
}
function emitErrorChunk(request: Request, id: number, error: mixed): void {
@@ -694,8 +706,8 @@ function performWork(request: Request): void {
const segment = pingedSegments[i];
retrySegment(request, segment);
}
if (request.flowing) {
flushCompletedChunks(request);
if (request.destination !== null) {
flushCompletedChunks(request, request.destination);
}
} catch (error) {
reportError(request, error);
@@ -706,8 +718,10 @@ function performWork(request: Request): void {
}
}
function flushCompletedChunks(request: Request): void {
const destination = request.destination;
function flushCompletedChunks(
request: Request,
destination: Destination,
): void {
beginWriting(destination);
try {
// We emit module chunks first in the stream so that
@@ -718,7 +732,7 @@ function flushCompletedChunks(request: Request): void {
request.pendingChunks--;
const chunk = moduleChunks[i];
if (!writeChunk(destination, chunk)) {
request.flowing = false;
request.destination = null;
i++;
break;
}
@@ -731,7 +745,7 @@ function flushCompletedChunks(request: Request): void {
request.pendingChunks--;
const chunk = jsonChunks[i];
if (!writeChunk(destination, chunk)) {
request.flowing = false;
request.destination = null;
i++;
break;
}
@@ -746,7 +760,7 @@ function flushCompletedChunks(request: Request): void {
request.pendingChunks--;
const chunk = errorChunks[i];
if (!writeChunk(destination, chunk)) {
request.flowing = false;
request.destination = null;
i++;
break;
}
@@ -766,10 +780,18 @@ export function startWork(request: Request): void {
scheduleWork(() => performWork(request));
}
export function startFlowing(request: Request): void {
request.flowing = true;
export function startFlowing(request: Request, destination: Destination): void {
if (request.status === CLOSING) {
request.status = CLOSED;
closeWithError(destination, request.fatalError);
return;
}
if (request.status === CLOSED) {
return;
}
request.destination = destination;
try {
flushCompletedChunks(request);
flushCompletedChunks(request, destination);
} catch (error) {
reportError(request, error);
fatalError(request, error);