836 lines
26 KiB
TypeScript
836 lines
26 KiB
TypeScript
/* eslint-disable @typescript-eslint/member-ordering, @typescript-eslint/no-non-null-assertion,, max-statements-per-line */
|
|
/**
|
|
* Manages the WebSocket connection to the signaling server,
|
|
* including automatic reconnection and heartbeats.
|
|
*/
|
|
import {
|
|
Observable,
|
|
Subject,
|
|
of
|
|
} from 'rxjs';
|
|
import type { SignalingMessage } from '../../../shared-kernel';
|
|
import { recordDebugNetworkSignalingPayload } from '../logging/debug-network-metrics';
|
|
import { IdentifyCredentials, JoinedServerInfo } from '../realtime.types';
|
|
import { WebRTCLogger } from '../logging/webrtc-logger';
|
|
import {
|
|
SIGNALING_RECONNECT_BASE_DELAY_MS,
|
|
SIGNALING_RECONNECT_MAX_DELAY_MS,
|
|
SIGNALING_CONNECT_TIMEOUT_MS,
|
|
STATE_HEARTBEAT_INTERVAL_MS,
|
|
SIGNALING_KEEPALIVE_INTERVAL_MS,
|
|
SIGNALING_HEALTH_PROBE_INTERVAL_MS,
|
|
SIGNALING_KEEPALIVE_ACK_TIMEOUT_MS,
|
|
SIGNALING_TYPE_IDENTIFY,
|
|
SIGNALING_TYPE_JOIN_SERVER,
|
|
SIGNALING_TYPE_KEEPALIVE,
|
|
SIGNALING_TYPE_KEEPALIVE_ACK,
|
|
SIGNALING_TYPE_VIEW_SERVER,
|
|
isTransientSignalingOutboundType
|
|
} from '../realtime.constants';
|
|
import { probeSignalingEndpointHealth } from './signaling-endpoint-health.rules';
|
|
|
|
interface ParsedSignalingPayload {
|
|
sdp?: RTCSessionDescriptionInit;
|
|
candidate?: RTCIceCandidateInit;
|
|
}
|
|
|
|
type ParsedSignalingMessage = Omit<Partial<SignalingMessage>, 'type' | 'payload'> &
|
|
Record<string, unknown> & {
|
|
type: string;
|
|
payload?: ParsedSignalingPayload;
|
|
};
|
|
|
|
export class SignalingManager {
|
|
private signalingWebSocket: WebSocket | null = null;
|
|
private lastSignalingUrl: string | null = null;
|
|
private signalingReconnectAttempts = 0;
|
|
private signalingReconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
|
private stateHeartbeatTimer: ReturnType<typeof setInterval> | null = null;
|
|
private lastKeepaliveSentAt = 0;
|
|
private lastKeepaliveAckAt = 0;
|
|
private serverSupportsKeepaliveAck = false;
|
|
private lastEndpointHealthOk: boolean | null = null;
|
|
private lastKnownServerInstanceId: string | null = null;
|
|
private lastEndpointHealthProbeAt = 0;
|
|
private endpointHealthProbeInFlight = false;
|
|
private connectAttemptStartedAt = 0;
|
|
|
|
/** Fires every heartbeat tick - the main service hooks this to broadcast state. */
|
|
readonly heartbeatTick$ = new Subject<void>();
|
|
|
|
/** Fires whenever a raw signaling message arrives from the server. */
|
|
readonly messageReceived$ = new Subject<ParsedSignalingMessage>();
|
|
|
|
/** Fires when connection status changes (true = open, false = closed/error). */
|
|
readonly connectionStatus$ = new Subject<{ connected: boolean; errorMessage?: string }>();
|
|
|
|
constructor(
|
|
private readonly logger: WebRTCLogger,
|
|
private readonly getLastIdentify: () => IdentifyCredentials | null,
|
|
private readonly getLastJoinedServer: () => JoinedServerInfo | null,
|
|
private readonly getMemberServerIds: () => ReadonlySet<string>
|
|
) {}
|
|
|
|
/** Open (or re-open) a WebSocket to the signaling server. */
|
|
connect(serverUrl: string): Observable<boolean> {
|
|
if (this.lastSignalingUrl === serverUrl) {
|
|
if (this.isSocketOpen()) {
|
|
return of(true);
|
|
}
|
|
|
|
if (this.isSocketConnecting()) {
|
|
const connectAge = Date.now() - this.connectAttemptStartedAt;
|
|
|
|
if (connectAge < SIGNALING_CONNECT_TIMEOUT_MS) {
|
|
return this.waitForOpen();
|
|
}
|
|
|
|
this.discardCurrentSocket();
|
|
}
|
|
}
|
|
|
|
this.lastSignalingUrl = serverUrl;
|
|
return new Observable<boolean>((observer) => {
|
|
let connectTimeout: ReturnType<typeof setTimeout> | null = null;
|
|
|
|
const clearConnectTimeout = (): void => {
|
|
if (!connectTimeout) {
|
|
return;
|
|
}
|
|
|
|
clearTimeout(connectTimeout);
|
|
connectTimeout = null;
|
|
};
|
|
|
|
try {
|
|
this.logger.info('[signaling] Connecting to signaling server', { serverUrl });
|
|
|
|
const previousSocket = this.signalingWebSocket;
|
|
|
|
this.lastSignalingUrl = serverUrl;
|
|
this.connectAttemptStartedAt = Date.now();
|
|
const socket = new WebSocket(serverUrl);
|
|
|
|
this.signalingWebSocket = socket;
|
|
|
|
if (previousSocket && previousSocket !== socket) {
|
|
this.detachSocketHandlers(previousSocket);
|
|
|
|
try {
|
|
previousSocket.close();
|
|
} catch {
|
|
this.logger.warn('[signaling] Failed to close previous signaling socket', {
|
|
url: serverUrl
|
|
});
|
|
}
|
|
}
|
|
|
|
connectTimeout = setTimeout(() => {
|
|
if (socket !== this.signalingWebSocket || !this.isSocketConnecting()) {
|
|
return;
|
|
}
|
|
|
|
this.logger.warn('[signaling] Signaling connect attempt timed out', {
|
|
timeoutMs: SIGNALING_CONNECT_TIMEOUT_MS,
|
|
url: serverUrl
|
|
});
|
|
|
|
clearConnectTimeout();
|
|
this.discardCurrentSocket();
|
|
this.connectionStatus$.next({
|
|
connected: false,
|
|
errorMessage: 'network.signaling.connectTimeout'
|
|
});
|
|
|
|
this.scheduleReconnect();
|
|
observer.next(false);
|
|
observer.complete();
|
|
}, SIGNALING_CONNECT_TIMEOUT_MS);
|
|
|
|
socket.onopen = () => {
|
|
if (socket !== this.signalingWebSocket)
|
|
return;
|
|
|
|
clearConnectTimeout();
|
|
|
|
this.logger.info('[signaling] Connected to signaling server', {
|
|
serverUrl,
|
|
readyState: this.getSocketReadyStateLabel()
|
|
});
|
|
|
|
this.clearReconnect();
|
|
this.lastKeepaliveAckAt = Date.now();
|
|
this.lastEndpointHealthOk = null;
|
|
this.lastKnownServerInstanceId = null;
|
|
this.lastEndpointHealthProbeAt = 0;
|
|
this.startHeartbeat();
|
|
this.connectionStatus$.next({ connected: true });
|
|
this.reIdentifyAndRejoin();
|
|
observer.next(true);
|
|
observer.complete();
|
|
};
|
|
|
|
socket.onmessage = (event) => {
|
|
if (socket !== this.signalingWebSocket)
|
|
return;
|
|
|
|
const rawPayload = this.stringifySocketPayload(event.data);
|
|
const payloadBytes = rawPayload ? this.measurePayloadBytes(rawPayload) : null;
|
|
|
|
try {
|
|
const message = JSON.parse(rawPayload) as ParsedSignalingMessage;
|
|
const payloadPreview = this.buildPayloadPreview(message);
|
|
|
|
recordDebugNetworkSignalingPayload(message, 'inbound');
|
|
|
|
this.logger.traffic('signaling', 'inbound', {
|
|
...payloadPreview,
|
|
bytes: payloadBytes ?? undefined,
|
|
payloadPreview,
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
type: typeof message.type === 'string' ? message.type : 'unknown',
|
|
url: serverUrl
|
|
});
|
|
|
|
if (message.type === SIGNALING_TYPE_KEEPALIVE_ACK) {
|
|
this.serverSupportsKeepaliveAck = true;
|
|
this.lastKeepaliveAckAt = Date.now();
|
|
} else {
|
|
this.messageReceived$.next(message);
|
|
}
|
|
} catch (error) {
|
|
this.logger.error('[signaling] Failed to parse signaling message', error, {
|
|
bytes: payloadBytes ?? undefined,
|
|
rawPreview: this.getPayloadPreview(rawPayload),
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
url: serverUrl
|
|
});
|
|
}
|
|
};
|
|
|
|
socket.onerror = (error) => {
|
|
if (socket !== this.signalingWebSocket)
|
|
return;
|
|
|
|
clearConnectTimeout();
|
|
|
|
this.logger.error('[signaling] Signaling socket error', error, {
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
url: serverUrl
|
|
});
|
|
|
|
this.connectionStatus$.next({ connected: false,
|
|
errorMessage: 'network.signaling.connectionFailed' });
|
|
|
|
observer.error(error);
|
|
};
|
|
|
|
socket.onclose = (event) => {
|
|
if (socket !== this.signalingWebSocket)
|
|
return;
|
|
|
|
clearConnectTimeout();
|
|
|
|
this.logger.warn('[signaling] Disconnected from signaling server', {
|
|
attempts: this.signalingReconnectAttempts,
|
|
code: event.code,
|
|
reason: event.reason || null,
|
|
url: serverUrl,
|
|
wasClean: event.wasClean
|
|
});
|
|
|
|
this.stopHeartbeat();
|
|
this.connectionStatus$.next({ connected: false,
|
|
errorMessage: 'network.signaling.disconnected' });
|
|
|
|
this.scheduleReconnect();
|
|
};
|
|
} catch (error) {
|
|
this.logger.error('[signaling] Failed to initialize signaling socket', error, {
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
url: serverUrl
|
|
});
|
|
|
|
observer.error(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
/** Ensure signaling is connected; try reconnecting if not. */
|
|
async ensureConnected(timeoutMs: number = SIGNALING_CONNECT_TIMEOUT_MS): Promise<boolean> {
|
|
if (this.isSocketOpen())
|
|
return true;
|
|
|
|
if (!this.lastSignalingUrl)
|
|
return false;
|
|
|
|
return new Promise<boolean>((resolve) => {
|
|
let settled = false;
|
|
|
|
const timeout = setTimeout(() => {
|
|
if (!settled) { settled = true; resolve(false); }
|
|
}, timeoutMs);
|
|
|
|
this.connect(this.lastSignalingUrl!).subscribe({
|
|
next: (connected) => {
|
|
if (!settled) {
|
|
settled = true;
|
|
clearTimeout(timeout);
|
|
resolve(connected);
|
|
}
|
|
},
|
|
error: () => { if (!settled) { settled = true; clearTimeout(timeout); resolve(false); } }
|
|
});
|
|
});
|
|
}
|
|
|
|
/** Send a signaling message (with `from` / `timestamp` populated). */
|
|
sendSignalingMessage(message: Omit<SignalingMessage, 'from' | 'timestamp'>, localPeerId: string): void {
|
|
if (!this.isSocketOpen()) {
|
|
if (isTransientSignalingOutboundType(message.type)) {
|
|
return;
|
|
}
|
|
|
|
this.logger.warn('[signaling] Signaling socket not connected', {
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
type: message.type,
|
|
url: this.lastSignalingUrl
|
|
});
|
|
|
|
return;
|
|
}
|
|
|
|
const fullMessage: SignalingMessage = { ...message,
|
|
from: localPeerId,
|
|
timestamp: Date.now() };
|
|
|
|
this.sendSerializedPayload(fullMessage, {
|
|
targetPeerId: message.to,
|
|
type: message.type,
|
|
url: this.lastSignalingUrl
|
|
});
|
|
}
|
|
|
|
/** Send a raw JSON payload (for identify, join_server, etc.). */
|
|
sendRawMessage(message: Record<string, unknown>): boolean {
|
|
const messageType = typeof message['type'] === 'string' ? message['type'] : 'unknown';
|
|
|
|
if (!this.isSocketOpen()) {
|
|
if (isTransientSignalingOutboundType(messageType)) {
|
|
return false;
|
|
}
|
|
|
|
this.logger.warn('[signaling] Signaling socket not connected', {
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
type: messageType,
|
|
url: this.lastSignalingUrl
|
|
});
|
|
|
|
return false;
|
|
}
|
|
|
|
this.sendSerializedPayload(message, {
|
|
targetPeerId: typeof message['targetUserId'] === 'string' ? message['targetUserId'] : undefined,
|
|
type: messageType,
|
|
url: this.lastSignalingUrl
|
|
});
|
|
|
|
return true;
|
|
}
|
|
|
|
/** Gracefully close the WebSocket. */
|
|
close(): void {
|
|
this.stopHeartbeat();
|
|
this.clearReconnect();
|
|
|
|
const socket = this.signalingWebSocket;
|
|
|
|
this.signalingWebSocket = null;
|
|
|
|
if (socket) {
|
|
socket.close();
|
|
}
|
|
}
|
|
|
|
/** Whether the underlying WebSocket is currently open. */
|
|
isSocketOpen(): boolean {
|
|
return this.signalingWebSocket !== null && this.signalingWebSocket.readyState === WebSocket.OPEN;
|
|
}
|
|
|
|
isSocketConnecting(): boolean {
|
|
return this.signalingWebSocket !== null && this.signalingWebSocket.readyState === WebSocket.CONNECTING;
|
|
}
|
|
|
|
/** The URL last used to connect (needed for reconnection). */
|
|
getLastUrl(): string | null {
|
|
return this.lastSignalingUrl;
|
|
}
|
|
|
|
/** Re-identify and rejoin servers after a reconnect. */
|
|
private reIdentifyAndRejoin(): void {
|
|
const credentials = this.getLastIdentify();
|
|
|
|
if (credentials) {
|
|
this.sendRawMessage({
|
|
type: SIGNALING_TYPE_IDENTIFY,
|
|
token: credentials.token,
|
|
oderId: credentials.oderId,
|
|
displayName: credentials.displayName,
|
|
description: credentials.description,
|
|
profileUpdatedAt: credentials.profileUpdatedAt,
|
|
homeSignalServerUrl: credentials.homeSignalServerUrl,
|
|
connectionScope: this.lastSignalingUrl ?? undefined,
|
|
clientInstanceId: credentials.clientInstanceId
|
|
});
|
|
}
|
|
|
|
const memberIds = this.getMemberServerIds();
|
|
|
|
if (memberIds.size > 0) {
|
|
memberIds.forEach((serverId) => {
|
|
this.sendRawMessage({ type: SIGNALING_TYPE_JOIN_SERVER,
|
|
serverId });
|
|
});
|
|
|
|
const lastJoined = this.getLastJoinedServer();
|
|
|
|
if (lastJoined && memberIds.has(lastJoined.serverId)) {
|
|
this.sendRawMessage({ type: SIGNALING_TYPE_VIEW_SERVER,
|
|
serverId: lastJoined.serverId });
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Schedule a reconnect attempt using exponential backoff.
|
|
*
|
|
* The delay doubles with each attempt up to {@link SIGNALING_RECONNECT_MAX_DELAY_MS}.
|
|
* No-ops if a timer is already pending or no URL is stored.
|
|
*/
|
|
private scheduleReconnect(): void {
|
|
if (this.signalingReconnectTimer || !this.lastSignalingUrl || this.isSocketOpen() || this.isSocketConnecting())
|
|
return;
|
|
|
|
const delay = Math.min(
|
|
SIGNALING_RECONNECT_MAX_DELAY_MS,
|
|
SIGNALING_RECONNECT_BASE_DELAY_MS * Math.pow(2, this.signalingReconnectAttempts)
|
|
);
|
|
|
|
this.signalingReconnectTimer = setTimeout(() => {
|
|
this.signalingReconnectTimer = null;
|
|
|
|
if (this.isSocketOpen() || this.isSocketConnecting()) {
|
|
return;
|
|
}
|
|
|
|
this.signalingReconnectAttempts++;
|
|
this.logger.info('[signaling] Attempting reconnect', {
|
|
attempt: this.signalingReconnectAttempts,
|
|
delay,
|
|
url: this.lastSignalingUrl
|
|
});
|
|
|
|
this.connect(this.lastSignalingUrl!).subscribe({
|
|
next: (connected) => {
|
|
if (connected) {
|
|
this.signalingReconnectAttempts = 0;
|
|
return;
|
|
}
|
|
|
|
this.scheduleReconnect();
|
|
},
|
|
error: () => { this.scheduleReconnect(); }
|
|
});
|
|
}, delay);
|
|
}
|
|
|
|
private waitForOpen(timeoutMs: number = SIGNALING_CONNECT_TIMEOUT_MS): Observable<boolean> {
|
|
if (this.isSocketOpen()) {
|
|
return of(true);
|
|
}
|
|
|
|
return new Observable<boolean>((observer) => {
|
|
let settled = false;
|
|
|
|
const subscription = this.connectionStatus$.subscribe(({ connected }) => {
|
|
if (!connected || settled) {
|
|
return;
|
|
}
|
|
|
|
settled = true;
|
|
clearTimeout(timeout);
|
|
subscription.unsubscribe();
|
|
observer.next(true);
|
|
observer.complete();
|
|
});
|
|
const timeout = setTimeout(() => {
|
|
if (settled) {
|
|
return;
|
|
}
|
|
|
|
settled = true;
|
|
subscription.unsubscribe();
|
|
observer.next(this.isSocketOpen());
|
|
observer.complete();
|
|
}, timeoutMs);
|
|
|
|
return () => {
|
|
settled = true;
|
|
clearTimeout(timeout);
|
|
subscription.unsubscribe();
|
|
};
|
|
});
|
|
}
|
|
|
|
private handleSocketTransportFailure(reason: string, error?: unknown): void {
|
|
if (!this.signalingWebSocket) {
|
|
return;
|
|
}
|
|
|
|
this.logger.warn('[signaling] Signaling transport failed; forcing reconnect', {
|
|
error,
|
|
reason,
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
url: this.lastSignalingUrl
|
|
});
|
|
|
|
this.stopHeartbeat();
|
|
this.discardCurrentSocket();
|
|
this.connectionStatus$.next({
|
|
connected: false,
|
|
errorMessage: reason
|
|
});
|
|
|
|
this.scheduleReconnect();
|
|
}
|
|
|
|
private discardCurrentSocket(): void {
|
|
const socket = this.signalingWebSocket;
|
|
|
|
this.signalingWebSocket = null;
|
|
this.connectAttemptStartedAt = 0;
|
|
|
|
if (!socket) {
|
|
return;
|
|
}
|
|
|
|
this.detachSocketHandlers(socket);
|
|
|
|
try {
|
|
socket.close();
|
|
} catch {
|
|
this.logger.warn('[signaling] Failed to discard signaling socket', {
|
|
url: this.lastSignalingUrl
|
|
});
|
|
}
|
|
}
|
|
|
|
private detachSocketHandlers(socket: WebSocket): void {
|
|
socket.onopen = null;
|
|
socket.onclose = null;
|
|
socket.onerror = null;
|
|
socket.onmessage = null;
|
|
}
|
|
|
|
/** Cancel any pending reconnect timer and reset the attempt counter. */
|
|
private clearReconnect(): void {
|
|
if (this.signalingReconnectTimer) {
|
|
clearTimeout(this.signalingReconnectTimer);
|
|
this.signalingReconnectTimer = null;
|
|
}
|
|
|
|
this.signalingReconnectAttempts = 0;
|
|
}
|
|
|
|
/** Start the heartbeat interval that drives periodic state broadcasts. */
|
|
private startHeartbeat(): void {
|
|
this.stopHeartbeat();
|
|
// Prime timers so the first heartbeat tick can send keepalives and health probes immediately.
|
|
const now = Date.now();
|
|
|
|
this.lastKeepaliveSentAt = now - SIGNALING_KEEPALIVE_INTERVAL_MS;
|
|
this.lastEndpointHealthProbeAt = now - SIGNALING_HEALTH_PROBE_INTERVAL_MS;
|
|
this.stateHeartbeatTimer = setInterval(() => {
|
|
this.runHeartbeatChecks();
|
|
}, STATE_HEARTBEAT_INTERVAL_MS);
|
|
|
|
void this.runHeartbeatChecks();
|
|
}
|
|
|
|
private runHeartbeatChecks(): void {
|
|
this.heartbeatTick$.next();
|
|
this.verifyKeepaliveAckIfDue();
|
|
this.sendKeepaliveIfDue();
|
|
void this.probeEndpointHealthIfDue();
|
|
}
|
|
|
|
/** Stop the heartbeat interval. */
|
|
private stopHeartbeat(): void {
|
|
if (this.stateHeartbeatTimer) {
|
|
clearInterval(this.stateHeartbeatTimer);
|
|
this.stateHeartbeatTimer = null;
|
|
}
|
|
|
|
this.lastKeepaliveSentAt = 0;
|
|
this.lastKeepaliveAckAt = 0;
|
|
this.serverSupportsKeepaliveAck = false;
|
|
this.lastEndpointHealthOk = null;
|
|
this.lastKnownServerInstanceId = null;
|
|
this.lastEndpointHealthProbeAt = 0;
|
|
this.endpointHealthProbeInFlight = false;
|
|
}
|
|
|
|
private async probeEndpointHealthIfDue(): Promise<void> {
|
|
if (!this.lastSignalingUrl || this.endpointHealthProbeInFlight || this.isSocketConnecting()) {
|
|
return;
|
|
}
|
|
|
|
if (!this.isSocketOpen()) {
|
|
return;
|
|
}
|
|
|
|
const now = Date.now();
|
|
|
|
if (now - this.lastEndpointHealthProbeAt < SIGNALING_HEALTH_PROBE_INTERVAL_MS) {
|
|
return;
|
|
}
|
|
|
|
this.lastEndpointHealthProbeAt = now;
|
|
this.endpointHealthProbeInFlight = true;
|
|
|
|
try {
|
|
const snapshot = await probeSignalingEndpointHealth(this.lastSignalingUrl);
|
|
const wasHealthy = this.lastEndpointHealthOk;
|
|
const previousServerInstanceId = this.lastKnownServerInstanceId;
|
|
|
|
this.lastEndpointHealthOk = snapshot.ok;
|
|
|
|
if (!snapshot.ok) {
|
|
this.handleSocketTransportFailure('network.signaling.healthCheckFailed');
|
|
return;
|
|
}
|
|
|
|
if (snapshot.serverInstanceId) {
|
|
if (previousServerInstanceId && snapshot.serverInstanceId !== previousServerInstanceId) {
|
|
this.handleSocketTransportFailure('network.signaling.instanceChanged');
|
|
return;
|
|
}
|
|
|
|
this.lastKnownServerInstanceId = snapshot.serverInstanceId;
|
|
}
|
|
|
|
if (wasHealthy === false) {
|
|
this.handleSocketTransportFailure('network.signaling.recovered');
|
|
}
|
|
} finally {
|
|
this.endpointHealthProbeInFlight = false;
|
|
}
|
|
}
|
|
|
|
private verifyKeepaliveAckIfDue(): void {
|
|
if (!this.serverSupportsKeepaliveAck || !this.isSocketOpen() || this.lastKeepaliveSentAt === 0) {
|
|
return;
|
|
}
|
|
|
|
if (this.lastKeepaliveAckAt >= this.lastKeepaliveSentAt) {
|
|
return;
|
|
}
|
|
|
|
if (Date.now() - this.lastKeepaliveSentAt < SIGNALING_KEEPALIVE_ACK_TIMEOUT_MS) {
|
|
return;
|
|
}
|
|
|
|
this.handleSocketTransportFailure('network.signaling.keepaliveTimeout');
|
|
}
|
|
|
|
private sendKeepaliveIfDue(): void {
|
|
const now = Date.now();
|
|
|
|
if (now - this.lastKeepaliveSentAt < SIGNALING_KEEPALIVE_INTERVAL_MS) {
|
|
return;
|
|
}
|
|
|
|
this.lastKeepaliveSentAt = now;
|
|
|
|
try {
|
|
const sent = this.sendRawMessage({ type: SIGNALING_TYPE_KEEPALIVE });
|
|
|
|
if (sent && !this.serverSupportsKeepaliveAck) {
|
|
this.lastKeepaliveAckAt = this.lastKeepaliveSentAt;
|
|
}
|
|
} catch (error) {
|
|
this.handleSocketTransportFailure('network.signaling.keepaliveSendFailed', error);
|
|
}
|
|
}
|
|
|
|
/** Clean up all resources. */
|
|
destroy(): void {
|
|
this.close();
|
|
this.heartbeatTick$.complete();
|
|
this.messageReceived$.complete();
|
|
this.connectionStatus$.complete();
|
|
}
|
|
|
|
private sendSerializedPayload(
|
|
message: SignalingMessage | Record<string, unknown>,
|
|
details: { targetPeerId?: string; type?: string; url?: string | null }
|
|
): void {
|
|
let rawPayload = '';
|
|
|
|
const payloadPreview = this.buildPayloadPreview(message);
|
|
|
|
recordDebugNetworkSignalingPayload(message, 'outbound');
|
|
|
|
try {
|
|
rawPayload = JSON.stringify(message);
|
|
} catch (error) {
|
|
this.logger.error('[signaling] Failed to serialize signaling payload', error, {
|
|
payloadPreview,
|
|
type: details.type,
|
|
url: details.url
|
|
});
|
|
|
|
throw error;
|
|
}
|
|
|
|
try {
|
|
this.signalingWebSocket!.send(rawPayload);
|
|
this.logger.traffic('signaling', 'outbound', {
|
|
...payloadPreview,
|
|
bytes: this.measurePayloadBytes(rawPayload),
|
|
payloadPreview,
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
targetPeerId: details.targetPeerId,
|
|
type: details.type,
|
|
url: details.url
|
|
});
|
|
} catch (error) {
|
|
this.logger.error('[signaling] Failed to send signaling payload', error, {
|
|
bytes: this.measurePayloadBytes(rawPayload),
|
|
payloadPreview,
|
|
readyState: this.getSocketReadyStateLabel(),
|
|
targetPeerId: details.targetPeerId,
|
|
type: details.type,
|
|
url: details.url
|
|
});
|
|
|
|
this.handleSocketTransportFailure('network.signaling.payloadSendFailed', error);
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
private getSocketReadyStateLabel(): string {
|
|
const readyState = this.signalingWebSocket?.readyState;
|
|
|
|
switch (readyState) {
|
|
case WebSocket.CONNECTING:
|
|
return 'connecting';
|
|
case WebSocket.OPEN:
|
|
return 'open';
|
|
case WebSocket.CLOSING:
|
|
return 'closing';
|
|
case WebSocket.CLOSED:
|
|
return 'closed';
|
|
default:
|
|
return 'unavailable';
|
|
}
|
|
}
|
|
|
|
private stringifySocketPayload(payload: unknown): string {
|
|
if (typeof payload === 'string')
|
|
return payload;
|
|
|
|
if (payload instanceof ArrayBuffer)
|
|
return new TextDecoder().decode(payload);
|
|
|
|
return String(payload ?? '');
|
|
}
|
|
|
|
private measurePayloadBytes(payload: string): number {
|
|
return new TextEncoder().encode(payload).length;
|
|
}
|
|
|
|
private getPayloadPreview(payload: string): string {
|
|
return payload.replace(/\s+/g, ' ').slice(0, 240);
|
|
}
|
|
|
|
private buildPayloadPreview(payload: SignalingMessage | Record<string, unknown>): Record<string, unknown> {
|
|
const record = payload as Record<string, unknown>;
|
|
const voiceState = this.summarizeVoiceState(record['voiceState']);
|
|
const users = this.summarizeUsers(record['users']);
|
|
const preview: Record<string, unknown> = {
|
|
keys: Object.keys(record).slice(0, 10),
|
|
type: typeof record['type'] === 'string' ? record['type'] : 'unknown'
|
|
};
|
|
|
|
this.assignPreviewValue(preview, 'displayName', typeof record['displayName'] === 'string' ? record['displayName'] : undefined);
|
|
this.assignPreviewValue(preview, 'fromUserId', typeof record['fromUserId'] === 'string' ? record['fromUserId'] : undefined);
|
|
this.assignPreviewValue(preview, 'isScreenSharing', typeof record['isScreenSharing'] === 'boolean' ? record['isScreenSharing'] : undefined);
|
|
this.assignPreviewValue(preview, 'oderId', typeof record['oderId'] === 'string' ? record['oderId'] : undefined);
|
|
this.assignPreviewValue(preview, 'roomId', typeof record['roomId'] === 'string' ? record['roomId'] : undefined);
|
|
this.assignPreviewValue(preview, 'serverId', typeof record['serverId'] === 'string' ? record['serverId'] : undefined);
|
|
this.assignPreviewValue(preview, 'targetPeerId', typeof record['targetUserId'] === 'string' ? record['targetUserId'] : undefined);
|
|
this.assignPreviewValue(preview, 'userCount', Array.isArray(record['users']) ? record['users'].length : undefined);
|
|
this.assignPreviewValue(preview, 'users', users);
|
|
this.assignPreviewValue(preview, 'voiceState', voiceState);
|
|
|
|
return preview;
|
|
}
|
|
|
|
private summarizeVoiceState(value: unknown): Record<string, unknown> | undefined {
|
|
const voiceState = this.asRecord(value);
|
|
|
|
if (!voiceState)
|
|
return undefined;
|
|
|
|
const summary: Record<string, unknown> = {
|
|
isConnected: voiceState['isConnected'] === true,
|
|
isMuted: voiceState['isMuted'] === true,
|
|
isDeafened: voiceState['isDeafened'] === true,
|
|
isSpeaking: voiceState['isSpeaking'] === true
|
|
};
|
|
|
|
this.assignPreviewValue(summary, 'roomId', typeof voiceState['roomId'] === 'string' ? voiceState['roomId'] : undefined);
|
|
this.assignPreviewValue(summary, 'serverId', typeof voiceState['serverId'] === 'string' ? voiceState['serverId'] : undefined);
|
|
this.assignPreviewValue(summary, 'volume', typeof voiceState['volume'] === 'number' ? voiceState['volume'] : undefined);
|
|
|
|
return summary;
|
|
}
|
|
|
|
private summarizeUsers(value: unknown): Record<string, unknown>[] | undefined {
|
|
if (!Array.isArray(value))
|
|
return undefined;
|
|
|
|
const users: Record<string, unknown>[] = [];
|
|
|
|
for (const userValue of value.slice(0, 20)) {
|
|
const user = this.asRecord(userValue);
|
|
|
|
if (!user)
|
|
continue;
|
|
|
|
const summary: Record<string, unknown> = {};
|
|
|
|
this.assignPreviewValue(summary, 'displayName', typeof user['displayName'] === 'string' ? user['displayName'] : undefined);
|
|
this.assignPreviewValue(summary, 'oderId', typeof user['oderId'] === 'string' ? user['oderId'] : undefined);
|
|
|
|
users.push(summary);
|
|
}
|
|
|
|
return users;
|
|
}
|
|
|
|
private assignPreviewValue(target: Record<string, unknown>, key: string, value: unknown): void {
|
|
if (value !== undefined)
|
|
target[key] = value;
|
|
}
|
|
|
|
private asRecord(value: unknown): Record<string, unknown> | null {
|
|
if (!value || typeof value !== 'object' || Array.isArray(value))
|
|
return null;
|
|
|
|
return value as Record<string, unknown>;
|
|
}
|
|
}
|