diff --git a/electron/db/database.ts b/electron/db/database.ts index 89e2b47..acee116 100644 --- a/electron/db/database.ts +++ b/electron/db/database.ts @@ -41,7 +41,14 @@ const RETRYABLE_SAVE_ERROR_CODES = new Set([ 'EBUSY' ]); -let saveQueue: Promise = Promise.resolve(); +interface PendingSaveWaiter { + reject: (error: unknown) => void; + resolve: () => void; +} + +let pendingSaveSnapshot: Buffer | null = null; +let pendingSaveWaiters: PendingSaveWaiter[] = []; +let saveInProgress = false; function wait(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); @@ -146,16 +153,51 @@ async function writeDatabaseSnapshot(snapshot: Buffer): Promise { } } +function settleSaveWaiters(waiters: PendingSaveWaiter[], error?: unknown): void { + for (const waiter of waiters) { + if (error === undefined) { + waiter.resolve(); + } else { + waiter.reject(error); + } + } +} + +async function drainDatabaseSaveQueue(): Promise { + if (saveInProgress) { + return; + } + + saveInProgress = true; + + try { + while (pendingSaveSnapshot) { + const snapshot = pendingSaveSnapshot; + const waiters = pendingSaveWaiters; + + pendingSaveSnapshot = null; + pendingSaveWaiters = []; + + try { + await writeDatabaseSnapshot(snapshot); + settleSaveWaiters(waiters); + } catch (error) { + settleSaveWaiters(waiters, error); + } + } + } finally { + saveInProgress = false; + } +} + async function atomicSave(data: Uint8Array): Promise { const snapshot = Buffer.from(data); - const saveTask = saveQueue.then( - () => writeDatabaseSnapshot(snapshot), - () => writeDatabaseSnapshot(snapshot) - ); - saveQueue = saveTask.catch(() => {}); - - return saveTask; + return new Promise((resolve, reject) => { + pendingSaveSnapshot = snapshot; + pendingSaveWaiters.push({ resolve, reject }); + void drainDatabaseSaveQueue(); + }); } export async function initializeDatabase(): Promise { diff --git a/electron/ipc/system.ts b/electron/ipc/system.ts index 9c85c45..c6001dc 100644 --- a/electron/ipc/system.ts +++ b/electron/ipc/system.ts @@ -62,6 +62,9 @@ import { listRunningProcessNames } from '../process-list'; import { detectActiveGame } from '../game-detection'; const DEFAULT_MIME_TYPE = 'application/octet-stream'; +const MAX_ACTIVE_DESKTOP_NOTIFICATIONS = 20; +const activeDesktopNotifications = new Set(); +const desktopNotificationCleanups = new Map void>(); const FILE_CLIPBOARD_FORMATS = [ 'x-special/gnome-copied-files', 'text/uri-list', @@ -399,9 +402,16 @@ export function setupSystemHandlers(): void { icon: getWindowIconPath(), silent: true }); - - notification.on('click', () => { + const cleanup = () => { + notification.removeListener('click', handleClick); + notification.removeListener('close', cleanup); + notification.removeListener('failed', cleanup); + activeDesktopNotifications.delete(notification); + desktopNotificationCleanups.delete(notification); + }; + const handleClick = () => { if (!mainWindow) { + cleanup(); return; } @@ -414,7 +424,26 @@ export function setupSystemHandlers(): void { } mainWindow.focus(); - }); + cleanup(); + notification.close(); + }; + + notification.on('click', handleClick); + notification.once('close', cleanup); + notification.once('failed', cleanup); + activeDesktopNotifications.add(notification); + desktopNotificationCleanups.set(notification, cleanup); + + while (activeDesktopNotifications.size > MAX_ACTIVE_DESKTOP_NOTIFICATIONS) { + const oldestNotification = activeDesktopNotifications.values().next().value; + + if (!oldestNotification) { + break; + } + + desktopNotificationCleanups.get(oldestNotification)?.(); + oldestNotification.close(); + } notification.show(); } catch { diff --git a/toju-app/src/app/infrastructure/realtime/README.md b/toju-app/src/app/infrastructure/realtime/README.md index fa3bfb5..a41032e 100644 --- a/toju-app/src/app/infrastructure/realtime/README.md +++ b/toju-app/src/app/infrastructure/realtime/README.md @@ -250,7 +250,7 @@ Profile avatar sync follows attachment-style chunk transport plus server-icon-st Every 5 seconds a PING message is sent to each peer. The peer responds with PONG carrying the original timestamp, and the round-trip latency is stored in a signal. -Data-channel failures are treated as control-plane failures, not proof that RTP audio has stopped. When an open channel reports a non-fatal error, the client requests a fresh voice-state snapshot over that same channel. When the channel closes or cannot carry the resync request, the peer manager waits a short grace period so any still-flowing audio is not interrupted by a transient event. If the `RTCPeerConnection` is still connected after that grace period, the elected initiator replaces only the data channel in-place and preserves the media transport. Full peer recreation is reserved for cases where the media transport is no longer connected or the in-place control-channel repair fails. +Data-channel failures are treated as control-plane failures. When an open channel reports a non-fatal error, the client requests a fresh voice-state snapshot over that same channel. When the channel is already closed there is no recovery on it, so the peer manager acts immediately: the deterministic initiator renegotiates a new `RTCDataChannel` on the existing `RTCPeerConnection` (preserving audio/video transport), the non-initiator briefly waits for that replacement and then forces a full peer rebuild if it does not arrive, and a peer whose `RTCPeerConnection` is no longer in `connected` state is recreated immediately through the normal deterministic reconnect path. A closing-but-not-yet-closed channel still waits a short grace period in case the underlying transport flips back. Either way, the rebuild heals chat, state sync, voice, camera, and screen-share transport together instead of preserving a media connection whose control channel can no longer coordinate peer state. ## Media pipeline diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.spec.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.spec.ts index 23c5c9a..404c751 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.spec.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.spec.ts @@ -13,7 +13,7 @@ describe('peer recovery', () => { vi.useRealTimers(); }); - it('waits a short grace period before replacing a closed data channel in place', () => { + it('recreates a peer immediately when the data channel is already closed', () => { vi.useFakeTimers(); const channel = createDataChannel('closed'); @@ -24,29 +24,28 @@ describe('peer recovery', () => { scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); + expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true }); + expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', true); + expect(handlers.createAndSendOffer).toHaveBeenCalledWith('bob'); + expect(context.state.dataChannelRecoveryTimers.has('bob')).toBe(false); + }); + + it('waits a short grace period before recreating a peer with a closing data channel', () => { + vi.useFakeTimers(); + + const channel = createDataChannel('closing'); + const context = createContext('alice'); + const handlers = createRecoveryHandlers(context); + + context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected')); + + scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); + vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS - 1); expect(handlers.removePeer).not.toHaveBeenCalled(); expect(handlers.createPeerConnection).not.toHaveBeenCalled(); vi.advanceTimersByTime(1); - expect(handlers.replaceDataChannel).toHaveBeenCalledWith('bob', channel); - expect(handlers.removePeer).not.toHaveBeenCalled(); - expect(handlers.createPeerConnection).not.toHaveBeenCalled(); - expect(handlers.createAndSendOffer).not.toHaveBeenCalled(); - }); - - it('falls back to full peer recreation when in-place data channel replacement fails', () => { - vi.useFakeTimers(); - - const channel = createDataChannel('closed'); - const context = createContext('alice'); - const handlers = createRecoveryHandlers(context); - - handlers.replaceDataChannel.mockReturnValueOnce(false); - context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected')); - - scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); - vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS); expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true }); expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', true); @@ -56,7 +55,7 @@ describe('peer recovery', () => { it('does not recreate a peer when a replacement data channel is adopted before the grace expires', () => { vi.useFakeTimers(); - const staleChannel = createDataChannel('closed'); + const staleChannel = createDataChannel('closing'); const replacementChannel = createDataChannel(DATA_CHANNEL_STATE_OPEN); const context = createContext('alice'); const handlers = createRecoveryHandlers(context); @@ -90,7 +89,7 @@ describe('peer recovery', () => { expect(handlers.createPeerConnection).not.toHaveBeenCalled(); }); - it('preserves a connected non-initiator peer while waiting for the remote initiator to replace the channel', () => { + it('recreates a connected non-initiator peer and waits for the remote initiator offer', () => { vi.useFakeTimers(); const channel = createDataChannel('closed'); @@ -99,11 +98,10 @@ describe('peer recovery', () => { context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected', false)); scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); - vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS); - expect(handlers.removePeer).not.toHaveBeenCalled(); + expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true }); expect(handlers.replaceDataChannel).not.toHaveBeenCalled(); - expect(handlers.createPeerConnection).not.toHaveBeenCalled(); + expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', false); expect(handlers.createAndSendOffer).not.toHaveBeenCalled(); }); diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts index 3502388..cc44e0f 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts @@ -154,6 +154,18 @@ export function scheduleDataChannelRecovery( if (channel.readyState === DATA_CHANNEL_STATE_OPEN) return; + if (channel.readyState === 'closed') { + logger.warn('[data-channel] Control channel closed; reconnecting peer immediately', { + channelLabel: channel.label, + connectionState: peerData.connection.connectionState, + peerId, + reason + }); + + repairUnavailableDataChannel(context, peerId, channel, reason, handlers); + return; + } + if (state.dataChannelRecoveryTimers.has(peerId)) return; @@ -183,35 +195,42 @@ export function scheduleDataChannelRecovery( reason }); - if (latestPeerData.connection.connectionState === CONNECTION_STATE_CONNECTED) { - if (latestPeerData.isInitiator && handlers.replaceDataChannel(peerId, channel)) { - logger.info('[data-channel] Replaced control channel without recreating media transport', { - peerId, - reason - }); - - return; - } - - if (!latestPeerData.isInitiator) { - logger.info('[data-channel] Waiting for initiator to replace control channel; preserving media transport', { - peerId, - reason - }); - - return; - } - } - - trackDisconnectedPeer(state, peerId); - handlers.removePeer(peerId, { preserveReconnectState: true }); - attemptPeerReconnect(context, peerId, handlers); - schedulePeerReconnect(context, peerId, handlers); + repairUnavailableDataChannel(context, peerId, channel, reason, handlers); }, DATA_CHANNEL_RECOVERY_GRACE_MS); state.dataChannelRecoveryTimers.set(peerId, timer); } +function repairUnavailableDataChannel( + context: PeerConnectionManagerContext, + peerId: string, + channel: RTCDataChannel, + reason: string, + handlers: RecoveryHandlers +): void { + const { logger, state } = context; + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData || peerData.dataChannel !== channel) + return; + + if (peerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) + return; + + logger.warn('[data-channel] Recreating peer transport after control channel failure', { + channelLabel: channel.label, + connectionState: peerData.connection.connectionState, + peerId, + readyState: peerData.dataChannel?.readyState ?? null, + reason + }); + + trackDisconnectedPeer(state, peerId); + handlers.removePeer(peerId, { preserveReconnectState: true }); + attemptPeerReconnect(context, peerId, handlers); + schedulePeerReconnect(context, peerId, handlers); +} + export function schedulePeerDisconnectRecovery( context: PeerConnectionManagerContext, peerId: string,