Files
Toju/src/app/infrastructure/realtime/peer-connection-manager/peer-connection.manager.ts
2026-03-23 00:42:08 +01:00

464 lines
15 KiB
TypeScript

/* eslint-disable @typescript-eslint/member-ordering */
import { ChatEvent } from '../../../shared-kernel';
import { recordDebugNetworkDownloadRates } from '../logging/debug-network-metrics';
import { WebRTCLogger } from '../logging/webrtc-logger';
import { PeerData } from '../realtime.types';
import { createPeerConnection as createManagedPeerConnection } from './connection/create-peer-connection';
import {
doCreateAndSendOffer,
doHandleAnswer,
doHandleIceCandidate,
doHandleOffer,
doRenegotiate,
enqueueNegotiation
} from './connection/negotiation';
import {
broadcastCurrentStates,
broadcastMessage,
sendCurrentStatesToPeer,
sendToPeer,
sendToPeerBuffered,
setupDataChannel
} from './messaging/data-channel';
import {
addToConnectedPeers,
clearAllPeerReconnectTimers,
clearPeerDisconnectGraceTimer,
clearPeerReconnectTimer,
closeAllPeers as closeManagedPeers,
getConnectedPeerIds,
removePeer as removeManagedPeer,
requestVoiceStateFromPeer,
resetConnectedPeers,
schedulePeerDisconnectRecovery,
schedulePeerReconnect,
trackDisconnectedPeer
} from './recovery/peer-recovery';
import { clearRemoteScreenShareStream as clearManagedRemoteScreenShareStream, handleRemoteTrack } from './streams/remote-streams';
import {
ConnectionLifecycleHandlers,
createPeerConnectionManagerState,
NegotiationHandlers,
PeerConnectionCallbacks,
PeerConnectionManagerContext,
RecoveryHandlers,
RemovePeerOptions
} from './shared';
const PEER_STATS_POLL_INTERVAL_MS = 2_000;
const PEER_STATS_SAMPLE_MIN_INTERVAL_MS = 500;
interface PeerInboundByteSnapshot {
audioBytesReceived: number;
collectedAt: number;
videoBytesReceived: number;
}
/**
* Creates and manages RTCPeerConnections, data channels,
* offer/answer negotiation, ICE candidates, and P2P reconnection.
*/
export class PeerConnectionManager {
private readonly state = createPeerConnectionManagerState();
private readonly lastInboundByteSnapshots = new Map<string, PeerInboundByteSnapshot>();
private statsPollTimer: ReturnType<typeof setInterval> | null = null;
private transportStatsPollInFlight = false;
/** Active peer connections keyed by remote peer ID. */
readonly activePeerConnections = this.state.activePeerConnections;
/** Remote composite streams keyed by remote peer ID. */
readonly remotePeerStreams = this.state.remotePeerStreams;
/** Remote voice-only streams keyed by remote peer ID. */
readonly remotePeerVoiceStreams = this.state.remotePeerVoiceStreams;
/** Remote screen-share streams keyed by remote peer ID. */
readonly remotePeerScreenShareStreams = this.state.remotePeerScreenShareStreams;
/** Last measured latency (ms) per peer. */
readonly peerLatencies = this.state.peerLatencies;
/** Emitted whenever a peer latency value changes. */
readonly peerLatencyChanged$ = this.state.peerLatencyChanged$;
readonly peerConnected$ = this.state.peerConnected$;
readonly peerDisconnected$ = this.state.peerDisconnected$;
readonly remoteStream$ = this.state.remoteStream$;
readonly messageReceived$ = this.state.messageReceived$;
/** Emitted whenever the connected peer list changes. */
readonly connectedPeersChanged$ = this.state.connectedPeersChanged$;
constructor(
private readonly logger: WebRTCLogger,
private callbacks: PeerConnectionCallbacks
) {
this.startTransportStatsPolling();
}
/**
* Replace the callback set at runtime.
* Needed because of circular initialisation between managers.
*/
setCallbacks(callbacks: PeerConnectionCallbacks): void {
this.callbacks = callbacks;
}
/**
* Create a new RTCPeerConnection to a remote peer.
*/
createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData {
return createManagedPeerConnection(
this.context,
remotePeerId,
isInitiator,
this.connectionHandlers
);
}
/**
* Create an SDP offer and send it to the remote peer via the signaling server.
*/
async createAndSendOffer(remotePeerId: string): Promise<void> {
return new Promise<void>((resolve) => {
enqueueNegotiation(this.state, remotePeerId, async () => {
await doCreateAndSendOffer(this.context, remotePeerId);
resolve();
});
});
}
/**
* Handle an incoming SDP offer from a remote peer.
*/
handleOffer(fromUserId: string, sdp: RTCSessionDescriptionInit): void {
enqueueNegotiation(this.state, fromUserId, () =>
doHandleOffer(this.context, fromUserId, sdp, this.negotiationHandlers)
);
}
/**
* Handle an incoming SDP answer from a remote peer.
*/
handleAnswer(fromUserId: string, sdp: RTCSessionDescriptionInit): void {
enqueueNegotiation(this.state, fromUserId, () =>
doHandleAnswer(this.context, fromUserId, sdp)
);
}
/**
* Process an incoming ICE candidate from a remote peer.
*/
handleIceCandidate(fromUserId: string, candidate: RTCIceCandidateInit): void {
enqueueNegotiation(this.state, fromUserId, () =>
doHandleIceCandidate(this.context, fromUserId, candidate, this.negotiationHandlers)
);
}
/**
* Re-negotiate (create offer) to push track changes to remote.
*/
async renegotiate(peerId: string): Promise<void> {
return new Promise<void>((resolve) => {
enqueueNegotiation(this.state, peerId, async () => {
await doRenegotiate(this.context, peerId);
resolve();
});
});
}
/** Broadcast a ChatEvent to every peer with an open data channel. */
broadcastMessage(event: ChatEvent): void {
broadcastMessage(this.context, event);
}
/**
* Send a ChatEvent to a specific peer's data channel.
*/
sendToPeer(peerId: string, event: ChatEvent): void {
sendToPeer(this.context, peerId, event);
}
/**
* Send a ChatEvent with back-pressure awareness.
*/
async sendToPeerBuffered(peerId: string, event: ChatEvent): Promise<void> {
await sendToPeerBuffered(this.context, peerId, event);
}
/**
* Send the current voice and screen-share states to a single peer.
*/
sendCurrentStatesToPeer(peerId: string): void {
sendCurrentStatesToPeer(this.context, peerId);
}
/** Broadcast the current voice and screen-share states to all connected peers. */
broadcastCurrentStates(): void {
broadcastCurrentStates(this.context);
}
/**
* Close and remove a peer connection, data channel, and emit a disconnect event.
*/
removePeer(peerId: string, options?: RemovePeerOptions): void {
removeManagedPeer(this.context, peerId, options);
this.clearPeerTransportStats(peerId);
}
/** Close every active peer connection and clear internal state. */
closeAllPeers(): void {
closeManagedPeers(this.state);
this.lastInboundByteSnapshots.clear();
}
/** Cancel all pending peer reconnect timers and clear the tracker. */
clearAllPeerReconnectTimers(): void {
clearAllPeerReconnectTimers(this.state);
}
/** Return a snapshot copy of the currently-connected peer IDs. */
getConnectedPeerIds(): string[] {
return getConnectedPeerIds(this.state);
}
/** Remove any cached remote screen-share tracks for a peer. */
clearRemoteScreenShareStream(peerId: string): void {
clearManagedRemoteScreenShareStream(this.context, peerId);
}
/** Reset the connected peers list to empty and notify subscribers. */
resetConnectedPeers(): void {
resetConnectedPeers(this.state);
}
/** Clean up all resources. */
destroy(): void {
this.stopTransportStatsPolling();
this.lastInboundByteSnapshots.clear();
this.closeAllPeers();
this.peerConnected$.complete();
this.peerDisconnected$.complete();
this.remoteStream$.complete();
this.messageReceived$.complete();
this.connectedPeersChanged$.complete();
this.peerLatencyChanged$.complete();
}
private get context(): PeerConnectionManagerContext {
return {
logger: this.logger,
callbacks: this.callbacks,
state: this.state
};
}
private get connectionHandlers(): ConnectionLifecycleHandlers {
return {
clearPeerDisconnectGraceTimer: (peerId: string) => this.clearPeerDisconnectGraceTimer(peerId),
addToConnectedPeers: (peerId: string) => this.addToConnectedPeers(peerId),
clearPeerReconnectTimer: (peerId: string) => this.clearPeerReconnectTimer(peerId),
requestVoiceStateFromPeer: (peerId: string) => this.requestVoiceStateFromPeer(peerId),
schedulePeerDisconnectRecovery: (peerId: string) =>
this.schedulePeerDisconnectRecovery(peerId),
trackDisconnectedPeer: (peerId: string) => this.trackDisconnectedPeer(peerId),
removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options),
schedulePeerReconnect: (peerId: string) => this.schedulePeerReconnect(peerId),
handleRemoteTrack: (event: RTCTrackEvent, peerId: string) =>
this.handleRemoteTrack(event, peerId),
setupDataChannel: (channel: RTCDataChannel, peerId: string) =>
this.setupDataChannel(channel, peerId)
};
}
private get negotiationHandlers(): NegotiationHandlers {
return {
createPeerConnection: (remotePeerId: string, isInitiator: boolean) =>
this.createPeerConnection(remotePeerId, isInitiator)
};
}
private get recoveryHandlers(): RecoveryHandlers {
return {
removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options),
createPeerConnection: (peerId: string, isInitiator: boolean) =>
this.createPeerConnection(peerId, isInitiator),
createAndSendOffer: (peerId: string) => this.createAndSendOffer(peerId)
};
}
private setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void {
setupDataChannel(this.context, channel, remotePeerId);
}
private handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void {
handleRemoteTrack(this.context, event, remotePeerId);
}
private trackDisconnectedPeer(peerId: string): void {
trackDisconnectedPeer(this.state, peerId);
}
private clearPeerReconnectTimer(peerId: string): void {
clearPeerReconnectTimer(this.state, peerId);
}
private clearPeerDisconnectGraceTimer(peerId: string): void {
clearPeerDisconnectGraceTimer(this.state, peerId);
}
private schedulePeerDisconnectRecovery(peerId: string): void {
schedulePeerDisconnectRecovery(this.context, peerId, this.recoveryHandlers);
}
private schedulePeerReconnect(peerId: string): void {
schedulePeerReconnect(this.context, peerId, this.recoveryHandlers);
}
private requestVoiceStateFromPeer(peerId: string): void {
requestVoiceStateFromPeer(this.state, this.logger, peerId);
}
private addToConnectedPeers(peerId: string): void {
addToConnectedPeers(this.state, peerId);
}
private startTransportStatsPolling(): void {
if (this.statsPollTimer)
return;
this.statsPollTimer = setInterval(() => {
void this.pollTransportStats();
}, PEER_STATS_POLL_INTERVAL_MS);
}
private stopTransportStatsPolling(): void {
if (!this.statsPollTimer)
return;
clearInterval(this.statsPollTimer);
this.statsPollTimer = null;
}
private clearPeerTransportStats(peerId: string): void {
this.lastInboundByteSnapshots.delete(peerId);
}
private async pollTransportStats(): Promise<void> {
if (this.transportStatsPollInFlight || this.state.activePeerConnections.size === 0)
return;
this.transportStatsPollInFlight = true;
try {
for (const [peerId, peerData] of Array.from(this.state.activePeerConnections.entries())) {
await this.pollPeerTransportStats(peerId, peerData);
}
} finally {
this.transportStatsPollInFlight = false;
}
}
private async pollPeerTransportStats(peerId: string, peerData: PeerData): Promise<void> {
const connectionState = peerData.connection.connectionState;
if (connectionState === 'closed' || connectionState === 'failed') {
this.clearPeerTransportStats(peerId);
return;
}
try {
const stats = await peerData.connection.getStats();
let audioBytesReceived = 0;
let videoBytesReceived = 0;
stats.forEach((report) => {
const summary = this.getInboundRtpSummary(report);
if (!summary)
return;
if (summary.kind === 'audio')
audioBytesReceived += summary.bytesReceived;
if (summary.kind === 'video')
videoBytesReceived += summary.bytesReceived;
});
const collectedAt = Date.now();
const previous = this.lastInboundByteSnapshots.get(peerId);
this.lastInboundByteSnapshots.set(peerId, {
audioBytesReceived,
collectedAt,
videoBytesReceived
});
if (!previous)
return;
const elapsedMs = collectedAt - previous.collectedAt;
if (elapsedMs < PEER_STATS_SAMPLE_MIN_INTERVAL_MS)
return;
const audioDownloadMbps = this.calculateMbps(audioBytesReceived - previous.audioBytesReceived, elapsedMs);
const videoDownloadMbps = this.calculateMbps(videoBytesReceived - previous.videoBytesReceived, elapsedMs);
recordDebugNetworkDownloadRates(peerId, {
audioMbps: this.roundMetric(audioDownloadMbps),
videoMbps: this.roundMetric(videoDownloadMbps)
}, collectedAt);
this.logger.info('Peer transport stats', {
audioDownloadMbps: this.roundMetric(audioDownloadMbps),
connectionState,
remotePeerId: peerId,
totalDownloadMbps: this.roundMetric(audioDownloadMbps + videoDownloadMbps),
videoDownloadMbps: this.roundMetric(videoDownloadMbps)
});
} catch (error) {
this.logger.warn('Failed to collect peer transport stats', {
connectionState,
error: (error as Error)?.message ?? String(error),
peerId
});
}
}
private getInboundRtpSummary(report: RTCStats): { bytesReceived: number; kind: 'audio' | 'video' } | null {
const summary = report as unknown as Record<string, unknown>;
if (summary['type'] !== 'inbound-rtp' || summary['isRemote'] === true)
return null;
const bytesReceived = typeof summary['bytesReceived'] === 'number'
? summary['bytesReceived']
: null;
const mediaKind = typeof summary['kind'] === 'string'
? summary['kind']
: (typeof summary['mediaType'] === 'string' ? summary['mediaType'] : null);
if (bytesReceived === null || (mediaKind !== 'audio' && mediaKind !== 'video'))
return null;
return {
bytesReceived,
kind: mediaKind
};
}
private calculateMbps(deltaBytes: number, elapsedMs: number): number {
if (elapsedMs <= 0)
return 0;
return Math.max(0, deltaBytes) * 8 / elapsedMs / 1000;
}
private roundMetric(value: number): number {
return Math.round(value * 1000) / 1000;
}
}