Files
Toju/src/app/core/services/webrtc.service.ts
Myx cb2c0495b9
All checks were successful
Queue Release Build / prepare (push) Successful in 16s
Deploy Web Apps / deploy (push) Successful in 10m15s
Queue Release Build / build-linux (push) Successful in 26m14s
Queue Release Build / build-windows (push) Successful in 25m41s
Queue Release Build / finalize (push) Successful in 1m51s
hotfix handshake issue
2026-03-19 03:34:26 +01:00

1379 lines
44 KiB
TypeScript

/**
* WebRTCService - thin Angular service that composes specialised managers.
*
* Each concern lives in its own file under `./webrtc/`:
* • SignalingManager - WebSocket lifecycle & reconnection
* • PeerConnectionManager - RTCPeerConnection, offers/answers, ICE, data channels
* • MediaManager - mic voice, mute, deafen, bitrate
* • ScreenShareManager - screen capture & mixed audio
* • WebRTCLogger - debug / diagnostic logging
*
* This file wires them together and exposes a public API that is
* identical to the old monolithic service so consumers don't change.
*/
/* eslint-disable @typescript-eslint/member-ordering, @typescript-eslint/no-non-null-assertion, @typescript-eslint/no-unused-vars */
import {
Injectable,
signal,
computed,
inject,
OnDestroy
} from '@angular/core';
import {
Observable,
of,
Subject,
Subscription
} from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { SignalingMessage, ChatEvent } from '../models/index';
import { TimeSyncService } from './time-sync.service';
import { DebuggingService } from './debugging.service';
import { ScreenShareSourcePickerService } from './screen-share-source-picker.service';
import {
SignalingManager,
PeerConnectionManager,
MediaManager,
ScreenShareManager,
WebRTCLogger,
IdentifyCredentials,
JoinedServerInfo,
VoiceStateSnapshot,
LatencyProfile,
ScreenShareStartOptions,
SIGNALING_TYPE_IDENTIFY,
SIGNALING_TYPE_JOIN_SERVER,
SIGNALING_TYPE_VIEW_SERVER,
SIGNALING_TYPE_LEAVE_SERVER,
SIGNALING_TYPE_OFFER,
SIGNALING_TYPE_ANSWER,
SIGNALING_TYPE_ICE_CANDIDATE,
SIGNALING_TYPE_CONNECTED,
SIGNALING_TYPE_SERVER_USERS,
SIGNALING_TYPE_USER_JOINED,
SIGNALING_TYPE_USER_LEFT,
DEFAULT_DISPLAY_NAME,
P2P_TYPE_SCREEN_SHARE_REQUEST,
P2P_TYPE_SCREEN_SHARE_STOP,
P2P_TYPE_VOICE_STATE,
P2P_TYPE_SCREEN_STATE
} from './webrtc';
interface SignalingUserSummary {
oderId: string;
displayName: string;
}
interface IncomingSignalingPayload {
sdp?: RTCSessionDescriptionInit;
candidate?: RTCIceCandidateInit;
}
type IncomingSignalingMessage = Omit<Partial<SignalingMessage>, 'type' | 'payload'> & {
type: string;
payload?: IncomingSignalingPayload;
oderId?: string;
serverTime?: number;
serverId?: string;
serverIds?: string[];
users?: SignalingUserSummary[];
displayName?: string;
fromUserId?: string;
};
@Injectable({
providedIn: 'root'
})
export class WebRTCService implements OnDestroy {
private readonly timeSync = inject(TimeSyncService);
private readonly debugging = inject(DebuggingService);
private readonly screenShareSourcePicker = inject(ScreenShareSourcePickerService);
private readonly logger = new WebRTCLogger(() => this.debugging.enabled());
private lastIdentifyCredentials: IdentifyCredentials | null = null;
private readonly lastJoinedServerBySignalUrl = new Map<string, JoinedServerInfo>();
private readonly memberServerIdsBySignalUrl = new Map<string, Set<string>>();
private readonly serverSignalingUrlMap = new Map<string, string>();
private readonly peerSignalingUrlMap = new Map<string, string>();
private readonly signalingManagers = new Map<string, SignalingManager>();
private readonly signalingSubscriptions = new Map<string, Subscription[]>();
private readonly signalingConnectionStates = new Map<string, boolean>();
private activeServerId: string | null = null;
/** The server ID where voice is currently active, or `null` when not in voice. */
private voiceServerId: string | null = null;
/** Maps each remote peer ID to the shared servers they currently belong to. */
private readonly peerServerMap = new Map<string, Set<string>>();
private readonly serviceDestroyed$ = new Subject<void>();
private remoteScreenShareRequestsEnabled = false;
private readonly desiredRemoteScreenSharePeers = new Set<string>();
private readonly activeRemoteScreenSharePeers = new Set<string>();
private readonly _localPeerId = signal<string>(uuidv4());
private readonly _isSignalingConnected = signal(false);
private readonly _isVoiceConnected = signal(false);
private readonly _connectedPeers = signal<string[]>([]);
private readonly _isMuted = signal(false);
private readonly _isDeafened = signal(false);
private readonly _isScreenSharing = signal(false);
private readonly _isNoiseReductionEnabled = signal(false);
private readonly _screenStreamSignal = signal<MediaStream | null>(null);
private readonly _isScreenShareRemotePlaybackSuppressed = signal(false);
private readonly _forceDefaultRemotePlaybackOutput = signal(false);
private readonly _hasConnectionError = signal(false);
private readonly _connectionErrorMessage = signal<string | null>(null);
private readonly _hasEverConnected = signal(false);
/**
* Reactive snapshot of per-peer latencies (ms).
* Updated whenever a ping/pong round-trip completes.
* Keyed by remote peer (oderId).
*/
private readonly _peerLatencies = signal<ReadonlyMap<string, number>>(new Map());
// Public computed signals (unchanged external API)
readonly peerId = computed(() => this._localPeerId());
readonly isConnected = computed(() => this._isSignalingConnected());
readonly hasEverConnected = computed(() => this._hasEverConnected());
readonly isVoiceConnected = computed(() => this._isVoiceConnected());
readonly connectedPeers = computed(() => this._connectedPeers());
readonly isMuted = computed(() => this._isMuted());
readonly isDeafened = computed(() => this._isDeafened());
readonly isScreenSharing = computed(() => this._isScreenSharing());
readonly isNoiseReductionEnabled = computed(() => this._isNoiseReductionEnabled());
readonly screenStream = computed(() => this._screenStreamSignal());
readonly isScreenShareRemotePlaybackSuppressed = computed(() => this._isScreenShareRemotePlaybackSuppressed());
readonly forceDefaultRemotePlaybackOutput = computed(() => this._forceDefaultRemotePlaybackOutput());
readonly hasConnectionError = computed(() => this._hasConnectionError());
readonly connectionErrorMessage = computed(() => this._connectionErrorMessage());
readonly shouldShowConnectionError = computed(() => {
if (!this._hasConnectionError())
return false;
if (this._isVoiceConnected() && this._connectedPeers().length > 0)
return false;
return true;
});
/** Per-peer latency map (ms). Read via `peerLatencies()`. */
readonly peerLatencies = computed(() => this._peerLatencies());
private readonly signalingMessage$ = new Subject<IncomingSignalingMessage>();
readonly onSignalingMessage = this.signalingMessage$.asObservable();
// Delegates to managers
get onMessageReceived(): Observable<ChatEvent> {
return this.peerManager.messageReceived$.asObservable();
}
get onPeerConnected(): Observable<string> {
return this.peerManager.peerConnected$.asObservable();
}
get onPeerDisconnected(): Observable<string> {
return this.peerManager.peerDisconnected$.asObservable();
}
get onRemoteStream(): Observable<{ peerId: string; stream: MediaStream }> {
return this.peerManager.remoteStream$.asObservable();
}
get onVoiceConnected(): Observable<void> {
return this.mediaManager.voiceConnected$.asObservable();
}
private readonly peerManager: PeerConnectionManager;
private readonly mediaManager: MediaManager;
private readonly screenShareManager: ScreenShareManager;
constructor() {
// Create managers with null callbacks first to break circular initialization
this.peerManager = new PeerConnectionManager(this.logger, null!);
this.mediaManager = new MediaManager(this.logger, null!);
this.screenShareManager = new ScreenShareManager(this.logger, null!);
// Now wire up cross-references (all managers are instantiated)
this.peerManager.setCallbacks({
sendRawMessage: (msg: Record<string, unknown>) => this.sendRawMessage(msg),
getLocalMediaStream: (): MediaStream | null => this.mediaManager.getLocalStream(),
isSignalingConnected: (): boolean => this._isSignalingConnected(),
getVoiceStateSnapshot: (): VoiceStateSnapshot => this.getCurrentVoiceState(),
getIdentifyCredentials: (): IdentifyCredentials | null => this.lastIdentifyCredentials,
getLocalPeerId: (): string => this._localPeerId(),
isScreenSharingActive: (): boolean => this._isScreenSharing()
});
this.mediaManager.setCallbacks({
getActivePeers: (): Map<string, import('./webrtc').PeerData> =>
this.peerManager.activePeerConnections,
renegotiate: (peerId: string): Promise<void> => this.peerManager.renegotiate(peerId),
broadcastMessage: (event: ChatEvent): void => this.peerManager.broadcastMessage(event),
getIdentifyOderId: (): string => this.lastIdentifyCredentials?.oderId || this._localPeerId(),
getIdentifyDisplayName: (): string =>
this.lastIdentifyCredentials?.displayName || DEFAULT_DISPLAY_NAME
});
this.screenShareManager.setCallbacks({
getActivePeers: (): Map<string, import('./webrtc').PeerData> =>
this.peerManager.activePeerConnections,
getLocalMediaStream: (): MediaStream | null => this.mediaManager.getLocalStream(),
renegotiate: (peerId: string): Promise<void> => this.peerManager.renegotiate(peerId),
broadcastCurrentStates: (): void => this.peerManager.broadcastCurrentStates(),
selectDesktopSource: async (sources, options) => await this.screenShareSourcePicker.open(
sources,
options.includeSystemAudio
),
updateLocalScreenShareState: (state): void => {
this._isScreenSharing.set(state.active);
this._screenStreamSignal.set(state.stream);
this._isScreenShareRemotePlaybackSuppressed.set(state.suppressRemotePlayback);
this._forceDefaultRemotePlaybackOutput.set(state.forceDefaultRemotePlaybackOutput);
}
});
this.wireManagerEvents();
}
private wireManagerEvents(): void {
// Internal control-plane messages for on-demand screen-share delivery.
this.peerManager.messageReceived$.subscribe((event) => this.handlePeerControlMessage(event));
// Peer manager → connected peers signal
this.peerManager.connectedPeersChanged$.subscribe((peers: string[]) =>
this._connectedPeers.set(peers)
);
// If we are already sharing when a new peer connection finishes, push the
// current screen-share tracks to that peer and renegotiate.
this.peerManager.peerConnected$.subscribe((peerId) => {
if (!this.screenShareManager.getIsScreenActive()) {
if (this.remoteScreenShareRequestsEnabled && this.desiredRemoteScreenSharePeers.has(peerId)) {
this.requestRemoteScreenShares([peerId]);
}
return;
}
this.screenShareManager.syncScreenShareToPeer(peerId);
if (this.remoteScreenShareRequestsEnabled && this.desiredRemoteScreenSharePeers.has(peerId)) {
this.requestRemoteScreenShares([peerId]);
}
});
this.peerManager.peerDisconnected$.subscribe((peerId) => {
this.activeRemoteScreenSharePeers.delete(peerId);
this.peerServerMap.delete(peerId);
this.peerSignalingUrlMap.delete(peerId);
this.screenShareManager.clearScreenShareRequest(peerId);
});
// Media manager → voice connected signal
this.mediaManager.voiceConnected$.subscribe(() => {
this._isVoiceConnected.set(true);
});
// Peer manager → latency updates
this.peerManager.peerLatencyChanged$.subscribe(({ peerId, latencyMs }) => {
const next = new Map(this.peerManager.peerLatencies);
this._peerLatencies.set(next);
});
}
private ensureSignalingManager(signalUrl: string): SignalingManager {
const existingManager = this.signalingManagers.get(signalUrl);
if (existingManager) {
return existingManager;
}
const manager = new SignalingManager(
this.logger,
() => this.lastIdentifyCredentials,
() => this.lastJoinedServerBySignalUrl.get(signalUrl) ?? null,
() => this.getMemberServerIdsForSignalUrl(signalUrl)
);
const subscriptions: Subscription[] = [
manager.connectionStatus$.subscribe(({ connected, errorMessage }) =>
this.handleSignalingConnectionStatus(signalUrl, connected, errorMessage)
),
manager.messageReceived$.subscribe((message) => this.handleSignalingMessage(message, signalUrl)),
manager.heartbeatTick$.subscribe(() => this.peerManager.broadcastCurrentStates())
];
this.signalingManagers.set(signalUrl, manager);
this.signalingSubscriptions.set(signalUrl, subscriptions);
return manager;
}
private handleSignalingConnectionStatus(
signalUrl: string,
connected: boolean,
errorMessage?: string
): void {
this.signalingConnectionStates.set(signalUrl, connected);
if (connected)
this._hasEverConnected.set(true);
const anyConnected = this.isAnySignalingConnected();
this._isSignalingConnected.set(anyConnected);
this._hasConnectionError.set(!anyConnected);
this._connectionErrorMessage.set(anyConnected ? null : (errorMessage ?? 'Disconnected from signaling server'));
}
private isAnySignalingConnected(): boolean {
for (const manager of this.signalingManagers.values()) {
if (manager.isSocketOpen()) {
return true;
}
}
return false;
}
private getConnectedSignalingManagers(): { signalUrl: string; manager: SignalingManager }[] {
const connectedManagers: { signalUrl: string; manager: SignalingManager }[] = [];
for (const [signalUrl, manager] of this.signalingManagers.entries()) {
if (!manager.isSocketOpen()) {
continue;
}
connectedManagers.push({ signalUrl,
manager });
}
return connectedManagers;
}
private getOrCreateMemberServerSet(signalUrl: string): Set<string> {
const existingSet = this.memberServerIdsBySignalUrl.get(signalUrl);
if (existingSet) {
return existingSet;
}
const createdSet = new Set<string>();
this.memberServerIdsBySignalUrl.set(signalUrl, createdSet);
return createdSet;
}
private getMemberServerIdsForSignalUrl(signalUrl: string): ReadonlySet<string> {
return this.memberServerIdsBySignalUrl.get(signalUrl) ?? new Set<string>();
}
private isJoinedServer(serverId: string): boolean {
for (const memberServerIds of this.memberServerIdsBySignalUrl.values()) {
if (memberServerIds.has(serverId)) {
return true;
}
}
return false;
}
private getJoinedServerCount(): number {
let joinedServerCount = 0;
for (const memberServerIds of this.memberServerIdsBySignalUrl.values()) {
joinedServerCount += memberServerIds.size;
}
return joinedServerCount;
}
private handleSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
this.signalingMessage$.next(message);
this.logger.info('Signaling message', {
signalUrl,
type: message.type
});
switch (message.type) {
case SIGNALING_TYPE_CONNECTED:
this.handleConnectedSignalingMessage(message, signalUrl);
return;
case SIGNALING_TYPE_SERVER_USERS:
this.handleServerUsersSignalingMessage(message, signalUrl);
return;
case SIGNALING_TYPE_USER_JOINED:
this.handleUserJoinedSignalingMessage(message, signalUrl);
return;
case SIGNALING_TYPE_USER_LEFT:
this.handleUserLeftSignalingMessage(message, signalUrl);
return;
case SIGNALING_TYPE_OFFER:
this.handleOfferSignalingMessage(message, signalUrl);
return;
case SIGNALING_TYPE_ANSWER:
this.handleAnswerSignalingMessage(message, signalUrl);
return;
case SIGNALING_TYPE_ICE_CANDIDATE:
this.handleIceCandidateSignalingMessage(message, signalUrl);
return;
default:
return;
}
}
private handleConnectedSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
this.logger.info('Server connected', {
oderId: message.oderId,
signalUrl
});
if (message.serverId) {
this.serverSignalingUrlMap.set(message.serverId, signalUrl);
}
if (typeof message.serverTime === 'number') {
this.timeSync.setFromServerTime(message.serverTime);
}
}
private handleServerUsersSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
const users = Array.isArray(message.users) ? message.users : [];
this.logger.info('Server users', {
count: users.length,
signalUrl,
serverId: message.serverId
});
if (message.serverId) {
this.serverSignalingUrlMap.set(message.serverId, signalUrl);
}
for (const user of users) {
if (!user.oderId)
continue;
this.peerSignalingUrlMap.set(user.oderId, signalUrl);
if (message.serverId) {
this.trackPeerInServer(user.oderId, message.serverId);
}
const existing = this.peerManager.activePeerConnections.get(user.oderId);
if (this.canReusePeerConnection(existing)) {
this.logger.info('Reusing active peer connection', {
connectionState: existing?.connection.connectionState ?? 'unknown',
dataChannelState: existing?.dataChannel?.readyState ?? 'missing',
oderId: user.oderId,
serverId: message.serverId,
signalUrl
});
continue;
}
if (existing) {
this.logger.info('Removing failed peer before recreate', {
connectionState: existing.connection.connectionState,
dataChannelState: existing.dataChannel?.readyState ?? 'missing',
oderId: user.oderId,
serverId: message.serverId,
signalUrl
});
this.peerManager.removePeer(user.oderId);
}
this.logger.info('Create peer connection to existing user', {
oderId: user.oderId,
serverId: message.serverId,
signalUrl
});
this.peerManager.createPeerConnection(user.oderId, true);
void this.peerManager.createAndSendOffer(user.oderId);
}
}
private handleUserJoinedSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
this.logger.info('User joined', {
displayName: message.displayName,
oderId: message.oderId,
signalUrl
});
if (message.serverId) {
this.serverSignalingUrlMap.set(message.serverId, signalUrl);
}
if (message.oderId) {
this.peerSignalingUrlMap.set(message.oderId, signalUrl);
}
if (message.oderId && message.serverId) {
this.trackPeerInServer(message.oderId, message.serverId);
}
}
private handleUserLeftSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
this.logger.info('User left', {
displayName: message.displayName,
oderId: message.oderId,
signalUrl,
serverId: message.serverId
});
if (message.oderId) {
const hasRemainingSharedServers = Array.isArray(message.serverIds)
? this.replacePeerSharedServers(message.oderId, message.serverIds)
: (message.serverId
? this.untrackPeerFromServer(message.oderId, message.serverId)
: false);
if (!hasRemainingSharedServers) {
this.peerManager.removePeer(message.oderId);
this.peerServerMap.delete(message.oderId);
this.peerSignalingUrlMap.delete(message.oderId);
}
}
}
private handleOfferSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
const fromUserId = message.fromUserId;
const sdp = message.payload?.sdp;
if (!fromUserId || !sdp)
return;
this.peerSignalingUrlMap.set(fromUserId, signalUrl);
const offerEffectiveServer = this.voiceServerId || this.activeServerId;
if (offerEffectiveServer && !this.peerServerMap.has(fromUserId)) {
this.trackPeerInServer(fromUserId, offerEffectiveServer);
}
this.peerManager.handleOffer(fromUserId, sdp);
}
private handleAnswerSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
const fromUserId = message.fromUserId;
const sdp = message.payload?.sdp;
if (!fromUserId || !sdp)
return;
this.peerSignalingUrlMap.set(fromUserId, signalUrl);
this.peerManager.handleAnswer(fromUserId, sdp);
}
private handleIceCandidateSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
const fromUserId = message.fromUserId;
const candidate = message.payload?.candidate;
if (!fromUserId || !candidate)
return;
this.peerSignalingUrlMap.set(fromUserId, signalUrl);
this.peerManager.handleIceCandidate(fromUserId, candidate);
}
/**
* Close all peer connections that were discovered from a server
* other than `serverId`. Also removes their entries from
* {@link peerServerMap} so the bookkeeping stays clean.
*
* This ensures audio (and data channels) are scoped to only
* the voice-active (or currently viewed) server.
*/
private closePeersNotInServer(serverId: string): void {
const peersToClose: string[] = [];
this.peerServerMap.forEach((peerServerIds, peerId) => {
if (!peerServerIds.has(serverId)) {
peersToClose.push(peerId);
}
});
for (const peerId of peersToClose) {
this.logger.info('Closing peer from different server', { peerId,
currentServer: serverId });
this.peerManager.removePeer(peerId);
this.peerServerMap.delete(peerId);
this.peerSignalingUrlMap.delete(peerId);
}
}
private getCurrentVoiceState(): VoiceStateSnapshot {
return {
isConnected: this._isVoiceConnected(),
isMuted: this._isMuted(),
isDeafened: this._isDeafened(),
isScreenSharing: this._isScreenSharing(),
roomId: this.mediaManager.getCurrentVoiceRoomId(),
serverId: this.mediaManager.getCurrentVoiceServerId()
};
}
// PUBLIC API - matches the old monolithic service's interface
/**
* Connect to a signaling server via WebSocket.
*
* @param serverUrl - The WebSocket URL of the signaling server.
* @returns An observable that emits `true` once connected.
*/
connectToSignalingServer(serverUrl: string): Observable<boolean> {
const manager = this.ensureSignalingManager(serverUrl);
if (manager.isSocketOpen()) {
return of(true);
}
return manager.connect(serverUrl);
}
/** Returns true when the signaling socket for a given URL is currently open. */
isSignalingConnectedTo(serverUrl: string): boolean {
return this.signalingManagers.get(serverUrl)?.isSocketOpen() ?? false;
}
private trackPeerInServer(peerId: string, serverId: string): void {
if (!peerId || !serverId)
return;
const trackedServers = this.peerServerMap.get(peerId) ?? new Set<string>();
trackedServers.add(serverId);
this.peerServerMap.set(peerId, trackedServers);
}
private replacePeerSharedServers(peerId: string, serverIds: string[]): boolean {
const sharedServerIds = serverIds.filter((serverId) => this.isJoinedServer(serverId));
if (sharedServerIds.length === 0) {
this.peerServerMap.delete(peerId);
return false;
}
this.peerServerMap.set(peerId, new Set(sharedServerIds));
return true;
}
private untrackPeerFromServer(peerId: string, serverId: string): boolean {
const trackedServers = this.peerServerMap.get(peerId);
if (!trackedServers)
return false;
trackedServers.delete(serverId);
if (trackedServers.size === 0) {
this.peerServerMap.delete(peerId);
return false;
}
this.peerServerMap.set(peerId, trackedServers);
return true;
}
/**
* Ensure the signaling WebSocket is connected, reconnecting if needed.
*
* @param timeoutMs - Maximum time (ms) to wait for the connection.
* @returns `true` if connected within the timeout.
*/
async ensureSignalingConnected(timeoutMs?: number): Promise<boolean> {
if (this.isAnySignalingConnected()) {
return true;
}
for (const manager of this.signalingManagers.values()) {
if (await manager.ensureConnected(timeoutMs)) {
return true;
}
}
return false;
}
/**
* Send a signaling-level message (with `from` and `timestamp` auto-populated).
*
* @param message - The signaling message payload (excluding `from` / `timestamp`).
*/
sendSignalingMessage(message: Omit<SignalingMessage, 'from' | 'timestamp'>): void {
const targetPeerId = message.to;
if (targetPeerId) {
const targetSignalUrl = this.peerSignalingUrlMap.get(targetPeerId);
if (targetSignalUrl) {
const targetManager = this.ensureSignalingManager(targetSignalUrl);
targetManager.sendSignalingMessage(message, this._localPeerId());
return;
}
}
const connectedManagers = this.getConnectedSignalingManagers();
if (connectedManagers.length === 0) {
this.logger.error('[signaling] No active signaling connection for outbound message', new Error('No signaling manager available'), {
type: message.type
});
return;
}
for (const { manager } of connectedManagers) {
manager.sendSignalingMessage(message, this._localPeerId());
}
}
/**
* Send a raw JSON payload through the signaling WebSocket.
*
* @param message - Arbitrary JSON message.
*/
sendRawMessage(message: Record<string, unknown>): void {
const targetPeerId = typeof message['targetUserId'] === 'string' ? message['targetUserId'] : null;
if (targetPeerId) {
const targetSignalUrl = this.peerSignalingUrlMap.get(targetPeerId);
if (targetSignalUrl && this.sendRawMessageToSignalUrl(targetSignalUrl, message)) {
return;
}
}
const serverId = typeof message['serverId'] === 'string' ? message['serverId'] : null;
if (serverId) {
const serverSignalUrl = this.serverSignalingUrlMap.get(serverId);
if (serverSignalUrl && this.sendRawMessageToSignalUrl(serverSignalUrl, message)) {
return;
}
}
const connectedManagers = this.getConnectedSignalingManagers();
if (connectedManagers.length === 0) {
this.logger.error('[signaling] No active signaling connection for outbound message', new Error('No signaling manager available'), {
type: typeof message['type'] === 'string' ? message['type'] : 'unknown'
});
return;
}
for (const { manager } of connectedManagers) {
manager.sendRawMessage(message);
}
}
private sendRawMessageToSignalUrl(signalUrl: string, message: Record<string, unknown>): boolean {
const manager = this.signalingManagers.get(signalUrl);
if (!manager) {
return false;
}
manager.sendRawMessage(message);
return true;
}
/**
* Track the currently-active server ID (for server-scoped operations).
*
* @param serverId - The server to mark as active.
*/
setCurrentServer(serverId: string): void {
this.activeServerId = serverId;
}
/** The server ID currently being viewed / active, or `null`. */
get currentServerId(): string | null {
return this.activeServerId;
}
/** The last signaling URL used by the client, if any. */
getCurrentSignalingUrl(): string | null {
if (this.activeServerId) {
const activeServerSignalUrl = this.serverSignalingUrlMap.get(this.activeServerId);
if (activeServerSignalUrl) {
return activeServerSignalUrl;
}
}
return this.getConnectedSignalingManagers()[0]?.signalUrl ?? null;
}
/**
* Send an identify message to the signaling server.
*
* The credentials are cached so they can be replayed after a reconnect.
*
* @param oderId - The user's unique order/peer ID.
* @param displayName - The user's display name.
*/
identify(oderId: string, displayName: string, signalUrl?: string): void {
this.lastIdentifyCredentials = { oderId,
displayName };
const identifyMessage = {
type: SIGNALING_TYPE_IDENTIFY,
oderId,
displayName
};
if (signalUrl) {
this.sendRawMessageToSignalUrl(signalUrl, identifyMessage);
return;
}
this.sendRawMessage(identifyMessage);
}
/**
* Join a server (room) on the signaling server.
*
* @param roomId - The server / room ID to join.
* @param userId - The local user ID.
*/
joinRoom(roomId: string, userId: string, signalUrl?: string): void {
const resolvedSignalUrl = signalUrl
?? this.serverSignalingUrlMap.get(roomId)
?? this.getCurrentSignalingUrl();
if (!resolvedSignalUrl) {
this.logger.warn('[signaling] Cannot join room without a signaling URL', { roomId });
return;
}
this.serverSignalingUrlMap.set(roomId, resolvedSignalUrl);
this.lastJoinedServerBySignalUrl.set(resolvedSignalUrl, {
serverId: roomId,
userId
});
this.getOrCreateMemberServerSet(resolvedSignalUrl).add(roomId);
this.sendRawMessageToSignalUrl(resolvedSignalUrl, {
type: SIGNALING_TYPE_JOIN_SERVER,
serverId: roomId
});
}
/**
* Switch to a different server. If already a member, sends a view event;
* otherwise joins the server.
*
* @param serverId - The target server ID.
* @param userId - The local user ID.
*/
switchServer(serverId: string, userId: string, signalUrl?: string): void {
const resolvedSignalUrl = signalUrl
?? this.serverSignalingUrlMap.get(serverId)
?? this.getCurrentSignalingUrl();
if (!resolvedSignalUrl) {
this.logger.warn('[signaling] Cannot switch server without a signaling URL', { serverId });
return;
}
this.serverSignalingUrlMap.set(serverId, resolvedSignalUrl);
this.lastJoinedServerBySignalUrl.set(resolvedSignalUrl, {
serverId,
userId
});
const memberServerIds = this.getOrCreateMemberServerSet(resolvedSignalUrl);
if (memberServerIds.has(serverId)) {
this.sendRawMessageToSignalUrl(resolvedSignalUrl, {
type: SIGNALING_TYPE_VIEW_SERVER,
serverId
});
this.logger.info('Viewed server (already joined)', {
serverId,
signalUrl: resolvedSignalUrl,
userId,
voiceConnected: this._isVoiceConnected()
});
} else {
memberServerIds.add(serverId);
this.sendRawMessageToSignalUrl(resolvedSignalUrl, {
type: SIGNALING_TYPE_JOIN_SERVER,
serverId
});
this.logger.info('Joined new server via switch', {
serverId,
signalUrl: resolvedSignalUrl,
userId,
voiceConnected: this._isVoiceConnected()
});
}
}
/**
* Leave one or all servers.
*
* If `serverId` is provided, leaves only that server.
* Otherwise leaves every joined server and performs a full cleanup.
*
* @param serverId - Optional server to leave; omit to leave all.
*/
leaveRoom(serverId?: string): void {
if (serverId) {
const resolvedSignalUrl = this.serverSignalingUrlMap.get(serverId);
if (resolvedSignalUrl) {
this.getOrCreateMemberServerSet(resolvedSignalUrl).delete(serverId);
this.sendRawMessageToSignalUrl(resolvedSignalUrl, {
type: SIGNALING_TYPE_LEAVE_SERVER,
serverId
});
} else {
this.sendRawMessage({
type: SIGNALING_TYPE_LEAVE_SERVER,
serverId
});
for (const memberServerIds of this.memberServerIdsBySignalUrl.values()) {
memberServerIds.delete(serverId);
}
}
this.serverSignalingUrlMap.delete(serverId);
this.logger.info('Left server', { serverId });
if (this.getJoinedServerCount() === 0) {
this.fullCleanup();
}
return;
}
for (const [signalUrl, memberServerIds] of this.memberServerIdsBySignalUrl.entries()) {
for (const sid of memberServerIds) {
this.sendRawMessageToSignalUrl(signalUrl, {
type: SIGNALING_TYPE_LEAVE_SERVER,
serverId: sid
});
}
}
this.memberServerIdsBySignalUrl.clear();
this.serverSignalingUrlMap.clear();
this.fullCleanup();
}
/**
* Check whether the local client has joined a given server.
*
* @param serverId - The server to check.
*/
hasJoinedServer(serverId: string): boolean {
return this.isJoinedServer(serverId);
}
/** Returns a read-only set of all currently-joined server IDs. */
getJoinedServerIds(): ReadonlySet<string> {
const joinedServerIds = new Set<string>();
for (const memberServerIds of this.memberServerIdsBySignalUrl.values()) {
memberServerIds.forEach((serverId) => joinedServerIds.add(serverId));
}
return joinedServerIds;
}
/**
* Broadcast a {@link ChatEvent} to every connected peer.
*
* @param event - The chat event to send.
*/
broadcastMessage(event: ChatEvent): void {
this.peerManager.broadcastMessage(event);
}
/**
* Send a {@link ChatEvent} to a specific peer.
*
* @param peerId - The target peer ID.
* @param event - The chat event to send.
*/
sendToPeer(peerId: string, event: ChatEvent): void {
this.peerManager.sendToPeer(peerId, event);
}
syncRemoteScreenShareRequests(peerIds: string[], enabled: boolean): void {
const nextDesiredPeers = new Set(
peerIds.filter((peerId): peerId is string => !!peerId)
);
if (!enabled) {
this.remoteScreenShareRequestsEnabled = false;
this.desiredRemoteScreenSharePeers.clear();
this.stopRemoteScreenShares([...this.activeRemoteScreenSharePeers]);
return;
}
this.remoteScreenShareRequestsEnabled = true;
for (const activePeerId of [...this.activeRemoteScreenSharePeers]) {
if (!nextDesiredPeers.has(activePeerId)) {
this.stopRemoteScreenShares([activePeerId]);
}
}
this.desiredRemoteScreenSharePeers.clear();
nextDesiredPeers.forEach((peerId) => this.desiredRemoteScreenSharePeers.add(peerId));
this.requestRemoteScreenShares([...nextDesiredPeers]);
}
/**
* Send a {@link ChatEvent} to a peer with back-pressure awareness.
*
* @param peerId - The target peer ID.
* @param event - The chat event to send.
*/
async sendToPeerBuffered(peerId: string, event: ChatEvent): Promise<void> {
return this.peerManager.sendToPeerBuffered(peerId, event);
}
/** Returns an array of currently-connected peer IDs. */
getConnectedPeers(): string[] {
return this.peerManager.getConnectedPeerIds();
}
/**
* Get the composite remote {@link MediaStream} for a connected peer.
*
* @param peerId - The remote peer whose stream to retrieve.
* @returns The stream, or `null` if the peer has no active stream.
*/
getRemoteStream(peerId: string): MediaStream | null {
return this.peerManager.remotePeerStreams.get(peerId) ?? null;
}
/**
* Get the remote voice-only stream for a connected peer.
*
* @param peerId - The remote peer whose voice stream to retrieve.
* @returns The stream, or `null` if the peer has no active voice audio.
*/
getRemoteVoiceStream(peerId: string): MediaStream | null {
return this.peerManager.remotePeerVoiceStreams.get(peerId) ?? null;
}
/**
* Get the remote screen-share stream for a connected peer.
*
* This contains the screen video track and any audio track that belongs to
* the screen share itself, not the peer's normal voice-chat audio.
*
* @param peerId - The remote peer whose screen-share stream to retrieve.
* @returns The stream, or `null` if the peer has no active screen share.
*/
getRemoteScreenShareStream(peerId: string): MediaStream | null {
return this.peerManager.remotePeerScreenShareStreams.get(peerId) ?? null;
}
/**
* Get the current local media stream (microphone audio).
*
* @returns The local {@link MediaStream}, or `null` if voice is not active.
*/
getLocalStream(): MediaStream | null {
return this.mediaManager.getLocalStream();
}
/**
* Get the raw local microphone stream before gain / RNNoise processing.
*
* @returns The raw microphone {@link MediaStream}, or `null` if voice is not active.
*/
getRawMicStream(): MediaStream | null {
return this.mediaManager.getRawMicStream();
}
/**
* Request microphone access and start sending audio to all peers.
*
* @returns The captured local {@link MediaStream}.
*/
async enableVoice(): Promise<MediaStream> {
const stream = await this.mediaManager.enableVoice();
this.syncMediaSignals();
return stream;
}
/** Stop local voice capture and remove audio senders from peers. */
disableVoice(): void {
this.voiceServerId = null;
this.mediaManager.disableVoice();
this._isVoiceConnected.set(false);
}
/**
* Inject an externally-obtained media stream as the local voice source.
*
* @param stream - The media stream to use.
*/
async setLocalStream(stream: MediaStream): Promise<void> {
await this.mediaManager.setLocalStream(stream);
this.syncMediaSignals();
}
/**
* Toggle the local microphone mute state.
*
* @param muted - Explicit state; if omitted, the current state is toggled.
*/
toggleMute(muted?: boolean): void {
this.mediaManager.toggleMute(muted);
this._isMuted.set(this.mediaManager.getIsMicMuted());
}
/**
* Toggle self-deafen (suppress incoming audio playback).
*
* @param deafened - Explicit state; if omitted, the current state is toggled.
*/
toggleDeafen(deafened?: boolean): void {
this.mediaManager.toggleDeafen(deafened);
this._isDeafened.set(this.mediaManager.getIsSelfDeafened());
}
/**
* Toggle RNNoise noise reduction on the local microphone.
*
* When enabled, the raw mic audio is routed through an AudioWorklet
* that applies neural-network noise suppression before being sent
* to peers.
*
* @param enabled - Explicit state; if omitted, the current state is toggled.
*/
async toggleNoiseReduction(enabled?: boolean): Promise<void> {
await this.mediaManager.toggleNoiseReduction(enabled);
this._isNoiseReductionEnabled.set(this.mediaManager.getIsNoiseReductionEnabled());
}
/**
* Set the output volume for remote audio playback.
*
* @param volume - Normalised volume (0-1).
*/
setOutputVolume(volume: number): void {
this.mediaManager.setOutputVolume(volume);
}
/**
* Set the input (microphone) volume.
*
* Adjusts a Web Audio GainNode on the local mic stream so the level
* sent to peers changes in real time without renegotiation.
*
* @param volume - Normalised volume (0-1).
*/
setInputVolume(volume: number): void {
this.mediaManager.setInputVolume(volume);
}
/**
* Set the maximum audio bitrate for all peer connections.
*
* @param kbps - Target bitrate in kilobits per second.
*/
async setAudioBitrate(kbps: number): Promise<void> {
return this.mediaManager.setAudioBitrate(kbps);
}
/**
* Apply a predefined latency profile that maps to a specific bitrate.
*
* @param profile - One of `'low'`, `'balanced'`, or `'high'`.
*/
async setLatencyProfile(profile: LatencyProfile): Promise<void> {
return this.mediaManager.setLatencyProfile(profile);
}
/**
* Start broadcasting voice-presence heartbeats to all peers.
*
* Also marks the given server as the active voice server and closes
* any peer connections that belong to other servers so that audio
* is isolated to the correct voice channel.
*
* @param roomId - The voice channel room ID.
* @param serverId - The voice channel server ID.
*/
startVoiceHeartbeat(roomId?: string, serverId?: string): void {
if (serverId) {
this.voiceServerId = serverId;
}
this.mediaManager.startVoiceHeartbeat(roomId, serverId);
}
/** Stop the voice-presence heartbeat. */
stopVoiceHeartbeat(): void {
this.mediaManager.stopVoiceHeartbeat();
}
/**
* Start sharing the screen (or a window) with all connected peers.
*
* @param options - Screen-share capture options.
* @returns The screen-capture {@link MediaStream}.
*/
async startScreenShare(options: ScreenShareStartOptions): Promise<MediaStream> {
return await this.screenShareManager.startScreenShare(options);
}
/** Stop screen sharing and restore microphone audio on all peers. */
stopScreenShare(): void {
this.screenShareManager.stopScreenShare();
}
/** Disconnect from the signaling server and clean up all state. */
disconnect(): void {
this.leaveRoom();
this.voiceServerId = null;
this.peerServerMap.clear();
this.peerSignalingUrlMap.clear();
this.lastJoinedServerBySignalUrl.clear();
this.memberServerIdsBySignalUrl.clear();
this.serverSignalingUrlMap.clear();
this.mediaManager.stopVoiceHeartbeat();
this.destroyAllSignalingManagers();
this._isSignalingConnected.set(false);
this._hasEverConnected.set(false);
this._hasConnectionError.set(false);
this._connectionErrorMessage.set(null);
this.serviceDestroyed$.next();
}
/** Alias for {@link disconnect}. */
disconnectAll(): void {
this.disconnect();
}
private fullCleanup(): void {
this.voiceServerId = null;
this.peerServerMap.clear();
this.peerSignalingUrlMap.clear();
this.remoteScreenShareRequestsEnabled = false;
this.desiredRemoteScreenSharePeers.clear();
this.activeRemoteScreenSharePeers.clear();
this.peerManager.closeAllPeers();
this._connectedPeers.set([]);
this.mediaManager.disableVoice();
this._isVoiceConnected.set(false);
this.screenShareManager.stopScreenShare();
this._isScreenSharing.set(false);
this._screenStreamSignal.set(null);
this._isScreenShareRemotePlaybackSuppressed.set(false);
this._forceDefaultRemotePlaybackOutput.set(false);
}
/** Synchronise Angular signals from the MediaManager's internal state. */
private syncMediaSignals(): void {
this._isVoiceConnected.set(this.mediaManager.getIsVoiceActive());
this._isMuted.set(this.mediaManager.getIsMicMuted());
this._isDeafened.set(this.mediaManager.getIsSelfDeafened());
}
/** Returns true if a peer connection is still alive enough to finish negotiating. */
private canReusePeerConnection(peer: import('./webrtc').PeerData | undefined): boolean {
if (!peer)
return false;
const connState = peer.connection?.connectionState;
return connState !== 'closed' && connState !== 'failed';
}
private handlePeerControlMessage(event: ChatEvent): void {
if (!event.fromPeerId) {
return;
}
if (event.type === P2P_TYPE_SCREEN_STATE && event.isScreenSharing === false) {
this.peerManager.clearRemoteScreenShareStream(event.fromPeerId);
return;
}
if (event.type === P2P_TYPE_SCREEN_SHARE_REQUEST) {
this.screenShareManager.requestScreenShareForPeer(event.fromPeerId);
return;
}
if (event.type === P2P_TYPE_SCREEN_SHARE_STOP) {
this.screenShareManager.stopScreenShareForPeer(event.fromPeerId);
}
}
private requestRemoteScreenShares(peerIds: string[]): void {
const connectedPeerIds = new Set(this.peerManager.getConnectedPeerIds());
for (const peerId of peerIds) {
if (!connectedPeerIds.has(peerId) || this.activeRemoteScreenSharePeers.has(peerId)) {
continue;
}
this.peerManager.sendToPeer(peerId, { type: P2P_TYPE_SCREEN_SHARE_REQUEST });
this.activeRemoteScreenSharePeers.add(peerId);
}
}
private stopRemoteScreenShares(peerIds: string[]): void {
const connectedPeerIds = new Set(this.peerManager.getConnectedPeerIds());
for (const peerId of peerIds) {
if (this.activeRemoteScreenSharePeers.has(peerId) && connectedPeerIds.has(peerId)) {
this.peerManager.sendToPeer(peerId, { type: P2P_TYPE_SCREEN_SHARE_STOP });
}
this.activeRemoteScreenSharePeers.delete(peerId);
this.peerManager.clearRemoteScreenShareStream(peerId);
}
}
private destroyAllSignalingManagers(): void {
for (const subscriptions of this.signalingSubscriptions.values()) {
for (const subscription of subscriptions) {
subscription.unsubscribe();
}
}
for (const manager of this.signalingManagers.values()) {
manager.destroy();
}
this.signalingSubscriptions.clear();
this.signalingManagers.clear();
this.signalingConnectionStates.clear();
}
ngOnDestroy(): void {
this.disconnect();
this.serviceDestroyed$.complete();
this.peerManager.destroy();
this.mediaManager.destroy();
this.screenShareManager.destroy();
}
}