fix: memory leak hunting and reconnecting on data error
This commit is contained in:
@@ -41,7 +41,14 @@ const RETRYABLE_SAVE_ERROR_CODES = new Set([
|
|||||||
'EBUSY'
|
'EBUSY'
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let saveQueue: Promise<void> = Promise.resolve();
|
interface PendingSaveWaiter {
|
||||||
|
reject: (error: unknown) => void;
|
||||||
|
resolve: () => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
let pendingSaveSnapshot: Buffer | null = null;
|
||||||
|
let pendingSaveWaiters: PendingSaveWaiter[] = [];
|
||||||
|
let saveInProgress = false;
|
||||||
|
|
||||||
function wait(ms: number): Promise<void> {
|
function wait(ms: number): Promise<void> {
|
||||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
@@ -146,16 +153,51 @@ async function writeDatabaseSnapshot(snapshot: Buffer): Promise<void> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function settleSaveWaiters(waiters: PendingSaveWaiter[], error?: unknown): void {
|
||||||
|
for (const waiter of waiters) {
|
||||||
|
if (error === undefined) {
|
||||||
|
waiter.resolve();
|
||||||
|
} else {
|
||||||
|
waiter.reject(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function drainDatabaseSaveQueue(): Promise<void> {
|
||||||
|
if (saveInProgress) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
saveInProgress = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (pendingSaveSnapshot) {
|
||||||
|
const snapshot = pendingSaveSnapshot;
|
||||||
|
const waiters = pendingSaveWaiters;
|
||||||
|
|
||||||
|
pendingSaveSnapshot = null;
|
||||||
|
pendingSaveWaiters = [];
|
||||||
|
|
||||||
|
try {
|
||||||
|
await writeDatabaseSnapshot(snapshot);
|
||||||
|
settleSaveWaiters(waiters);
|
||||||
|
} catch (error) {
|
||||||
|
settleSaveWaiters(waiters, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
saveInProgress = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function atomicSave(data: Uint8Array): Promise<void> {
|
async function atomicSave(data: Uint8Array): Promise<void> {
|
||||||
const snapshot = Buffer.from(data);
|
const snapshot = Buffer.from(data);
|
||||||
const saveTask = saveQueue.then(
|
|
||||||
() => writeDatabaseSnapshot(snapshot),
|
|
||||||
() => writeDatabaseSnapshot(snapshot)
|
|
||||||
);
|
|
||||||
|
|
||||||
saveQueue = saveTask.catch(() => {});
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
pendingSaveSnapshot = snapshot;
|
||||||
return saveTask;
|
pendingSaveWaiters.push({ resolve, reject });
|
||||||
|
void drainDatabaseSaveQueue();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function initializeDatabase(): Promise<void> {
|
export async function initializeDatabase(): Promise<void> {
|
||||||
|
|||||||
@@ -62,6 +62,9 @@ import { listRunningProcessNames } from '../process-list';
|
|||||||
import { detectActiveGame } from '../game-detection';
|
import { detectActiveGame } from '../game-detection';
|
||||||
|
|
||||||
const DEFAULT_MIME_TYPE = 'application/octet-stream';
|
const DEFAULT_MIME_TYPE = 'application/octet-stream';
|
||||||
|
const MAX_ACTIVE_DESKTOP_NOTIFICATIONS = 20;
|
||||||
|
const activeDesktopNotifications = new Set<Notification>();
|
||||||
|
const desktopNotificationCleanups = new Map<Notification, () => void>();
|
||||||
const FILE_CLIPBOARD_FORMATS = [
|
const FILE_CLIPBOARD_FORMATS = [
|
||||||
'x-special/gnome-copied-files',
|
'x-special/gnome-copied-files',
|
||||||
'text/uri-list',
|
'text/uri-list',
|
||||||
@@ -399,9 +402,16 @@ export function setupSystemHandlers(): void {
|
|||||||
icon: getWindowIconPath(),
|
icon: getWindowIconPath(),
|
||||||
silent: true
|
silent: true
|
||||||
});
|
});
|
||||||
|
const cleanup = () => {
|
||||||
notification.on('click', () => {
|
notification.removeListener('click', handleClick);
|
||||||
|
notification.removeListener('close', cleanup);
|
||||||
|
notification.removeListener('failed', cleanup);
|
||||||
|
activeDesktopNotifications.delete(notification);
|
||||||
|
desktopNotificationCleanups.delete(notification);
|
||||||
|
};
|
||||||
|
const handleClick = () => {
|
||||||
if (!mainWindow) {
|
if (!mainWindow) {
|
||||||
|
cleanup();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -414,7 +424,26 @@ export function setupSystemHandlers(): void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
mainWindow.focus();
|
mainWindow.focus();
|
||||||
});
|
cleanup();
|
||||||
|
notification.close();
|
||||||
|
};
|
||||||
|
|
||||||
|
notification.on('click', handleClick);
|
||||||
|
notification.once('close', cleanup);
|
||||||
|
notification.once('failed', cleanup);
|
||||||
|
activeDesktopNotifications.add(notification);
|
||||||
|
desktopNotificationCleanups.set(notification, cleanup);
|
||||||
|
|
||||||
|
while (activeDesktopNotifications.size > MAX_ACTIVE_DESKTOP_NOTIFICATIONS) {
|
||||||
|
const oldestNotification = activeDesktopNotifications.values().next().value;
|
||||||
|
|
||||||
|
if (!oldestNotification) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
desktopNotificationCleanups.get(oldestNotification)?.();
|
||||||
|
oldestNotification.close();
|
||||||
|
}
|
||||||
|
|
||||||
notification.show();
|
notification.show();
|
||||||
} catch {
|
} catch {
|
||||||
|
|||||||
@@ -250,7 +250,7 @@ Profile avatar sync follows attachment-style chunk transport plus server-icon-st
|
|||||||
|
|
||||||
Every 5 seconds a PING message is sent to each peer. The peer responds with PONG carrying the original timestamp, and the round-trip latency is stored in a signal.
|
Every 5 seconds a PING message is sent to each peer. The peer responds with PONG carrying the original timestamp, and the round-trip latency is stored in a signal.
|
||||||
|
|
||||||
Data-channel failures are treated as control-plane failures, not proof that RTP audio has stopped. When an open channel reports a non-fatal error, the client requests a fresh voice-state snapshot over that same channel. When the channel closes or cannot carry the resync request, the peer manager waits a short grace period so any still-flowing audio is not interrupted by a transient event. If the `RTCPeerConnection` is still connected after that grace period, the elected initiator replaces only the data channel in-place and preserves the media transport. Full peer recreation is reserved for cases where the media transport is no longer connected or the in-place control-channel repair fails.
|
Data-channel failures are treated as control-plane failures. When an open channel reports a non-fatal error, the client requests a fresh voice-state snapshot over that same channel. When the channel is already closed there is no recovery on it, so the peer manager acts immediately: the deterministic initiator renegotiates a new `RTCDataChannel` on the existing `RTCPeerConnection` (preserving audio/video transport), the non-initiator briefly waits for that replacement and then forces a full peer rebuild if it does not arrive, and a peer whose `RTCPeerConnection` is no longer in `connected` state is recreated immediately through the normal deterministic reconnect path. A closing-but-not-yet-closed channel still waits a short grace period in case the underlying transport flips back. Either way, the rebuild heals chat, state sync, voice, camera, and screen-share transport together instead of preserving a media connection whose control channel can no longer coordinate peer state.
|
||||||
|
|
||||||
## Media pipeline
|
## Media pipeline
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ describe('peer recovery', () => {
|
|||||||
vi.useRealTimers();
|
vi.useRealTimers();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('waits a short grace period before replacing a closed data channel in place', () => {
|
it('recreates a peer immediately when the data channel is already closed', () => {
|
||||||
vi.useFakeTimers();
|
vi.useFakeTimers();
|
||||||
|
|
||||||
const channel = createDataChannel('closed');
|
const channel = createDataChannel('closed');
|
||||||
@@ -24,29 +24,28 @@ describe('peer recovery', () => {
|
|||||||
|
|
||||||
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
|
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
|
||||||
|
|
||||||
|
expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true });
|
||||||
|
expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', true);
|
||||||
|
expect(handlers.createAndSendOffer).toHaveBeenCalledWith('bob');
|
||||||
|
expect(context.state.dataChannelRecoveryTimers.has('bob')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('waits a short grace period before recreating a peer with a closing data channel', () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
|
||||||
|
const channel = createDataChannel('closing');
|
||||||
|
const context = createContext('alice');
|
||||||
|
const handlers = createRecoveryHandlers(context);
|
||||||
|
|
||||||
|
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected'));
|
||||||
|
|
||||||
|
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
|
||||||
|
|
||||||
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS - 1);
|
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS - 1);
|
||||||
expect(handlers.removePeer).not.toHaveBeenCalled();
|
expect(handlers.removePeer).not.toHaveBeenCalled();
|
||||||
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
|
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
|
||||||
|
|
||||||
vi.advanceTimersByTime(1);
|
vi.advanceTimersByTime(1);
|
||||||
expect(handlers.replaceDataChannel).toHaveBeenCalledWith('bob', channel);
|
|
||||||
expect(handlers.removePeer).not.toHaveBeenCalled();
|
|
||||||
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
|
|
||||||
expect(handlers.createAndSendOffer).not.toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('falls back to full peer recreation when in-place data channel replacement fails', () => {
|
|
||||||
vi.useFakeTimers();
|
|
||||||
|
|
||||||
const channel = createDataChannel('closed');
|
|
||||||
const context = createContext('alice');
|
|
||||||
const handlers = createRecoveryHandlers(context);
|
|
||||||
|
|
||||||
handlers.replaceDataChannel.mockReturnValueOnce(false);
|
|
||||||
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected'));
|
|
||||||
|
|
||||||
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
|
|
||||||
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS);
|
|
||||||
|
|
||||||
expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true });
|
expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true });
|
||||||
expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', true);
|
expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', true);
|
||||||
@@ -56,7 +55,7 @@ describe('peer recovery', () => {
|
|||||||
it('does not recreate a peer when a replacement data channel is adopted before the grace expires', () => {
|
it('does not recreate a peer when a replacement data channel is adopted before the grace expires', () => {
|
||||||
vi.useFakeTimers();
|
vi.useFakeTimers();
|
||||||
|
|
||||||
const staleChannel = createDataChannel('closed');
|
const staleChannel = createDataChannel('closing');
|
||||||
const replacementChannel = createDataChannel(DATA_CHANNEL_STATE_OPEN);
|
const replacementChannel = createDataChannel(DATA_CHANNEL_STATE_OPEN);
|
||||||
const context = createContext('alice');
|
const context = createContext('alice');
|
||||||
const handlers = createRecoveryHandlers(context);
|
const handlers = createRecoveryHandlers(context);
|
||||||
@@ -90,7 +89,7 @@ describe('peer recovery', () => {
|
|||||||
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
|
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('preserves a connected non-initiator peer while waiting for the remote initiator to replace the channel', () => {
|
it('recreates a connected non-initiator peer and waits for the remote initiator offer', () => {
|
||||||
vi.useFakeTimers();
|
vi.useFakeTimers();
|
||||||
|
|
||||||
const channel = createDataChannel('closed');
|
const channel = createDataChannel('closed');
|
||||||
@@ -99,11 +98,10 @@ describe('peer recovery', () => {
|
|||||||
|
|
||||||
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected', false));
|
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected', false));
|
||||||
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
|
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
|
||||||
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS);
|
|
||||||
|
|
||||||
expect(handlers.removePeer).not.toHaveBeenCalled();
|
expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true });
|
||||||
expect(handlers.replaceDataChannel).not.toHaveBeenCalled();
|
expect(handlers.replaceDataChannel).not.toHaveBeenCalled();
|
||||||
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
|
expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', false);
|
||||||
expect(handlers.createAndSendOffer).not.toHaveBeenCalled();
|
expect(handlers.createAndSendOffer).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -154,6 +154,18 @@ export function scheduleDataChannelRecovery(
|
|||||||
if (channel.readyState === DATA_CHANNEL_STATE_OPEN)
|
if (channel.readyState === DATA_CHANNEL_STATE_OPEN)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if (channel.readyState === 'closed') {
|
||||||
|
logger.warn('[data-channel] Control channel closed; reconnecting peer immediately', {
|
||||||
|
channelLabel: channel.label,
|
||||||
|
connectionState: peerData.connection.connectionState,
|
||||||
|
peerId,
|
||||||
|
reason
|
||||||
|
});
|
||||||
|
|
||||||
|
repairUnavailableDataChannel(context, peerId, channel, reason, handlers);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (state.dataChannelRecoveryTimers.has(peerId))
|
if (state.dataChannelRecoveryTimers.has(peerId))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@@ -183,33 +195,40 @@ export function scheduleDataChannelRecovery(
|
|||||||
reason
|
reason
|
||||||
});
|
});
|
||||||
|
|
||||||
if (latestPeerData.connection.connectionState === CONNECTION_STATE_CONNECTED) {
|
repairUnavailableDataChannel(context, peerId, channel, reason, handlers);
|
||||||
if (latestPeerData.isInitiator && handlers.replaceDataChannel(peerId, channel)) {
|
}, DATA_CHANNEL_RECOVERY_GRACE_MS);
|
||||||
logger.info('[data-channel] Replaced control channel without recreating media transport', {
|
|
||||||
|
state.dataChannelRecoveryTimers.set(peerId, timer);
|
||||||
|
}
|
||||||
|
|
||||||
|
function repairUnavailableDataChannel(
|
||||||
|
context: PeerConnectionManagerContext,
|
||||||
|
peerId: string,
|
||||||
|
channel: RTCDataChannel,
|
||||||
|
reason: string,
|
||||||
|
handlers: RecoveryHandlers
|
||||||
|
): void {
|
||||||
|
const { logger, state } = context;
|
||||||
|
const peerData = state.activePeerConnections.get(peerId);
|
||||||
|
|
||||||
|
if (!peerData || peerData.dataChannel !== channel)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (peerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN)
|
||||||
|
return;
|
||||||
|
|
||||||
|
logger.warn('[data-channel] Recreating peer transport after control channel failure', {
|
||||||
|
channelLabel: channel.label,
|
||||||
|
connectionState: peerData.connection.connectionState,
|
||||||
peerId,
|
peerId,
|
||||||
|
readyState: peerData.dataChannel?.readyState ?? null,
|
||||||
reason
|
reason
|
||||||
});
|
});
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!latestPeerData.isInitiator) {
|
|
||||||
logger.info('[data-channel] Waiting for initiator to replace control channel; preserving media transport', {
|
|
||||||
peerId,
|
|
||||||
reason
|
|
||||||
});
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trackDisconnectedPeer(state, peerId);
|
trackDisconnectedPeer(state, peerId);
|
||||||
handlers.removePeer(peerId, { preserveReconnectState: true });
|
handlers.removePeer(peerId, { preserveReconnectState: true });
|
||||||
attemptPeerReconnect(context, peerId, handlers);
|
attemptPeerReconnect(context, peerId, handlers);
|
||||||
schedulePeerReconnect(context, peerId, handlers);
|
schedulePeerReconnect(context, peerId, handlers);
|
||||||
}, DATA_CHANNEL_RECOVERY_GRACE_MS);
|
|
||||||
|
|
||||||
state.dataChannelRecoveryTimers.set(peerId, timer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function schedulePeerDisconnectRecovery(
|
export function schedulePeerDisconnectRecovery(
|
||||||
|
|||||||
Reference in New Issue
Block a user