Files
Toju/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts

573 lines
18 KiB
TypeScript

/* eslint-disable @typescript-eslint/member-ordering, @typescript-eslint/no-non-null-assertion,, max-statements-per-line */
/**
* Manages the WebSocket connection to the signaling server,
* including automatic reconnection and heartbeats.
*/
import {
Observable,
Subject,
of
} from 'rxjs';
import type { SignalingMessage } from '../../../shared-kernel';
import { recordDebugNetworkSignalingPayload } from '../logging/debug-network-metrics';
import { IdentifyCredentials, JoinedServerInfo } from '../realtime.types';
import { WebRTCLogger } from '../logging/webrtc-logger';
import {
SIGNALING_RECONNECT_BASE_DELAY_MS,
SIGNALING_RECONNECT_MAX_DELAY_MS,
SIGNALING_CONNECT_TIMEOUT_MS,
STATE_HEARTBEAT_INTERVAL_MS,
SIGNALING_TYPE_IDENTIFY,
SIGNALING_TYPE_JOIN_SERVER,
SIGNALING_TYPE_VIEW_SERVER
} from '../realtime.constants';
interface ParsedSignalingPayload {
sdp?: RTCSessionDescriptionInit;
candidate?: RTCIceCandidateInit;
}
type ParsedSignalingMessage = Omit<Partial<SignalingMessage>, 'type' | 'payload'> &
Record<string, unknown> & {
type: string;
payload?: ParsedSignalingPayload;
};
export class SignalingManager {
private signalingWebSocket: WebSocket | null = null;
private lastSignalingUrl: string | null = null;
private signalingReconnectAttempts = 0;
private signalingReconnectTimer: ReturnType<typeof setTimeout> | null = null;
private stateHeartbeatTimer: ReturnType<typeof setInterval> | null = null;
/** Fires every heartbeat tick - the main service hooks this to broadcast state. */
readonly heartbeatTick$ = new Subject<void>();
/** Fires whenever a raw signaling message arrives from the server. */
readonly messageReceived$ = new Subject<ParsedSignalingMessage>();
/** Fires when connection status changes (true = open, false = closed/error). */
readonly connectionStatus$ = new Subject<{ connected: boolean; errorMessage?: string }>();
constructor(
private readonly logger: WebRTCLogger,
private readonly getLastIdentify: () => IdentifyCredentials | null,
private readonly getLastJoinedServer: () => JoinedServerInfo | null,
private readonly getMemberServerIds: () => ReadonlySet<string>
) {}
/** Open (or re-open) a WebSocket to the signaling server. */
connect(serverUrl: string): Observable<boolean> {
if (this.lastSignalingUrl === serverUrl) {
if (this.isSocketOpen()) {
return of(true);
}
if (this.isSocketConnecting()) {
return this.waitForOpen();
}
}
this.lastSignalingUrl = serverUrl;
return new Observable<boolean>((observer) => {
try {
this.logger.info('[signaling] Connecting to signaling server', { serverUrl });
const previousSocket = this.signalingWebSocket;
this.lastSignalingUrl = serverUrl;
const socket = new WebSocket(serverUrl);
this.signalingWebSocket = socket;
if (previousSocket && previousSocket !== socket) {
try {
previousSocket.close();
} catch {
this.logger.warn('[signaling] Failed to close previous signaling socket', {
url: serverUrl
});
}
}
socket.onopen = () => {
if (socket !== this.signalingWebSocket)
return;
this.logger.info('[signaling] Connected to signaling server', {
serverUrl,
readyState: this.getSocketReadyStateLabel()
});
this.clearReconnect();
this.startHeartbeat();
this.connectionStatus$.next({ connected: true });
this.reIdentifyAndRejoin();
observer.next(true);
observer.complete();
};
socket.onmessage = (event) => {
if (socket !== this.signalingWebSocket)
return;
const rawPayload = this.stringifySocketPayload(event.data);
const payloadBytes = rawPayload ? this.measurePayloadBytes(rawPayload) : null;
try {
const message = JSON.parse(rawPayload) as ParsedSignalingMessage;
const payloadPreview = this.buildPayloadPreview(message);
recordDebugNetworkSignalingPayload(message, 'inbound');
this.logger.traffic('signaling', 'inbound', {
...payloadPreview,
bytes: payloadBytes ?? undefined,
payloadPreview,
readyState: this.getSocketReadyStateLabel(),
type: typeof message.type === 'string' ? message.type : 'unknown',
url: serverUrl
});
this.messageReceived$.next(message);
} catch (error) {
this.logger.error('[signaling] Failed to parse signaling message', error, {
bytes: payloadBytes ?? undefined,
rawPreview: this.getPayloadPreview(rawPayload),
readyState: this.getSocketReadyStateLabel(),
url: serverUrl
});
}
};
socket.onerror = (error) => {
if (socket !== this.signalingWebSocket)
return;
this.logger.error('[signaling] Signaling socket error', error, {
readyState: this.getSocketReadyStateLabel(),
url: serverUrl
});
this.connectionStatus$.next({ connected: false,
errorMessage: 'Connection to signaling server failed' });
observer.error(error);
};
socket.onclose = (event) => {
if (socket !== this.signalingWebSocket)
return;
this.logger.warn('[signaling] Disconnected from signaling server', {
attempts: this.signalingReconnectAttempts,
code: event.code,
reason: event.reason || null,
url: serverUrl,
wasClean: event.wasClean
});
this.stopHeartbeat();
this.connectionStatus$.next({ connected: false,
errorMessage: 'Disconnected from signaling server' });
this.scheduleReconnect();
};
} catch (error) {
this.logger.error('[signaling] Failed to initialize signaling socket', error, {
readyState: this.getSocketReadyStateLabel(),
url: serverUrl
});
observer.error(error);
}
});
}
/** Ensure signaling is connected; try reconnecting if not. */
async ensureConnected(timeoutMs: number = SIGNALING_CONNECT_TIMEOUT_MS): Promise<boolean> {
if (this.isSocketOpen())
return true;
if (!this.lastSignalingUrl)
return false;
return new Promise<boolean>((resolve) => {
let settled = false;
const timeout = setTimeout(() => {
if (!settled) { settled = true; resolve(false); }
}, timeoutMs);
this.connect(this.lastSignalingUrl!).subscribe({
next: () => { if (!settled) { settled = true; clearTimeout(timeout); resolve(true); } },
error: () => { if (!settled) { settled = true; clearTimeout(timeout); resolve(false); } }
});
});
}
/** Send a signaling message (with `from` / `timestamp` populated). */
sendSignalingMessage(message: Omit<SignalingMessage, 'from' | 'timestamp'>, localPeerId: string): void {
if (!this.isSocketOpen()) {
this.logger.error('[signaling] Signaling socket not connected', new Error('Socket not open'), {
readyState: this.getSocketReadyStateLabel(),
type: message.type,
url: this.lastSignalingUrl
});
return;
}
const fullMessage: SignalingMessage = { ...message,
from: localPeerId,
timestamp: Date.now() };
this.sendSerializedPayload(fullMessage, {
targetPeerId: message.to,
type: message.type,
url: this.lastSignalingUrl
});
}
/** Send a raw JSON payload (for identify, join_server, etc.). */
sendRawMessage(message: Record<string, unknown>): void {
if (!this.isSocketOpen()) {
this.logger.error('[signaling] Signaling socket not connected', new Error('Socket not open'), {
readyState: this.getSocketReadyStateLabel(),
type: typeof message['type'] === 'string' ? message['type'] : 'unknown',
url: this.lastSignalingUrl
});
return;
}
this.sendSerializedPayload(message, {
targetPeerId: typeof message['targetUserId'] === 'string' ? message['targetUserId'] : undefined,
type: typeof message['type'] === 'string' ? message['type'] : 'unknown',
url: this.lastSignalingUrl
});
}
/** Gracefully close the WebSocket. */
close(): void {
this.stopHeartbeat();
this.clearReconnect();
const socket = this.signalingWebSocket;
this.signalingWebSocket = null;
if (socket) {
socket.close();
}
}
/** Whether the underlying WebSocket is currently open. */
isSocketOpen(): boolean {
return this.signalingWebSocket !== null && this.signalingWebSocket.readyState === WebSocket.OPEN;
}
isSocketConnecting(): boolean {
return this.signalingWebSocket !== null && this.signalingWebSocket.readyState === WebSocket.CONNECTING;
}
/** The URL last used to connect (needed for reconnection). */
getLastUrl(): string | null {
return this.lastSignalingUrl;
}
/** Re-identify and rejoin servers after a reconnect. */
private reIdentifyAndRejoin(): void {
const credentials = this.getLastIdentify();
if (credentials) {
this.sendRawMessage({ type: SIGNALING_TYPE_IDENTIFY,
oderId: credentials.oderId,
displayName: credentials.displayName,
connectionScope: this.lastSignalingUrl ?? undefined });
}
const memberIds = this.getMemberServerIds();
if (memberIds.size > 0) {
memberIds.forEach((serverId) => {
this.sendRawMessage({ type: SIGNALING_TYPE_JOIN_SERVER,
serverId });
});
const lastJoined = this.getLastJoinedServer();
if (lastJoined && memberIds.has(lastJoined.serverId)) {
this.sendRawMessage({ type: SIGNALING_TYPE_VIEW_SERVER,
serverId: lastJoined.serverId });
}
}
}
/**
* Schedule a reconnect attempt using exponential backoff.
*
* The delay doubles with each attempt up to {@link SIGNALING_RECONNECT_MAX_DELAY_MS}.
* No-ops if a timer is already pending or no URL is stored.
*/
private scheduleReconnect(): void {
if (this.signalingReconnectTimer || !this.lastSignalingUrl || this.isSocketOpen() || this.isSocketConnecting())
return;
const delay = Math.min(
SIGNALING_RECONNECT_MAX_DELAY_MS,
SIGNALING_RECONNECT_BASE_DELAY_MS * Math.pow(2, this.signalingReconnectAttempts)
);
this.signalingReconnectTimer = setTimeout(() => {
this.signalingReconnectTimer = null;
if (this.isSocketOpen() || this.isSocketConnecting()) {
return;
}
this.signalingReconnectAttempts++;
this.logger.info('[signaling] Attempting reconnect', {
attempt: this.signalingReconnectAttempts,
delay,
url: this.lastSignalingUrl
});
this.connect(this.lastSignalingUrl!).subscribe({
next: () => { this.signalingReconnectAttempts = 0; },
error: () => { this.scheduleReconnect(); }
});
}, delay);
}
private waitForOpen(timeoutMs: number = SIGNALING_CONNECT_TIMEOUT_MS): Observable<boolean> {
if (this.isSocketOpen()) {
return of(true);
}
return new Observable<boolean>((observer) => {
let settled = false;
const subscription = this.connectionStatus$.subscribe(({ connected }) => {
if (!connected || settled) {
return;
}
settled = true;
clearTimeout(timeout);
subscription.unsubscribe();
observer.next(true);
observer.complete();
});
const timeout = setTimeout(() => {
if (settled) {
return;
}
settled = true;
subscription.unsubscribe();
observer.next(this.isSocketOpen());
observer.complete();
}, timeoutMs);
return () => {
settled = true;
clearTimeout(timeout);
subscription.unsubscribe();
};
});
}
/** Cancel any pending reconnect timer and reset the attempt counter. */
private clearReconnect(): void {
if (this.signalingReconnectTimer) {
clearTimeout(this.signalingReconnectTimer);
this.signalingReconnectTimer = null;
}
this.signalingReconnectAttempts = 0;
}
/** Start the heartbeat interval that drives periodic state broadcasts. */
private startHeartbeat(): void {
this.stopHeartbeat();
this.stateHeartbeatTimer = setInterval(() => this.heartbeatTick$.next(), STATE_HEARTBEAT_INTERVAL_MS);
}
/** Stop the heartbeat interval. */
private stopHeartbeat(): void {
if (this.stateHeartbeatTimer) {
clearInterval(this.stateHeartbeatTimer);
this.stateHeartbeatTimer = null;
}
}
/** Clean up all resources. */
destroy(): void {
this.close();
this.heartbeatTick$.complete();
this.messageReceived$.complete();
this.connectionStatus$.complete();
}
private sendSerializedPayload(
message: SignalingMessage | Record<string, unknown>,
details: { targetPeerId?: string; type?: string; url?: string | null }
): void {
let rawPayload = '';
const payloadPreview = this.buildPayloadPreview(message);
recordDebugNetworkSignalingPayload(message, 'outbound');
try {
rawPayload = JSON.stringify(message);
} catch (error) {
this.logger.error('[signaling] Failed to serialize signaling payload', error, {
payloadPreview,
type: details.type,
url: details.url
});
throw error;
}
try {
this.signalingWebSocket!.send(rawPayload);
this.logger.traffic('signaling', 'outbound', {
...payloadPreview,
bytes: this.measurePayloadBytes(rawPayload),
payloadPreview,
readyState: this.getSocketReadyStateLabel(),
targetPeerId: details.targetPeerId,
type: details.type,
url: details.url
});
} catch (error) {
this.logger.error('[signaling] Failed to send signaling payload', error, {
bytes: this.measurePayloadBytes(rawPayload),
payloadPreview,
readyState: this.getSocketReadyStateLabel(),
targetPeerId: details.targetPeerId,
type: details.type,
url: details.url
});
throw error;
}
}
private getSocketReadyStateLabel(): string {
const readyState = this.signalingWebSocket?.readyState;
switch (readyState) {
case WebSocket.CONNECTING:
return 'connecting';
case WebSocket.OPEN:
return 'open';
case WebSocket.CLOSING:
return 'closing';
case WebSocket.CLOSED:
return 'closed';
default:
return 'unavailable';
}
}
private stringifySocketPayload(payload: unknown): string {
if (typeof payload === 'string')
return payload;
if (payload instanceof ArrayBuffer)
return new TextDecoder().decode(payload);
return String(payload ?? '');
}
private measurePayloadBytes(payload: string): number {
return new TextEncoder().encode(payload).length;
}
private getPayloadPreview(payload: string): string {
return payload.replace(/\s+/g, ' ').slice(0, 240);
}
private buildPayloadPreview(payload: SignalingMessage | Record<string, unknown>): Record<string, unknown> {
const record = payload as Record<string, unknown>;
const voiceState = this.summarizeVoiceState(record['voiceState']);
const users = this.summarizeUsers(record['users']);
const preview: Record<string, unknown> = {
keys: Object.keys(record).slice(0, 10),
type: typeof record['type'] === 'string' ? record['type'] : 'unknown'
};
this.assignPreviewValue(preview, 'displayName', typeof record['displayName'] === 'string' ? record['displayName'] : undefined);
this.assignPreviewValue(preview, 'fromUserId', typeof record['fromUserId'] === 'string' ? record['fromUserId'] : undefined);
this.assignPreviewValue(preview, 'isScreenSharing', typeof record['isScreenSharing'] === 'boolean' ? record['isScreenSharing'] : undefined);
this.assignPreviewValue(preview, 'oderId', typeof record['oderId'] === 'string' ? record['oderId'] : undefined);
this.assignPreviewValue(preview, 'roomId', typeof record['roomId'] === 'string' ? record['roomId'] : undefined);
this.assignPreviewValue(preview, 'serverId', typeof record['serverId'] === 'string' ? record['serverId'] : undefined);
this.assignPreviewValue(preview, 'targetPeerId', typeof record['targetUserId'] === 'string' ? record['targetUserId'] : undefined);
this.assignPreviewValue(preview, 'userCount', Array.isArray(record['users']) ? record['users'].length : undefined);
this.assignPreviewValue(preview, 'users', users);
this.assignPreviewValue(preview, 'voiceState', voiceState);
return preview;
}
private summarizeVoiceState(value: unknown): Record<string, unknown> | undefined {
const voiceState = this.asRecord(value);
if (!voiceState)
return undefined;
const summary: Record<string, unknown> = {
isConnected: voiceState['isConnected'] === true,
isMuted: voiceState['isMuted'] === true,
isDeafened: voiceState['isDeafened'] === true,
isSpeaking: voiceState['isSpeaking'] === true
};
this.assignPreviewValue(summary, 'roomId', typeof voiceState['roomId'] === 'string' ? voiceState['roomId'] : undefined);
this.assignPreviewValue(summary, 'serverId', typeof voiceState['serverId'] === 'string' ? voiceState['serverId'] : undefined);
this.assignPreviewValue(summary, 'volume', typeof voiceState['volume'] === 'number' ? voiceState['volume'] : undefined);
return summary;
}
private summarizeUsers(value: unknown): Record<string, unknown>[] | undefined {
if (!Array.isArray(value))
return undefined;
const users: Record<string, unknown>[] = [];
for (const userValue of value.slice(0, 20)) {
const user = this.asRecord(userValue);
if (!user)
continue;
const summary: Record<string, unknown> = {};
this.assignPreviewValue(summary, 'displayName', typeof user['displayName'] === 'string' ? user['displayName'] : undefined);
this.assignPreviewValue(summary, 'oderId', typeof user['oderId'] === 'string' ? user['oderId'] : undefined);
users.push(summary);
}
return users;
}
private assignPreviewValue(target: Record<string, unknown>, key: string, value: unknown): void {
if (value !== undefined)
target[key] = value;
}
private asRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== 'object' || Array.isArray(value))
return null;
return value as Record<string, unknown>;
}
}