diff --git a/src/app/core/services/webrtc/peer-connection-manager/connection/create-peer-connection.ts b/src/app/core/services/webrtc/peer-connection-manager/connection/create-peer-connection.ts new file mode 100644 index 0000000..32d73e1 --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/connection/create-peer-connection.ts @@ -0,0 +1,177 @@ +import { + CONNECTION_STATE_CLOSED, + CONNECTION_STATE_CONNECTED, + CONNECTION_STATE_DISCONNECTED, + CONNECTION_STATE_FAILED, + DATA_CHANNEL_LABEL, + ICE_SERVERS, + SIGNALING_TYPE_ICE_CANDIDATE, + TRACK_KIND_AUDIO, + TRACK_KIND_VIDEO, + TRANSCEIVER_RECV_ONLY, + TRANSCEIVER_SEND_RECV +} from '../../webrtc.constants'; +import { PeerData } from '../../webrtc.types'; +import { ConnectionLifecycleHandlers, PeerConnectionManagerContext } from '../shared'; + +/** + * Create and configure a new RTCPeerConnection for a remote peer. + */ +export function createPeerConnection( + context: PeerConnectionManagerContext, + remotePeerId: string, + isInitiator: boolean, + handlers: ConnectionLifecycleHandlers +): PeerData { + const { callbacks, logger, state } = context; + + logger.info('Creating peer connection', { remotePeerId, isInitiator }); + + const connection = new RTCPeerConnection({ iceServers: ICE_SERVERS }); + + let dataChannel: RTCDataChannel | null = null; + + connection.onicecandidate = (event) => { + if (event.candidate) { + logger.info('ICE candidate gathered', { + remotePeerId, + candidateType: (event.candidate as RTCIceCandidate & { type?: string }).type + }); + + callbacks.sendRawMessage({ + type: SIGNALING_TYPE_ICE_CANDIDATE, + targetUserId: remotePeerId, + payload: { candidate: event.candidate } + }); + } + }; + + connection.onconnectionstatechange = () => { + logger.info('connectionstatechange', { + remotePeerId, + state: connection.connectionState + }); + + switch (connection.connectionState) { + case CONNECTION_STATE_CONNECTED: + handlers.clearPeerDisconnectGraceTimer(remotePeerId); + handlers.addToConnectedPeers(remotePeerId); + state.peerConnected$.next(remotePeerId); + handlers.clearPeerReconnectTimer(remotePeerId); + state.disconnectedPeerTracker.delete(remotePeerId); + handlers.requestVoiceStateFromPeer(remotePeerId); + break; + + case CONNECTION_STATE_DISCONNECTED: + handlers.schedulePeerDisconnectRecovery(remotePeerId); + break; + + case CONNECTION_STATE_FAILED: + handlers.trackDisconnectedPeer(remotePeerId); + handlers.removePeer(remotePeerId, { preserveReconnectState: true }); + handlers.schedulePeerReconnect(remotePeerId); + break; + + case CONNECTION_STATE_CLOSED: + handlers.removePeer(remotePeerId); + break; + } + }; + + connection.oniceconnectionstatechange = () => { + logger.info('iceconnectionstatechange', { + remotePeerId, + state: connection.iceConnectionState + }); + }; + + connection.onsignalingstatechange = () => { + logger.info('signalingstatechange', { + remotePeerId, + state: connection.signalingState + }); + }; + + connection.onnegotiationneeded = () => { + logger.info('negotiationneeded', { remotePeerId }); + }; + + connection.ontrack = (event) => { + handlers.handleRemoteTrack(event, remotePeerId); + }; + + if (isInitiator) { + dataChannel = connection.createDataChannel(DATA_CHANNEL_LABEL, { ordered: true }); + handlers.setupDataChannel(dataChannel, remotePeerId); + } else { + connection.ondatachannel = (event) => { + logger.info('Received data channel', { remotePeerId }); + dataChannel = event.channel; + + const existing = state.activePeerConnections.get(remotePeerId); + + if (existing) { + existing.dataChannel = dataChannel; + } + + handlers.setupDataChannel(dataChannel, remotePeerId); + }; + } + + const peerData: PeerData = { + connection, + dataChannel, + isInitiator, + pendingIceCandidates: [], + audioSender: undefined, + videoSender: undefined + }; + + if (isInitiator) { + const audioTransceiver = connection.addTransceiver(TRACK_KIND_AUDIO, { + direction: TRANSCEIVER_SEND_RECV + }); + const videoTransceiver = connection.addTransceiver(TRACK_KIND_VIDEO, { + direction: TRANSCEIVER_RECV_ONLY + }); + + peerData.audioSender = audioTransceiver.sender; + peerData.videoSender = videoTransceiver.sender; + } + + state.activePeerConnections.set(remotePeerId, peerData); + + const localStream = callbacks.getLocalMediaStream(); + + if (localStream && isInitiator) { + logger.logStream(`localStream->${remotePeerId}`, localStream); + + localStream.getTracks().forEach((track) => { + if (track.kind === TRACK_KIND_AUDIO && peerData.audioSender) { + peerData.audioSender + .replaceTrack(track) + .then(() => logger.info('audio replaceTrack (init) ok', { remotePeerId })) + .catch((error) => + logger.error('audio replaceTrack failed at createPeerConnection', error) + ); + } else if (track.kind === TRACK_KIND_VIDEO && peerData.videoSender) { + peerData.videoSender + .replaceTrack(track) + .then(() => logger.info('video replaceTrack (init) ok', { remotePeerId })) + .catch((error) => + logger.error('video replaceTrack failed at createPeerConnection', error) + ); + } else { + const sender = connection.addTrack(track, localStream); + + if (track.kind === TRACK_KIND_AUDIO) + peerData.audioSender = sender; + + if (track.kind === TRACK_KIND_VIDEO) + peerData.videoSender = sender; + } + }); + } + + return peerData; +} diff --git a/src/app/core/services/webrtc/peer-connection-manager/connection/negotiation.ts b/src/app/core/services/webrtc/peer-connection-manager/connection/negotiation.ts new file mode 100644 index 0000000..b0de18f --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/connection/negotiation.ts @@ -0,0 +1,247 @@ +/* eslint-disable complexity */ +import { + SIGNALING_TYPE_ANSWER, + SIGNALING_TYPE_OFFER, + TRACK_KIND_AUDIO, + TRACK_KIND_VIDEO, + TRANSCEIVER_SEND_RECV +} from '../../webrtc.constants'; +import { + NegotiationHandlers, + PeerConnectionManagerContext, + PeerConnectionManagerState +} from '../shared'; + +/** + * Queue a negotiation task so SDP operations for a single peer never overlap. + */ +export function enqueueNegotiation( + state: PeerConnectionManagerState, + peerId: string, + task: () => Promise +): void { + const previousTask = state.peerNegotiationQueue.get(peerId) ?? Promise.resolve(); + const nextTask = previousTask.then(task, task); + + state.peerNegotiationQueue.set(peerId, nextTask); +} + +export async function doCreateAndSendOffer( + context: PeerConnectionManagerContext, + remotePeerId: string +): Promise { + const { callbacks, logger, state } = context; + const peerData = state.activePeerConnections.get(remotePeerId); + + if (!peerData) + return; + + try { + const offer = await peerData.connection.createOffer(); + + await peerData.connection.setLocalDescription(offer); + logger.info('Sending offer', { + remotePeerId, + type: offer.type, + sdpLength: offer.sdp?.length + }); + + callbacks.sendRawMessage({ + type: SIGNALING_TYPE_OFFER, + targetUserId: remotePeerId, + payload: { sdp: offer } + }); + } catch (error) { + logger.error('Failed to create offer', error); + } +} + +export async function doHandleOffer( + context: PeerConnectionManagerContext, + fromUserId: string, + sdp: RTCSessionDescriptionInit, + handlers: NegotiationHandlers +): Promise { + const { callbacks, logger, state } = context; + + logger.info('Handling offer', { fromUserId }); + + let peerData = state.activePeerConnections.get(fromUserId); + + if (!peerData) { + peerData = handlers.createPeerConnection(fromUserId, false); + } + + try { + const signalingState = peerData.connection.signalingState; + const hasCollision = + signalingState === 'have-local-offer' || signalingState === 'have-local-pranswer'; + + if (hasCollision) { + const localId = + callbacks.getIdentifyCredentials()?.oderId || callbacks.getLocalPeerId(); + const isPolite = localId > fromUserId; + + if (!isPolite) { + logger.info('Ignoring colliding offer (impolite side)', { fromUserId, localId }); + return; + } + + logger.info('Rolling back local offer (polite side)', { fromUserId, localId }); + + await peerData.connection.setLocalDescription({ + type: 'rollback' + } as RTCSessionDescriptionInit); + } + + await peerData.connection.setRemoteDescription(new RTCSessionDescription(sdp)); + + const transceivers = peerData.connection.getTransceivers(); + + for (const transceiver of transceivers) { + const receiverKind = transceiver.receiver.track?.kind; + + if (receiverKind === TRACK_KIND_AUDIO) { + if (!peerData.audioSender) { + peerData.audioSender = transceiver.sender; + } + + transceiver.direction = TRANSCEIVER_SEND_RECV; + } else if (receiverKind === TRACK_KIND_VIDEO && !peerData.videoSender) { + peerData.videoSender = transceiver.sender; + } + } + + const localStream = callbacks.getLocalMediaStream(); + + if (localStream) { + logger.logStream(`localStream->${fromUserId} (answerer)`, localStream); + + for (const track of localStream.getTracks()) { + if (track.kind === TRACK_KIND_AUDIO && peerData.audioSender) { + await peerData.audioSender.replaceTrack(track); + logger.info('audio replaceTrack (answerer) ok', { fromUserId }); + } else if (track.kind === TRACK_KIND_VIDEO && peerData.videoSender) { + await peerData.videoSender.replaceTrack(track); + logger.info('video replaceTrack (answerer) ok', { fromUserId }); + } + } + } + + for (const candidate of peerData.pendingIceCandidates) { + await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate)); + } + + peerData.pendingIceCandidates = []; + + const answer = await peerData.connection.createAnswer(); + + await peerData.connection.setLocalDescription(answer); + + logger.info('Sending answer', { + to: fromUserId, + type: answer.type, + sdpLength: answer.sdp?.length + }); + + callbacks.sendRawMessage({ + type: SIGNALING_TYPE_ANSWER, + targetUserId: fromUserId, + payload: { sdp: answer } + }); + } catch (error) { + logger.error('Failed to handle offer', error); + } +} + +export async function doHandleAnswer( + context: PeerConnectionManagerContext, + fromUserId: string, + sdp: RTCSessionDescriptionInit +): Promise { + const { logger, state } = context; + + logger.info('Handling answer', { fromUserId }); + + const peerData = state.activePeerConnections.get(fromUserId); + + if (!peerData) { + logger.error('No peer for answer', new Error('Missing peer'), { fromUserId }); + return; + } + + try { + if (peerData.connection.signalingState === 'have-local-offer') { + await peerData.connection.setRemoteDescription(new RTCSessionDescription(sdp)); + + for (const candidate of peerData.pendingIceCandidates) { + await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate)); + } + + peerData.pendingIceCandidates = []; + } else { + logger.warn('Ignoring answer - wrong signaling state', { + state: peerData.connection.signalingState + }); + } + } catch (error) { + logger.error('Failed to handle answer', error); + } +} + +export async function doHandleIceCandidate( + context: PeerConnectionManagerContext, + fromUserId: string, + candidate: RTCIceCandidateInit, + handlers: NegotiationHandlers +): Promise { + const { logger, state } = context; + + let peerData = state.activePeerConnections.get(fromUserId); + + if (!peerData) { + logger.info('Creating peer for early ICE', { fromUserId }); + peerData = handlers.createPeerConnection(fromUserId, false); + } + + try { + if (peerData.connection.remoteDescription) { + await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate)); + } else { + logger.info('Queuing ICE candidate', { fromUserId }); + peerData.pendingIceCandidates.push(candidate); + } + } catch (error) { + logger.error('Failed to add ICE candidate', error); + } +} + +export async function doRenegotiate( + context: PeerConnectionManagerContext, + peerId: string +): Promise { + const { callbacks, logger, state } = context; + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData) + return; + + try { + const offer = await peerData.connection.createOffer(); + + await peerData.connection.setLocalDescription(offer); + logger.info('Renegotiate offer', { + peerId, + type: offer.type, + sdpLength: offer.sdp?.length + }); + + callbacks.sendRawMessage({ + type: SIGNALING_TYPE_OFFER, + targetUserId: peerId, + payload: { sdp: offer } + }); + } catch (error) { + logger.error('Failed to renegotiate', error); + } +} diff --git a/src/app/core/services/webrtc/peer-connection-manager/index.ts b/src/app/core/services/webrtc/peer-connection-manager/index.ts new file mode 100644 index 0000000..2ebe1c4 --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/index.ts @@ -0,0 +1,2 @@ +export * from './peer-connection.manager'; +export * from './shared'; diff --git a/src/app/core/services/webrtc/peer-connection-manager/messaging/data-channel.ts b/src/app/core/services/webrtc/peer-connection-manager/messaging/data-channel.ts new file mode 100644 index 0000000..7c1a4d5 --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/messaging/data-channel.ts @@ -0,0 +1,296 @@ +import { ChatEvent } from '../../../../models'; +import { + DATA_CHANNEL_HIGH_WATER_BYTES, + DATA_CHANNEL_LOW_WATER_BYTES, + DATA_CHANNEL_STATE_OPEN, + DEFAULT_DISPLAY_NAME, + P2P_TYPE_PING, + P2P_TYPE_PONG, + P2P_TYPE_SCREEN_STATE, + P2P_TYPE_STATE_REQUEST, + P2P_TYPE_VOICE_STATE, + P2P_TYPE_VOICE_STATE_REQUEST +} from '../../webrtc.constants'; +import { PeerConnectionManagerContext } from '../shared'; +import { startPingInterval } from './ping'; + +type PeerMessage = Record & { + type?: string; + ts?: number; +}; + +/** + * Wire open/close/error/message handlers onto a data channel. + */ +export function setupDataChannel( + context: PeerConnectionManagerContext, + channel: RTCDataChannel, + remotePeerId: string +): void { + const { logger } = context; + + channel.onopen = () => { + logger.info('Data channel open', { remotePeerId }); + sendCurrentStatesToChannel(context, channel, remotePeerId); + + try { + channel.send(JSON.stringify({ type: P2P_TYPE_STATE_REQUEST })); + } catch { + /* ignore */ + } + + startPingInterval(context.state, remotePeerId); + }; + + channel.onclose = () => { + logger.info('Data channel closed', { remotePeerId }); + }; + + channel.onerror = (error) => { + logger.error('Data channel error', error, { remotePeerId }); + }; + + channel.onmessage = (event) => { + try { + const message = JSON.parse(event.data) as PeerMessage; + + handlePeerMessage(context, remotePeerId, message); + } catch (error) { + logger.error('Failed to parse peer message', error); + } + }; +} + +/** + * Route an incoming peer-to-peer message. + */ +export function handlePeerMessage( + context: PeerConnectionManagerContext, + peerId: string, + message: PeerMessage +): void { + const { logger, state } = context; + + logger.info('Received P2P message', { + peerId, + type: message.type + }); + + if (message.type === P2P_TYPE_STATE_REQUEST || message.type === P2P_TYPE_VOICE_STATE_REQUEST) { + sendCurrentStatesToPeer(context, peerId); + return; + } + + if (message.type === P2P_TYPE_PING) { + sendToPeer(context, peerId, { + type: P2P_TYPE_PONG, + ts: message.ts + }); + + return; + } + + if (message.type === P2P_TYPE_PONG) { + const sentAt = state.pendingPings.get(peerId); + + if (sentAt && typeof message.ts === 'number' && message.ts === sentAt) { + const latencyMs = Math.round(performance.now() - sentAt); + + state.peerLatencies.set(peerId, latencyMs); + state.peerLatencyChanged$.next({ peerId, latencyMs }); + } + + state.pendingPings.delete(peerId); + return; + } + + const enrichedMessage = { + ...message, + fromPeerId: peerId + } as ChatEvent; + + state.messageReceived$.next(enrichedMessage); +} + +/** Broadcast a ChatEvent to every peer with an open data channel. */ +export function broadcastMessage( + context: PeerConnectionManagerContext, + event: object +): void { + const { logger, state } = context; + const data = JSON.stringify(event); + + state.activePeerConnections.forEach((peerData, peerId) => { + try { + if (peerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) { + peerData.dataChannel.send(data); + logger.info('Sent message via P2P', { peerId }); + } + } catch (error) { + logger.error('Failed to send to peer', error, { peerId }); + } + }); +} + +/** + * Send a ChatEvent to a specific peer's data channel. + */ +export function sendToPeer( + context: PeerConnectionManagerContext, + peerId: string, + event: object +): void { + const { logger, state } = context; + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) { + logger.warn('Peer not connected - cannot send', { peerId }); + return; + } + + try { + peerData.dataChannel.send(JSON.stringify(event)); + } catch (error) { + logger.error('Failed to send to peer', error, { peerId }); + } +} + +/** + * Send a ChatEvent with back-pressure awareness. + */ +export async function sendToPeerBuffered( + context: PeerConnectionManagerContext, + peerId: string, + event: object +): Promise { + const { logger, state } = context; + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) { + logger.warn('Peer not connected - cannot send buffered', { peerId }); + return; + } + + const channel = peerData.dataChannel; + const data = JSON.stringify(event); + + if (typeof channel.bufferedAmountLowThreshold === 'number') { + channel.bufferedAmountLowThreshold = DATA_CHANNEL_LOW_WATER_BYTES; + } + + if (channel.bufferedAmount > DATA_CHANNEL_HIGH_WATER_BYTES) { + await new Promise((resolve) => { + const handleBufferedAmountLow = () => { + if (channel.bufferedAmount <= DATA_CHANNEL_LOW_WATER_BYTES) { + channel.removeEventListener('bufferedamountlow', handleBufferedAmountLow); + resolve(); + } + }; + + channel.addEventListener('bufferedamountlow', handleBufferedAmountLow, { once: true }); + }); + } + + try { + channel.send(data); + } catch (error) { + logger.error('Failed to send buffered message', error, { peerId }); + } +} + +/** + * Send the current voice and screen-share states to a single peer. + */ +export function sendCurrentStatesToPeer( + context: PeerConnectionManagerContext, + peerId: string +): void { + const { callbacks } = context; + const credentials = callbacks.getIdentifyCredentials(); + const oderId = credentials?.oderId || callbacks.getLocalPeerId(); + const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; + const voiceState = callbacks.getVoiceStateSnapshot(); + + sendToPeer(context, peerId, { + type: P2P_TYPE_VOICE_STATE, + oderId, + displayName, + voiceState + }); + + sendToPeer(context, peerId, { + type: P2P_TYPE_SCREEN_STATE, + oderId, + displayName, + isScreenSharing: callbacks.isScreenSharingActive() + }); +} + +export function sendCurrentStatesToChannel( + context: PeerConnectionManagerContext, + channel: RTCDataChannel, + remotePeerId: string +): void { + const { callbacks, logger } = context; + + if (channel.readyState !== DATA_CHANNEL_STATE_OPEN) { + logger.warn('Cannot send states - channel not open', { + remotePeerId, + state: channel.readyState + }); + + return; + } + + const credentials = callbacks.getIdentifyCredentials(); + const oderId = credentials?.oderId || callbacks.getLocalPeerId(); + const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; + const voiceState = callbacks.getVoiceStateSnapshot(); + + try { + channel.send( + JSON.stringify({ + type: P2P_TYPE_VOICE_STATE, + oderId, + displayName, + voiceState + }) + ); + + channel.send( + JSON.stringify({ + type: P2P_TYPE_SCREEN_STATE, + oderId, + displayName, + isScreenSharing: callbacks.isScreenSharingActive() + }) + ); + + logger.info('Sent initial states to channel', { remotePeerId, voiceState }); + } catch (error) { + logger.error('Failed to send initial states to channel', error); + } +} + +/** Broadcast the current voice and screen-share states to all connected peers. */ +export function broadcastCurrentStates(context: PeerConnectionManagerContext): void { + const { callbacks } = context; + const credentials = callbacks.getIdentifyCredentials(); + const oderId = credentials?.oderId || callbacks.getLocalPeerId(); + const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; + const voiceState = callbacks.getVoiceStateSnapshot(); + + broadcastMessage(context, { + type: P2P_TYPE_VOICE_STATE, + oderId, + displayName, + voiceState + }); + + broadcastMessage(context, { + type: P2P_TYPE_SCREEN_STATE, + oderId, + displayName, + isScreenSharing: callbacks.isScreenSharingActive() + }); +} diff --git a/src/app/core/services/webrtc/peer-connection-manager/messaging/ping.ts b/src/app/core/services/webrtc/peer-connection-manager/messaging/ping.ts new file mode 100644 index 0000000..c4153f1 --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/messaging/ping.ts @@ -0,0 +1,55 @@ +import { + DATA_CHANNEL_STATE_OPEN, + P2P_TYPE_PING, + PEER_PING_INTERVAL_MS +} from '../../webrtc.constants'; +import { PeerConnectionManagerState } from '../shared'; + +/** Start periodic pings to a peer to measure round-trip latency. */ +export function startPingInterval(state: PeerConnectionManagerState, peerId: string): void { + stopPingInterval(state, peerId); + sendPing(state, peerId); + + const timer = setInterval(() => sendPing(state, peerId), PEER_PING_INTERVAL_MS); + + state.peerPingTimers.set(peerId, timer); +} + +/** Stop the periodic ping for a specific peer. */ +export function stopPingInterval(state: PeerConnectionManagerState, peerId: string): void { + const timer = state.peerPingTimers.get(peerId); + + if (timer) { + clearInterval(timer); + state.peerPingTimers.delete(peerId); + } +} + +/** Cancel all active ping timers. */ +export function clearAllPingTimers(state: PeerConnectionManagerState): void { + state.peerPingTimers.forEach((timer) => clearInterval(timer)); + state.peerPingTimers.clear(); +} + +/** Send a single ping to a peer. */ +export function sendPing(state: PeerConnectionManagerState, peerId: string): void { + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) + return; + + const timestamp = performance.now(); + + state.pendingPings.set(peerId, timestamp); + + try { + peerData.dataChannel.send( + JSON.stringify({ + type: P2P_TYPE_PING, + ts: timestamp + }) + ); + } catch { + /* ignore */ + } +} diff --git a/src/app/core/services/webrtc/peer-connection-manager/peer-connection.manager.ts b/src/app/core/services/webrtc/peer-connection-manager/peer-connection.manager.ts new file mode 100644 index 0000000..67a2335 --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/peer-connection.manager.ts @@ -0,0 +1,296 @@ +/* eslint-disable @typescript-eslint/member-ordering */ +import { ChatEvent } from '../../../models'; +import { WebRTCLogger } from '../webrtc-logger'; +import { PeerData } from '../webrtc.types'; +import { createPeerConnection as createManagedPeerConnection } from './connection/create-peer-connection'; +import { + doCreateAndSendOffer, + doHandleAnswer, + doHandleIceCandidate, + doHandleOffer, + doRenegotiate, + enqueueNegotiation +} from './connection/negotiation'; +import { + broadcastCurrentStates, + broadcastMessage, + sendCurrentStatesToPeer, + sendToPeer, + sendToPeerBuffered, + setupDataChannel +} from './messaging/data-channel'; +import { + addToConnectedPeers, + clearAllPeerReconnectTimers, + clearPeerDisconnectGraceTimer, + clearPeerReconnectTimer, + closeAllPeers as closeManagedPeers, + getConnectedPeerIds, + removePeer as removeManagedPeer, + requestVoiceStateFromPeer, + resetConnectedPeers, + schedulePeerDisconnectRecovery, + schedulePeerReconnect, + trackDisconnectedPeer +} from './recovery/peer-recovery'; +import { handleRemoteTrack } from './streams/remote-streams'; +import { + ConnectionLifecycleHandlers, + createPeerConnectionManagerState, + NegotiationHandlers, + PeerConnectionCallbacks, + PeerConnectionManagerContext, + RecoveryHandlers, + RemovePeerOptions +} from './shared'; + +/** + * Creates and manages RTCPeerConnections, data channels, + * offer/answer negotiation, ICE candidates, and P2P reconnection. + */ +export class PeerConnectionManager { + private readonly state = createPeerConnectionManagerState(); + + /** Active peer connections keyed by remote peer ID. */ + readonly activePeerConnections = this.state.activePeerConnections; + + /** Remote composite streams keyed by remote peer ID. */ + readonly remotePeerStreams = this.state.remotePeerStreams; + + /** Last measured latency (ms) per peer. */ + readonly peerLatencies = this.state.peerLatencies; + + /** Emitted whenever a peer latency value changes. */ + readonly peerLatencyChanged$ = this.state.peerLatencyChanged$; + + readonly peerConnected$ = this.state.peerConnected$; + readonly peerDisconnected$ = this.state.peerDisconnected$; + readonly remoteStream$ = this.state.remoteStream$; + readonly messageReceived$ = this.state.messageReceived$; + + /** Emitted whenever the connected peer list changes. */ + readonly connectedPeersChanged$ = this.state.connectedPeersChanged$; + + constructor( + private readonly logger: WebRTCLogger, + private callbacks: PeerConnectionCallbacks + ) {} + + /** + * Replace the callback set at runtime. + * Needed because of circular initialisation between managers. + */ + setCallbacks(callbacks: PeerConnectionCallbacks): void { + this.callbacks = callbacks; + } + + /** + * Create a new RTCPeerConnection to a remote peer. + */ + createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData { + return createManagedPeerConnection( + this.context, + remotePeerId, + isInitiator, + this.connectionHandlers + ); + } + + /** + * Create an SDP offer and send it to the remote peer via the signaling server. + */ + async createAndSendOffer(remotePeerId: string): Promise { + return new Promise((resolve) => { + enqueueNegotiation(this.state, remotePeerId, async () => { + await doCreateAndSendOffer(this.context, remotePeerId); + resolve(); + }); + }); + } + + /** + * Handle an incoming SDP offer from a remote peer. + */ + handleOffer(fromUserId: string, sdp: RTCSessionDescriptionInit): void { + enqueueNegotiation(this.state, fromUserId, () => + doHandleOffer(this.context, fromUserId, sdp, this.negotiationHandlers) + ); + } + + /** + * Handle an incoming SDP answer from a remote peer. + */ + handleAnswer(fromUserId: string, sdp: RTCSessionDescriptionInit): void { + enqueueNegotiation(this.state, fromUserId, () => + doHandleAnswer(this.context, fromUserId, sdp) + ); + } + + /** + * Process an incoming ICE candidate from a remote peer. + */ + handleIceCandidate(fromUserId: string, candidate: RTCIceCandidateInit): void { + enqueueNegotiation(this.state, fromUserId, () => + doHandleIceCandidate(this.context, fromUserId, candidate, this.negotiationHandlers) + ); + } + + /** + * Re-negotiate (create offer) to push track changes to remote. + */ + async renegotiate(peerId: string): Promise { + return new Promise((resolve) => { + enqueueNegotiation(this.state, peerId, async () => { + await doRenegotiate(this.context, peerId); + resolve(); + }); + }); + } + + /** Broadcast a ChatEvent to every peer with an open data channel. */ + broadcastMessage(event: ChatEvent): void { + broadcastMessage(this.context, event); + } + + /** + * Send a ChatEvent to a specific peer's data channel. + */ + sendToPeer(peerId: string, event: ChatEvent): void { + sendToPeer(this.context, peerId, event); + } + + /** + * Send a ChatEvent with back-pressure awareness. + */ + async sendToPeerBuffered(peerId: string, event: ChatEvent): Promise { + await sendToPeerBuffered(this.context, peerId, event); + } + + /** + * Send the current voice and screen-share states to a single peer. + */ + sendCurrentStatesToPeer(peerId: string): void { + sendCurrentStatesToPeer(this.context, peerId); + } + + /** Broadcast the current voice and screen-share states to all connected peers. */ + broadcastCurrentStates(): void { + broadcastCurrentStates(this.context); + } + + /** + * Close and remove a peer connection, data channel, and emit a disconnect event. + */ + removePeer(peerId: string, options?: RemovePeerOptions): void { + removeManagedPeer(this.context, peerId, options); + } + + /** Close every active peer connection and clear internal state. */ + closeAllPeers(): void { + closeManagedPeers(this.state); + } + + /** Cancel all pending peer reconnect timers and clear the tracker. */ + clearAllPeerReconnectTimers(): void { + clearAllPeerReconnectTimers(this.state); + } + + /** Return a snapshot copy of the currently-connected peer IDs. */ + getConnectedPeerIds(): string[] { + return getConnectedPeerIds(this.state); + } + + /** Reset the connected peers list to empty and notify subscribers. */ + resetConnectedPeers(): void { + resetConnectedPeers(this.state); + } + + /** Clean up all resources. */ + destroy(): void { + this.closeAllPeers(); + this.peerConnected$.complete(); + this.peerDisconnected$.complete(); + this.remoteStream$.complete(); + this.messageReceived$.complete(); + this.connectedPeersChanged$.complete(); + this.peerLatencyChanged$.complete(); + } + + private get context(): PeerConnectionManagerContext { + return { + logger: this.logger, + callbacks: this.callbacks, + state: this.state + }; + } + + private get connectionHandlers(): ConnectionLifecycleHandlers { + return { + clearPeerDisconnectGraceTimer: (peerId: string) => this.clearPeerDisconnectGraceTimer(peerId), + addToConnectedPeers: (peerId: string) => this.addToConnectedPeers(peerId), + clearPeerReconnectTimer: (peerId: string) => this.clearPeerReconnectTimer(peerId), + requestVoiceStateFromPeer: (peerId: string) => this.requestVoiceStateFromPeer(peerId), + schedulePeerDisconnectRecovery: (peerId: string) => + this.schedulePeerDisconnectRecovery(peerId), + trackDisconnectedPeer: (peerId: string) => this.trackDisconnectedPeer(peerId), + removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options), + schedulePeerReconnect: (peerId: string) => this.schedulePeerReconnect(peerId), + handleRemoteTrack: (event: RTCTrackEvent, peerId: string) => + this.handleRemoteTrack(event, peerId), + setupDataChannel: (channel: RTCDataChannel, peerId: string) => + this.setupDataChannel(channel, peerId) + }; + } + + private get negotiationHandlers(): NegotiationHandlers { + return { + createPeerConnection: (remotePeerId: string, isInitiator: boolean) => + this.createPeerConnection(remotePeerId, isInitiator) + }; + } + + private get recoveryHandlers(): RecoveryHandlers { + return { + removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options), + createPeerConnection: (peerId: string, isInitiator: boolean) => + this.createPeerConnection(peerId, isInitiator), + createAndSendOffer: (peerId: string) => this.createAndSendOffer(peerId) + }; + } + + private setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void { + setupDataChannel(this.context, channel, remotePeerId); + } + + private handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void { + handleRemoteTrack(this.context, event, remotePeerId); + } + + private trackDisconnectedPeer(peerId: string): void { + trackDisconnectedPeer(this.state, peerId); + } + + private clearPeerReconnectTimer(peerId: string): void { + clearPeerReconnectTimer(this.state, peerId); + } + + private clearPeerDisconnectGraceTimer(peerId: string): void { + clearPeerDisconnectGraceTimer(this.state, peerId); + } + + private schedulePeerDisconnectRecovery(peerId: string): void { + schedulePeerDisconnectRecovery(this.context, peerId, this.recoveryHandlers); + } + + private schedulePeerReconnect(peerId: string): void { + schedulePeerReconnect(this.context, peerId, this.recoveryHandlers); + } + + private requestVoiceStateFromPeer(peerId: string): void { + requestVoiceStateFromPeer(this.state, this.logger, peerId); + } + + private addToConnectedPeers(peerId: string): void { + addToConnectedPeers(this.state, peerId); + } +} diff --git a/src/app/core/services/webrtc/peer-connection-manager/recovery/peer-recovery.ts b/src/app/core/services/webrtc/peer-connection-manager/recovery/peer-recovery.ts new file mode 100644 index 0000000..b95eba8 --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/recovery/peer-recovery.ts @@ -0,0 +1,264 @@ +import { + CONNECTION_STATE_CONNECTED, + DATA_CHANNEL_STATE_OPEN, + P2P_TYPE_VOICE_STATE_REQUEST, + PEER_DISCONNECT_GRACE_MS, + PEER_RECONNECT_INTERVAL_MS, + PEER_RECONNECT_MAX_ATTEMPTS +} from '../../webrtc.constants'; +import { + PeerConnectionManagerContext, + PeerConnectionManagerState, + RecoveryHandlers, + RemovePeerOptions +} from '../shared'; +import { clearAllPingTimers, stopPingInterval } from '../messaging/ping'; + +/** + * Close and remove a peer connection, data channel, and emit a disconnect event. + */ +export function removePeer( + context: PeerConnectionManagerContext, + peerId: string, + options?: RemovePeerOptions +): void { + const { state } = context; + const peerData = state.activePeerConnections.get(peerId); + const preserveReconnectState = options?.preserveReconnectState === true; + + clearPeerDisconnectGraceTimer(state, peerId); + + if (!preserveReconnectState) { + clearPeerReconnectTimer(state, peerId); + state.disconnectedPeerTracker.delete(peerId); + } + + state.remotePeerStreams.delete(peerId); + + if (peerData) { + if (peerData.dataChannel) + peerData.dataChannel.close(); + + peerData.connection.close(); + state.activePeerConnections.delete(peerId); + state.peerNegotiationQueue.delete(peerId); + removeFromConnectedPeers(state, peerId); + stopPingInterval(state, peerId); + state.peerLatencies.delete(peerId); + state.pendingPings.delete(peerId); + state.peerDisconnected$.next(peerId); + } +} + +/** Close every active peer connection and clear internal state. */ +export function closeAllPeers(state: PeerConnectionManagerState): void { + clearAllPeerReconnectTimers(state); + clearAllPeerDisconnectGraceTimers(state); + clearAllPingTimers(state); + + state.activePeerConnections.forEach((peerData) => { + if (peerData.dataChannel) + peerData.dataChannel.close(); + + peerData.connection.close(); + }); + + state.activePeerConnections.clear(); + state.remotePeerStreams.clear(); + state.peerNegotiationQueue.clear(); + state.peerLatencies.clear(); + state.pendingPings.clear(); + state.connectedPeersChanged$.next([]); +} + +export function trackDisconnectedPeer(state: PeerConnectionManagerState, peerId: string): void { + state.disconnectedPeerTracker.set(peerId, { + lastSeenTimestamp: Date.now(), + reconnectAttempts: 0 + }); +} + +export function clearPeerReconnectTimer( + state: PeerConnectionManagerState, + peerId: string +): void { + const timer = state.peerReconnectTimers.get(peerId); + + if (timer) { + clearInterval(timer); + state.peerReconnectTimers.delete(peerId); + } +} + +export function clearPeerDisconnectGraceTimer( + state: PeerConnectionManagerState, + peerId: string +): void { + const timer = state.peerDisconnectGraceTimers.get(peerId); + + if (timer) { + clearTimeout(timer); + state.peerDisconnectGraceTimers.delete(peerId); + } +} + +/** Cancel all pending peer reconnect timers and clear the tracker. */ +export function clearAllPeerReconnectTimers(state: PeerConnectionManagerState): void { + state.peerReconnectTimers.forEach((timer) => clearInterval(timer)); + state.peerReconnectTimers.clear(); + state.disconnectedPeerTracker.clear(); +} + +export function clearAllPeerDisconnectGraceTimers(state: PeerConnectionManagerState): void { + state.peerDisconnectGraceTimers.forEach((timer) => clearTimeout(timer)); + state.peerDisconnectGraceTimers.clear(); +} + +export function schedulePeerDisconnectRecovery( + context: PeerConnectionManagerContext, + peerId: string, + handlers: RecoveryHandlers +): void { + const { logger, state } = context; + + if (state.peerDisconnectGraceTimers.has(peerId)) + return; + + logger.warn('Peer temporarily disconnected; waiting before reconnect', { peerId }); + + const timer = setTimeout(() => { + state.peerDisconnectGraceTimers.delete(peerId); + + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData) + return; + + const connectionState = peerData.connection.connectionState; + + if (connectionState === CONNECTION_STATE_CONNECTED || connectionState === 'connecting') { + logger.info('Peer recovered before disconnect grace expired', { + peerId, + state: connectionState + }); + + return; + } + + logger.warn('Peer still disconnected after grace period; recreating connection', { + peerId, + state: connectionState + }); + + trackDisconnectedPeer(state, peerId); + handlers.removePeer(peerId, { preserveReconnectState: true }); + schedulePeerReconnect(context, peerId, handlers); + }, PEER_DISCONNECT_GRACE_MS); + + state.peerDisconnectGraceTimers.set(peerId, timer); +} + +export function schedulePeerReconnect( + context: PeerConnectionManagerContext, + peerId: string, + handlers: RecoveryHandlers +): void { + const { callbacks, logger, state } = context; + + if (state.peerReconnectTimers.has(peerId)) + return; + + logger.info('Scheduling P2P reconnect', { peerId }); + + const timer = setInterval(() => { + const info = state.disconnectedPeerTracker.get(peerId); + + if (!info) { + clearPeerReconnectTimer(state, peerId); + return; + } + + info.reconnectAttempts++; + logger.info('P2P reconnect attempt', { + peerId, + attempt: info.reconnectAttempts + }); + + if (info.reconnectAttempts >= PEER_RECONNECT_MAX_ATTEMPTS) { + logger.info('P2P reconnect max attempts reached', { peerId }); + clearPeerReconnectTimer(state, peerId); + state.disconnectedPeerTracker.delete(peerId); + return; + } + + if (!callbacks.isSignalingConnected()) { + logger.info('Skipping P2P reconnect - no signaling connection', { peerId }); + return; + } + + attemptPeerReconnect(state, peerId, handlers); + }, PEER_RECONNECT_INTERVAL_MS); + + state.peerReconnectTimers.set(peerId, timer); +} + +export function attemptPeerReconnect( + state: PeerConnectionManagerState, + peerId: string, + handlers: RecoveryHandlers +): void { + if (state.activePeerConnections.has(peerId)) { + handlers.removePeer(peerId, { preserveReconnectState: true }); + } + + handlers.createPeerConnection(peerId, true); + void handlers.createAndSendOffer(peerId); +} + +export function requestVoiceStateFromPeer( + state: PeerConnectionManagerState, + logger: PeerConnectionManagerContext['logger'], + peerId: string +): void { + const peerData = state.activePeerConnections.get(peerId); + + if (peerData?.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) { + try { + peerData.dataChannel.send(JSON.stringify({ type: P2P_TYPE_VOICE_STATE_REQUEST })); + } catch (error) { + logger.warn('Failed to request voice state', error); + } + } +} + +/** Return a snapshot copy of the currently-connected peer IDs. */ +export function getConnectedPeerIds(state: PeerConnectionManagerState): string[] { + return [...state.connectedPeersList]; +} + +export function addToConnectedPeers(state: PeerConnectionManagerState, peerId: string): void { + if (!state.connectedPeersList.includes(peerId)) { + state.connectedPeersList = [...state.connectedPeersList, peerId]; + state.connectedPeersChanged$.next(state.connectedPeersList); + } +} + +/** + * Remove a peer from the connected list and notify subscribers. + */ +export function removeFromConnectedPeers( + state: PeerConnectionManagerState, + peerId: string +): void { + state.connectedPeersList = state.connectedPeersList.filter( + (connectedId) => connectedId !== peerId + ); + + state.connectedPeersChanged$.next(state.connectedPeersList); +} + +/** Reset the connected peers list to empty and notify subscribers. */ +export function resetConnectedPeers(state: PeerConnectionManagerState): void { + state.connectedPeersList = []; + state.connectedPeersChanged$.next([]); +} diff --git a/src/app/core/services/webrtc/peer-connection-manager/shared.ts b/src/app/core/services/webrtc/peer-connection-manager/shared.ts new file mode 100644 index 0000000..6aeaa1c --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/shared.ts @@ -0,0 +1,103 @@ +import { Subject } from 'rxjs'; +import { ChatEvent } from '../../../models'; +import { WebRTCLogger } from '../webrtc-logger'; +import { + DisconnectedPeerEntry, + IdentifyCredentials, + PeerData, + VoiceStateSnapshot +} from '../webrtc.types'; + +/** + * Callbacks the PeerConnectionManager needs from the owning service. + * This keeps the manager decoupled from Angular DI / signals. + */ +export interface PeerConnectionCallbacks { + /** Send a raw JSON message via the signaling server. */ + sendRawMessage(msg: Record): void; + /** Get the current local media stream (mic audio). */ + getLocalMediaStream(): MediaStream | null; + /** Whether signaling is currently connected. */ + isSignalingConnected(): boolean; + /** Returns the current voice/screen state snapshot for broadcasting. */ + getVoiceStateSnapshot(): VoiceStateSnapshot; + /** Returns the identify credentials (oderId + displayName). */ + getIdentifyCredentials(): IdentifyCredentials | null; + /** Returns the local peer ID. */ + getLocalPeerId(): string; + /** Whether screen sharing is active. */ + isScreenSharingActive(): boolean; +} + +export interface PeerConnectionManagerState { + activePeerConnections: Map; + remotePeerStreams: Map; + disconnectedPeerTracker: Map; + peerReconnectTimers: Map>; + peerDisconnectGraceTimers: Map>; + pendingPings: Map; + peerPingTimers: Map>; + peerLatencies: Map; + peerLatencyChanged$: Subject<{ peerId: string; latencyMs: number }>; + peerNegotiationQueue: Map>; + peerConnected$: Subject; + peerDisconnected$: Subject; + remoteStream$: Subject<{ peerId: string; stream: MediaStream }>; + messageReceived$: Subject; + connectedPeersChanged$: Subject; + connectedPeersList: string[]; +} + +export interface PeerConnectionManagerContext { + readonly logger: WebRTCLogger; + readonly callbacks: PeerConnectionCallbacks; + readonly state: PeerConnectionManagerState; +} + +export interface RemovePeerOptions { + preserveReconnectState?: boolean; +} + +export interface ConnectionLifecycleHandlers { + clearPeerDisconnectGraceTimer(peerId: string): void; + addToConnectedPeers(peerId: string): void; + clearPeerReconnectTimer(peerId: string): void; + requestVoiceStateFromPeer(peerId: string): void; + schedulePeerDisconnectRecovery(peerId: string): void; + trackDisconnectedPeer(peerId: string): void; + removePeer(peerId: string, options?: RemovePeerOptions): void; + schedulePeerReconnect(peerId: string): void; + handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void; + setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void; +} + +export interface NegotiationHandlers { + createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData; +} + +export interface RecoveryHandlers { + removePeer(peerId: string, options?: RemovePeerOptions): void; + createPeerConnection(peerId: string, isInitiator: boolean): PeerData; + createAndSendOffer(peerId: string): Promise; +} + +export function createPeerConnectionManagerState(): PeerConnectionManagerState { + return { + activePeerConnections: new Map(), + remotePeerStreams: new Map(), + disconnectedPeerTracker: new Map(), + peerReconnectTimers: new Map>(), + peerDisconnectGraceTimers: new Map>(), + pendingPings: new Map(), + peerPingTimers: new Map>(), + peerLatencies: new Map(), + peerLatencyChanged$: new Subject<{ peerId: string; latencyMs: number }>(), + peerNegotiationQueue: new Map>(), + peerConnected$: new Subject(), + peerDisconnected$: new Subject(), + remoteStream$: new Subject<{ peerId: string; stream: MediaStream }>(), + messageReceived$: new Subject(), + connectedPeersChanged$: new Subject(), + connectedPeersList: [] + }; +} diff --git a/src/app/core/services/webrtc/peer-connection-manager/streams/remote-streams.ts b/src/app/core/services/webrtc/peer-connection-manager/streams/remote-streams.ts new file mode 100644 index 0000000..20a6eeb --- /dev/null +++ b/src/app/core/services/webrtc/peer-connection-manager/streams/remote-streams.ts @@ -0,0 +1,94 @@ +import { TRACK_KIND_VIDEO } from '../../webrtc.constants'; +import { PeerConnectionManagerContext } from '../shared'; + +export function handleRemoteTrack( + context: PeerConnectionManagerContext, + event: RTCTrackEvent, + remotePeerId: string +): void { + const { logger, state } = context; + const track = event.track; + const settings = + typeof track.getSettings === 'function' ? track.getSettings() : ({} as MediaTrackSettings); + + logger.info('Remote track', { + remotePeerId, + kind: track.kind, + id: track.id, + enabled: track.enabled, + readyState: track.readyState, + settings + }); + + logger.attachTrackDiagnostics(track, `remote:${remotePeerId}:${track.kind}`); + + if (track.kind === TRACK_KIND_VIDEO && (!track.enabled || track.readyState !== 'live')) { + logger.info('Skipping inactive video track', { + remotePeerId, + enabled: track.enabled, + readyState: track.readyState + }); + + return; + } + + const compositeStream = buildCompositeRemoteStream(state, remotePeerId, track); + + track.addEventListener('ended', () => removeRemoteTrack(state, remotePeerId, track.id)); + + state.remotePeerStreams.set(remotePeerId, compositeStream); + state.remoteStream$.next({ + peerId: remotePeerId, + stream: compositeStream + }); +} + +function buildCompositeRemoteStream( + state: PeerConnectionManagerContext['state'], + remotePeerId: string, + incomingTrack: MediaStreamTrack +): MediaStream { + const existingStream = state.remotePeerStreams.get(remotePeerId); + + let preservedTracks: MediaStreamTrack[] = []; + + if (existingStream) { + preservedTracks = existingStream.getTracks().filter( + (existingTrack) => + existingTrack.kind !== incomingTrack.kind && existingTrack.readyState === 'live' + ); + } + + return new MediaStream([...preservedTracks, incomingTrack]); +} + +function removeRemoteTrack( + state: PeerConnectionManagerContext['state'], + remotePeerId: string, + trackId: string +): void { + const currentStream = state.remotePeerStreams.get(remotePeerId); + + if (!currentStream) + return; + + const remainingTracks = currentStream + .getTracks() + .filter((existingTrack) => existingTrack.id !== trackId && existingTrack.readyState === 'live'); + + if (remainingTracks.length === currentStream.getTracks().length) + return; + + if (remainingTracks.length === 0) { + state.remotePeerStreams.delete(remotePeerId); + return; + } + + const nextStream = new MediaStream(remainingTracks); + + state.remotePeerStreams.set(remotePeerId, nextStream); + state.remoteStream$.next({ + peerId: remotePeerId, + stream: nextStream + }); +} diff --git a/src/app/core/services/webrtc/peer-connection.manager.ts b/src/app/core/services/webrtc/peer-connection.manager.ts index df9bcbe..2b2be85 100644 --- a/src/app/core/services/webrtc/peer-connection.manager.ts +++ b/src/app/core/services/webrtc/peer-connection.manager.ts @@ -1,1195 +1 @@ -/* eslint-disable @typescript-eslint/member-ordering, @typescript-eslint/no-explicit-any, id-length */ -/** - * Creates and manages RTCPeerConnections, data channels, - * offer/answer negotiation, ICE candidates, and P2P reconnection. - */ -import { Subject } from 'rxjs'; -import { ChatEvent } from '../../models'; -import { WebRTCLogger } from './webrtc-logger'; -import { - PeerData, - DisconnectedPeerEntry, - VoiceStateSnapshot, - IdentifyCredentials -} from './webrtc.types'; -import { - ICE_SERVERS, - DATA_CHANNEL_LABEL, - DATA_CHANNEL_HIGH_WATER_BYTES, - DATA_CHANNEL_LOW_WATER_BYTES, - DATA_CHANNEL_STATE_OPEN, - CONNECTION_STATE_CONNECTED, - CONNECTION_STATE_DISCONNECTED, - CONNECTION_STATE_FAILED, - CONNECTION_STATE_CLOSED, - TRACK_KIND_AUDIO, - TRACK_KIND_VIDEO, - TRANSCEIVER_SEND_RECV, - TRANSCEIVER_RECV_ONLY, - PEER_RECONNECT_MAX_ATTEMPTS, - PEER_RECONNECT_INTERVAL_MS, - PEER_DISCONNECT_GRACE_MS, - P2P_TYPE_STATE_REQUEST, - P2P_TYPE_VOICE_STATE_REQUEST, - P2P_TYPE_VOICE_STATE, - P2P_TYPE_SCREEN_STATE, - P2P_TYPE_PING, - P2P_TYPE_PONG, - PEER_PING_INTERVAL_MS, - SIGNALING_TYPE_OFFER, - SIGNALING_TYPE_ANSWER, - SIGNALING_TYPE_ICE_CANDIDATE, - DEFAULT_DISPLAY_NAME -} from './webrtc.constants'; - -/** - * Callbacks the PeerConnectionManager needs from the owning service. - * This keeps the manager decoupled from Angular DI / signals. - */ -export interface PeerConnectionCallbacks { - /** Send a raw JSON message via the signaling server. */ - sendRawMessage(msg: Record): void; - /** Get the current local media stream (mic audio). */ - getLocalMediaStream(): MediaStream | null; - /** Whether signaling is currently connected. */ - isSignalingConnected(): boolean; - /** Returns the current voice/screen state snapshot for broadcasting. */ - getVoiceStateSnapshot(): VoiceStateSnapshot; - /** Returns the identify credentials (oderId + displayName). */ - getIdentifyCredentials(): IdentifyCredentials | null; - /** Returns the local peer ID. */ - getLocalPeerId(): string; - /** Whether screen sharing is active. */ - isScreenSharingActive(): boolean; -} - -export class PeerConnectionManager { - /** Active peer connections keyed by remote peer ID. */ - readonly activePeerConnections = new Map(); - - /** Remote composite streams keyed by remote peer ID. */ - readonly remotePeerStreams = new Map(); - - /** Tracks disconnected peers for P2P reconnection scheduling. */ - private disconnectedPeerTracker = new Map(); - private peerReconnectTimers = new Map>(); - private readonly peerDisconnectGraceTimers = new Map>(); - - /** Pending ping timestamps keyed by peer ID. */ - private readonly pendingPings = new Map(); - /** Per-peer ping interval timers. */ - private readonly peerPingTimers = new Map>(); - /** Last measured latency (ms) per peer. */ - readonly peerLatencies = new Map(); - /** Emitted whenever a peer latency value changes. */ - readonly peerLatencyChanged$ = new Subject<{ peerId: string; latencyMs: number }>(); - - /** - * Per-peer promise chain that serialises all SDP operations - * (handleOffer, handleAnswer, renegotiate) so they never run - * concurrently on the same RTCPeerConnection. - */ - private readonly peerNegotiationQueue = new Map>(); - - readonly peerConnected$ = new Subject(); - readonly peerDisconnected$ = new Subject(); - readonly remoteStream$ = new Subject<{ peerId: string; stream: MediaStream }>(); - readonly messageReceived$ = new Subject(); - /** Emitted whenever the connected peer list changes. */ - readonly connectedPeersChanged$ = new Subject(); - - constructor( - private readonly logger: WebRTCLogger, - private callbacks: PeerConnectionCallbacks - ) {} - - /** - * Replace the callback set at runtime. - * Needed because of circular initialisation between managers. - * - * @param cb - The new callback interface to wire into this manager. - */ - setCallbacks(cb: PeerConnectionCallbacks): void { - this.callbacks = cb; - } - - /** - * Create a new RTCPeerConnection to a remote peer. - * - * Sets up ICE candidate forwarding, connection-state monitoring, - * data-channel creation (initiator) or listening (answerer), - * transceiver pre-creation, and local-track attachment. - * - * @param remotePeerId - Unique identifier of the remote peer. - * @param isInitiator - `true` if this side should create the data channel and send the offer. - * @returns The newly-created {@link PeerData} record. - */ - createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData { - this.logger.info('Creating peer connection', { remotePeerId, - isInitiator }); - - const connection = new RTCPeerConnection({ iceServers: ICE_SERVERS }); - - let dataChannel: RTCDataChannel | null = null; - - // ICE candidates → signaling - connection.onicecandidate = (event) => { - if (event.candidate) { - this.logger.info('ICE candidate gathered', { - remotePeerId, - candidateType: (event.candidate as any)?.type - }); - - this.callbacks.sendRawMessage({ - type: SIGNALING_TYPE_ICE_CANDIDATE, - targetUserId: remotePeerId, - payload: { candidate: event.candidate } - }); - } - }; - - // Connection state - connection.onconnectionstatechange = () => { - this.logger.info('connectionstatechange', { - remotePeerId, - state: connection.connectionState - }); - - switch (connection.connectionState) { - case CONNECTION_STATE_CONNECTED: - this.clearPeerDisconnectGraceTimer(remotePeerId); - this.addToConnectedPeers(remotePeerId); - this.peerConnected$.next(remotePeerId); - this.clearPeerReconnectTimer(remotePeerId); - this.disconnectedPeerTracker.delete(remotePeerId); - this.requestVoiceStateFromPeer(remotePeerId); - break; - - case CONNECTION_STATE_DISCONNECTED: - this.schedulePeerDisconnectRecovery(remotePeerId); - break; - - case CONNECTION_STATE_FAILED: - this.trackDisconnectedPeer(remotePeerId); - this.removePeer(remotePeerId, { preserveReconnectState: true }); - this.schedulePeerReconnect(remotePeerId); - break; - - case CONNECTION_STATE_CLOSED: - this.removePeer(remotePeerId); - break; - } - }; - - // Additional state logs - connection.oniceconnectionstatechange = () => { - this.logger.info('iceconnectionstatechange', { - remotePeerId, - state: connection.iceConnectionState - }); - }; - - connection.onsignalingstatechange = () => { - this.logger.info('signalingstatechange', { remotePeerId, - state: connection.signalingState }); - }; - - connection.onnegotiationneeded = () => { - this.logger.info('negotiationneeded', { remotePeerId }); - }; - - // Incoming remote tracks - connection.ontrack = (event) => { - this.handleRemoteTrack(event, remotePeerId); - }; - - // Data channel - if (isInitiator) { - dataChannel = connection.createDataChannel(DATA_CHANNEL_LABEL, { ordered: true }); - this.setupDataChannel(dataChannel, remotePeerId); - } else { - connection.ondatachannel = (event) => { - this.logger.info('Received data channel', { remotePeerId }); - dataChannel = event.channel; - const existing = this.activePeerConnections.get(remotePeerId); - - if (existing) { - existing.dataChannel = dataChannel; - } - - this.setupDataChannel(dataChannel, remotePeerId); - }; - } - - const peerData: PeerData = { - connection, - dataChannel, - isInitiator, - pendingIceCandidates: [], - audioSender: undefined, - videoSender: undefined - }; - - // Pre-create transceivers only for the initiator (offerer). - if (isInitiator) { - const audioTransceiver = connection.addTransceiver(TRACK_KIND_AUDIO, { - direction: TRANSCEIVER_SEND_RECV - }); - const videoTransceiver = connection.addTransceiver(TRACK_KIND_VIDEO, { - direction: TRANSCEIVER_RECV_ONLY - }); - - peerData.audioSender = audioTransceiver.sender; - peerData.videoSender = videoTransceiver.sender; - } - - this.activePeerConnections.set(remotePeerId, peerData); - - // Attach local stream to initiator - const localStream = this.callbacks.getLocalMediaStream(); - - if (localStream && isInitiator) { - this.logger.logStream(`localStream->${remotePeerId}`, localStream); - localStream.getTracks().forEach((track) => { - if (track.kind === TRACK_KIND_AUDIO && peerData.audioSender) { - peerData.audioSender - .replaceTrack(track) - .then(() => this.logger.info('audio replaceTrack (init) ok', { remotePeerId })) - .catch((e) => - this.logger.error('audio replaceTrack failed at createPeerConnection', e) - ); - } else if (track.kind === TRACK_KIND_VIDEO && peerData.videoSender) { - peerData.videoSender - .replaceTrack(track) - .then(() => this.logger.info('video replaceTrack (init) ok', { remotePeerId })) - .catch((e) => - this.logger.error('video replaceTrack failed at createPeerConnection', e) - ); - } else { - const sender = connection.addTrack(track, localStream); - - if (track.kind === TRACK_KIND_AUDIO) - peerData.audioSender = sender; - - if (track.kind === TRACK_KIND_VIDEO) - peerData.videoSender = sender; - } - }); - } - - return peerData; - } - - /** - * Create an SDP offer and send it to the remote peer via the signaling server. - * - * Serialised per-peer via the negotiation queue. - * - * @param remotePeerId - The peer to send the offer to. - */ - async createAndSendOffer(remotePeerId: string): Promise { - return new Promise((resolve) => { - this.enqueueNegotiation(remotePeerId, async () => { - await this.doCreateAndSendOffer(remotePeerId); - resolve(); - }); - }); - } - - private async doCreateAndSendOffer(remotePeerId: string): Promise { - const peerData = this.activePeerConnections.get(remotePeerId); - - if (!peerData) - return; - - try { - const offer = await peerData.connection.createOffer(); - - await peerData.connection.setLocalDescription(offer); - this.logger.info('Sending offer', { - remotePeerId, - type: offer.type, - sdpLength: offer.sdp?.length - }); - - this.callbacks.sendRawMessage({ - type: SIGNALING_TYPE_OFFER, - targetUserId: remotePeerId, - payload: { sdp: offer } - }); - } catch (error) { - this.logger.error('Failed to create offer', error); - } - } - - // ═══════════════════════════════════════════════════════════════════ - // Per-peer negotiation serialisation helpers - // ═══════════════════════════════════════════════════════════════════ - - /** - * Enqueue an async SDP operation for a given peer. - * - * All operations on the same peer run strictly in order, preventing - * concurrent `setLocalDescription` / `setRemoteDescription` calls - * that corrupt the RTCPeerConnection signalling state. - */ - private enqueueNegotiation(peerId: string, task: () => Promise): void { - const prev = this.peerNegotiationQueue.get(peerId) ?? Promise.resolve(); - const next = prev.then(task, task); // always chain, even after rejection - - this.peerNegotiationQueue.set(peerId, next); - } - - /** - * Handle an incoming SDP offer from a remote peer. - * - * Implements the "perfect negotiation" pattern to resolve offer - * collisions (glare). When both sides send offers simultaneously - * the **polite** peer (higher ID) rolls back its own offer and - * accepts the remote one; the **impolite** peer (lower ID) ignores - * the incoming offer and waits for an answer to its own. - * - * The actual SDP work is serialised per-peer to prevent races. - * - * @param fromUserId - The peer ID that sent the offer. - * @param sdp - The remote session description. - */ - handleOffer(fromUserId: string, sdp: RTCSessionDescriptionInit): void { - this.enqueueNegotiation(fromUserId, () => this.doHandleOffer(fromUserId, sdp)); - } - - private async doHandleOffer(fromUserId: string, sdp: RTCSessionDescriptionInit): Promise { - this.logger.info('Handling offer', { fromUserId }); - - let peerData = this.activePeerConnections.get(fromUserId); - - if (!peerData) { - peerData = this.createPeerConnection(fromUserId, false); - } - - try { - // ── Offer-collision (glare) detection ────────────────────────── - const signalingState = peerData.connection.signalingState; - const collision = - signalingState === 'have-local-offer' || signalingState === 'have-local-pranswer'; - - if (collision) { - const localId = - this.callbacks.getIdentifyCredentials()?.oderId || this.callbacks.getLocalPeerId(); - // The "polite" peer (lexicographically greater ID) yields its own offer. - const isPolite = localId > fromUserId; - - if (!isPolite) { - this.logger.info('Ignoring colliding offer (impolite side)', { fromUserId, - localId }); - - return; // Our offer takes priority - remote will answer it. - } - - this.logger.info('Rolling back local offer (polite side)', { fromUserId, - localId }); - - await peerData.connection.setLocalDescription({ - type: 'rollback' - } as RTCSessionDescriptionInit); - } - // ────────────────────────────────────────────────────────────── - - await peerData.connection.setRemoteDescription(new RTCSessionDescription(sdp)); - - // Discover transceivers the browser created on the answerer side - // and ensure audio transceivers are set to sendrecv for bidirectional voice. - // Without this, the answerer's SDP answer defaults to recvonly for audio, - // making the connection one-way (only the offerer's audio is heard). - const transceivers = peerData.connection.getTransceivers(); - - for (const transceiver of transceivers) { - const receiverKind = transceiver.receiver.track?.kind; - - if (receiverKind === TRACK_KIND_AUDIO) { - if (!peerData.audioSender) { - peerData.audioSender = transceiver.sender; - } - - // Promote to sendrecv so the SDP answer includes a send direction, - // enabling bidirectional audio regardless of who initiated the connection. - transceiver.direction = TRANSCEIVER_SEND_RECV; - } else if (receiverKind === TRACK_KIND_VIDEO && !peerData.videoSender) { - peerData.videoSender = transceiver.sender; - } - } - - // Attach local tracks (answerer side) - const localStream = this.callbacks.getLocalMediaStream(); - - if (localStream) { - this.logger.logStream(`localStream->${fromUserId} (answerer)`, localStream); - - for (const track of localStream.getTracks()) { - if (track.kind === TRACK_KIND_AUDIO && peerData.audioSender) { - await peerData.audioSender.replaceTrack(track); - this.logger.info('audio replaceTrack (answerer) ok', { fromUserId }); - } else if (track.kind === TRACK_KIND_VIDEO && peerData.videoSender) { - await peerData.videoSender.replaceTrack(track); - this.logger.info('video replaceTrack (answerer) ok', { fromUserId }); - } - } - } - - // Flush queued ICE candidates - for (const candidate of peerData.pendingIceCandidates) { - await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate)); - } - - peerData.pendingIceCandidates = []; - - const answer = await peerData.connection.createAnswer(); - - await peerData.connection.setLocalDescription(answer); - - this.logger.info('Sending answer', { - to: fromUserId, - type: answer.type, - sdpLength: answer.sdp?.length - }); - - this.callbacks.sendRawMessage({ - type: SIGNALING_TYPE_ANSWER, - targetUserId: fromUserId, - payload: { sdp: answer } - }); - } catch (error) { - this.logger.error('Failed to handle offer', error); - } - } - - /** - * Handle an incoming SDP answer from a remote peer. - * - * Sets the remote description and flushes any queued ICE candidates. - * Ignored if the connection is not in the `have-local-offer` state. - * - * Serialised per-peer via the negotiation queue. - * - * @param fromUserId - The peer ID that sent the answer. - * @param sdp - The remote session description. - */ - handleAnswer(fromUserId: string, sdp: RTCSessionDescriptionInit): void { - this.enqueueNegotiation(fromUserId, () => this.doHandleAnswer(fromUserId, sdp)); - } - - private async doHandleAnswer(fromUserId: string, sdp: RTCSessionDescriptionInit): Promise { - this.logger.info('Handling answer', { fromUserId }); - const peerData = this.activePeerConnections.get(fromUserId); - - if (!peerData) { - this.logger.error('No peer for answer', new Error('Missing peer'), { fromUserId }); - return; - } - - try { - if (peerData.connection.signalingState === 'have-local-offer') { - await peerData.connection.setRemoteDescription(new RTCSessionDescription(sdp)); - - for (const candidate of peerData.pendingIceCandidates) { - await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate)); - } - - peerData.pendingIceCandidates = []; - } else { - this.logger.warn('Ignoring answer - wrong signaling state', { - state: peerData.connection.signalingState - }); - } - } catch (error) { - this.logger.error('Failed to handle answer', error); - } - } - - /** - * Process an incoming ICE candidate from a remote peer. - * - * If the remote description has already been set the candidate is added - * immediately; otherwise it is queued until the description arrives. - * - * Serialised per-peer via the negotiation queue. - * - * @param fromUserId - The peer ID that sent the candidate. - * @param candidate - The ICE candidate to add. - */ - handleIceCandidate(fromUserId: string, candidate: RTCIceCandidateInit): void { - this.enqueueNegotiation(fromUserId, () => this.doHandleIceCandidate(fromUserId, candidate)); - } - - private async doHandleIceCandidate( - fromUserId: string, - candidate: RTCIceCandidateInit - ): Promise { - let peerData = this.activePeerConnections.get(fromUserId); - - if (!peerData) { - this.logger.info('Creating peer for early ICE', { fromUserId }); - peerData = this.createPeerConnection(fromUserId, false); - } - - try { - if (peerData.connection.remoteDescription) { - await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate)); - } else { - this.logger.info('Queuing ICE candidate', { fromUserId }); - peerData.pendingIceCandidates.push(candidate); - } - } catch (error) { - this.logger.error('Failed to add ICE candidate', error); - } - } - - /** - * Re-negotiate (create offer) to push track changes to remote. - * - * Serialised per-peer via the negotiation queue so it never races - * against an incoming offer or a previous renegotiate. - */ - async renegotiate(peerId: string): Promise { - return new Promise((resolve) => { - this.enqueueNegotiation(peerId, async () => { - await this.doRenegotiate(peerId); - resolve(); - }); - }); - } - - private async doRenegotiate(peerId: string): Promise { - const peerData = this.activePeerConnections.get(peerId); - - if (!peerData) - return; - - try { - const offer = await peerData.connection.createOffer(); - - await peerData.connection.setLocalDescription(offer); - this.logger.info('Renegotiate offer', { - peerId, - type: offer.type, - sdpLength: offer.sdp?.length - }); - - this.callbacks.sendRawMessage({ - type: SIGNALING_TYPE_OFFER, - targetUserId: peerId, - payload: { sdp: offer } - }); - } catch (error) { - this.logger.error('Failed to renegotiate', error); - } - } - - /** - * Wire open/close/error/message handlers onto a data channel. - * - * On open, current voice and screen states are sent to the remote peer - * and a state-request is emitted so the remote peer does the same. - * - * @param channel - The RTCDataChannel to configure. - * @param remotePeerId - The remote peer this channel belongs to. - */ - private setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void { - channel.onopen = () => { - this.logger.info('Data channel open', { remotePeerId }); - this.sendCurrentStatesToChannel(channel, remotePeerId); - - try { - channel.send(JSON.stringify({ type: P2P_TYPE_STATE_REQUEST })); - } catch { - /* ignore */ - } - - this.startPingInterval(remotePeerId); - }; - - channel.onclose = () => { - this.logger.info('Data channel closed', { remotePeerId }); - }; - - channel.onerror = (error) => { - this.logger.error('Data channel error', error, { remotePeerId }); - }; - - channel.onmessage = (event) => { - try { - const message = JSON.parse(event.data); - - this.handlePeerMessage(remotePeerId, message); - } catch (error) { - this.logger.error('Failed to parse peer message', error); - } - }; - } - - /** - * Route an incoming peer-to-peer message. - * - * State-request messages trigger an immediate state broadcast back. - * All other messages are enriched with `fromPeerId` and forwarded - * to the {@link messageReceived$} subject. - * - * @param peerId - The remote peer that sent the message. - * @param message - The parsed JSON payload. - */ - private handlePeerMessage(peerId: string, message: any): void { - this.logger.info('Received P2P message', { peerId, - type: message?.type }); - - if (message.type === P2P_TYPE_STATE_REQUEST || message.type === P2P_TYPE_VOICE_STATE_REQUEST) { - this.sendCurrentStatesToPeer(peerId); - return; - } - - // Ping/pong latency measurement - handled internally, not forwarded - if (message.type === P2P_TYPE_PING) { - this.sendToPeer(peerId, { type: P2P_TYPE_PONG, - ts: message.ts } as any); - - return; - } - - if (message.type === P2P_TYPE_PONG) { - const sent = this.pendingPings.get(peerId); - - if (sent && typeof message.ts === 'number' && message.ts === sent) { - const latencyMs = Math.round(performance.now() - sent); - - this.peerLatencies.set(peerId, latencyMs); - this.peerLatencyChanged$.next({ peerId, - latencyMs }); - } - - this.pendingPings.delete(peerId); - return; - } - - const enriched = { ...message, - fromPeerId: peerId }; - - this.messageReceived$.next(enriched); - } - - /** Broadcast a ChatEvent to every peer with an open data channel. */ - broadcastMessage(event: ChatEvent): void { - const data = JSON.stringify(event); - - this.activePeerConnections.forEach((peerData, peerId) => { - try { - if (peerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) { - peerData.dataChannel.send(data); - this.logger.info('Sent message via P2P', { peerId }); - } - } catch (error) { - this.logger.error('Failed to send to peer', error, { peerId }); - } - }); - } - - /** - * Send a {@link ChatEvent} to a specific peer's data channel. - * - * Silently returns if the peer is not connected or the channel is not open. - * - * @param peerId - The target peer. - * @param event - The chat event to send. - */ - sendToPeer(peerId: string, event: ChatEvent): void { - const peerData = this.activePeerConnections.get(peerId); - - if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) { - this.logger.warn('Peer not connected - cannot send', { peerId }); - return; - } - - try { - peerData.dataChannel.send(JSON.stringify(event)); - } catch (error) { - this.logger.error('Failed to send to peer', error, { peerId }); - } - } - - /** - * Send a {@link ChatEvent} with back-pressure awareness. - * - * If the data channel's buffer exceeds {@link DATA_CHANNEL_HIGH_WATER_BYTES} - * the call awaits until the buffer drains below {@link DATA_CHANNEL_LOW_WATER_BYTES}. - * - * @param peerId - The target peer. - * @param event - The chat event to send. - */ - async sendToPeerBuffered(peerId: string, event: ChatEvent): Promise { - const peerData = this.activePeerConnections.get(peerId); - - if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) { - this.logger.warn('Peer not connected - cannot send buffered', { peerId }); - return; - } - - const channel = peerData.dataChannel; - const data = JSON.stringify(event); - - if (typeof channel.bufferedAmountLowThreshold === 'number') { - channel.bufferedAmountLowThreshold = DATA_CHANNEL_LOW_WATER_BYTES; - } - - if (channel.bufferedAmount > DATA_CHANNEL_HIGH_WATER_BYTES) { - await new Promise((resolve) => { - const handler = () => { - if (channel.bufferedAmount <= DATA_CHANNEL_LOW_WATER_BYTES) { - channel.removeEventListener('bufferedamountlow', handler as any); - resolve(); - } - }; - - channel.addEventListener('bufferedamountlow', handler as any, { once: true } as any); - }); - } - - try { - channel.send(data); - } catch (error) { - this.logger.error('Failed to send buffered message', error, { peerId }); - } - } - - /** - * Send the current voice and screen-share states to a single peer. - * - * @param peerId - The peer to notify. - */ - sendCurrentStatesToPeer(peerId: string): void { - const credentials = this.callbacks.getIdentifyCredentials(); - const oderId = credentials?.oderId || this.callbacks.getLocalPeerId(); - const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; - const voiceState = this.callbacks.getVoiceStateSnapshot(); - - this.sendToPeer(peerId, { type: P2P_TYPE_VOICE_STATE, - oderId, - displayName, - voiceState } as any); - - this.sendToPeer(peerId, { - type: P2P_TYPE_SCREEN_STATE, - oderId, - displayName, - isScreenSharing: this.callbacks.isScreenSharingActive() - } as any); - } - - private sendCurrentStatesToChannel(channel: RTCDataChannel, remotePeerId: string): void { - if (channel.readyState !== DATA_CHANNEL_STATE_OPEN) { - this.logger.warn('Cannot send states - channel not open', { - remotePeerId, - state: channel.readyState - }); - - return; - } - - const credentials = this.callbacks.getIdentifyCredentials(); - const oderId = credentials?.oderId || this.callbacks.getLocalPeerId(); - const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; - const voiceState = this.callbacks.getVoiceStateSnapshot(); - - try { - channel.send(JSON.stringify({ type: P2P_TYPE_VOICE_STATE, - oderId, - displayName, - voiceState })); - - channel.send( - JSON.stringify({ - type: P2P_TYPE_SCREEN_STATE, - oderId, - displayName, - isScreenSharing: this.callbacks.isScreenSharingActive() - }) - ); - - this.logger.info('Sent initial states to channel', { remotePeerId, - voiceState }); - } catch (e) { - this.logger.error('Failed to send initial states to channel', e); - } - } - - /** Broadcast the current voice and screen-share states to all connected peers. */ - broadcastCurrentStates(): void { - const credentials = this.callbacks.getIdentifyCredentials(); - const oderId = credentials?.oderId || this.callbacks.getLocalPeerId(); - const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME; - const voiceState = this.callbacks.getVoiceStateSnapshot(); - - this.broadcastMessage({ type: P2P_TYPE_VOICE_STATE, - oderId, - displayName, - voiceState } as any); - - this.broadcastMessage({ - type: P2P_TYPE_SCREEN_STATE, - oderId, - displayName, - isScreenSharing: this.callbacks.isScreenSharingActive() - } as any); - } - - private handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void { - const track = event.track; - const settings = - typeof track.getSettings === 'function' ? track.getSettings() : ({} as MediaTrackSettings); - - this.logger.info('Remote track', { - remotePeerId, - kind: track.kind, - id: track.id, - enabled: track.enabled, - readyState: track.readyState, - settings - }); - - this.logger.attachTrackDiagnostics(track, `remote:${remotePeerId}:${track.kind}`); - - // Skip inactive video placeholder tracks - if (track.kind === TRACK_KIND_VIDEO && (!track.enabled || track.readyState !== 'live')) { - this.logger.info('Skipping inactive video track', { - remotePeerId, - enabled: track.enabled, - readyState: track.readyState - }); - - return; - } - - const compositeStream = this.buildCompositeRemoteStream(remotePeerId, track); - - track.addEventListener('ended', () => this.removeRemoteTrack(remotePeerId, track.id)); - - this.remotePeerStreams.set(remotePeerId, compositeStream); - this.remoteStream$.next({ peerId: remotePeerId, - stream: compositeStream }); - } - - private buildCompositeRemoteStream(remotePeerId: string, incomingTrack: MediaStreamTrack): MediaStream { - const existingStream = this.remotePeerStreams.get(remotePeerId); - - let preservedTracks: MediaStreamTrack[] = []; - - if (existingStream) { - preservedTracks = existingStream.getTracks().filter( - (existingTrack) => - existingTrack.kind !== incomingTrack.kind && existingTrack.readyState === 'live' - ); - } - - return new MediaStream([...preservedTracks, incomingTrack]); - } - - private removeRemoteTrack(remotePeerId: string, trackId: string): void { - const currentStream = this.remotePeerStreams.get(remotePeerId); - - if (!currentStream) - return; - - const remainingTracks = currentStream - .getTracks() - .filter((existingTrack) => existingTrack.id !== trackId && existingTrack.readyState === 'live'); - - if (remainingTracks.length === currentStream.getTracks().length) - return; - - if (remainingTracks.length === 0) { - this.remotePeerStreams.delete(remotePeerId); - return; - } - - const nextStream = new MediaStream(remainingTracks); - - this.remotePeerStreams.set(remotePeerId, nextStream); - this.remoteStream$.next({ peerId: remotePeerId, - stream: nextStream }); - } - - /** - * Close and remove a peer connection, data channel, and emit a disconnect event. - * - * @param peerId - The peer to remove. - */ - removePeer(peerId: string, options?: { preserveReconnectState?: boolean }): void { - const peerData = this.activePeerConnections.get(peerId); - const preserveReconnectState = options?.preserveReconnectState === true; - - this.clearPeerDisconnectGraceTimer(peerId); - - if (!preserveReconnectState) { - this.clearPeerReconnectTimer(peerId); - this.disconnectedPeerTracker.delete(peerId); - } - - this.remotePeerStreams.delete(peerId); - - if (peerData) { - if (peerData.dataChannel) - peerData.dataChannel.close(); - - peerData.connection.close(); - this.activePeerConnections.delete(peerId); - this.peerNegotiationQueue.delete(peerId); - this.removeFromConnectedPeers(peerId); - this.stopPingInterval(peerId); - this.peerLatencies.delete(peerId); - this.pendingPings.delete(peerId); - this.peerDisconnected$.next(peerId); - } - } - - /** Close every active peer connection and clear internal state. */ - closeAllPeers(): void { - this.clearAllPeerReconnectTimers(); - this.clearAllPeerDisconnectGraceTimers(); - this.clearAllPingTimers(); - this.activePeerConnections.forEach((peerData) => { - if (peerData.dataChannel) - peerData.dataChannel.close(); - - peerData.connection.close(); - }); - - this.activePeerConnections.clear(); - this.remotePeerStreams.clear(); - this.peerNegotiationQueue.clear(); - this.peerLatencies.clear(); - this.pendingPings.clear(); - this.connectedPeersChanged$.next([]); - } - - private trackDisconnectedPeer(peerId: string): void { - this.disconnectedPeerTracker.set(peerId, { - lastSeenTimestamp: Date.now(), - reconnectAttempts: 0 - }); - } - - private clearPeerReconnectTimer(peerId: string): void { - const timer = this.peerReconnectTimers.get(peerId); - - if (timer) { - clearInterval(timer); - this.peerReconnectTimers.delete(peerId); - } - } - - private clearPeerDisconnectGraceTimer(peerId: string): void { - const timer = this.peerDisconnectGraceTimers.get(peerId); - - if (timer) { - clearTimeout(timer); - this.peerDisconnectGraceTimers.delete(peerId); - } - } - - /** Cancel all pending peer reconnect timers and clear the tracker. */ - clearAllPeerReconnectTimers(): void { - this.peerReconnectTimers.forEach((timer) => clearInterval(timer)); - this.peerReconnectTimers.clear(); - this.disconnectedPeerTracker.clear(); - } - - private clearAllPeerDisconnectGraceTimers(): void { - this.peerDisconnectGraceTimers.forEach((timer) => clearTimeout(timer)); - this.peerDisconnectGraceTimers.clear(); - } - - private schedulePeerDisconnectRecovery(peerId: string): void { - if (this.peerDisconnectGraceTimers.has(peerId)) - return; - - this.logger.warn('Peer temporarily disconnected; waiting before reconnect', { peerId }); - - const timer = setTimeout(() => { - this.peerDisconnectGraceTimers.delete(peerId); - - const peerData = this.activePeerConnections.get(peerId); - - if (!peerData) - return; - - const state = peerData.connection.connectionState; - - if (state === CONNECTION_STATE_CONNECTED || state === 'connecting') { - this.logger.info('Peer recovered before disconnect grace expired', { - peerId, - state - }); - - return; - } - - this.logger.warn('Peer still disconnected after grace period; recreating connection', { - peerId, - state - }); - - this.trackDisconnectedPeer(peerId); - this.removePeer(peerId, { preserveReconnectState: true }); - this.schedulePeerReconnect(peerId); - }, PEER_DISCONNECT_GRACE_MS); - - this.peerDisconnectGraceTimers.set(peerId, timer); - } - - private schedulePeerReconnect(peerId: string): void { - if (this.peerReconnectTimers.has(peerId)) - return; - - this.logger.info('Scheduling P2P reconnect', { peerId }); - - const timer = setInterval(() => { - const info = this.disconnectedPeerTracker.get(peerId); - - if (!info) { - this.clearPeerReconnectTimer(peerId); - return; - } - - info.reconnectAttempts++; - this.logger.info('P2P reconnect attempt', { peerId, - attempt: info.reconnectAttempts }); - - if (info.reconnectAttempts >= PEER_RECONNECT_MAX_ATTEMPTS) { - this.logger.info('P2P reconnect max attempts reached', { peerId }); - this.clearPeerReconnectTimer(peerId); - this.disconnectedPeerTracker.delete(peerId); - return; - } - - if (!this.callbacks.isSignalingConnected()) { - this.logger.info('Skipping P2P reconnect - no signaling connection', { peerId }); - return; - } - - this.attemptPeerReconnect(peerId); - }, PEER_RECONNECT_INTERVAL_MS); - - this.peerReconnectTimers.set(peerId, timer); - } - - private attemptPeerReconnect(peerId: string): void { - if (this.activePeerConnections.has(peerId)) { - this.removePeer(peerId, { preserveReconnectState: true }); - } - - this.createPeerConnection(peerId, true); - void this.createAndSendOffer(peerId); - } - - private requestVoiceStateFromPeer(peerId: string): void { - const peerData = this.activePeerConnections.get(peerId); - - if (peerData?.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) { - try { - peerData.dataChannel.send(JSON.stringify({ type: P2P_TYPE_VOICE_STATE_REQUEST })); - } catch (e) { - this.logger.warn('Failed to request voice state', e as any); - } - } - } - - private connectedPeersList: string[] = []; - - /** Return a snapshot copy of the currently-connected peer IDs. */ - getConnectedPeerIds(): string[] { - return [...this.connectedPeersList]; - } - - private addToConnectedPeers(peerId: string): void { - if (!this.connectedPeersList.includes(peerId)) { - this.connectedPeersList = [...this.connectedPeersList, peerId]; - this.connectedPeersChanged$.next(this.connectedPeersList); - } - } - - /** - * Remove a peer from the connected list and notify subscribers. - * - * @param peerId - The peer to remove. - */ - private removeFromConnectedPeers(peerId: string): void { - this.connectedPeersList = this.connectedPeersList.filter( - (connectedId) => connectedId !== peerId - ); - - this.connectedPeersChanged$.next(this.connectedPeersList); - } - - /** Reset the connected peers list to empty and notify subscribers. */ - resetConnectedPeers(): void { - this.connectedPeersList = []; - this.connectedPeersChanged$.next([]); - } - - // ═══════════════════════════════════════════════════════════════════ - // Ping / Latency helpers - // ═══════════════════════════════════════════════════════════════════ - - /** Start periodic pings to a peer to measure round-trip latency. */ - private startPingInterval(peerId: string): void { - this.stopPingInterval(peerId); - // Send an immediate ping - this.sendPing(peerId); - const timer = setInterval(() => this.sendPing(peerId), PEER_PING_INTERVAL_MS); - - this.peerPingTimers.set(peerId, timer); - } - - /** Stop the periodic ping for a specific peer. */ - private stopPingInterval(peerId: string): void { - const timer = this.peerPingTimers.get(peerId); - - if (timer) { - clearInterval(timer); - this.peerPingTimers.delete(peerId); - } - } - - /** Cancel all active ping timers. */ - private clearAllPingTimers(): void { - this.peerPingTimers.forEach((timer) => clearInterval(timer)); - this.peerPingTimers.clear(); - } - - /** Send a single ping to a peer. */ - private sendPing(peerId: string): void { - const peerData = this.activePeerConnections.get(peerId); - - if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) - return; - - const ts = performance.now(); - - this.pendingPings.set(peerId, ts); - - try { - peerData.dataChannel.send(JSON.stringify({ type: P2P_TYPE_PING, - ts })); - } catch { - /* ignore */ - } - } - - /** Clean up all resources. */ - destroy(): void { - this.closeAllPeers(); - this.peerConnected$.complete(); - this.peerDisconnected$.complete(); - this.remoteStream$.complete(); - this.messageReceived$.complete(); - this.connectedPeersChanged$.complete(); - this.peerLatencyChanged$.complete(); - } -} +export * from './peer-connection-manager';