Split connection manager into multiple files

This commit is contained in:
2026-03-07 17:03:08 +01:00
parent e6892e9297
commit 5013db5e16
10 changed files with 1535 additions and 1195 deletions

View File

@@ -0,0 +1,177 @@
import {
CONNECTION_STATE_CLOSED,
CONNECTION_STATE_CONNECTED,
CONNECTION_STATE_DISCONNECTED,
CONNECTION_STATE_FAILED,
DATA_CHANNEL_LABEL,
ICE_SERVERS,
SIGNALING_TYPE_ICE_CANDIDATE,
TRACK_KIND_AUDIO,
TRACK_KIND_VIDEO,
TRANSCEIVER_RECV_ONLY,
TRANSCEIVER_SEND_RECV
} from '../../webrtc.constants';
import { PeerData } from '../../webrtc.types';
import { ConnectionLifecycleHandlers, PeerConnectionManagerContext } from '../shared';
/**
* Create and configure a new RTCPeerConnection for a remote peer.
*/
export function createPeerConnection(
context: PeerConnectionManagerContext,
remotePeerId: string,
isInitiator: boolean,
handlers: ConnectionLifecycleHandlers
): PeerData {
const { callbacks, logger, state } = context;
logger.info('Creating peer connection', { remotePeerId, isInitiator });
const connection = new RTCPeerConnection({ iceServers: ICE_SERVERS });
let dataChannel: RTCDataChannel | null = null;
connection.onicecandidate = (event) => {
if (event.candidate) {
logger.info('ICE candidate gathered', {
remotePeerId,
candidateType: (event.candidate as RTCIceCandidate & { type?: string }).type
});
callbacks.sendRawMessage({
type: SIGNALING_TYPE_ICE_CANDIDATE,
targetUserId: remotePeerId,
payload: { candidate: event.candidate }
});
}
};
connection.onconnectionstatechange = () => {
logger.info('connectionstatechange', {
remotePeerId,
state: connection.connectionState
});
switch (connection.connectionState) {
case CONNECTION_STATE_CONNECTED:
handlers.clearPeerDisconnectGraceTimer(remotePeerId);
handlers.addToConnectedPeers(remotePeerId);
state.peerConnected$.next(remotePeerId);
handlers.clearPeerReconnectTimer(remotePeerId);
state.disconnectedPeerTracker.delete(remotePeerId);
handlers.requestVoiceStateFromPeer(remotePeerId);
break;
case CONNECTION_STATE_DISCONNECTED:
handlers.schedulePeerDisconnectRecovery(remotePeerId);
break;
case CONNECTION_STATE_FAILED:
handlers.trackDisconnectedPeer(remotePeerId);
handlers.removePeer(remotePeerId, { preserveReconnectState: true });
handlers.schedulePeerReconnect(remotePeerId);
break;
case CONNECTION_STATE_CLOSED:
handlers.removePeer(remotePeerId);
break;
}
};
connection.oniceconnectionstatechange = () => {
logger.info('iceconnectionstatechange', {
remotePeerId,
state: connection.iceConnectionState
});
};
connection.onsignalingstatechange = () => {
logger.info('signalingstatechange', {
remotePeerId,
state: connection.signalingState
});
};
connection.onnegotiationneeded = () => {
logger.info('negotiationneeded', { remotePeerId });
};
connection.ontrack = (event) => {
handlers.handleRemoteTrack(event, remotePeerId);
};
if (isInitiator) {
dataChannel = connection.createDataChannel(DATA_CHANNEL_LABEL, { ordered: true });
handlers.setupDataChannel(dataChannel, remotePeerId);
} else {
connection.ondatachannel = (event) => {
logger.info('Received data channel', { remotePeerId });
dataChannel = event.channel;
const existing = state.activePeerConnections.get(remotePeerId);
if (existing) {
existing.dataChannel = dataChannel;
}
handlers.setupDataChannel(dataChannel, remotePeerId);
};
}
const peerData: PeerData = {
connection,
dataChannel,
isInitiator,
pendingIceCandidates: [],
audioSender: undefined,
videoSender: undefined
};
if (isInitiator) {
const audioTransceiver = connection.addTransceiver(TRACK_KIND_AUDIO, {
direction: TRANSCEIVER_SEND_RECV
});
const videoTransceiver = connection.addTransceiver(TRACK_KIND_VIDEO, {
direction: TRANSCEIVER_RECV_ONLY
});
peerData.audioSender = audioTransceiver.sender;
peerData.videoSender = videoTransceiver.sender;
}
state.activePeerConnections.set(remotePeerId, peerData);
const localStream = callbacks.getLocalMediaStream();
if (localStream && isInitiator) {
logger.logStream(`localStream->${remotePeerId}`, localStream);
localStream.getTracks().forEach((track) => {
if (track.kind === TRACK_KIND_AUDIO && peerData.audioSender) {
peerData.audioSender
.replaceTrack(track)
.then(() => logger.info('audio replaceTrack (init) ok', { remotePeerId }))
.catch((error) =>
logger.error('audio replaceTrack failed at createPeerConnection', error)
);
} else if (track.kind === TRACK_KIND_VIDEO && peerData.videoSender) {
peerData.videoSender
.replaceTrack(track)
.then(() => logger.info('video replaceTrack (init) ok', { remotePeerId }))
.catch((error) =>
logger.error('video replaceTrack failed at createPeerConnection', error)
);
} else {
const sender = connection.addTrack(track, localStream);
if (track.kind === TRACK_KIND_AUDIO)
peerData.audioSender = sender;
if (track.kind === TRACK_KIND_VIDEO)
peerData.videoSender = sender;
}
});
}
return peerData;
}

View File

@@ -0,0 +1,247 @@
/* eslint-disable complexity */
import {
SIGNALING_TYPE_ANSWER,
SIGNALING_TYPE_OFFER,
TRACK_KIND_AUDIO,
TRACK_KIND_VIDEO,
TRANSCEIVER_SEND_RECV
} from '../../webrtc.constants';
import {
NegotiationHandlers,
PeerConnectionManagerContext,
PeerConnectionManagerState
} from '../shared';
/**
* Queue a negotiation task so SDP operations for a single peer never overlap.
*/
export function enqueueNegotiation(
state: PeerConnectionManagerState,
peerId: string,
task: () => Promise<void>
): void {
const previousTask = state.peerNegotiationQueue.get(peerId) ?? Promise.resolve();
const nextTask = previousTask.then(task, task);
state.peerNegotiationQueue.set(peerId, nextTask);
}
export async function doCreateAndSendOffer(
context: PeerConnectionManagerContext,
remotePeerId: string
): Promise<void> {
const { callbacks, logger, state } = context;
const peerData = state.activePeerConnections.get(remotePeerId);
if (!peerData)
return;
try {
const offer = await peerData.connection.createOffer();
await peerData.connection.setLocalDescription(offer);
logger.info('Sending offer', {
remotePeerId,
type: offer.type,
sdpLength: offer.sdp?.length
});
callbacks.sendRawMessage({
type: SIGNALING_TYPE_OFFER,
targetUserId: remotePeerId,
payload: { sdp: offer }
});
} catch (error) {
logger.error('Failed to create offer', error);
}
}
export async function doHandleOffer(
context: PeerConnectionManagerContext,
fromUserId: string,
sdp: RTCSessionDescriptionInit,
handlers: NegotiationHandlers
): Promise<void> {
const { callbacks, logger, state } = context;
logger.info('Handling offer', { fromUserId });
let peerData = state.activePeerConnections.get(fromUserId);
if (!peerData) {
peerData = handlers.createPeerConnection(fromUserId, false);
}
try {
const signalingState = peerData.connection.signalingState;
const hasCollision =
signalingState === 'have-local-offer' || signalingState === 'have-local-pranswer';
if (hasCollision) {
const localId =
callbacks.getIdentifyCredentials()?.oderId || callbacks.getLocalPeerId();
const isPolite = localId > fromUserId;
if (!isPolite) {
logger.info('Ignoring colliding offer (impolite side)', { fromUserId, localId });
return;
}
logger.info('Rolling back local offer (polite side)', { fromUserId, localId });
await peerData.connection.setLocalDescription({
type: 'rollback'
} as RTCSessionDescriptionInit);
}
await peerData.connection.setRemoteDescription(new RTCSessionDescription(sdp));
const transceivers = peerData.connection.getTransceivers();
for (const transceiver of transceivers) {
const receiverKind = transceiver.receiver.track?.kind;
if (receiverKind === TRACK_KIND_AUDIO) {
if (!peerData.audioSender) {
peerData.audioSender = transceiver.sender;
}
transceiver.direction = TRANSCEIVER_SEND_RECV;
} else if (receiverKind === TRACK_KIND_VIDEO && !peerData.videoSender) {
peerData.videoSender = transceiver.sender;
}
}
const localStream = callbacks.getLocalMediaStream();
if (localStream) {
logger.logStream(`localStream->${fromUserId} (answerer)`, localStream);
for (const track of localStream.getTracks()) {
if (track.kind === TRACK_KIND_AUDIO && peerData.audioSender) {
await peerData.audioSender.replaceTrack(track);
logger.info('audio replaceTrack (answerer) ok', { fromUserId });
} else if (track.kind === TRACK_KIND_VIDEO && peerData.videoSender) {
await peerData.videoSender.replaceTrack(track);
logger.info('video replaceTrack (answerer) ok', { fromUserId });
}
}
}
for (const candidate of peerData.pendingIceCandidates) {
await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate));
}
peerData.pendingIceCandidates = [];
const answer = await peerData.connection.createAnswer();
await peerData.connection.setLocalDescription(answer);
logger.info('Sending answer', {
to: fromUserId,
type: answer.type,
sdpLength: answer.sdp?.length
});
callbacks.sendRawMessage({
type: SIGNALING_TYPE_ANSWER,
targetUserId: fromUserId,
payload: { sdp: answer }
});
} catch (error) {
logger.error('Failed to handle offer', error);
}
}
export async function doHandleAnswer(
context: PeerConnectionManagerContext,
fromUserId: string,
sdp: RTCSessionDescriptionInit
): Promise<void> {
const { logger, state } = context;
logger.info('Handling answer', { fromUserId });
const peerData = state.activePeerConnections.get(fromUserId);
if (!peerData) {
logger.error('No peer for answer', new Error('Missing peer'), { fromUserId });
return;
}
try {
if (peerData.connection.signalingState === 'have-local-offer') {
await peerData.connection.setRemoteDescription(new RTCSessionDescription(sdp));
for (const candidate of peerData.pendingIceCandidates) {
await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate));
}
peerData.pendingIceCandidates = [];
} else {
logger.warn('Ignoring answer - wrong signaling state', {
state: peerData.connection.signalingState
});
}
} catch (error) {
logger.error('Failed to handle answer', error);
}
}
export async function doHandleIceCandidate(
context: PeerConnectionManagerContext,
fromUserId: string,
candidate: RTCIceCandidateInit,
handlers: NegotiationHandlers
): Promise<void> {
const { logger, state } = context;
let peerData = state.activePeerConnections.get(fromUserId);
if (!peerData) {
logger.info('Creating peer for early ICE', { fromUserId });
peerData = handlers.createPeerConnection(fromUserId, false);
}
try {
if (peerData.connection.remoteDescription) {
await peerData.connection.addIceCandidate(new RTCIceCandidate(candidate));
} else {
logger.info('Queuing ICE candidate', { fromUserId });
peerData.pendingIceCandidates.push(candidate);
}
} catch (error) {
logger.error('Failed to add ICE candidate', error);
}
}
export async function doRenegotiate(
context: PeerConnectionManagerContext,
peerId: string
): Promise<void> {
const { callbacks, logger, state } = context;
const peerData = state.activePeerConnections.get(peerId);
if (!peerData)
return;
try {
const offer = await peerData.connection.createOffer();
await peerData.connection.setLocalDescription(offer);
logger.info('Renegotiate offer', {
peerId,
type: offer.type,
sdpLength: offer.sdp?.length
});
callbacks.sendRawMessage({
type: SIGNALING_TYPE_OFFER,
targetUserId: peerId,
payload: { sdp: offer }
});
} catch (error) {
logger.error('Failed to renegotiate', error);
}
}

View File

@@ -0,0 +1,2 @@
export * from './peer-connection.manager';
export * from './shared';

View File

@@ -0,0 +1,296 @@
import { ChatEvent } from '../../../../models';
import {
DATA_CHANNEL_HIGH_WATER_BYTES,
DATA_CHANNEL_LOW_WATER_BYTES,
DATA_CHANNEL_STATE_OPEN,
DEFAULT_DISPLAY_NAME,
P2P_TYPE_PING,
P2P_TYPE_PONG,
P2P_TYPE_SCREEN_STATE,
P2P_TYPE_STATE_REQUEST,
P2P_TYPE_VOICE_STATE,
P2P_TYPE_VOICE_STATE_REQUEST
} from '../../webrtc.constants';
import { PeerConnectionManagerContext } from '../shared';
import { startPingInterval } from './ping';
type PeerMessage = Record<string, unknown> & {
type?: string;
ts?: number;
};
/**
* Wire open/close/error/message handlers onto a data channel.
*/
export function setupDataChannel(
context: PeerConnectionManagerContext,
channel: RTCDataChannel,
remotePeerId: string
): void {
const { logger } = context;
channel.onopen = () => {
logger.info('Data channel open', { remotePeerId });
sendCurrentStatesToChannel(context, channel, remotePeerId);
try {
channel.send(JSON.stringify({ type: P2P_TYPE_STATE_REQUEST }));
} catch {
/* ignore */
}
startPingInterval(context.state, remotePeerId);
};
channel.onclose = () => {
logger.info('Data channel closed', { remotePeerId });
};
channel.onerror = (error) => {
logger.error('Data channel error', error, { remotePeerId });
};
channel.onmessage = (event) => {
try {
const message = JSON.parse(event.data) as PeerMessage;
handlePeerMessage(context, remotePeerId, message);
} catch (error) {
logger.error('Failed to parse peer message', error);
}
};
}
/**
* Route an incoming peer-to-peer message.
*/
export function handlePeerMessage(
context: PeerConnectionManagerContext,
peerId: string,
message: PeerMessage
): void {
const { logger, state } = context;
logger.info('Received P2P message', {
peerId,
type: message.type
});
if (message.type === P2P_TYPE_STATE_REQUEST || message.type === P2P_TYPE_VOICE_STATE_REQUEST) {
sendCurrentStatesToPeer(context, peerId);
return;
}
if (message.type === P2P_TYPE_PING) {
sendToPeer(context, peerId, {
type: P2P_TYPE_PONG,
ts: message.ts
});
return;
}
if (message.type === P2P_TYPE_PONG) {
const sentAt = state.pendingPings.get(peerId);
if (sentAt && typeof message.ts === 'number' && message.ts === sentAt) {
const latencyMs = Math.round(performance.now() - sentAt);
state.peerLatencies.set(peerId, latencyMs);
state.peerLatencyChanged$.next({ peerId, latencyMs });
}
state.pendingPings.delete(peerId);
return;
}
const enrichedMessage = {
...message,
fromPeerId: peerId
} as ChatEvent;
state.messageReceived$.next(enrichedMessage);
}
/** Broadcast a ChatEvent to every peer with an open data channel. */
export function broadcastMessage(
context: PeerConnectionManagerContext,
event: object
): void {
const { logger, state } = context;
const data = JSON.stringify(event);
state.activePeerConnections.forEach((peerData, peerId) => {
try {
if (peerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) {
peerData.dataChannel.send(data);
logger.info('Sent message via P2P', { peerId });
}
} catch (error) {
logger.error('Failed to send to peer', error, { peerId });
}
});
}
/**
* Send a ChatEvent to a specific peer's data channel.
*/
export function sendToPeer(
context: PeerConnectionManagerContext,
peerId: string,
event: object
): void {
const { logger, state } = context;
const peerData = state.activePeerConnections.get(peerId);
if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) {
logger.warn('Peer not connected - cannot send', { peerId });
return;
}
try {
peerData.dataChannel.send(JSON.stringify(event));
} catch (error) {
logger.error('Failed to send to peer', error, { peerId });
}
}
/**
* Send a ChatEvent with back-pressure awareness.
*/
export async function sendToPeerBuffered(
context: PeerConnectionManagerContext,
peerId: string,
event: object
): Promise<void> {
const { logger, state } = context;
const peerData = state.activePeerConnections.get(peerId);
if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN) {
logger.warn('Peer not connected - cannot send buffered', { peerId });
return;
}
const channel = peerData.dataChannel;
const data = JSON.stringify(event);
if (typeof channel.bufferedAmountLowThreshold === 'number') {
channel.bufferedAmountLowThreshold = DATA_CHANNEL_LOW_WATER_BYTES;
}
if (channel.bufferedAmount > DATA_CHANNEL_HIGH_WATER_BYTES) {
await new Promise<void>((resolve) => {
const handleBufferedAmountLow = () => {
if (channel.bufferedAmount <= DATA_CHANNEL_LOW_WATER_BYTES) {
channel.removeEventListener('bufferedamountlow', handleBufferedAmountLow);
resolve();
}
};
channel.addEventListener('bufferedamountlow', handleBufferedAmountLow, { once: true });
});
}
try {
channel.send(data);
} catch (error) {
logger.error('Failed to send buffered message', error, { peerId });
}
}
/**
* Send the current voice and screen-share states to a single peer.
*/
export function sendCurrentStatesToPeer(
context: PeerConnectionManagerContext,
peerId: string
): void {
const { callbacks } = context;
const credentials = callbacks.getIdentifyCredentials();
const oderId = credentials?.oderId || callbacks.getLocalPeerId();
const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME;
const voiceState = callbacks.getVoiceStateSnapshot();
sendToPeer(context, peerId, {
type: P2P_TYPE_VOICE_STATE,
oderId,
displayName,
voiceState
});
sendToPeer(context, peerId, {
type: P2P_TYPE_SCREEN_STATE,
oderId,
displayName,
isScreenSharing: callbacks.isScreenSharingActive()
});
}
export function sendCurrentStatesToChannel(
context: PeerConnectionManagerContext,
channel: RTCDataChannel,
remotePeerId: string
): void {
const { callbacks, logger } = context;
if (channel.readyState !== DATA_CHANNEL_STATE_OPEN) {
logger.warn('Cannot send states - channel not open', {
remotePeerId,
state: channel.readyState
});
return;
}
const credentials = callbacks.getIdentifyCredentials();
const oderId = credentials?.oderId || callbacks.getLocalPeerId();
const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME;
const voiceState = callbacks.getVoiceStateSnapshot();
try {
channel.send(
JSON.stringify({
type: P2P_TYPE_VOICE_STATE,
oderId,
displayName,
voiceState
})
);
channel.send(
JSON.stringify({
type: P2P_TYPE_SCREEN_STATE,
oderId,
displayName,
isScreenSharing: callbacks.isScreenSharingActive()
})
);
logger.info('Sent initial states to channel', { remotePeerId, voiceState });
} catch (error) {
logger.error('Failed to send initial states to channel', error);
}
}
/** Broadcast the current voice and screen-share states to all connected peers. */
export function broadcastCurrentStates(context: PeerConnectionManagerContext): void {
const { callbacks } = context;
const credentials = callbacks.getIdentifyCredentials();
const oderId = credentials?.oderId || callbacks.getLocalPeerId();
const displayName = credentials?.displayName || DEFAULT_DISPLAY_NAME;
const voiceState = callbacks.getVoiceStateSnapshot();
broadcastMessage(context, {
type: P2P_TYPE_VOICE_STATE,
oderId,
displayName,
voiceState
});
broadcastMessage(context, {
type: P2P_TYPE_SCREEN_STATE,
oderId,
displayName,
isScreenSharing: callbacks.isScreenSharingActive()
});
}

View File

@@ -0,0 +1,55 @@
import {
DATA_CHANNEL_STATE_OPEN,
P2P_TYPE_PING,
PEER_PING_INTERVAL_MS
} from '../../webrtc.constants';
import { PeerConnectionManagerState } from '../shared';
/** Start periodic pings to a peer to measure round-trip latency. */
export function startPingInterval(state: PeerConnectionManagerState, peerId: string): void {
stopPingInterval(state, peerId);
sendPing(state, peerId);
const timer = setInterval(() => sendPing(state, peerId), PEER_PING_INTERVAL_MS);
state.peerPingTimers.set(peerId, timer);
}
/** Stop the periodic ping for a specific peer. */
export function stopPingInterval(state: PeerConnectionManagerState, peerId: string): void {
const timer = state.peerPingTimers.get(peerId);
if (timer) {
clearInterval(timer);
state.peerPingTimers.delete(peerId);
}
}
/** Cancel all active ping timers. */
export function clearAllPingTimers(state: PeerConnectionManagerState): void {
state.peerPingTimers.forEach((timer) => clearInterval(timer));
state.peerPingTimers.clear();
}
/** Send a single ping to a peer. */
export function sendPing(state: PeerConnectionManagerState, peerId: string): void {
const peerData = state.activePeerConnections.get(peerId);
if (!peerData?.dataChannel || peerData.dataChannel.readyState !== DATA_CHANNEL_STATE_OPEN)
return;
const timestamp = performance.now();
state.pendingPings.set(peerId, timestamp);
try {
peerData.dataChannel.send(
JSON.stringify({
type: P2P_TYPE_PING,
ts: timestamp
})
);
} catch {
/* ignore */
}
}

View File

@@ -0,0 +1,296 @@
/* eslint-disable @typescript-eslint/member-ordering */
import { ChatEvent } from '../../../models';
import { WebRTCLogger } from '../webrtc-logger';
import { PeerData } from '../webrtc.types';
import { createPeerConnection as createManagedPeerConnection } from './connection/create-peer-connection';
import {
doCreateAndSendOffer,
doHandleAnswer,
doHandleIceCandidate,
doHandleOffer,
doRenegotiate,
enqueueNegotiation
} from './connection/negotiation';
import {
broadcastCurrentStates,
broadcastMessage,
sendCurrentStatesToPeer,
sendToPeer,
sendToPeerBuffered,
setupDataChannel
} from './messaging/data-channel';
import {
addToConnectedPeers,
clearAllPeerReconnectTimers,
clearPeerDisconnectGraceTimer,
clearPeerReconnectTimer,
closeAllPeers as closeManagedPeers,
getConnectedPeerIds,
removePeer as removeManagedPeer,
requestVoiceStateFromPeer,
resetConnectedPeers,
schedulePeerDisconnectRecovery,
schedulePeerReconnect,
trackDisconnectedPeer
} from './recovery/peer-recovery';
import { handleRemoteTrack } from './streams/remote-streams';
import {
ConnectionLifecycleHandlers,
createPeerConnectionManagerState,
NegotiationHandlers,
PeerConnectionCallbacks,
PeerConnectionManagerContext,
RecoveryHandlers,
RemovePeerOptions
} from './shared';
/**
* Creates and manages RTCPeerConnections, data channels,
* offer/answer negotiation, ICE candidates, and P2P reconnection.
*/
export class PeerConnectionManager {
private readonly state = createPeerConnectionManagerState();
/** Active peer connections keyed by remote peer ID. */
readonly activePeerConnections = this.state.activePeerConnections;
/** Remote composite streams keyed by remote peer ID. */
readonly remotePeerStreams = this.state.remotePeerStreams;
/** Last measured latency (ms) per peer. */
readonly peerLatencies = this.state.peerLatencies;
/** Emitted whenever a peer latency value changes. */
readonly peerLatencyChanged$ = this.state.peerLatencyChanged$;
readonly peerConnected$ = this.state.peerConnected$;
readonly peerDisconnected$ = this.state.peerDisconnected$;
readonly remoteStream$ = this.state.remoteStream$;
readonly messageReceived$ = this.state.messageReceived$;
/** Emitted whenever the connected peer list changes. */
readonly connectedPeersChanged$ = this.state.connectedPeersChanged$;
constructor(
private readonly logger: WebRTCLogger,
private callbacks: PeerConnectionCallbacks
) {}
/**
* Replace the callback set at runtime.
* Needed because of circular initialisation between managers.
*/
setCallbacks(callbacks: PeerConnectionCallbacks): void {
this.callbacks = callbacks;
}
/**
* Create a new RTCPeerConnection to a remote peer.
*/
createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData {
return createManagedPeerConnection(
this.context,
remotePeerId,
isInitiator,
this.connectionHandlers
);
}
/**
* Create an SDP offer and send it to the remote peer via the signaling server.
*/
async createAndSendOffer(remotePeerId: string): Promise<void> {
return new Promise<void>((resolve) => {
enqueueNegotiation(this.state, remotePeerId, async () => {
await doCreateAndSendOffer(this.context, remotePeerId);
resolve();
});
});
}
/**
* Handle an incoming SDP offer from a remote peer.
*/
handleOffer(fromUserId: string, sdp: RTCSessionDescriptionInit): void {
enqueueNegotiation(this.state, fromUserId, () =>
doHandleOffer(this.context, fromUserId, sdp, this.negotiationHandlers)
);
}
/**
* Handle an incoming SDP answer from a remote peer.
*/
handleAnswer(fromUserId: string, sdp: RTCSessionDescriptionInit): void {
enqueueNegotiation(this.state, fromUserId, () =>
doHandleAnswer(this.context, fromUserId, sdp)
);
}
/**
* Process an incoming ICE candidate from a remote peer.
*/
handleIceCandidate(fromUserId: string, candidate: RTCIceCandidateInit): void {
enqueueNegotiation(this.state, fromUserId, () =>
doHandleIceCandidate(this.context, fromUserId, candidate, this.negotiationHandlers)
);
}
/**
* Re-negotiate (create offer) to push track changes to remote.
*/
async renegotiate(peerId: string): Promise<void> {
return new Promise<void>((resolve) => {
enqueueNegotiation(this.state, peerId, async () => {
await doRenegotiate(this.context, peerId);
resolve();
});
});
}
/** Broadcast a ChatEvent to every peer with an open data channel. */
broadcastMessage(event: ChatEvent): void {
broadcastMessage(this.context, event);
}
/**
* Send a ChatEvent to a specific peer's data channel.
*/
sendToPeer(peerId: string, event: ChatEvent): void {
sendToPeer(this.context, peerId, event);
}
/**
* Send a ChatEvent with back-pressure awareness.
*/
async sendToPeerBuffered(peerId: string, event: ChatEvent): Promise<void> {
await sendToPeerBuffered(this.context, peerId, event);
}
/**
* Send the current voice and screen-share states to a single peer.
*/
sendCurrentStatesToPeer(peerId: string): void {
sendCurrentStatesToPeer(this.context, peerId);
}
/** Broadcast the current voice and screen-share states to all connected peers. */
broadcastCurrentStates(): void {
broadcastCurrentStates(this.context);
}
/**
* Close and remove a peer connection, data channel, and emit a disconnect event.
*/
removePeer(peerId: string, options?: RemovePeerOptions): void {
removeManagedPeer(this.context, peerId, options);
}
/** Close every active peer connection and clear internal state. */
closeAllPeers(): void {
closeManagedPeers(this.state);
}
/** Cancel all pending peer reconnect timers and clear the tracker. */
clearAllPeerReconnectTimers(): void {
clearAllPeerReconnectTimers(this.state);
}
/** Return a snapshot copy of the currently-connected peer IDs. */
getConnectedPeerIds(): string[] {
return getConnectedPeerIds(this.state);
}
/** Reset the connected peers list to empty and notify subscribers. */
resetConnectedPeers(): void {
resetConnectedPeers(this.state);
}
/** Clean up all resources. */
destroy(): void {
this.closeAllPeers();
this.peerConnected$.complete();
this.peerDisconnected$.complete();
this.remoteStream$.complete();
this.messageReceived$.complete();
this.connectedPeersChanged$.complete();
this.peerLatencyChanged$.complete();
}
private get context(): PeerConnectionManagerContext {
return {
logger: this.logger,
callbacks: this.callbacks,
state: this.state
};
}
private get connectionHandlers(): ConnectionLifecycleHandlers {
return {
clearPeerDisconnectGraceTimer: (peerId: string) => this.clearPeerDisconnectGraceTimer(peerId),
addToConnectedPeers: (peerId: string) => this.addToConnectedPeers(peerId),
clearPeerReconnectTimer: (peerId: string) => this.clearPeerReconnectTimer(peerId),
requestVoiceStateFromPeer: (peerId: string) => this.requestVoiceStateFromPeer(peerId),
schedulePeerDisconnectRecovery: (peerId: string) =>
this.schedulePeerDisconnectRecovery(peerId),
trackDisconnectedPeer: (peerId: string) => this.trackDisconnectedPeer(peerId),
removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options),
schedulePeerReconnect: (peerId: string) => this.schedulePeerReconnect(peerId),
handleRemoteTrack: (event: RTCTrackEvent, peerId: string) =>
this.handleRemoteTrack(event, peerId),
setupDataChannel: (channel: RTCDataChannel, peerId: string) =>
this.setupDataChannel(channel, peerId)
};
}
private get negotiationHandlers(): NegotiationHandlers {
return {
createPeerConnection: (remotePeerId: string, isInitiator: boolean) =>
this.createPeerConnection(remotePeerId, isInitiator)
};
}
private get recoveryHandlers(): RecoveryHandlers {
return {
removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options),
createPeerConnection: (peerId: string, isInitiator: boolean) =>
this.createPeerConnection(peerId, isInitiator),
createAndSendOffer: (peerId: string) => this.createAndSendOffer(peerId)
};
}
private setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void {
setupDataChannel(this.context, channel, remotePeerId);
}
private handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void {
handleRemoteTrack(this.context, event, remotePeerId);
}
private trackDisconnectedPeer(peerId: string): void {
trackDisconnectedPeer(this.state, peerId);
}
private clearPeerReconnectTimer(peerId: string): void {
clearPeerReconnectTimer(this.state, peerId);
}
private clearPeerDisconnectGraceTimer(peerId: string): void {
clearPeerDisconnectGraceTimer(this.state, peerId);
}
private schedulePeerDisconnectRecovery(peerId: string): void {
schedulePeerDisconnectRecovery(this.context, peerId, this.recoveryHandlers);
}
private schedulePeerReconnect(peerId: string): void {
schedulePeerReconnect(this.context, peerId, this.recoveryHandlers);
}
private requestVoiceStateFromPeer(peerId: string): void {
requestVoiceStateFromPeer(this.state, this.logger, peerId);
}
private addToConnectedPeers(peerId: string): void {
addToConnectedPeers(this.state, peerId);
}
}

View File

@@ -0,0 +1,264 @@
import {
CONNECTION_STATE_CONNECTED,
DATA_CHANNEL_STATE_OPEN,
P2P_TYPE_VOICE_STATE_REQUEST,
PEER_DISCONNECT_GRACE_MS,
PEER_RECONNECT_INTERVAL_MS,
PEER_RECONNECT_MAX_ATTEMPTS
} from '../../webrtc.constants';
import {
PeerConnectionManagerContext,
PeerConnectionManagerState,
RecoveryHandlers,
RemovePeerOptions
} from '../shared';
import { clearAllPingTimers, stopPingInterval } from '../messaging/ping';
/**
* Close and remove a peer connection, data channel, and emit a disconnect event.
*/
export function removePeer(
context: PeerConnectionManagerContext,
peerId: string,
options?: RemovePeerOptions
): void {
const { state } = context;
const peerData = state.activePeerConnections.get(peerId);
const preserveReconnectState = options?.preserveReconnectState === true;
clearPeerDisconnectGraceTimer(state, peerId);
if (!preserveReconnectState) {
clearPeerReconnectTimer(state, peerId);
state.disconnectedPeerTracker.delete(peerId);
}
state.remotePeerStreams.delete(peerId);
if (peerData) {
if (peerData.dataChannel)
peerData.dataChannel.close();
peerData.connection.close();
state.activePeerConnections.delete(peerId);
state.peerNegotiationQueue.delete(peerId);
removeFromConnectedPeers(state, peerId);
stopPingInterval(state, peerId);
state.peerLatencies.delete(peerId);
state.pendingPings.delete(peerId);
state.peerDisconnected$.next(peerId);
}
}
/** Close every active peer connection and clear internal state. */
export function closeAllPeers(state: PeerConnectionManagerState): void {
clearAllPeerReconnectTimers(state);
clearAllPeerDisconnectGraceTimers(state);
clearAllPingTimers(state);
state.activePeerConnections.forEach((peerData) => {
if (peerData.dataChannel)
peerData.dataChannel.close();
peerData.connection.close();
});
state.activePeerConnections.clear();
state.remotePeerStreams.clear();
state.peerNegotiationQueue.clear();
state.peerLatencies.clear();
state.pendingPings.clear();
state.connectedPeersChanged$.next([]);
}
export function trackDisconnectedPeer(state: PeerConnectionManagerState, peerId: string): void {
state.disconnectedPeerTracker.set(peerId, {
lastSeenTimestamp: Date.now(),
reconnectAttempts: 0
});
}
export function clearPeerReconnectTimer(
state: PeerConnectionManagerState,
peerId: string
): void {
const timer = state.peerReconnectTimers.get(peerId);
if (timer) {
clearInterval(timer);
state.peerReconnectTimers.delete(peerId);
}
}
export function clearPeerDisconnectGraceTimer(
state: PeerConnectionManagerState,
peerId: string
): void {
const timer = state.peerDisconnectGraceTimers.get(peerId);
if (timer) {
clearTimeout(timer);
state.peerDisconnectGraceTimers.delete(peerId);
}
}
/** Cancel all pending peer reconnect timers and clear the tracker. */
export function clearAllPeerReconnectTimers(state: PeerConnectionManagerState): void {
state.peerReconnectTimers.forEach((timer) => clearInterval(timer));
state.peerReconnectTimers.clear();
state.disconnectedPeerTracker.clear();
}
export function clearAllPeerDisconnectGraceTimers(state: PeerConnectionManagerState): void {
state.peerDisconnectGraceTimers.forEach((timer) => clearTimeout(timer));
state.peerDisconnectGraceTimers.clear();
}
export function schedulePeerDisconnectRecovery(
context: PeerConnectionManagerContext,
peerId: string,
handlers: RecoveryHandlers
): void {
const { logger, state } = context;
if (state.peerDisconnectGraceTimers.has(peerId))
return;
logger.warn('Peer temporarily disconnected; waiting before reconnect', { peerId });
const timer = setTimeout(() => {
state.peerDisconnectGraceTimers.delete(peerId);
const peerData = state.activePeerConnections.get(peerId);
if (!peerData)
return;
const connectionState = peerData.connection.connectionState;
if (connectionState === CONNECTION_STATE_CONNECTED || connectionState === 'connecting') {
logger.info('Peer recovered before disconnect grace expired', {
peerId,
state: connectionState
});
return;
}
logger.warn('Peer still disconnected after grace period; recreating connection', {
peerId,
state: connectionState
});
trackDisconnectedPeer(state, peerId);
handlers.removePeer(peerId, { preserveReconnectState: true });
schedulePeerReconnect(context, peerId, handlers);
}, PEER_DISCONNECT_GRACE_MS);
state.peerDisconnectGraceTimers.set(peerId, timer);
}
export function schedulePeerReconnect(
context: PeerConnectionManagerContext,
peerId: string,
handlers: RecoveryHandlers
): void {
const { callbacks, logger, state } = context;
if (state.peerReconnectTimers.has(peerId))
return;
logger.info('Scheduling P2P reconnect', { peerId });
const timer = setInterval(() => {
const info = state.disconnectedPeerTracker.get(peerId);
if (!info) {
clearPeerReconnectTimer(state, peerId);
return;
}
info.reconnectAttempts++;
logger.info('P2P reconnect attempt', {
peerId,
attempt: info.reconnectAttempts
});
if (info.reconnectAttempts >= PEER_RECONNECT_MAX_ATTEMPTS) {
logger.info('P2P reconnect max attempts reached', { peerId });
clearPeerReconnectTimer(state, peerId);
state.disconnectedPeerTracker.delete(peerId);
return;
}
if (!callbacks.isSignalingConnected()) {
logger.info('Skipping P2P reconnect - no signaling connection', { peerId });
return;
}
attemptPeerReconnect(state, peerId, handlers);
}, PEER_RECONNECT_INTERVAL_MS);
state.peerReconnectTimers.set(peerId, timer);
}
export function attemptPeerReconnect(
state: PeerConnectionManagerState,
peerId: string,
handlers: RecoveryHandlers
): void {
if (state.activePeerConnections.has(peerId)) {
handlers.removePeer(peerId, { preserveReconnectState: true });
}
handlers.createPeerConnection(peerId, true);
void handlers.createAndSendOffer(peerId);
}
export function requestVoiceStateFromPeer(
state: PeerConnectionManagerState,
logger: PeerConnectionManagerContext['logger'],
peerId: string
): void {
const peerData = state.activePeerConnections.get(peerId);
if (peerData?.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN) {
try {
peerData.dataChannel.send(JSON.stringify({ type: P2P_TYPE_VOICE_STATE_REQUEST }));
} catch (error) {
logger.warn('Failed to request voice state', error);
}
}
}
/** Return a snapshot copy of the currently-connected peer IDs. */
export function getConnectedPeerIds(state: PeerConnectionManagerState): string[] {
return [...state.connectedPeersList];
}
export function addToConnectedPeers(state: PeerConnectionManagerState, peerId: string): void {
if (!state.connectedPeersList.includes(peerId)) {
state.connectedPeersList = [...state.connectedPeersList, peerId];
state.connectedPeersChanged$.next(state.connectedPeersList);
}
}
/**
* Remove a peer from the connected list and notify subscribers.
*/
export function removeFromConnectedPeers(
state: PeerConnectionManagerState,
peerId: string
): void {
state.connectedPeersList = state.connectedPeersList.filter(
(connectedId) => connectedId !== peerId
);
state.connectedPeersChanged$.next(state.connectedPeersList);
}
/** Reset the connected peers list to empty and notify subscribers. */
export function resetConnectedPeers(state: PeerConnectionManagerState): void {
state.connectedPeersList = [];
state.connectedPeersChanged$.next([]);
}

View File

@@ -0,0 +1,103 @@
import { Subject } from 'rxjs';
import { ChatEvent } from '../../../models';
import { WebRTCLogger } from '../webrtc-logger';
import {
DisconnectedPeerEntry,
IdentifyCredentials,
PeerData,
VoiceStateSnapshot
} from '../webrtc.types';
/**
* Callbacks the PeerConnectionManager needs from the owning service.
* This keeps the manager decoupled from Angular DI / signals.
*/
export interface PeerConnectionCallbacks {
/** Send a raw JSON message via the signaling server. */
sendRawMessage(msg: Record<string, unknown>): void;
/** Get the current local media stream (mic audio). */
getLocalMediaStream(): MediaStream | null;
/** Whether signaling is currently connected. */
isSignalingConnected(): boolean;
/** Returns the current voice/screen state snapshot for broadcasting. */
getVoiceStateSnapshot(): VoiceStateSnapshot;
/** Returns the identify credentials (oderId + displayName). */
getIdentifyCredentials(): IdentifyCredentials | null;
/** Returns the local peer ID. */
getLocalPeerId(): string;
/** Whether screen sharing is active. */
isScreenSharingActive(): boolean;
}
export interface PeerConnectionManagerState {
activePeerConnections: Map<string, PeerData>;
remotePeerStreams: Map<string, MediaStream>;
disconnectedPeerTracker: Map<string, DisconnectedPeerEntry>;
peerReconnectTimers: Map<string, ReturnType<typeof setInterval>>;
peerDisconnectGraceTimers: Map<string, ReturnType<typeof setTimeout>>;
pendingPings: Map<string, number>;
peerPingTimers: Map<string, ReturnType<typeof setInterval>>;
peerLatencies: Map<string, number>;
peerLatencyChanged$: Subject<{ peerId: string; latencyMs: number }>;
peerNegotiationQueue: Map<string, Promise<void>>;
peerConnected$: Subject<string>;
peerDisconnected$: Subject<string>;
remoteStream$: Subject<{ peerId: string; stream: MediaStream }>;
messageReceived$: Subject<ChatEvent>;
connectedPeersChanged$: Subject<string[]>;
connectedPeersList: string[];
}
export interface PeerConnectionManagerContext {
readonly logger: WebRTCLogger;
readonly callbacks: PeerConnectionCallbacks;
readonly state: PeerConnectionManagerState;
}
export interface RemovePeerOptions {
preserveReconnectState?: boolean;
}
export interface ConnectionLifecycleHandlers {
clearPeerDisconnectGraceTimer(peerId: string): void;
addToConnectedPeers(peerId: string): void;
clearPeerReconnectTimer(peerId: string): void;
requestVoiceStateFromPeer(peerId: string): void;
schedulePeerDisconnectRecovery(peerId: string): void;
trackDisconnectedPeer(peerId: string): void;
removePeer(peerId: string, options?: RemovePeerOptions): void;
schedulePeerReconnect(peerId: string): void;
handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void;
setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void;
}
export interface NegotiationHandlers {
createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData;
}
export interface RecoveryHandlers {
removePeer(peerId: string, options?: RemovePeerOptions): void;
createPeerConnection(peerId: string, isInitiator: boolean): PeerData;
createAndSendOffer(peerId: string): Promise<void>;
}
export function createPeerConnectionManagerState(): PeerConnectionManagerState {
return {
activePeerConnections: new Map<string, PeerData>(),
remotePeerStreams: new Map<string, MediaStream>(),
disconnectedPeerTracker: new Map<string, DisconnectedPeerEntry>(),
peerReconnectTimers: new Map<string, ReturnType<typeof setInterval>>(),
peerDisconnectGraceTimers: new Map<string, ReturnType<typeof setTimeout>>(),
pendingPings: new Map<string, number>(),
peerPingTimers: new Map<string, ReturnType<typeof setInterval>>(),
peerLatencies: new Map<string, number>(),
peerLatencyChanged$: new Subject<{ peerId: string; latencyMs: number }>(),
peerNegotiationQueue: new Map<string, Promise<void>>(),
peerConnected$: new Subject<string>(),
peerDisconnected$: new Subject<string>(),
remoteStream$: new Subject<{ peerId: string; stream: MediaStream }>(),
messageReceived$: new Subject<ChatEvent>(),
connectedPeersChanged$: new Subject<string[]>(),
connectedPeersList: []
};
}

View File

@@ -0,0 +1,94 @@
import { TRACK_KIND_VIDEO } from '../../webrtc.constants';
import { PeerConnectionManagerContext } from '../shared';
export function handleRemoteTrack(
context: PeerConnectionManagerContext,
event: RTCTrackEvent,
remotePeerId: string
): void {
const { logger, state } = context;
const track = event.track;
const settings =
typeof track.getSettings === 'function' ? track.getSettings() : ({} as MediaTrackSettings);
logger.info('Remote track', {
remotePeerId,
kind: track.kind,
id: track.id,
enabled: track.enabled,
readyState: track.readyState,
settings
});
logger.attachTrackDiagnostics(track, `remote:${remotePeerId}:${track.kind}`);
if (track.kind === TRACK_KIND_VIDEO && (!track.enabled || track.readyState !== 'live')) {
logger.info('Skipping inactive video track', {
remotePeerId,
enabled: track.enabled,
readyState: track.readyState
});
return;
}
const compositeStream = buildCompositeRemoteStream(state, remotePeerId, track);
track.addEventListener('ended', () => removeRemoteTrack(state, remotePeerId, track.id));
state.remotePeerStreams.set(remotePeerId, compositeStream);
state.remoteStream$.next({
peerId: remotePeerId,
stream: compositeStream
});
}
function buildCompositeRemoteStream(
state: PeerConnectionManagerContext['state'],
remotePeerId: string,
incomingTrack: MediaStreamTrack
): MediaStream {
const existingStream = state.remotePeerStreams.get(remotePeerId);
let preservedTracks: MediaStreamTrack[] = [];
if (existingStream) {
preservedTracks = existingStream.getTracks().filter(
(existingTrack) =>
existingTrack.kind !== incomingTrack.kind && existingTrack.readyState === 'live'
);
}
return new MediaStream([...preservedTracks, incomingTrack]);
}
function removeRemoteTrack(
state: PeerConnectionManagerContext['state'],
remotePeerId: string,
trackId: string
): void {
const currentStream = state.remotePeerStreams.get(remotePeerId);
if (!currentStream)
return;
const remainingTracks = currentStream
.getTracks()
.filter((existingTrack) => existingTrack.id !== trackId && existingTrack.readyState === 'live');
if (remainingTracks.length === currentStream.getTracks().length)
return;
if (remainingTracks.length === 0) {
state.remotePeerStreams.delete(remotePeerId);
return;
}
const nextStream = new MediaStream(remainingTracks);
state.remotePeerStreams.set(remotePeerId, nextStream);
state.remoteStream$.next({
peerId: remotePeerId,
stream: nextStream
});
}

File diff suppressed because it is too large Load Diff