[Flight] Close Debug Channel when All Lazy References Have Been GC:ed (#33718)

When we have a debug channel open that can ask for more objects. That
doesn't close until all lazy objects have been explicitly asked for. If
you GC an object before the lazy references inside of it before asking
for or releasing the objects, then it'll never close.

This ensures that if there are no more PendingChunk and no more
ResolvedModelChunk then we can close the connection.

There's two sources of retaining the Response object. On one side we
have a handle to it from the stream coming from the server. On the other
side we have a handle to it from ResolvedModelChunk to ask for more data
when we lazily parse a model.

This PR makes a weak handle from the stream to the Response. However, it
keeps a strong reference alive whenever we're waiting on a pending chunk
because then the stream might be the root if the only listeners are the
callbacks passed to the promise and no references to the promise itself.

The pending chunks count can end up being zero even if we might get more
data because the references might be inside lazy chunks. In this case
the lazy chunks keeps the Response alive. When the lazy chunk gets
parsed it can find more chunks that then end up pending to keep the
response strongly alive until they resolve.
This commit is contained in:
Sebastian Markbåge
2025-07-07 11:28:15 -04:00
committed by GitHub
parent 0378b46e7e
commit 7cafeff340
3 changed files with 209 additions and 52 deletions

View File

@@ -16,18 +16,49 @@ function findSourceMapURL(fileName) {
let updateRoot;
async function callServer(id, args) {
const response = fetch('/', {
method: 'POST',
headers: {
Accept: 'text/x-component',
'rsc-action': id,
},
body: await encodeReply(args),
});
const {returnValue, root} = await createFromFetch(response, {
callServer,
findSourceMapURL,
});
let response;
if (
process.env.NODE_ENV === 'development' &&
typeof WebSocketStream === 'function'
) {
const requestId = crypto.randomUUID();
const wss = new WebSocketStream(
'ws://localhost:3001/debug-channel?' + requestId
);
const debugChannel = await wss.opened;
response = createFromFetch(
fetch('/', {
method: 'POST',
headers: {
Accept: 'text/x-component',
'rsc-action': id,
'rsc-request-id': requestId,
},
body: await encodeReply(args),
}),
{
callServer,
debugChannel,
findSourceMapURL,
}
);
} else {
response = createFromFetch(
fetch('/', {
method: 'POST',
headers: {
Accept: 'text/x-component',
'rsc-action': id,
},
body: await encodeReply(args),
}),
{
callServer,
findSourceMapURL,
}
);
}
const {returnValue, root} = await response;
// Refresh the tree with the new RSC payload.
startTransition(() => {
updateRoot(root);

View File

@@ -332,7 +332,7 @@ export type FindSourceMapURLCallback = (
export type DebugChannelCallback = (message: string) => void;
export type Response = {
type Response = {
_bundlerConfig: ServerConsumerModuleMap,
_serverReferenceConfig: null | ServerManifest,
_moduleLoading: ModuleLoading,
@@ -351,6 +351,8 @@ export type Response = {
_closedReason: mixed,
_tempRefs: void | TemporaryReferenceSet, // the set temporary references can be resolved from
_timeOrigin: number, // Profiling-only
_pendingChunks: number, // DEV-only
_weakResponse: WeakResponse, // DEV-only
_debugRootOwner?: null | ReactComponentInfo, // DEV-only
_debugRootStack?: null | Error, // DEV-only
_debugRootTask?: null | ConsoleTask, // DEV-only
@@ -360,6 +362,54 @@ export type Response = {
_rootEnvironmentName: string, // DEV-only, the requested environment name.
};
// This indirection exists only to clean up DebugChannel when all Lazy References are GC:ed.
// Therefore we only use the indirection in DEV.
type WeakResponse = {
weak: WeakRef<Response>,
response: null | Response, // This is null when there are no pending chunks.
};
export type {WeakResponse as Response};
function hasGCedResponse(weakResponse: WeakResponse): boolean {
return __DEV__ && weakResponse.weak.deref() === undefined;
}
function unwrapWeakResponse(weakResponse: WeakResponse): Response {
if (__DEV__) {
const response = weakResponse.weak.deref();
if (response === undefined) {
// eslint-disable-next-line react-internal/prod-error-codes
throw new Error(
'We did not expect to receive new data after GC:ing the response.',
);
}
return response;
} else {
return (weakResponse: any); // In prod we just use the real Response directly.
}
}
function getWeakResponse(response: Response): WeakResponse {
if (__DEV__) {
return response._weakResponse;
} else {
return (response: any); // In prod we just use the real Response directly.
}
}
function cleanupDebugChannel(debugChannel: DebugChannelCallback): void {
// When a Response gets GC:ed because nobody is referring to any of the objects that lazily
// loads from the Response anymore, then we can close the debug channel.
debugChannel('');
}
// If FinalizationRegistry doesn't exist, we cannot use the debugChannel.
const debugChannelRegistry =
__DEV__ && typeof FinalizationRegistry === 'function'
? new FinalizationRegistry(cleanupDebugChannel)
: null;
function readChunk<T>(chunk: SomeChunk<T>): T {
// If we have resolved content, we try to initialize it first which
// might put us back into one of the other states.
@@ -385,16 +435,32 @@ function readChunk<T>(chunk: SomeChunk<T>): T {
}
}
export function getRoot<T>(response: Response): Thenable<T> {
export function getRoot<T>(weakResponse: WeakResponse): Thenable<T> {
const response = unwrapWeakResponse(weakResponse);
const chunk = getChunk(response, 0);
return (chunk: any);
}
function createPendingChunk<T>(response: Response): PendingChunk<T> {
if (__DEV__) {
// Retain a strong reference to the Response while we wait for the result.
response._pendingChunks++;
response._weakResponse.response = response;
}
// $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors
return new ReactPromise(PENDING, null, null);
}
function releasePendingChunk(response: Response, chunk: SomeChunk<any>): void {
if (__DEV__ && chunk.status === PENDING) {
if (--response._pendingChunks === 0) {
// We're no longer waiting for any more chunks. We can release the strong reference
// to the response. We'll regain it if we ask for any more data later on.
response._weakResponse.response = null;
}
}
}
function createBlockedChunk<T>(response: Response): BlockedChunk<T> {
// $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors
return new ReactPromise(BLOCKED, null, null);
@@ -525,7 +591,11 @@ function wakeChunkIfInitialized<T>(
}
}
function triggerErrorOnChunk<T>(chunk: SomeChunk<T>, error: mixed): void {
function triggerErrorOnChunk<T>(
response: Response,
chunk: SomeChunk<T>,
error: mixed,
): void {
if (chunk.status !== PENDING && chunk.status !== BLOCKED) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
@@ -535,6 +605,7 @@ function triggerErrorOnChunk<T>(chunk: SomeChunk<T>, error: mixed): void {
controller.error(error);
return;
}
releasePendingChunk(response, chunk);
const listeners = chunk.reason;
const erroredChunk: ErroredChunk<T> = (chunk: any);
erroredChunk.status = ERRORED;
@@ -635,6 +706,7 @@ function resolveModelChunk<T>(
controller.enqueueModel(value);
return;
}
releasePendingChunk(response, chunk);
const resolveListeners = chunk.value;
const rejectListeners = chunk.reason;
const resolvedChunk: ResolvedModelChunk<T> = (chunk: any);
@@ -652,6 +724,7 @@ function resolveModelChunk<T>(
}
function resolveModuleChunk<T>(
response: Response,
chunk: SomeChunk<T>,
value: ClientReference<T>,
): void {
@@ -659,6 +732,7 @@ function resolveModuleChunk<T>(
// We already resolved. We didn't expect to see this.
return;
}
releasePendingChunk(response, chunk);
const resolveListeners = chunk.value;
const rejectListeners = chunk.reason;
const resolvedChunk: ResolvedModuleChunk<T> = (chunk: any);
@@ -766,7 +840,15 @@ function initializeModuleChunk<T>(chunk: ResolvedModuleChunk<T>): void {
// Report that any missing chunks in the model is now going to throw this
// error upon read. Also notify any pending promises.
export function reportGlobalError(response: Response, error: Error): void {
export function reportGlobalError(
weakResponse: WeakResponse,
error: Error,
): void {
if (hasGCedResponse(weakResponse)) {
// Ignore close signal if we are not awaiting any more pending chunks.
return;
}
const response = unwrapWeakResponse(weakResponse);
response._closed = true;
response._closedReason = error;
response._chunks.forEach(chunk => {
@@ -774,7 +856,7 @@ export function reportGlobalError(response: Response, error: Error): void {
// trigger an error but if it wasn't then we need to
// because we won't be getting any new data to resolve it.
if (chunk.status === PENDING) {
triggerErrorOnChunk(chunk, error);
triggerErrorOnChunk(response, chunk, error);
}
});
if (__DEV__) {
@@ -1218,7 +1300,7 @@ function rejectReference(
reference: InitializationReference,
error: mixed,
): void {
const {handler} = reference;
const {handler, response} = reference;
if (handler.errored) {
// We've already errored. We could instead build up an AggregateError
@@ -1263,7 +1345,7 @@ function rejectReference(
}
}
triggerErrorOnChunk(chunk, error);
triggerErrorOnChunk(response, chunk, error);
}
function waitForReference<T>(
@@ -1482,7 +1564,7 @@ function loadServerReference<A: Iterable<any>, T>(
}
}
triggerErrorOnChunk(chunk, error);
triggerErrorOnChunk(response, chunk, error);
}
promise.then(fulfill, reject);
@@ -2025,6 +2107,11 @@ function ResponseInstance(
this._timeOrigin = 0;
}
if (__DEV__) {
this._pendingChunks = 0;
this._weakResponse = {
weak: new WeakRef(this),
response: this,
};
// TODO: The Flight Client can be used in a Client Environment too and we should really support
// getting the owner there as well, but currently the owner of ReactComponentInfo is typed as only
// supporting other ReactComponentInfo as owners (and not Fiber or Fizz's ComponentStackNode).
@@ -2059,6 +2146,15 @@ function ResponseInstance(
this._debugChannel = debugChannel;
this._replayConsole = replayConsole;
this._rootEnvironmentName = rootEnv;
if (debugChannel) {
if (debugChannelRegistry === null) {
// We can't safely clean things up later, so we immediately close the debug channel.
debugChannel('');
this._debugChannel = undefined;
} else {
debugChannelRegistry.register(this, debugChannel);
}
}
}
if (enableProfilerTimer && enableComponentPerformanceTrack) {
// Since we don't know when recording of profiles will start and stop, we have to
@@ -2084,20 +2180,22 @@ export function createResponse(
replayConsole: boolean, // DEV-only
environmentName: void | string, // DEV-only
debugChannel: void | DebugChannelCallback, // DEV-only
): Response {
// $FlowFixMe[invalid-constructor]: the shapes are exact here but Flow doesn't like constructors
return new ResponseInstance(
bundlerConfig,
serverReferenceConfig,
moduleLoading,
callServer,
encodeFormAction,
nonce,
temporaryReferences,
findSourceMapURL,
replayConsole,
environmentName,
debugChannel,
): WeakResponse {
return getWeakResponse(
// $FlowFixMe[invalid-constructor]: the shapes are exact here but Flow doesn't like constructors
new ResponseInstance(
bundlerConfig,
serverReferenceConfig,
moduleLoading,
callServer,
encodeFormAction,
nonce,
temporaryReferences,
findSourceMapURL,
replayConsole,
environmentName,
debugChannel,
),
);
}
@@ -2111,6 +2209,7 @@ function resolveDebugHalt(response: Response, id: number): void {
if (chunk.status !== PENDING && chunk.status !== BLOCKED) {
return;
}
releasePendingChunk(response, chunk);
const haltedChunk: HaltedChunk<any> = (chunk: any);
haltedChunk.status = HALTED;
haltedChunk.value = null;
@@ -2142,6 +2241,9 @@ function resolveText(response: Response, id: number, text: string): void {
controller.enqueueValue(text);
return;
}
if (chunk) {
releasePendingChunk(response, chunk);
}
chunks.set(id, createInitializedTextChunk(response, text));
}
@@ -2160,6 +2262,9 @@ function resolveBuffer(
controller.enqueueValue(buffer);
return;
}
if (chunk) {
releasePendingChunk(response, chunk);
}
chunks.set(id, createInitializedBufferChunk(response, buffer));
}
@@ -2197,14 +2302,15 @@ function resolveModule(
blockedChunk = createBlockedChunk(response);
chunks.set(id, blockedChunk);
} else {
releasePendingChunk(response, chunk);
// This can't actually happen because we don't have any forward
// references to modules.
blockedChunk = (chunk: any);
blockedChunk.status = BLOCKED;
}
promise.then(
() => resolveModuleChunk(blockedChunk, clientReference),
error => triggerErrorOnChunk(blockedChunk, error),
() => resolveModuleChunk(response, blockedChunk, clientReference),
error => triggerErrorOnChunk(response, blockedChunk, error),
);
} else {
if (!chunk) {
@@ -2212,7 +2318,7 @@ function resolveModule(
} else {
// This can't actually happen because we don't have any forward
// references to modules.
resolveModuleChunk(chunk, clientReference);
resolveModuleChunk(response, chunk, clientReference);
}
}
}
@@ -2233,6 +2339,7 @@ function resolveStream<T: ReadableStream | $AsyncIterable<any, any, void>>(
// We already resolved. We didn't expect to see this.
return;
}
releasePendingChunk(response, chunk);
const resolveListeners = chunk.value;
const resolvedChunk: InitializedStreamChunk<T> = (chunk: any);
resolvedChunk.status = INITIALIZED;
@@ -2432,7 +2539,7 @@ function startAsyncIterable<T>(
createPendingChunk<IteratorResult<T, T>>(response);
}
while (nextWriteIndex < buffer.length) {
triggerErrorOnChunk(buffer[nextWriteIndex++], error);
triggerErrorOnChunk(response, buffer[nextWriteIndex++], error);
}
},
};
@@ -2569,7 +2676,7 @@ function resolvePostponeProd(response: Response, id: number): void {
if (!chunk) {
chunks.set(id, createErrorChunk(response, postponeInstance));
} else {
triggerErrorOnChunk(chunk, postponeInstance);
triggerErrorOnChunk(response, chunk, postponeInstance);
}
}
@@ -2608,7 +2715,7 @@ function resolvePostponeDev(
if (!chunk) {
chunks.set(id, createErrorChunk(response, postponeInstance));
} else {
triggerErrorOnChunk(chunk, postponeInstance);
triggerErrorOnChunk(response, chunk, postponeInstance);
}
}
@@ -3690,7 +3797,7 @@ function processFullStringRow(
if (!chunk) {
chunks.set(id, createErrorChunk(response, errorWithDigest));
} else {
triggerErrorOnChunk(chunk, errorWithDigest);
triggerErrorOnChunk(response, chunk, errorWithDigest);
}
return;
}
@@ -3819,9 +3926,14 @@ function processFullStringRow(
}
export function processBinaryChunk(
response: Response,
weakResponse: WeakResponse,
chunk: Uint8Array,
): void {
if (hasGCedResponse(weakResponse)) {
// Ignore more chunks if we've already GC:ed all listeners.
return;
}
const response = unwrapWeakResponse(weakResponse);
let i = 0;
let rowState = response._rowState;
let rowID = response._rowID;
@@ -3938,7 +4050,15 @@ export function processBinaryChunk(
response._rowLength = rowLength;
}
export function processStringChunk(response: Response, chunk: string): void {
export function processStringChunk(
weakResponse: WeakResponse,
chunk: string,
): void {
if (hasGCedResponse(weakResponse)) {
// Ignore more chunks if we've already GC:ed all listeners.
return;
}
const response = unwrapWeakResponse(weakResponse);
// This is a fork of processBinaryChunk that takes a string as input.
// This can't be just any binary chunk coverted to a string. It needs to be
// in the same offsets given from the Flight Server. E.g. if it's shifted by
@@ -4100,12 +4220,12 @@ function createFromJSONCallback(response: Response) {
};
}
export function close(response: Response): void {
export function close(weakResponse: WeakResponse): void {
// In case there are any remaining unresolved chunks, they won't
// be resolved now. So we need to issue an error to those.
// Ideally we should be able to early bail out if we kept a
// ref count of pending chunks.
reportGlobalError(response, new Error('Connection closed.'));
reportGlobalError(weakResponse, new Error('Connection closed.'));
}
function getCurrentOwnerInDEV(): null | ReactComponentInfo {

View File

@@ -92,21 +92,27 @@ function getDebugInfo(obj) {
return debugInfo;
}
const heldValues = [];
let finalizationCallback;
const finalizationRegistries = [];
function FinalizationRegistryMock(callback) {
finalizationCallback = callback;
this._heldValues = [];
this._callback = callback;
finalizationRegistries.push(this);
}
FinalizationRegistryMock.prototype.register = function (target, heldValue) {
heldValues.push(heldValue);
this._heldValues.push(heldValue);
};
global.FinalizationRegistry = FinalizationRegistryMock;
function gc() {
for (let i = 0; i < heldValues.length; i++) {
finalizationCallback(heldValues[i]);
for (let i = 0; i < finalizationRegistries.length; i++) {
const registry = finalizationRegistries[i];
const callback = registry._callback;
const heldValues = registry._heldValues;
for (let j = 0; j < heldValues.length; j++) {
callback(heldValues[j]);
}
heldValues.length = 0;
}
heldValues.length = 0;
}
let act;