diff --git a/e2e/helpers/webrtc-helpers.ts b/e2e/helpers/webrtc-helpers.ts index 7df6b43..75dc5be 100644 --- a/e2e/helpers/webrtc-helpers.ts +++ b/e2e/helpers/webrtc-helpers.ts @@ -11,6 +11,7 @@ import { type Page } from '@playwright/test'; export async function installWebRTCTracking(page: Page): Promise { await page.addInitScript(() => { const connections: RTCPeerConnection[] = []; + const dataChannels: RTCDataChannel[] = []; const syntheticMediaResources: { audioCtx: AudioContext; source?: AudioScheduledSourceNode; @@ -18,20 +19,40 @@ export async function installWebRTCTracking(page: Page): Promise { }[] = []; (window as any).__rtcConnections = connections; + (window as any).__rtcDataChannels = dataChannels; (window as any).__rtcRemoteTracks = [] as { kind: string; id: string; readyState: string }[]; (window as any).__rtcSyntheticMediaResources = syntheticMediaResources; const OriginalRTCPeerConnection = window.RTCPeerConnection; + const trackDataChannel = (channel: RTCDataChannel) => { + if (dataChannels.includes(channel)) { + return; + } + + dataChannels.push(channel); + }; (window as any).RTCPeerConnection = function(this: RTCPeerConnection, ...args: any[]) { const pc: RTCPeerConnection = new OriginalRTCPeerConnection(...args); + const originalCreateDataChannel = pc.createDataChannel.bind(pc); connections.push(pc); + pc.createDataChannel = ((label: string, options?: RTCDataChannelInit) => { + const channel = originalCreateDataChannel(label, options); + + trackDataChannel(channel); + return channel; + }) as RTCPeerConnection['createDataChannel']; + pc.addEventListener('connectionstatechange', () => { (window as any).__lastRtcState = pc.connectionState; }); + pc.addEventListener('datachannel', (event: RTCDataChannelEvent) => { + trackDataChannel(event.channel); + }); + pc.addEventListener('track', (event: RTCTrackEvent) => { (window as any).__rtcRemoteTracks.push({ kind: event.track.kind, @@ -211,6 +232,66 @@ export async function waitForConnectedPeerCount(page: Page, expectedCount: numbe ); } +/** Returns the number of tracked RTCDataChannels in the open state. */ +export async function getOpenDataChannelCount(page: Page): Promise { + return page.evaluate( + () => ((window as any).__rtcDataChannels as RTCDataChannel[] | undefined)?.filter( + (channel) => channel.readyState === 'open' + ).length ?? 0 + ); +} + +/** Wait until the expected number of tracked RTCDataChannels are open. */ +export async function waitForOpenDataChannelCount(page: Page, expectedCount: number, timeout = 45_000): Promise { + await page.waitForFunction( + (count) => ((window as any).__rtcDataChannels as RTCDataChannel[] | undefined)?.filter( + (channel) => channel.readyState === 'open' + ).length === count, + expectedCount, + { timeout } + ); +} + +/** Close every currently-open RTCDataChannel and return how many were closed. */ +export async function closeOpenDataChannels(page: Page): Promise { + return page.evaluate(() => { + const channels = ((window as any).__rtcDataChannels as RTCDataChannel[] | undefined) ?? []; + + let closed = 0; + + for (const channel of channels) { + if (channel.readyState !== 'open') { + continue; + } + + channel.close(); + closed++; + } + + return closed; + }); +} + +/** Dispatch a synthetic data-channel error event on each open channel. */ +export async function dispatchDataChannelErrors(page: Page): Promise { + return page.evaluate(() => { + const channels = ((window as any).__rtcDataChannels as RTCDataChannel[] | undefined) ?? []; + + let dispatched = 0; + + for (const channel of channels) { + if (channel.readyState !== 'open') { + continue; + } + + channel.dispatchEvent(new Event('error')); + dispatched++; + } + + return dispatched; + }); +} + /** * Resume all suspended AudioContext instances created by the synthetic * media patch. Uses CDP `Runtime.evaluate` with `userGesture: true` so diff --git a/e2e/tests/voice/data-channel-recovery.spec.ts b/e2e/tests/voice/data-channel-recovery.spec.ts new file mode 100644 index 0000000..c310a43 --- /dev/null +++ b/e2e/tests/voice/data-channel-recovery.spec.ts @@ -0,0 +1,181 @@ +import { expect, type Page } from '@playwright/test'; +import { test, type Client } from '../../fixtures/multi-client'; +import { + closeOpenDataChannels, + dispatchDataChannelErrors, + dumpRtcDiagnostics, + getOpenDataChannelCount, + installAutoResumeAudioContext, + installWebRTCTracking, + waitForAllPeerAudioFlow, + waitForAudioStatsPresent, + waitForConnectedPeerCount, + waitForOpenDataChannelCount +} from '../../helpers/webrtc-helpers'; +import { RegisterPage } from '../../pages/register.page'; +import { ServerSearchPage } from '../../pages/server-search.page'; +import { ChatRoomPage } from '../../pages/chat-room.page'; + +interface VoiceClient extends Client { + displayName: string; + username: string; +} + +const USER_PASSWORD = 'TestPass123!'; +const VOICE_CHANNEL = 'General'; + +test.describe('Voice data-channel recovery', () => { + test('keeps two users hearing each other after a data-channel error and close', async ({ createClient }) => { + test.setTimeout(240_000); + + const clients = await createVoiceScenario(createClient, 2, `DC Recovery Duo ${Date.now()}`); + const [alice, bob] = clients; + + await assertMeshAudio(clients, 1, 'initial two-user voice'); + + await test.step('A non-fatal data-channel error does not interrupt audio', async () => { + const dispatched = await dispatchDataChannelErrors(alice.page); + + expect(dispatched).toBeGreaterThan(0); + await waitForOpenDataChannelCount(alice.page, 1, 15_000); + await waitForOpenDataChannelCount(bob.page, 1, 15_000); + await assertMeshAudio(clients, 1, 'after synthetic data-channel error'); + }); + + await test.step('A closed data channel is rebuilt and audio resumes both ways', async () => { + const closed = await closeOpenDataChannels(alice.page); + + expect(closed).toBeGreaterThan(0); + await waitForConnectedPeerCount(alice.page, 1, 60_000); + await waitForConnectedPeerCount(bob.page, 1, 60_000); + await waitForOpenDataChannelCount(alice.page, 1, 60_000); + await waitForOpenDataChannelCount(bob.page, 1, 60_000); + await assertMeshAudio(clients, 1, 'after data-channel close recovery'); + }); + }); + + test('heals a three-user voice mesh when one client loses every data channel', async ({ createClient }) => { + test.setTimeout(300_000); + + const clients = await createVoiceScenario(createClient, 3, `DC Recovery Trio ${Date.now()}`); + const bob = clients[1]; + + await assertMeshAudio(clients, 2, 'initial three-user mesh'); + + await test.step('Bob loses all control channels and the full mesh recovers', async () => { + const closed = await closeOpenDataChannels(bob.page); + + expect(closed).toBe(2); + + for (const client of clients) { + await waitForConnectedPeerCount(client.page, 2, 90_000); + await waitForOpenDataChannelCount(client.page, 2, 90_000); + } + + await assertMeshAudio(clients, 2, 'after full control-channel recovery'); + }); + }); +}); + +async function createVoiceScenario( + createClient: () => Promise, + userCount: number, + serverName: string +): Promise { + const clients: VoiceClient[] = []; + + for (let index = 0; index < userCount; index++) { + const client = await createClient(); + const displayName = `DC Voice ${index + 1}`; + + await installDeterministicVoiceSettings(client.page); + await installWebRTCTracking(client.page); + await installAutoResumeAudioContext(client.page); + + clients.push({ + ...client, + displayName, + username: `dc_voice_${Date.now()}_${index + 1}` + }); + } + + await test.step('Register clients', async () => { + for (const client of clients) { + const registerPage = new RegisterPage(client.page); + + await registerPage.goto(); + await registerPage.register(client.username, client.displayName, USER_PASSWORD); + await expect(client.page).toHaveURL(/\/search/, { timeout: 20_000 }); + } + }); + + await test.step('Create and join server', async () => { + const hostSearch = new ServerSearchPage(clients[0].page); + + await hostSearch.createServer(serverName, { description: 'Data-channel recovery voice test' }); + await expect(clients[0].page).toHaveURL(/\/room\//, { timeout: 20_000 }); + + for (const client of clients.slice(1)) { + const searchPage = new ServerSearchPage(client.page); + + await searchPage.joinServerFromSearch(serverName); + await expect(client.page).toHaveURL(/\/room\//, { timeout: 20_000 }); + } + }); + + await test.step('Join everyone to voice', async () => { + const hostRoom = new ChatRoomPage(clients[0].page); + + await hostRoom.ensureVoiceChannelExists(VOICE_CHANNEL); + + for (const client of clients) { + const room = new ChatRoomPage(client.page); + + await room.joinVoiceChannel(VOICE_CHANNEL); + await expect(room.voiceControls).toBeVisible({ timeout: 20_000 }); + } + + const expectedRemotePeers = clients.length - 1; + + for (const client of clients) { + await waitForConnectedPeerCount(client.page, expectedRemotePeers, 90_000); + await waitForOpenDataChannelCount(client.page, expectedRemotePeers, 90_000); + await waitForAudioStatsPresent(client.page, 30_000); + } + }); + + return clients; +} + +async function installDeterministicVoiceSettings(page: Page): Promise { + await page.addInitScript(() => { + localStorage.setItem('metoyou_voice_settings', JSON.stringify({ + inputVolume: 100, + outputVolume: 100, + audioBitrate: 96, + latencyProfile: 'balanced', + includeSystemAudio: false, + noiseReduction: false, + screenShareQuality: 'balanced', + askScreenShareQuality: false + })); + }); +} + +async function assertMeshAudio( + clients: readonly VoiceClient[], + expectedRemotePeers: number, + label: string +): Promise { + for (const client of clients) { + try { + await waitForAllPeerAudioFlow(client.page, expectedRemotePeers, 60_000); + } catch (error) { + const dataChannelCount = await getOpenDataChannelCount(client.page); + + console.log(`[${client.displayName} ${label} data channels] ${dataChannelCount}`); + console.log(`[${client.displayName} ${label} RTC]\n${await dumpRtcDiagnostics(client.page)}`); + throw error; + } + } +} diff --git a/e2e/tests/voice/direct-call.spec.ts b/e2e/tests/voice/direct-call.spec.ts index a1bef83..e981c3d 100644 --- a/e2e/tests/voice/direct-call.spec.ts +++ b/e2e/tests/voice/direct-call.spec.ts @@ -1,6 +1,7 @@ import { expect, type Page } from '@playwright/test'; import { test, type Client } from '../../fixtures/multi-client'; import { + closeOpenDataChannels, dumpRtcDiagnostics, installAutoResumeAudioContext, installWebRTCTracking, @@ -8,6 +9,7 @@ import { waitForAudioFlow, waitForAudioStatsPresent, waitForConnectedPeerCount, + waitForOpenDataChannelCount, waitForInboundVideoFlow, waitForOutboundVideoFlow, waitForPeerConnected @@ -371,6 +373,35 @@ test.describe('Direct private calls', () => { }); }); + test('keeps private-call audio flowing after the data channel closes', async ({ createClient }) => { + const scenario = await createDirectCallScenario(createClient); + + await test.step('Alice starts a private call and Bob joins', async () => { + await startCallFromSearch(scenario.alice.page, scenario.bobUserId, 'Bob'); + await scenario.bob.page.getByRole('button', { name: 'Open private call' }).click(); + await expect(scenario.bob.page).toHaveURL(/\/call\//, { timeout: 20_000 }); + await scenario.bob.page.getByRole('button', { name: 'Join call' }).click(); + await expect(scenario.bob.page.getByRole('button', { name: 'Leave call' })).toBeVisible({ timeout: 20_000 }); + + await waitForConnectedPeerCount(scenario.alice.page, 1, 45_000); + await waitForConnectedPeerCount(scenario.bob.page, 1, 45_000); + await waitForOpenDataChannelCount(scenario.alice.page, 1, 45_000); + await waitForOpenDataChannelCount(scenario.bob.page, 1, 45_000); + await waitForAllPeerAudioFlow(scenario.alice.page, 1, 45_000); + await waitForAllPeerAudioFlow(scenario.bob.page, 1, 45_000); + }); + + await test.step('Data-channel recovery keeps the call audible', async () => { + const closed = await closeOpenDataChannels(scenario.alice.page); + + expect(closed).toBeGreaterThan(0); + await waitForOpenDataChannelCount(scenario.alice.page, 1, 60_000); + await waitForOpenDataChannelCount(scenario.bob.page, 1, 60_000); + await waitForAllPeerAudioFlow(scenario.alice.page, 1, 60_000); + await waitForAllPeerAudioFlow(scenario.bob.page, 1, 60_000); + }); + }); + test('missing and ended private calls do not leave stale call controls behind', async ({ createClient }) => { const scenario = await createDirectCallScenario(createClient); diff --git a/toju-app/src/app/infrastructure/realtime/README.md b/toju-app/src/app/infrastructure/realtime/README.md index 0977fe8..1ffb925 100644 --- a/toju-app/src/app/infrastructure/realtime/README.md +++ b/toju-app/src/app/infrastructure/realtime/README.md @@ -172,6 +172,8 @@ Join and leave broadcasts are also identity-aware: `handleJoinServer` only broad Peer routing also has to stay scoped to the signaling server that reported the membership. A `user_left` from one signaling cluster must only subtract that cluster's shared servers; otherwise a leave on `signal.toju.app` can incorrectly tear down a peer that is still shared through `signal-sweden.toju.app` or a local signaling server. Route metadata is therefore kept across peer recreation and only cleared once the renderer no longer shares any servers with that peer. +When local voice is active, a transient `user_left` or stale presence snapshot must not immediately mute or tear down a peer whose P2P transport is still alive. The users store receives the current connected peer IDs with signaling presence updates and preserves live voice/camera/screen state while that transport is connected. The signaling handler also preserves an active voice peer route after a `user_left` blip so a later data-channel or peer reconnect can still target the correct signaling URL. Explicit voice leave still travels as a `voice-state` event with `isConnected: false`, and closed/failed peer connections still clean themselves up through the peer recovery path. + ## Peer connection lifecycle Peers connect to each other directly with `RTCPeerConnection`. The initiator is chosen deterministically from the identified logical peer IDs so only one side creates the offer and primary data channel for a given pair. The other side creates an answer. If identity or negotiation is still settling, the retry timer defers instead of comparing against the ephemeral local transport ID or reusing a half-open peer forever. @@ -246,6 +248,8 @@ Profile avatar sync follows attachment-style chunk transport plus server-icon-st Every 5 seconds a PING message is sent to each peer. The peer responds with PONG carrying the original timestamp, and the round-trip latency is stored in a signal. +Data-channel failures are treated as control-plane failures, not proof that RTP audio has stopped. When an open channel reports a non-fatal error, the client requests a fresh voice-state snapshot over that same channel. When the channel closes or cannot carry the resync request, the peer manager waits a short grace period so any still-flowing audio is not interrupted by a transient event. If the `RTCPeerConnection` is still connected after that grace period, the elected initiator replaces only the data channel in-place and preserves the media transport. Full peer recreation is reserved for cases where the media transport is no longer connected or the in-place control-channel repair fails. + ## Media pipeline ### Voice diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.spec.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.spec.ts new file mode 100644 index 0000000..598d4d0 --- /dev/null +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.spec.ts @@ -0,0 +1,131 @@ +import { Subject } from 'rxjs'; +import { + DATA_CHANNEL_STATE_OPEN, + P2P_TYPE_STATE_REQUEST, + P2P_TYPE_VOICE_STATE_REQUEST +} from '../../realtime.constants'; +import { setupDataChannel } from './data-channel'; +import { + createPeerConnectionManagerState, + DataChannelLifecycleHandlers, + PeerConnectionManagerContext +} from '../shared'; + +describe('data channel lifecycle', () => { + it('sends current state and requests peer state when the channel opens', () => { + const context = createContext(); + const handlers = createHandlers(); + const channel = createDataChannel(DATA_CHANNEL_STATE_OPEN); + + setupDataChannel(context, channel, 'peer-a', handlers); + channel.onopen?.(new Event('open')); + + expect(handlers.clearDataChannelRecoveryTimer).toHaveBeenCalledWith('peer-a'); + expect(channel.send).toHaveBeenCalledWith(expect.stringContaining('"type":"voice-state"')); + expect(channel.send).toHaveBeenCalledWith(JSON.stringify({ type: P2P_TYPE_STATE_REQUEST })); + expect(handlers.scheduleDataChannelRecovery).not.toHaveBeenCalled(); + }); + + it('requests a voice-state resync on a non-fatal data channel error', () => { + const context = createContext(); + const handlers = createHandlers(); + const channel = createDataChannel(DATA_CHANNEL_STATE_OPEN); + + setupDataChannel(context, channel, 'peer-a', handlers); + channel.onerror?.(new Event('error')); + + expect(channel.send).toHaveBeenCalledWith(JSON.stringify({ type: P2P_TYPE_VOICE_STATE_REQUEST })); + expect(handlers.scheduleDataChannelRecovery).not.toHaveBeenCalled(); + }); + + it('schedules peer recovery when an error leaves the channel closed', () => { + const context = createContext(); + const handlers = createHandlers(); + const channel = createDataChannel('closed'); + + setupDataChannel(context, channel, 'peer-a', handlers); + channel.onerror?.(new Event('error')); + + expect(channel.send).not.toHaveBeenCalled(); + expect(handlers.scheduleDataChannelRecovery).toHaveBeenCalledWith('peer-a', channel, 'error'); + }); + + it('schedules peer recovery when the data channel closes', () => { + const context = createContext(); + const handlers = createHandlers(); + const channel = createDataChannel('closed'); + + setupDataChannel(context, channel, 'peer-a', handlers); + channel.onclose?.(new Event('close')); + + expect(handlers.scheduleDataChannelRecovery).toHaveBeenCalledWith('peer-a', channel, 'close'); + }); +}); + +function createContext(): PeerConnectionManagerContext { + return { + logger: { + error: vi.fn(), + info: vi.fn(), + logStream: vi.fn(), + traffic: vi.fn(), + warn: vi.fn() + } as unknown as PeerConnectionManagerContext['logger'], + callbacks: { + getIceServers: vi.fn(() => []), + getIdentifyCredentials: vi.fn(() => ({ oderId: 'local-user', displayName: 'Local User' })), + getLocalMediaStream: vi.fn(() => null), + getLocalPeerId: vi.fn(() => 'local-peer'), + getVoiceStateSnapshot: vi.fn(() => ({ + isConnected: true, + isMuted: false, + isDeafened: false, + isScreenSharing: false, + roomId: 'voice-room', + serverId: 'server-1' + })), + isCameraEnabled: vi.fn(() => false), + isScreenSharingActive: vi.fn(() => false), + isSignalingConnected: vi.fn(() => true), + sendRawMessage: vi.fn() + }, + state: { + ...createPeerConnectionManagerState(), + peerConnected$: new Subject() + } + }; +} + +function createHandlers(): DataChannelLifecycleHandlers { + return { + clearDataChannelRecoveryTimer: vi.fn(), + scheduleDataChannelRecovery: vi.fn() + }; +} + +function createDataChannel(readyState: RTCDataChannelState): RTCDataChannel & { send: ReturnType } { + return { + addEventListener: vi.fn(), + binaryType: 'arraybuffer', + bufferedAmount: 0, + bufferedAmountLowThreshold: 0, + close: vi.fn(), + dispatchEvent: vi.fn(), + id: 1, + label: 'chat', + maxPacketLifeTime: null, + maxRetransmits: null, + negotiated: false, + onbufferedamountlow: null, + onclose: null, + onclosing: null, + onerror: null, + onmessage: null, + onopen: null, + ordered: true, + protocol: '', + readyState, + removeEventListener: vi.fn(), + send: vi.fn() + } as unknown as RTCDataChannel & { send: ReturnType }; +} diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts index 04dc76b..f007f17 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts @@ -13,7 +13,7 @@ import { P2P_TYPE_VOICE_STATE_REQUEST } from '../../realtime.constants'; import { recordDebugNetworkDataChannelPayload, recordDebugNetworkPing } from '../../logging/debug-network-metrics'; -import { PeerConnectionManagerContext } from '../shared'; +import { DataChannelLifecycleHandlers, PeerConnectionManagerContext } from '../shared'; import { startPingInterval } from './ping'; type PeerMessage = Record & { @@ -27,11 +27,14 @@ type PeerMessage = Record & { export function setupDataChannel( context: PeerConnectionManagerContext, channel: RTCDataChannel, - remotePeerId: string + remotePeerId: string, + handlers: DataChannelLifecycleHandlers ): void { const { logger, state } = context; channel.onopen = () => { + handlers.clearDataChannelRecoveryTimer(remotePeerId); + logger.info('[data-channel] Data channel open', { channelLabel: channel.label, negotiated: channel.negotiated, @@ -43,22 +46,7 @@ export function setupDataChannel( state.peerConnected$.next(remotePeerId); sendCurrentStatesToChannel(context, channel, remotePeerId); - - try { - const stateRequest = { type: P2P_TYPE_STATE_REQUEST }; - const rawPayload = JSON.stringify(stateRequest); - - channel.send(rawPayload); - logDataChannelTraffic(context, channel, remotePeerId, 'outbound', rawPayload, stateRequest); - } catch (error) { - logger.error('[data-channel] Failed to request peer state on open', error, { - bufferedAmount: channel.bufferedAmount, - channelLabel: channel.label, - peerId: remotePeerId, - readyState: channel.readyState, - type: P2P_TYPE_STATE_REQUEST - }); - } + sendStateRequestToChannel(context, channel, remotePeerId, P2P_TYPE_STATE_REQUEST, 'open'); startPingInterval(context.state, logger, remotePeerId); }; @@ -70,6 +58,8 @@ export function setupDataChannel( peerId: remotePeerId, readyState: channel.readyState }); + + handlers.scheduleDataChannelRecovery(remotePeerId, channel, 'close'); }; channel.onerror = (error) => { @@ -79,6 +69,18 @@ export function setupDataChannel( peerId: remotePeerId, readyState: channel.readyState }); + + const didRequestState = sendStateRequestToChannel( + context, + channel, + remotePeerId, + P2P_TYPE_VOICE_STATE_REQUEST, + 'error' + ); + + if (!didRequestState) { + handlers.scheduleDataChannelRecovery(remotePeerId, channel, 'error'); + } }; channel.onmessage = (event) => { @@ -103,6 +105,48 @@ export function setupDataChannel( }; } +function sendStateRequestToChannel( + context: PeerConnectionManagerContext, + channel: RTCDataChannel, + remotePeerId: string, + type: typeof P2P_TYPE_STATE_REQUEST | typeof P2P_TYPE_VOICE_STATE_REQUEST, + reason: string +): boolean { + const { logger } = context; + + if (channel.readyState !== DATA_CHANNEL_STATE_OPEN) { + logger.warn('[data-channel] Cannot request peer state - channel not open', { + channelLabel: channel.label, + peerId: remotePeerId, + readyState: channel.readyState, + reason, + type + }); + + return false; + } + + try { + const stateRequest = { type }; + const rawPayload = JSON.stringify(stateRequest); + + channel.send(rawPayload); + logDataChannelTraffic(context, channel, remotePeerId, 'outbound', rawPayload, stateRequest); + return true; + } catch (error) { + logger.error('[data-channel] Failed to request peer state', error, { + bufferedAmount: channel.bufferedAmount, + channelLabel: channel.label, + peerId: remotePeerId, + readyState: channel.readyState, + reason, + type + }); + + return false; + } +} + /** * Route an incoming peer-to-peer message. */ diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/peer-connection.manager.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/peer-connection.manager.ts index e546ce2..7629878 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/peer-connection.manager.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/peer-connection.manager.ts @@ -1,5 +1,6 @@ /* eslint-disable @typescript-eslint/member-ordering */ import { ChatEvent } from '../../../shared-kernel'; +import { DATA_CHANNEL_LABEL } from '../realtime.constants'; import { recordDebugNetworkDownloadRates } from '../logging/debug-network-metrics'; import { WebRTCLogger } from '../logging/webrtc-logger'; import { PeerData } from '../realtime.types'; @@ -22,6 +23,7 @@ import { } from './messaging/data-channel'; import { addToConnectedPeers, + clearDataChannelRecoveryTimer, clearAllPeerReconnectTimers, clearPeerDisconnectGraceTimer, clearPeerReconnectTimer, @@ -30,6 +32,7 @@ import { removePeer as removeManagedPeer, requestVoiceStateFromPeer, resetConnectedPeers, + scheduleDataChannelRecovery, schedulePeerDisconnectRecovery, schedulePeerReconnect, trackDisconnectedPeer @@ -38,6 +41,7 @@ import { clearRemoteScreenShareStream as clearManagedRemoteScreenShareStream, ha import { ConnectionLifecycleHandlers, createPeerConnectionManagerState, + DataChannelLifecycleHandlers, NegotiationHandlers, PeerConnectionCallbacks, PeerConnectionManagerContext, @@ -285,6 +289,8 @@ export class PeerConnectionManager { private get recoveryHandlers(): RecoveryHandlers { return { removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options), + replaceDataChannel: (peerId: string, expectedChannel: RTCDataChannel) => + this.replaceDataChannel(peerId, expectedChannel), createPeerConnection: (peerId: string, isInitiator: boolean) => this.createPeerConnection(peerId, isInitiator), createAndSendOffer: (peerId: string) => this.createAndSendOffer(peerId) @@ -292,7 +298,15 @@ export class PeerConnectionManager { } private setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void { - setupDataChannel(this.context, channel, remotePeerId); + setupDataChannel(this.context, channel, remotePeerId, this.dataChannelHandlers); + } + + private get dataChannelHandlers(): DataChannelLifecycleHandlers { + return { + clearDataChannelRecoveryTimer: (peerId: string) => this.clearDataChannelRecoveryTimer(peerId), + scheduleDataChannelRecovery: (peerId: string, channel: RTCDataChannel, reason: string) => + this.scheduleDataChannelRecovery(peerId, channel, reason) + }; } private handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void { @@ -311,6 +325,36 @@ export class PeerConnectionManager { clearPeerDisconnectGraceTimer(this.state, peerId); } + private clearDataChannelRecoveryTimer(peerId: string): void { + clearDataChannelRecoveryTimer(this.state, peerId); + } + + private scheduleDataChannelRecovery(peerId: string, channel: RTCDataChannel, reason: string): void { + scheduleDataChannelRecovery(this.context, peerId, channel, reason, this.recoveryHandlers); + } + + private replaceDataChannel(peerId: string, expectedChannel: RTCDataChannel): boolean { + const peerData = this.state.activePeerConnections.get(peerId); + + if (!peerData || peerData.dataChannel !== expectedChannel) + return false; + + try { + const replacement = peerData.connection.createDataChannel(DATA_CHANNEL_LABEL, { ordered: true }); + + peerData.dataChannel = replacement; + this.setupDataChannel(replacement, peerId); + return true; + } catch (error) { + this.logger.warn('[data-channel] Failed to create replacement data channel', { + error: (error as Error)?.message ?? String(error), + peerId + }); + + return false; + } + } + private schedulePeerDisconnectRecovery(peerId: string): void { schedulePeerDisconnectRecovery(this.context, peerId, this.recoveryHandlers); } diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.spec.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.spec.ts new file mode 100644 index 0000000..23c5c9a --- /dev/null +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.spec.ts @@ -0,0 +1,216 @@ +import { DATA_CHANNEL_RECOVERY_GRACE_MS, DATA_CHANNEL_STATE_OPEN } from '../../realtime.constants'; +import type { PeerData } from '../../realtime.types'; +import { + createPeerConnectionManagerState, + PeerConnectionManagerContext, + RecoveryHandlers +} from '../shared'; +import { scheduleDataChannelRecovery } from './peer-recovery'; + +describe('peer recovery', () => { + afterEach(() => { + vi.clearAllTimers(); + vi.useRealTimers(); + }); + + it('waits a short grace period before replacing a closed data channel in place', () => { + vi.useFakeTimers(); + + const channel = createDataChannel('closed'); + const context = createContext('alice'); + const handlers = createRecoveryHandlers(context); + + context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected')); + + scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); + + vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS - 1); + expect(handlers.removePeer).not.toHaveBeenCalled(); + expect(handlers.createPeerConnection).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(handlers.replaceDataChannel).toHaveBeenCalledWith('bob', channel); + expect(handlers.removePeer).not.toHaveBeenCalled(); + expect(handlers.createPeerConnection).not.toHaveBeenCalled(); + expect(handlers.createAndSendOffer).not.toHaveBeenCalled(); + }); + + it('falls back to full peer recreation when in-place data channel replacement fails', () => { + vi.useFakeTimers(); + + const channel = createDataChannel('closed'); + const context = createContext('alice'); + const handlers = createRecoveryHandlers(context); + + handlers.replaceDataChannel.mockReturnValueOnce(false); + context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected')); + + scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); + vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS); + + expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true }); + expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', true); + expect(handlers.createAndSendOffer).toHaveBeenCalledWith('bob'); + }); + + it('does not recreate a peer when a replacement data channel is adopted before the grace expires', () => { + vi.useFakeTimers(); + + const staleChannel = createDataChannel('closed'); + const replacementChannel = createDataChannel(DATA_CHANNEL_STATE_OPEN); + const context = createContext('alice'); + const handlers = createRecoveryHandlers(context); + + context.state.activePeerConnections.set('bob', createPeerData(staleChannel, 'connected')); + + scheduleDataChannelRecovery(context, 'bob', staleChannel, 'close', handlers); + context.state.activePeerConnections.set('bob', createPeerData(replacementChannel, 'connected')); + + vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS); + + expect(handlers.removePeer).not.toHaveBeenCalled(); + expect(handlers.replaceDataChannel).not.toHaveBeenCalled(); + expect(handlers.createPeerConnection).not.toHaveBeenCalled(); + }); + + it('does not schedule recovery for an open data channel', () => { + vi.useFakeTimers(); + + const channel = createDataChannel(DATA_CHANNEL_STATE_OPEN); + const context = createContext('alice'); + const handlers = createRecoveryHandlers(context); + + context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected')); + + scheduleDataChannelRecovery(context, 'bob', channel, 'error', handlers); + vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS); + + expect(handlers.removePeer).not.toHaveBeenCalled(); + expect(handlers.replaceDataChannel).not.toHaveBeenCalled(); + expect(handlers.createPeerConnection).not.toHaveBeenCalled(); + }); + + it('preserves a connected non-initiator peer while waiting for the remote initiator to replace the channel', () => { + vi.useFakeTimers(); + + const channel = createDataChannel('closed'); + const context = createContext('zoe'); + const handlers = createRecoveryHandlers(context); + + context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected', false)); + scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); + vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS); + + expect(handlers.removePeer).not.toHaveBeenCalled(); + expect(handlers.replaceDataChannel).not.toHaveBeenCalled(); + expect(handlers.createPeerConnection).not.toHaveBeenCalled(); + expect(handlers.createAndSendOffer).not.toHaveBeenCalled(); + }); + + it('waits for the remote initiator when a non-connected peer needs full reconnect', () => { + vi.useFakeTimers(); + + const channel = createDataChannel('closed'); + const context = createContext('zoe'); + const handlers = createRecoveryHandlers(context); + + context.state.activePeerConnections.set('bob', createPeerData(channel, 'disconnected')); + + scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers); + vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS); + + expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', false); + expect(handlers.createAndSendOffer).not.toHaveBeenCalled(); + }); +}); + +function createContext(localOderId: string): PeerConnectionManagerContext { + return { + logger: { + error: vi.fn(), + info: vi.fn(), + logStream: vi.fn(), + traffic: vi.fn(), + warn: vi.fn() + } as unknown as PeerConnectionManagerContext['logger'], + callbacks: { + getIceServers: vi.fn(() => []), + getIdentifyCredentials: vi.fn(() => ({ oderId: localOderId, displayName: localOderId })), + getLocalMediaStream: vi.fn(() => null), + getLocalPeerId: vi.fn(() => localOderId), + getVoiceStateSnapshot: vi.fn(() => ({ + isConnected: true, + isMuted: false, + isDeafened: false, + isScreenSharing: false, + roomId: 'voice-room', + serverId: 'server-1' + })), + isCameraEnabled: vi.fn(() => false), + isScreenSharingActive: vi.fn(() => false), + isSignalingConnected: vi.fn(() => true), + sendRawMessage: vi.fn() + }, + state: createPeerConnectionManagerState() + }; +} + +function createRecoveryHandlers(context: PeerConnectionManagerContext): RecoveryHandlers & { + createAndSendOffer: ReturnType; + createPeerConnection: ReturnType; + removePeer: ReturnType; + replaceDataChannel: ReturnType; +} { + return { + createAndSendOffer: vi.fn(async () => undefined), + createPeerConnection: vi.fn((peerId: string, isInitiator: boolean) => { + const peerData = createPeerData(createDataChannel(isInitiator ? 'connecting' : 'closed'), 'new', isInitiator); + + context.state.activePeerConnections.set(peerId, peerData); + return peerData; + }), + removePeer: vi.fn((peerId: string) => { + context.state.activePeerConnections.delete(peerId); + }), + replaceDataChannel: vi.fn((peerId: string, expectedChannel: RTCDataChannel) => { + const peerData = context.state.activePeerConnections.get(peerId); + + if (!peerData || peerData.dataChannel !== expectedChannel) { + return false; + } + + peerData.dataChannel = createDataChannel('connecting'); + return true; + }) + }; +} + +function createPeerData( + dataChannel: RTCDataChannel, + connectionState: RTCPeerConnectionState, + isInitiator = true +): PeerData { + return { + audioSender: undefined, + connection: { + close: vi.fn(), + connectionState + } as unknown as RTCPeerConnection, + createdAt: Date.now(), + dataChannel, + isInitiator, + pendingIceCandidates: [], + remoteCameraStreamIds: new Set(), + remoteScreenShareStreamIds: new Set(), + remoteVoiceStreamIds: new Set(), + videoSender: undefined + }; +} + +function createDataChannel(readyState: RTCDataChannelState): RTCDataChannel { + return { + bufferedAmount: 0, + label: 'chat', + readyState + } as unknown as RTCDataChannel; +} diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts index aa5458f..3502388 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts @@ -1,5 +1,6 @@ import { CONNECTION_STATE_CONNECTED, + DATA_CHANNEL_RECOVERY_GRACE_MS, DATA_CHANNEL_STATE_OPEN, P2P_TYPE_VOICE_STATE_REQUEST, PEER_DISCONNECT_GRACE_MS, @@ -27,6 +28,7 @@ export function removePeer( const preserveReconnectState = options?.preserveReconnectState === true; clearPeerDisconnectGraceTimer(state, peerId); + clearDataChannelRecoveryTimer(state, peerId); if (!preserveReconnectState) { clearPeerReconnectTimer(state, peerId); @@ -56,6 +58,7 @@ export function removePeer( export function closeAllPeers(state: PeerConnectionManagerState): void { clearAllPeerReconnectTimers(state); clearAllPeerDisconnectGraceTimers(state); + clearAllDataChannelRecoveryTimers(state); clearAllPingTimers(state); state.activePeerConnections.forEach((peerData) => { @@ -106,6 +109,18 @@ export function clearPeerDisconnectGraceTimer( } } +export function clearDataChannelRecoveryTimer( + state: PeerConnectionManagerState, + peerId: string +): void { + const timer = state.dataChannelRecoveryTimers.get(peerId); + + if (timer) { + clearTimeout(timer); + state.dataChannelRecoveryTimers.delete(peerId); + } +} + /** Cancel all pending peer reconnect timers and clear the tracker. */ export function clearAllPeerReconnectTimers(state: PeerConnectionManagerState): void { state.peerReconnectTimers.forEach((timer) => clearInterval(timer)); @@ -118,6 +133,85 @@ export function clearAllPeerDisconnectGraceTimers(state: PeerConnectionManagerSt state.peerDisconnectGraceTimers.clear(); } +export function clearAllDataChannelRecoveryTimers(state: PeerConnectionManagerState): void { + state.dataChannelRecoveryTimers.forEach((timer) => clearTimeout(timer)); + state.dataChannelRecoveryTimers.clear(); +} + +export function scheduleDataChannelRecovery( + context: PeerConnectionManagerContext, + peerId: string, + channel: RTCDataChannel, + reason: string, + handlers: RecoveryHandlers +): void { + const { logger, state } = context; + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData || peerData.dataChannel !== channel) + return; + + if (channel.readyState === DATA_CHANNEL_STATE_OPEN) + return; + + if (state.dataChannelRecoveryTimers.has(peerId)) + return; + + logger.warn('[data-channel] Control channel unavailable; waiting before reconnect', { + channelLabel: channel.label, + peerId, + readyState: channel.readyState, + reason + }); + + const timer = setTimeout(() => { + state.dataChannelRecoveryTimers.delete(peerId); + + const latestPeerData = state.activePeerConnections.get(peerId); + + if (!latestPeerData || latestPeerData.dataChannel !== channel) + return; + + if (latestPeerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) + return; + + logger.warn('[data-channel] Control channel did not recover; selecting repair path', { + channelLabel: channel.label, + connectionState: latestPeerData.connection.connectionState, + peerId, + readyState: latestPeerData.dataChannel?.readyState ?? null, + reason + }); + + if (latestPeerData.connection.connectionState === CONNECTION_STATE_CONNECTED) { + if (latestPeerData.isInitiator && handlers.replaceDataChannel(peerId, channel)) { + logger.info('[data-channel] Replaced control channel without recreating media transport', { + peerId, + reason + }); + + return; + } + + if (!latestPeerData.isInitiator) { + logger.info('[data-channel] Waiting for initiator to replace control channel; preserving media transport', { + peerId, + reason + }); + + return; + } + } + + trackDisconnectedPeer(state, peerId); + handlers.removePeer(peerId, { preserveReconnectState: true }); + attemptPeerReconnect(context, peerId, handlers); + schedulePeerReconnect(context, peerId, handlers); + }, DATA_CHANNEL_RECOVERY_GRACE_MS); + + state.dataChannelRecoveryTimers.set(peerId, timer); +} + export function schedulePeerDisconnectRecovery( context: PeerConnectionManagerContext, peerId: string, diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/shared.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/shared.ts index b4ba764..49d2115 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/shared.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/shared.ts @@ -42,6 +42,7 @@ export interface PeerConnectionManagerState { disconnectedPeerTracker: Map; peerReconnectTimers: Map>; peerDisconnectGraceTimers: Map>; + dataChannelRecoveryTimers: Map>; pendingPings: Map; peerPingTimers: Map>; peerLatencies: Map; @@ -78,12 +79,18 @@ export interface ConnectionLifecycleHandlers { setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void; } +export interface DataChannelLifecycleHandlers { + clearDataChannelRecoveryTimer(peerId: string): void; + scheduleDataChannelRecovery(peerId: string, channel: RTCDataChannel, reason: string): void; +} + export interface NegotiationHandlers { createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData; } export interface RecoveryHandlers { removePeer(peerId: string, options?: RemovePeerOptions): void; + replaceDataChannel(peerId: string, expectedChannel: RTCDataChannel): boolean; createPeerConnection(peerId: string, isInitiator: boolean): PeerData; createAndSendOffer(peerId: string): Promise; } @@ -98,6 +105,7 @@ export function createPeerConnectionManagerState(): PeerConnectionManagerState { disconnectedPeerTracker: new Map(), peerReconnectTimers: new Map>(), peerDisconnectGraceTimers: new Map>(), + dataChannelRecoveryTimers: new Map>(), pendingPings: new Map(), peerPingTimers: new Map>(), peerLatencies: new Map(), diff --git a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts index 2c0f466..3e6f3f5 100644 --- a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts +++ b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts @@ -181,6 +181,7 @@ export class WebRTCService implements OnDestroy { this.signalingMessageHandler = new IncomingSignalingMessageHandler({ getLocalOderId: () => this.signalingTransportHandler.getIdentifyCredentials()?.oderId ?? null, getEffectiveServerId: () => this.voiceSessionController.getEffectiveServerId(this.state.currentServerId), + isVoiceConnected: () => this.state.isVoiceConnectedActive(), peerManager: this.peerManager, setServerTime: (serverTime) => this.timeSync.setFromServerTime(serverTime), signalingCoordinator: this.signalingCoordinator, diff --git a/toju-app/src/app/infrastructure/realtime/realtime.constants.ts b/toju-app/src/app/infrastructure/realtime/realtime.constants.ts index bd71be8..56c0a1c 100644 --- a/toju-app/src/app/infrastructure/realtime/realtime.constants.ts +++ b/toju-app/src/app/infrastructure/realtime/realtime.constants.ts @@ -40,6 +40,8 @@ export const DATA_CHANNEL_LABEL = 'chat'; export const DATA_CHANNEL_HIGH_WATER_BYTES = 4 * 1024 * 1024; // 4 MB /** Low-water mark (bytes) - resume sending once buffered amount drops below this */ export const DATA_CHANNEL_LOW_WATER_BYTES = 1 * 1024 * 1024; // 1 MB +/** Grace period before recreating a peer whose data channel closed while media may still be flowing. */ +export const DATA_CHANNEL_RECOVERY_GRACE_MS = 2_500; export const SCREEN_SHARE_IDEAL_WIDTH = 1920; export const SCREEN_SHARE_IDEAL_HEIGHT = 1080; diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.spec.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.spec.ts new file mode 100644 index 0000000..ac232c7 --- /dev/null +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.spec.ts @@ -0,0 +1,105 @@ +import type { PeerData } from '../realtime.types'; +import { PeerConnectionManager } from '../peer-connection-manager/peer-connection.manager'; +import { IncomingSignalingMessageHandler } from './signaling-message-handler'; +import { ServerSignalingCoordinator } from './server-signaling-coordinator'; + +describe('IncomingSignalingMessageHandler user_left handling', () => { + it('preserves an active voice peer on a transient user_left signal', () => { + const context = createHandlerContext({ voiceConnected: true }); + + context.coordinator.addJoinedServer('ws://signal-a', 'server-1'); + context.coordinator.trackPeerInServer('peer-a', 'server-1', 'ws://signal-a'); + context.peerManager.activePeerConnections.set('peer-a', createPeerData('connected', 'open')); + + context.handler.handleMessage({ type: 'user_left', oderId: 'peer-a', serverId: 'server-1' }, 'ws://signal-a'); + + expect(context.peerManager.removePeer).not.toHaveBeenCalled(); + expect(context.coordinator.getPeerSignalUrl('peer-a')).toBe('ws://signal-a'); + expect(context.coordinator.hasTrackedPeerServers('peer-a')).toBe(true); + }); + + it('removes a peer on user_left when local voice is not active', () => { + const context = createHandlerContext({ voiceConnected: false }); + + context.coordinator.trackPeerInServer('peer-a', 'server-1', 'ws://signal-a'); + context.peerManager.activePeerConnections.set('peer-a', createPeerData('connected', 'open')); + + context.handler.handleMessage({ type: 'user_left', oderId: 'peer-a', serverId: 'server-1' }, 'ws://signal-a'); + + expect(context.peerManager.removePeer).toHaveBeenCalledWith('peer-a'); + expect(context.coordinator.getPeerSignalUrl('peer-a')).toBeUndefined(); + }); + + it('removes a stale voice peer when no active P2P transport remains', () => { + const context = createHandlerContext({ voiceConnected: true }); + + context.coordinator.trackPeerInServer('peer-a', 'server-1', 'ws://signal-a'); + context.peerManager.activePeerConnections.set('peer-a', createPeerData('failed', 'closed')); + + context.handler.handleMessage({ type: 'user_left', oderId: 'peer-a', serverId: 'server-1' }, 'ws://signal-a'); + + expect(context.peerManager.removePeer).toHaveBeenCalledWith('peer-a'); + expect(context.coordinator.getPeerSignalUrl('peer-a')).toBeUndefined(); + }); +}); + +interface HandlerContext { + coordinator: ServerSignalingCoordinator; + handler: IncomingSignalingMessageHandler; + peerManager: PeerConnectionManager & { + activePeerConnections: Map; + removePeer: ReturnType; + }; +} + +function createHandlerContext(options: { voiceConnected: boolean }): HandlerContext { + const coordinator = new ServerSignalingCoordinator({ + createManager: vi.fn(), + handleConnectionStatus: vi.fn(), + handleHeartbeatTick: vi.fn(), + handleMessage: vi.fn() + }); + const peerManager = { + activePeerConnections: new Map(), + removePeer: vi.fn() + } as unknown as HandlerContext['peerManager']; + const handler = new IncomingSignalingMessageHandler({ + getEffectiveServerId: () => 'server-1', + getLocalOderId: () => 'local-user', + isVoiceConnected: () => options.voiceConnected, + logger: { + error: vi.fn(), + info: vi.fn(), + logStream: vi.fn(), + traffic: vi.fn(), + warn: vi.fn() + } as unknown as ConstructorParameters[0]['logger'], + peerManager, + setServerTime: vi.fn(), + signalingCoordinator: coordinator + }); + + return { coordinator, handler, peerManager }; +} + +function createPeerData( + connectionState: RTCPeerConnectionState, + dataChannelState: RTCDataChannelState +): PeerData { + return { + audioSender: undefined, + connection: { + connectionState + } as RTCPeerConnection, + createdAt: Date.now(), + dataChannel: { + readyState: dataChannelState + } as RTCDataChannel, + isInitiator: true, + pendingIceCandidates: [], + remoteCameraStreamIds: new Set(), + remoteScreenShareStreamIds: new Set(), + remoteVoiceStreamIds: new Set(), + videoSender: undefined + }; +} diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts index 6e9482c..97e628c 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts @@ -45,6 +45,7 @@ interface IncomingSignalingMessageHandlerDependencies { logger: WebRTCLogger; getLocalOderId(): string | null; getEffectiveServerId(): string | null; + isVoiceConnected(): boolean; setServerTime(serverTime: number): void; } @@ -294,7 +295,7 @@ export class IncomingSignalingMessageHandler { if (message.oderId) { this.clearUserJoinedFallbackOffer(message.oderId); this.nonInitiatorWaitStart.delete(message.oderId); - + const existing = this.dependencies.peerManager.activePeerConnections.get(message.oderId); const hasRemainingSharedServers = Array.isArray(message.serverIds) ? this.dependencies.signalingCoordinator.replacePeerSharedServers(message.oderId, signalUrl, message.serverIds) : message.serverId @@ -302,6 +303,16 @@ export class IncomingSignalingMessageHandler { : false; if (!hasRemainingSharedServers) { + const serverIdsToPreserve = Array.isArray(message.serverIds) + ? message.serverIds + : message.serverId + ? [message.serverId] + : []; + + if (this.shouldPreserveActiveVoicePeerAfterLeave(message.oderId, existing, signalUrl, serverIdsToPreserve)) { + return; + } + this.dependencies.peerManager.removePeer(message.oderId); this.dependencies.signalingCoordinator.deletePeerTracking(message.oderId); } @@ -533,6 +544,41 @@ export class IncomingSignalingMessageHandler { return connectionState === 'connected' || peer.dataChannel?.readyState === 'open'; } + private shouldPreserveActiveVoicePeerAfterLeave( + peerId: string, + peer: PeerData | undefined, + signalUrl: string, + serverIds: readonly string[] + ): boolean { + if (!this.dependencies.isVoiceConnected() || !this.hasActivePeerConnection(peer)) { + return false; + } + + let restoredServerScope = false; + + for (const serverId of serverIds) { + if (!this.dependencies.signalingCoordinator.hasJoinedServer(serverId)) { + continue; + } + + this.dependencies.signalingCoordinator.trackPeerInServer(peerId, serverId, signalUrl); + restoredServerScope = true; + } + + if (!restoredServerScope) { + this.dependencies.signalingCoordinator.setPeerSignalUrl(peerId, signalUrl); + } + + this.dependencies.logger.warn('Preserving active voice peer after transient user_left', { + connectionState: peer?.connection.connectionState ?? 'unknown', + dataChannelState: peer?.dataChannel?.readyState ?? 'missing', + peerId, + signalUrl + }); + + return true; + } + private isPeerConnectionNegotiating(peer: PeerData | undefined): boolean { if (!peer || this.hasActivePeerConnection(peer)) return false; diff --git a/toju-app/src/app/store/rooms/room-state-sync.effects.ts b/toju-app/src/app/store/rooms/room-state-sync.effects.ts index 6d0e0e8..d020806 100644 --- a/toju-app/src/app/store/rooms/room-state-sync.effects.ts +++ b/toju-app/src/app/store/rooms/room-state-sync.effects.ts @@ -195,7 +195,8 @@ export class RoomStateSyncEffects { UsersActions.userLeft({ userId: signalingMessage.oderId, serverId: signalingMessage.serverId, - serverIds: remainingServerIds + serverIds: remainingServerIds, + connectedPeerIds: this.webrtc.getConnectedPeers() }) ]; diff --git a/toju-app/src/app/store/users/users-status.reducer.spec.ts b/toju-app/src/app/store/users/users-status.reducer.spec.ts index 14e22f6..6030d80 100644 --- a/toju-app/src/app/store/users/users-status.reducer.spec.ts +++ b/toju-app/src/app/store/users/users-status.reducer.spec.ts @@ -300,6 +300,98 @@ describe('users reducer - status', () => { expect(state.entities['u3']?.cameraState?.isEnabled).toBe(false); expect(state.entities['u3']?.screenShareState?.isSharing).toBe(false); }); + + it('preserves voice state on user_left while live peer transport still exists', () => { + const remoteUser = createUser({ + id: 'u4', + oderId: 'u4', + displayName: 'Transient Peer', + presenceServerIds: ['s1'], + status: 'online', + voiceState: { + isConnected: true, + isMuted: false, + isDeafened: false, + isSpeaking: true, + roomId: 'voice-1', + serverId: 's1' + }, + cameraState: { isEnabled: true } + }); + const withUser = usersReducer(baseState, UsersActions.userJoined({ user: remoteUser })); + const state = usersReducer(withUser, UsersActions.userLeft({ + userId: 'u4', + serverId: 's1', + connectedPeerIds: ['u4'] + })); + + expect(state.entities['u4']?.presenceServerIds).toEqual(['s1']); + expect(state.entities['u4']?.isOnline).toBe(true); + expect(state.entities['u4']?.status).toBe('online'); + expect(state.entities['u4']?.voiceState?.isConnected).toBe(true); + expect(state.entities['u4']?.voiceState?.roomId).toBe('voice-1'); + expect(state.entities['u4']?.cameraState?.isEnabled).toBe(true); + }); + + it('matches live user_left transports by oderId and peerId', () => { + const remoteUser = createUser({ + id: 'db-id-5', + oderId: 'oder-5', + peerId: 'peer-5', + displayName: 'Peer Id Match', + presenceServerIds: ['s1'], + voiceState: { + isConnected: true, + isMuted: false, + isDeafened: false, + isSpeaking: false, + roomId: 'voice-1', + serverId: 's1' + } + }); + const withUser = usersReducer(baseState, UsersActions.userJoined({ user: remoteUser })); + const byOderId = usersReducer(withUser, UsersActions.userLeft({ + userId: 'db-id-5', + serverId: 's1', + connectedPeerIds: ['oder-5'] + })); + const byPeerId = usersReducer(withUser, UsersActions.userLeft({ + userId: 'db-id-5', + serverId: 's1', + connectedPeerIds: ['peer-5'] + })); + + expect(byOderId.entities['db-id-5']?.voiceState?.isConnected).toBe(true); + expect(byPeerId.entities['db-id-5']?.voiceState?.isConnected).toBe(true); + }); + + it('clears voice state on user_left when the peer transport is gone', () => { + const remoteUser = createUser({ + id: 'u6', + oderId: 'u6', + displayName: 'Gone Peer', + presenceServerIds: ['s1'], + voiceState: { + isConnected: true, + isMuted: false, + isDeafened: false, + isSpeaking: true, + roomId: 'voice-1', + serverId: 's1' + } + }); + const withUser = usersReducer(baseState, UsersActions.userJoined({ user: remoteUser })); + const state = usersReducer(withUser, UsersActions.userLeft({ + userId: 'u6', + serverId: 's1', + connectedPeerIds: [] + })); + + expect(state.entities['u6']?.presenceServerIds).toBeUndefined(); + expect(state.entities['u6']?.isOnline).toBe(false); + expect(state.entities['u6']?.voiceState?.isConnected).toBe(false); + expect(state.entities['u6']?.voiceState?.roomId).toBeUndefined(); + }); }); describe('manual status overrides auto idle', () => { diff --git a/toju-app/src/app/store/users/users.actions.ts b/toju-app/src/app/store/users/users.actions.ts index 59d22ab..ec008bc 100644 --- a/toju-app/src/app/store/users/users.actions.ts +++ b/toju-app/src/app/store/users/users.actions.ts @@ -32,7 +32,7 @@ export const UsersActions = createActionGroup({ 'Load Room Users Failure': props<{ error: string }>(), 'User Joined': props<{ user: User }>(), - 'User Left': props<{ userId: string; serverId?: string; serverIds?: string[] }>(), + 'User Left': props<{ userId: string; serverId?: string; serverIds?: string[]; connectedPeerIds?: string[] }>(), 'Sync Server Presence': props<{ roomId: string; users: User[]; connectedPeerIds?: string[] }>(), 'Update User': props<{ userId: string; updates: Partial }>(), diff --git a/toju-app/src/app/store/users/users.reducer.ts b/toju-app/src/app/store/users/users.reducer.ts index 27ffcda..3b7e63b 100644 --- a/toju-app/src/app/store/users/users.reducer.ts +++ b/toju-app/src/app/store/users/users.reducer.ts @@ -227,7 +227,8 @@ function buildAvatarUser(existingUser: User | undefined, incomingUser: { function buildPresenceRemovalChanges( user: User, - update: { serverId?: string; serverIds?: readonly string[] } + update: { serverId?: string; serverIds?: readonly string[] }, + connectedPeerIds: ReadonlySet = new Set() ): Partial { const nextPresenceServerIds = update.serverIds !== undefined ? normalizePresenceServerIds(update.serverIds) @@ -235,6 +236,24 @@ function buildPresenceRemovalChanges( const isOnline = (nextPresenceServerIds?.length ?? 0) > 0; const shouldClearLiveState = !isOnline || (!!user.voiceState?.serverId && !nextPresenceServerIds?.includes(user.voiceState.serverId)); + const hasLiveState = user.voiceState?.isConnected === true + || user.screenShareState?.isSharing === true + || user.cameraState?.isEnabled === true; + + if (shouldClearLiveState && hasLiveState && hasLivePeerTransport(user, connectedPeerIds)) { + const preservedPresenceServerIds = user.presenceServerIds + ?? (user.voiceState?.serverId ? [user.voiceState.serverId] : undefined); + + return { + presenceServerIds: preservedPresenceServerIds, + isOnline: true, + status: user.status && user.status !== 'offline' && user.status !== 'disconnected' ? user.status : 'online', + voiceState: user.voiceState, + screenShareState: user.screenShareState, + cameraState: user.cameraState, + gameActivity: user.gameActivity + }; + } return { presenceServerIds: nextPresenceServerIds, @@ -390,7 +409,7 @@ export const usersReducer = createReducer( ? usersAdapter.updateMany(stalePresenceUpdates, nextState) : nextState; }), - on(UsersActions.userLeft, (state, { userId, serverId, serverIds }) => { + on(UsersActions.userLeft, (state, { userId, serverId, serverIds, connectedPeerIds }) => { const existingUser = state.entities[userId]; if (!existingUser) { @@ -409,7 +428,7 @@ export const usersReducer = createReducer( changes: buildPresenceRemovalChanges(existingUser, { serverId, serverIds - }) + }, new Set(connectedPeerIds ?? [])) }, state );