feat: signal server tag

This commit is contained in:
2026-06-05 06:16:02 +02:00
parent 6865147e8f
commit bf4e6891d1
69 changed files with 2808 additions and 1269 deletions

View File

@@ -18,11 +18,16 @@ import {
SIGNALING_CONNECT_TIMEOUT_MS,
STATE_HEARTBEAT_INTERVAL_MS,
SIGNALING_KEEPALIVE_INTERVAL_MS,
SIGNALING_HEALTH_PROBE_INTERVAL_MS,
SIGNALING_KEEPALIVE_ACK_TIMEOUT_MS,
SIGNALING_TYPE_IDENTIFY,
SIGNALING_TYPE_JOIN_SERVER,
SIGNALING_TYPE_KEEPALIVE,
SIGNALING_TYPE_VIEW_SERVER
SIGNALING_TYPE_KEEPALIVE_ACK,
SIGNALING_TYPE_VIEW_SERVER,
isTransientSignalingOutboundType
} from '../realtime.constants';
import { probeSignalingEndpointHealth } from './signaling-endpoint-health.rules';
interface ParsedSignalingPayload {
sdp?: RTCSessionDescriptionInit;
@@ -42,6 +47,13 @@ export class SignalingManager {
private signalingReconnectTimer: ReturnType<typeof setTimeout> | null = null;
private stateHeartbeatTimer: ReturnType<typeof setInterval> | null = null;
private lastKeepaliveSentAt = 0;
private lastKeepaliveAckAt = 0;
private serverSupportsKeepaliveAck = false;
private lastEndpointHealthOk: boolean | null = null;
private lastKnownServerInstanceId: string | null = null;
private lastEndpointHealthProbeAt = 0;
private endpointHealthProbeInFlight = false;
private connectAttemptStartedAt = 0;
/** Fires every heartbeat tick - the main service hooks this to broadcast state. */
readonly heartbeatTick$ = new Subject<void>();
@@ -67,23 +79,43 @@ export class SignalingManager {
}
if (this.isSocketConnecting()) {
return this.waitForOpen();
const connectAge = Date.now() - this.connectAttemptStartedAt;
if (connectAge < SIGNALING_CONNECT_TIMEOUT_MS) {
return this.waitForOpen();
}
this.discardCurrentSocket();
}
}
this.lastSignalingUrl = serverUrl;
return new Observable<boolean>((observer) => {
let connectTimeout: ReturnType<typeof setTimeout> | null = null;
const clearConnectTimeout = (): void => {
if (!connectTimeout) {
return;
}
clearTimeout(connectTimeout);
connectTimeout = null;
};
try {
this.logger.info('[signaling] Connecting to signaling server', { serverUrl });
const previousSocket = this.signalingWebSocket;
this.lastSignalingUrl = serverUrl;
this.connectAttemptStartedAt = Date.now();
const socket = new WebSocket(serverUrl);
this.signalingWebSocket = socket;
if (previousSocket && previousSocket !== socket) {
this.detachSocketHandlers(previousSocket);
try {
previousSocket.close();
} catch {
@@ -93,16 +125,44 @@ export class SignalingManager {
}
}
connectTimeout = setTimeout(() => {
if (socket !== this.signalingWebSocket || !this.isSocketConnecting()) {
return;
}
this.logger.warn('[signaling] Signaling connect attempt timed out', {
timeoutMs: SIGNALING_CONNECT_TIMEOUT_MS,
url: serverUrl
});
clearConnectTimeout();
this.discardCurrentSocket();
this.connectionStatus$.next({
connected: false,
errorMessage: 'Timed out connecting to signaling server'
});
this.scheduleReconnect();
observer.next(false);
observer.complete();
}, SIGNALING_CONNECT_TIMEOUT_MS);
socket.onopen = () => {
if (socket !== this.signalingWebSocket)
return;
clearConnectTimeout();
this.logger.info('[signaling] Connected to signaling server', {
serverUrl,
readyState: this.getSocketReadyStateLabel()
});
this.clearReconnect();
this.lastKeepaliveAckAt = Date.now();
this.lastEndpointHealthOk = null;
this.lastKnownServerInstanceId = null;
this.lastEndpointHealthProbeAt = 0;
this.startHeartbeat();
this.connectionStatus$.next({ connected: true });
this.reIdentifyAndRejoin();
@@ -132,7 +192,12 @@ export class SignalingManager {
url: serverUrl
});
this.messageReceived$.next(message);
if (message.type === SIGNALING_TYPE_KEEPALIVE_ACK) {
this.serverSupportsKeepaliveAck = true;
this.lastKeepaliveAckAt = Date.now();
} else {
this.messageReceived$.next(message);
}
} catch (error) {
this.logger.error('[signaling] Failed to parse signaling message', error, {
bytes: payloadBytes ?? undefined,
@@ -147,6 +212,8 @@ export class SignalingManager {
if (socket !== this.signalingWebSocket)
return;
clearConnectTimeout();
this.logger.error('[signaling] Signaling socket error', error, {
readyState: this.getSocketReadyStateLabel(),
url: serverUrl
@@ -162,6 +229,8 @@ export class SignalingManager {
if (socket !== this.signalingWebSocket)
return;
clearConnectTimeout();
this.logger.warn('[signaling] Disconnected from signaling server', {
attempts: this.signalingReconnectAttempts,
code: event.code,
@@ -203,7 +272,13 @@ export class SignalingManager {
}, timeoutMs);
this.connect(this.lastSignalingUrl!).subscribe({
next: () => { if (!settled) { settled = true; clearTimeout(timeout); resolve(true); } },
next: (connected) => {
if (!settled) {
settled = true;
clearTimeout(timeout);
resolve(connected);
}
},
error: () => { if (!settled) { settled = true; clearTimeout(timeout); resolve(false); } }
});
});
@@ -212,7 +287,11 @@ export class SignalingManager {
/** Send a signaling message (with `from` / `timestamp` populated). */
sendSignalingMessage(message: Omit<SignalingMessage, 'from' | 'timestamp'>, localPeerId: string): void {
if (!this.isSocketOpen()) {
this.logger.error('[signaling] Signaling socket not connected', new Error('Socket not open'), {
if (isTransientSignalingOutboundType(message.type)) {
return;
}
this.logger.warn('[signaling] Signaling socket not connected', {
readyState: this.getSocketReadyStateLabel(),
type: message.type,
url: this.lastSignalingUrl
@@ -233,22 +312,30 @@ export class SignalingManager {
}
/** Send a raw JSON payload (for identify, join_server, etc.). */
sendRawMessage(message: Record<string, unknown>): void {
sendRawMessage(message: Record<string, unknown>): boolean {
const messageType = typeof message['type'] === 'string' ? message['type'] : 'unknown';
if (!this.isSocketOpen()) {
this.logger.error('[signaling] Signaling socket not connected', new Error('Socket not open'), {
if (isTransientSignalingOutboundType(messageType)) {
return false;
}
this.logger.warn('[signaling] Signaling socket not connected', {
readyState: this.getSocketReadyStateLabel(),
type: typeof message['type'] === 'string' ? message['type'] : 'unknown',
type: messageType,
url: this.lastSignalingUrl
});
return;
return false;
}
this.sendSerializedPayload(message, {
targetPeerId: typeof message['targetUserId'] === 'string' ? message['targetUserId'] : undefined,
type: typeof message['type'] === 'string' ? message['type'] : 'unknown',
type: messageType,
url: this.lastSignalingUrl
});
return true;
}
/** Gracefully close the WebSocket. */
@@ -284,10 +371,15 @@ export class SignalingManager {
const credentials = this.getLastIdentify();
if (credentials) {
this.sendRawMessage({ type: SIGNALING_TYPE_IDENTIFY,
this.sendRawMessage({
type: SIGNALING_TYPE_IDENTIFY,
oderId: credentials.oderId,
displayName: credentials.displayName,
connectionScope: this.lastSignalingUrl ?? undefined });
description: credentials.description,
profileUpdatedAt: credentials.profileUpdatedAt,
homeSignalServerUrl: credentials.homeSignalServerUrl,
connectionScope: this.lastSignalingUrl ?? undefined
});
}
const memberIds = this.getMemberServerIds();
@@ -337,7 +429,14 @@ export class SignalingManager {
});
this.connect(this.lastSignalingUrl!).subscribe({
next: () => { this.signalingReconnectAttempts = 0; },
next: (connected) => {
if (connected) {
this.signalingReconnectAttempts = 0;
return;
}
this.scheduleReconnect();
},
error: () => { this.scheduleReconnect(); }
});
}, delay);
@@ -381,6 +480,56 @@ export class SignalingManager {
});
}
private handleSocketTransportFailure(reason: string, error?: unknown): void {
if (!this.signalingWebSocket) {
return;
}
this.logger.warn('[signaling] Signaling transport failed; forcing reconnect', {
error,
reason,
readyState: this.getSocketReadyStateLabel(),
url: this.lastSignalingUrl
});
this.stopHeartbeat();
this.discardCurrentSocket();
this.connectionStatus$.next({
connected: false,
errorMessage: reason
});
this.scheduleReconnect();
}
private discardCurrentSocket(): void {
const socket = this.signalingWebSocket;
this.signalingWebSocket = null;
this.connectAttemptStartedAt = 0;
if (!socket) {
return;
}
this.detachSocketHandlers(socket);
try {
socket.close();
} catch {
this.logger.warn('[signaling] Failed to discard signaling socket', {
url: this.lastSignalingUrl
});
}
}
private detachSocketHandlers(socket: WebSocket): void {
socket.onopen = null;
socket.onclose = null;
socket.onerror = null;
socket.onmessage = null;
}
/** Cancel any pending reconnect timer and reset the attempt counter. */
private clearReconnect(): void {
if (this.signalingReconnectTimer) {
@@ -394,11 +543,22 @@ export class SignalingManager {
/** Start the heartbeat interval that drives periodic state broadcasts. */
private startHeartbeat(): void {
this.stopHeartbeat();
this.lastKeepaliveSentAt = Date.now();
// Prime timers so the first heartbeat tick can send keepalives and health probes immediately.
const now = Date.now();
this.lastKeepaliveSentAt = now - SIGNALING_KEEPALIVE_INTERVAL_MS;
this.lastEndpointHealthProbeAt = now - SIGNALING_HEALTH_PROBE_INTERVAL_MS;
this.stateHeartbeatTimer = setInterval(() => {
this.heartbeatTick$.next();
this.sendKeepaliveIfDue();
this.runHeartbeatChecks();
}, STATE_HEARTBEAT_INTERVAL_MS);
void this.runHeartbeatChecks();
}
private runHeartbeatChecks(): void {
this.heartbeatTick$.next();
this.verifyKeepaliveAckIfDue();
this.sendKeepaliveIfDue();
void this.probeEndpointHealthIfDue();
}
/** Stop the heartbeat interval. */
@@ -409,6 +569,75 @@ export class SignalingManager {
}
this.lastKeepaliveSentAt = 0;
this.lastKeepaliveAckAt = 0;
this.serverSupportsKeepaliveAck = false;
this.lastEndpointHealthOk = null;
this.lastKnownServerInstanceId = null;
this.lastEndpointHealthProbeAt = 0;
this.endpointHealthProbeInFlight = false;
}
private async probeEndpointHealthIfDue(): Promise<void> {
if (!this.lastSignalingUrl || this.endpointHealthProbeInFlight || this.isSocketConnecting()) {
return;
}
if (!this.isSocketOpen()) {
return;
}
const now = Date.now();
if (now - this.lastEndpointHealthProbeAt < SIGNALING_HEALTH_PROBE_INTERVAL_MS) {
return;
}
this.lastEndpointHealthProbeAt = now;
this.endpointHealthProbeInFlight = true;
try {
const snapshot = await probeSignalingEndpointHealth(this.lastSignalingUrl);
const wasHealthy = this.lastEndpointHealthOk;
const previousServerInstanceId = this.lastKnownServerInstanceId;
this.lastEndpointHealthOk = snapshot.ok;
if (!snapshot.ok) {
this.handleSocketTransportFailure('Signaling server health check failed');
return;
}
if (snapshot.serverInstanceId) {
if (previousServerInstanceId && snapshot.serverInstanceId !== previousServerInstanceId) {
this.handleSocketTransportFailure('Signaling server instance changed; refreshing websocket');
return;
}
this.lastKnownServerInstanceId = snapshot.serverInstanceId;
}
if (wasHealthy === false) {
this.handleSocketTransportFailure('Signaling server recovered; refreshing websocket');
}
} finally {
this.endpointHealthProbeInFlight = false;
}
}
private verifyKeepaliveAckIfDue(): void {
if (!this.serverSupportsKeepaliveAck || !this.isSocketOpen() || this.lastKeepaliveSentAt === 0) {
return;
}
if (this.lastKeepaliveAckAt >= this.lastKeepaliveSentAt) {
return;
}
if (Date.now() - this.lastKeepaliveSentAt < SIGNALING_KEEPALIVE_ACK_TIMEOUT_MS) {
return;
}
this.handleSocketTransportFailure('Signaling keepalive acknowledgement timed out');
}
private sendKeepaliveIfDue(): void {
@@ -421,13 +650,13 @@ export class SignalingManager {
this.lastKeepaliveSentAt = now;
try {
this.sendRawMessage({ type: SIGNALING_TYPE_KEEPALIVE });
const sent = this.sendRawMessage({ type: SIGNALING_TYPE_KEEPALIVE });
if (sent && !this.serverSupportsKeepaliveAck) {
this.lastKeepaliveAckAt = this.lastKeepaliveSentAt;
}
} catch (error) {
this.logger.warn('[signaling] Failed to send signaling keepalive', {
error,
readyState: this.getSocketReadyStateLabel(),
url: this.lastSignalingUrl
});
this.handleSocketTransportFailure('Failed to send signaling keepalive', error);
}
}
@@ -482,6 +711,8 @@ export class SignalingManager {
url: details.url
});
this.handleSocketTransportFailure('Failed to send signaling payload', error);
throw error;
}
}