import { ChatEvent } from '../../../../shared-kernel'; import { DATA_CHANNEL_HIGH_WATER_BYTES, DATA_CHANNEL_LOW_WATER_BYTES, DATA_CHANNEL_STATE_OPEN, DEFAULT_DISPLAY_NAME, P2P_TYPE_CAMERA_STATE, P2P_TYPE_PING, P2P_TYPE_PONG, P2P_TYPE_SCREEN_STATE, P2P_TYPE_STATE_REQUEST, P2P_TYPE_VOICE_STATE, P2P_TYPE_VOICE_STATE_REQUEST } from '../../realtime.constants'; import { recordDebugNetworkDataChannelPayload, recordDebugNetworkPing } from '../../logging/debug-network-metrics'; import { PeerConnectionManagerContext } from '../shared'; import { startPingInterval } from './ping'; type PeerMessage = Record & { type?: string; ts?: number; }; /** * Wire open/close/error/message handlers onto a data channel. */ export function setupDataChannel( context: PeerConnectionManagerContext, channel: RTCDataChannel, remotePeerId: string ): void { const { logger } = context; channel.onopen = () => { logger.info('[data-channel] Data channel open', { channelLabel: channel.label, negotiated: channel.negotiated, ordered: channel.ordered, peerId: remotePeerId, protocol: channel.protocol || null }); sendCurrentStatesToChannel(context, channel, remotePeerId); try { const stateRequest = { type: P2P_TYPE_STATE_REQUEST }; const rawPayload = JSON.stringify(stateRequest); channel.send(rawPayload); logDataChannelTraffic(context, channel, remotePeerId, 'outbound', rawPayload, stateRequest); } catch (error) { logger.error('[data-channel] Failed to request peer state on open', error, { bufferedAmount: channel.bufferedAmount, channelLabel: channel.label, peerId: remotePeerId, readyState: channel.readyState, type: P2P_TYPE_STATE_REQUEST }); } startPingInterval(context.state, logger, remotePeerId); }; channel.onclose = () => { logger.info('[data-channel] Data channel closed', { bufferedAmount: channel.bufferedAmount, channelLabel: channel.label, peerId: remotePeerId, readyState: channel.readyState }); }; channel.onerror = (error) => { logger.error('[data-channel] Data channel error', error, { bufferedAmount: channel.bufferedAmount, channelLabel: channel.label, peerId: remotePeerId, readyState: channel.readyState }); }; channel.onmessage = (event) => { const rawPayload = typeof event.data === 'string' ? event.data : String(event.data ?? ''); try { const message = JSON.parse(rawPayload) as PeerMessage; logDataChannelTraffic(context, channel, remotePeerId, 'inbound', rawPayload, message); handlePeerMessage(context, remotePeerId, message); } catch (error) { logger.error('[data-channel] Failed to parse peer message', error, { bytes: measurePayloadBytes(rawPayload), channelLabel: channel.label, peerId: remotePeerId, rawPreview: getRawPreview(rawPayload) }); } }; } /** * Route an incoming peer-to-peer message. */ export function handlePeerMessage( context: PeerConnectionManagerContext, peerId: string, message: PeerMessage ): void { const { logger, state } = context; logger.info('[data-channel] Received P2P message', summarizePeerMessage(message, { peerId })); recordDebugNetworkDataChannelPayload(peerId, message, 'inbound'); if (message.type === P2P_TYPE_STATE_REQUEST || message.type === P2P_TYPE_VOICE_STATE_REQUEST) { sendCurrentStatesToPeer(context, peerId); return; } if (message.type === P2P_TYPE_PING) { sendToPeer(context, peerId, { type: P2P_TYPE_PONG, ts: message.ts }); return; } if (message.type === P2P_TYPE_PONG) { const sentAt = state.pendingPings.get(peerId); if (sentAt && typeof message.ts === 'number' && message.ts === sentAt) { const latencyMs = Math.round(performance.now() - sentAt); state.peerLatencies.set(peerId, latencyMs); state.peerLatencyChanged$.next({ peerId, latencyMs }); recordDebugNetworkPing(peerId, latencyMs); logger.info('[data-channel] Peer latency updated', { latencyMs, peerId }); } state.pendingPings.delete(peerId); return; } const enrichedMessage = { ...message, fromPeerId: peerId } as ChatEvent; state.messageReceived$.next(enrichedMessage); } /** Broadcast a ChatEvent to every peer with an open data channel. */ export function broadcastMessage( context: PeerConnectionManagerContext, event: object ): void { const { logger, state } = context; let data = ''; try { data = JSON.stringify(event); } catch (error) { logger.error('[data-channel] Failed to serialize broadcast payload', error, { payloadPreview: summarizePeerMessage(event as PeerMessage) }); return; } state.activePeerConnections.forEach((peerData, peerId) => { try { if (peerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) { peerData.dataChannel.send(data); recordDebugNetworkDataChannelPayload(peerId, event as PeerMessage, 'outbound'); logDataChannelTraffic(context, peerData.dataChannel, peerId, 'outbound', data, event as PeerMessage); } } catch (error) { logger.error('[data-channel] Failed to broadcast message to peer', error, { bufferedAmount: peerData.dataChannel?.bufferedAmount, channelLabel: peerData.dataChannel?.label, payloadPreview: summarizePeerMessage(event as PeerMessage), peerId, readyState: peerData.dataChannel?.readyState ?? null }); } }); } /** * Send a ChatEvent to a specific peer's data channel. */ export function sendToPeer( context: PeerConnectionManagerContext, peerId: string, event: object ): void { const { logger, state } = context; const peerData = state.activePeerConnections.get(peerId); if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) { logger.warn('Peer not connected - cannot send', { peerId }); return; } try { const rawPayload = JSON.stringify(event); peerData.dataChannel.send(rawPayload); recordDebugNetworkDataChannelPayload(peerId, event as PeerMessage, 'outbound'); logDataChannelTraffic(context, peerData.dataChannel, peerId, 'outbound', rawPayload, event as PeerMessage); } catch (error) { logger.error('[data-channel] Failed to send message to peer', error, { bufferedAmount: peerData.dataChannel.bufferedAmount, channelLabel: peerData.dataChannel.label, payloadPreview: summarizePeerMessage(event as PeerMessage), peerId, readyState: peerData.dataChannel.readyState }); } } /** * Send a ChatEvent with back-pressure awareness. */ export async function sendToPeerBuffered( context: PeerConnectionManagerContext, peerId: string, event: object ): Promise { const { logger, state } = context; const peerData = state.activePeerConnections.get(peerId); if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) { logger.warn('Peer not connected - cannot send buffered', { peerId }); return; } const channel = peerData.dataChannel; const data = JSON.stringify(event); if (typeof channel.bufferedAmountLowThreshold === 'number') { channel.bufferedAmountLowThreshold = DATA_CHANNEL_LOW_WATER_BYTES; } if (channel.bufferedAmount > DATA_CHANNEL_HIGH_WATER_BYTES) { logger.warn('[data-channel] Waiting for buffered amount to drain', { bufferedAmount: channel.bufferedAmount, channelLabel: channel.label, highWaterMark: DATA_CHANNEL_HIGH_WATER_BYTES, lowWaterMark: DATA_CHANNEL_LOW_WATER_BYTES, peerId }); await new Promise((resolve) => { const handleBufferedAmountLow = () => { if (channel.bufferedAmount <= DATA_CHANNEL_LOW_WATER_BYTES) { channel.removeEventListener('bufferedamountlow', handleBufferedAmountLow); resolve(); } }; channel.addEventListener('bufferedamountlow', handleBufferedAmountLow, { once: true }); }); } try { channel.send(data); recordDebugNetworkDataChannelPayload(peerId, event as PeerMessage, 'outbound'); logDataChannelTraffic(context, channel, peerId, 'outbound', data, event as PeerMessage); } catch (error) { logger.error('[data-channel] Failed to send buffered message', error, { bufferedAmount: channel.bufferedAmount, channelLabel: channel.label, payloadPreview: summarizePeerMessage(event as PeerMessage), peerId, readyState: channel.readyState }); } } /** * Send the current voice, camera, and screen-share states to a single peer. */ export function sendCurrentStatesToPeer( context: PeerConnectionManagerContext, peerId: string ): void { const { callbacks } = context; const credentials = callbacks.getIdentifyCredentials(); const oderId = credentials?.oderId || callbacks.getLocalPeerId(); const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; const voiceState = callbacks.getVoiceStateSnapshot(); sendToPeer(context, peerId, { type: P2P_TYPE_VOICE_STATE, oderId, displayName, voiceState }); sendToPeer(context, peerId, { type: P2P_TYPE_SCREEN_STATE, oderId, displayName, isScreenSharing: callbacks.isScreenSharingActive() }); sendToPeer(context, peerId, { type: P2P_TYPE_CAMERA_STATE, oderId, displayName, isCameraEnabled: callbacks.isCameraEnabled() }); } export function sendCurrentStatesToChannel( context: PeerConnectionManagerContext, channel: RTCDataChannel, remotePeerId: string ): void { const { callbacks, logger } = context; if (channel.readyState !== DATA_CHANNEL_STATE_OPEN) { logger.warn('Cannot send states - channel not open', { remotePeerId, state: channel.readyState }); return; } const credentials = callbacks.getIdentifyCredentials(); const oderId = credentials?.oderId || callbacks.getLocalPeerId(); const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; const voiceState = callbacks.getVoiceStateSnapshot(); try { const voiceStatePayload = { type: P2P_TYPE_VOICE_STATE, oderId, displayName, voiceState }; const screenStatePayload = { type: P2P_TYPE_SCREEN_STATE, oderId, displayName, isScreenSharing: callbacks.isScreenSharingActive() }; const cameraStatePayload = { type: P2P_TYPE_CAMERA_STATE, oderId, displayName, isCameraEnabled: callbacks.isCameraEnabled() }; const voiceStateRaw = JSON.stringify(voiceStatePayload); const screenStateRaw = JSON.stringify(screenStatePayload); const cameraStateRaw = JSON.stringify(cameraStatePayload); channel.send(voiceStateRaw); logDataChannelTraffic(context, channel, remotePeerId, 'outbound', voiceStateRaw, voiceStatePayload); channel.send(screenStateRaw); logDataChannelTraffic(context, channel, remotePeerId, 'outbound', screenStateRaw, screenStatePayload); channel.send(cameraStateRaw); logDataChannelTraffic(context, channel, remotePeerId, 'outbound', cameraStateRaw, cameraStatePayload); logger.info('[data-channel] Sent initial states to channel', { remotePeerId, voiceState }); } catch (error) { logger.error('[data-channel] Failed to send initial states to channel', error, { bufferedAmount: channel.bufferedAmount, channelLabel: channel.label, peerId: remotePeerId, readyState: channel.readyState, voiceState }); } } /** Broadcast the current voice, camera, and screen-share states to all connected peers. */ export function broadcastCurrentStates(context: PeerConnectionManagerContext): void { const { callbacks } = context; const credentials = callbacks.getIdentifyCredentials(); const oderId = credentials?.oderId || callbacks.getLocalPeerId(); const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; const voiceState = callbacks.getVoiceStateSnapshot(); broadcastMessage(context, { type: P2P_TYPE_VOICE_STATE, oderId, displayName, voiceState }); broadcastMessage(context, { type: P2P_TYPE_SCREEN_STATE, oderId, displayName, isScreenSharing: callbacks.isScreenSharingActive() }); broadcastMessage(context, { type: P2P_TYPE_CAMERA_STATE, oderId, displayName, isCameraEnabled: callbacks.isCameraEnabled() }); } function logDataChannelTraffic( context: PeerConnectionManagerContext, channel: RTCDataChannel, peerId: string, direction: 'inbound' | 'outbound', rawPayload: string, payload: PeerMessage ): void { context.logger.traffic('data-channel', direction, { ...summarizePeerMessage(payload, { peerId }), bufferedAmount: channel.bufferedAmount, bytes: measurePayloadBytes(rawPayload), channelLabel: channel.label, readyState: channel.readyState }); } function summarizePeerMessage(payload: PeerMessage, base?: Record): Record { const summary: Record = { ...base, keys: Object.keys(payload).slice(0, 10), type: typeof payload.type === 'string' ? payload.type : 'unknown' }; const payloadMessage = asObject(payload['message']); const voiceState = asObject(payload['voiceState']); if (typeof payload['oderId'] === 'string') summary['oderId'] = payload['oderId']; if (typeof payload['displayName'] === 'string') summary['displayName'] = payload['displayName']; if (typeof payload['roomId'] === 'string') summary['roomId'] = payload['roomId']; if (typeof payload['serverId'] === 'string') summary['serverId'] = payload['serverId']; if (typeof payload['messageId'] === 'string') summary['messageId'] = payload['messageId']; if (typeof payload['isScreenSharing'] === 'boolean') summary['isScreenSharing'] = payload['isScreenSharing']; if (typeof payload['isCameraEnabled'] === 'boolean') summary['isCameraEnabled'] = payload['isCameraEnabled']; if (typeof payload['content'] === 'string') summary['contentLength'] = payload['content'].length; if (Array.isArray(payload['ids'])) summary['idsCount'] = payload['ids'].length; if (Array.isArray(payload['items'])) summary['itemsCount'] = payload['items'].length; if (Array.isArray(payload['messages'])) summary['messagesCount'] = payload['messages'].length; if (payloadMessage) { if (typeof payloadMessage['id'] === 'string') summary['messageId'] = payloadMessage['id']; if (typeof payloadMessage['roomId'] === 'string') summary['roomId'] = payloadMessage['roomId']; if (typeof payloadMessage['content'] === 'string') summary['contentLength'] = payloadMessage['content'].length; } if (voiceState) { const voiceStateSummary: Record = { isConnected: voiceState['isConnected'] === true, isMuted: voiceState['isMuted'] === true, isDeafened: voiceState['isDeafened'] === true, isSpeaking: voiceState['isSpeaking'] === true }; if (typeof voiceState['roomId'] === 'string') voiceStateSummary['roomId'] = voiceState['roomId']; if (typeof voiceState['serverId'] === 'string') voiceStateSummary['serverId'] = voiceState['serverId']; if (typeof voiceState['volume'] === 'number') voiceStateSummary['volume'] = voiceState['volume']; summary['voiceState'] = voiceStateSummary; } return summary; } function asObject(value: unknown): Record | null { if (!value || typeof value !== 'object' || Array.isArray(value)) return null; return value as Record; } function measurePayloadBytes(payload: string): number { return new TextEncoder().encode(payload).length; } function getRawPreview(payload: string): string { return payload.replace(/\s+/g, ' ').slice(0, 240); }