Repair connectivity correctly v1

This commit is contained in:
2026-05-17 15:15:14 +02:00
parent e769a6ee4a
commit 9d0a4478b2
18 changed files with 1125 additions and 25 deletions

View File

@@ -172,6 +172,8 @@ Join and leave broadcasts are also identity-aware: `handleJoinServer` only broad
Peer routing also has to stay scoped to the signaling server that reported the membership. A `user_left` from one signaling cluster must only subtract that cluster's shared servers; otherwise a leave on `signal.toju.app` can incorrectly tear down a peer that is still shared through `signal-sweden.toju.app` or a local signaling server. Route metadata is therefore kept across peer recreation and only cleared once the renderer no longer shares any servers with that peer.
When local voice is active, a transient `user_left` or stale presence snapshot must not immediately mute or tear down a peer whose P2P transport is still alive. The users store receives the current connected peer IDs with signaling presence updates and preserves live voice/camera/screen state while that transport is connected. The signaling handler also preserves an active voice peer route after a `user_left` blip so a later data-channel or peer reconnect can still target the correct signaling URL. Explicit voice leave still travels as a `voice-state` event with `isConnected: false`, and closed/failed peer connections still clean themselves up through the peer recovery path.
## Peer connection lifecycle
Peers connect to each other directly with `RTCPeerConnection`. The initiator is chosen deterministically from the identified logical peer IDs so only one side creates the offer and primary data channel for a given pair. The other side creates an answer. If identity or negotiation is still settling, the retry timer defers instead of comparing against the ephemeral local transport ID or reusing a half-open peer forever.
@@ -246,6 +248,8 @@ Profile avatar sync follows attachment-style chunk transport plus server-icon-st
Every 5 seconds a PING message is sent to each peer. The peer responds with PONG carrying the original timestamp, and the round-trip latency is stored in a signal.
Data-channel failures are treated as control-plane failures, not proof that RTP audio has stopped. When an open channel reports a non-fatal error, the client requests a fresh voice-state snapshot over that same channel. When the channel closes or cannot carry the resync request, the peer manager waits a short grace period so any still-flowing audio is not interrupted by a transient event. If the `RTCPeerConnection` is still connected after that grace period, the elected initiator replaces only the data channel in-place and preserves the media transport. Full peer recreation is reserved for cases where the media transport is no longer connected or the in-place control-channel repair fails.
## Media pipeline
### Voice

View File

@@ -0,0 +1,131 @@
import { Subject } from 'rxjs';
import {
DATA_CHANNEL_STATE_OPEN,
P2P_TYPE_STATE_REQUEST,
P2P_TYPE_VOICE_STATE_REQUEST
} from '../../realtime.constants';
import { setupDataChannel } from './data-channel';
import {
createPeerConnectionManagerState,
DataChannelLifecycleHandlers,
PeerConnectionManagerContext
} from '../shared';
describe('data channel lifecycle', () => {
it('sends current state and requests peer state when the channel opens', () => {
const context = createContext();
const handlers = createHandlers();
const channel = createDataChannel(DATA_CHANNEL_STATE_OPEN);
setupDataChannel(context, channel, 'peer-a', handlers);
channel.onopen?.(new Event('open'));
expect(handlers.clearDataChannelRecoveryTimer).toHaveBeenCalledWith('peer-a');
expect(channel.send).toHaveBeenCalledWith(expect.stringContaining('"type":"voice-state"'));
expect(channel.send).toHaveBeenCalledWith(JSON.stringify({ type: P2P_TYPE_STATE_REQUEST }));
expect(handlers.scheduleDataChannelRecovery).not.toHaveBeenCalled();
});
it('requests a voice-state resync on a non-fatal data channel error', () => {
const context = createContext();
const handlers = createHandlers();
const channel = createDataChannel(DATA_CHANNEL_STATE_OPEN);
setupDataChannel(context, channel, 'peer-a', handlers);
channel.onerror?.(new Event('error'));
expect(channel.send).toHaveBeenCalledWith(JSON.stringify({ type: P2P_TYPE_VOICE_STATE_REQUEST }));
expect(handlers.scheduleDataChannelRecovery).not.toHaveBeenCalled();
});
it('schedules peer recovery when an error leaves the channel closed', () => {
const context = createContext();
const handlers = createHandlers();
const channel = createDataChannel('closed');
setupDataChannel(context, channel, 'peer-a', handlers);
channel.onerror?.(new Event('error'));
expect(channel.send).not.toHaveBeenCalled();
expect(handlers.scheduleDataChannelRecovery).toHaveBeenCalledWith('peer-a', channel, 'error');
});
it('schedules peer recovery when the data channel closes', () => {
const context = createContext();
const handlers = createHandlers();
const channel = createDataChannel('closed');
setupDataChannel(context, channel, 'peer-a', handlers);
channel.onclose?.(new Event('close'));
expect(handlers.scheduleDataChannelRecovery).toHaveBeenCalledWith('peer-a', channel, 'close');
});
});
function createContext(): PeerConnectionManagerContext {
return {
logger: {
error: vi.fn(),
info: vi.fn(),
logStream: vi.fn(),
traffic: vi.fn(),
warn: vi.fn()
} as unknown as PeerConnectionManagerContext['logger'],
callbacks: {
getIceServers: vi.fn(() => []),
getIdentifyCredentials: vi.fn(() => ({ oderId: 'local-user', displayName: 'Local User' })),
getLocalMediaStream: vi.fn(() => null),
getLocalPeerId: vi.fn(() => 'local-peer'),
getVoiceStateSnapshot: vi.fn(() => ({
isConnected: true,
isMuted: false,
isDeafened: false,
isScreenSharing: false,
roomId: 'voice-room',
serverId: 'server-1'
})),
isCameraEnabled: vi.fn(() => false),
isScreenSharingActive: vi.fn(() => false),
isSignalingConnected: vi.fn(() => true),
sendRawMessage: vi.fn()
},
state: {
...createPeerConnectionManagerState(),
peerConnected$: new Subject<string>()
}
};
}
function createHandlers(): DataChannelLifecycleHandlers {
return {
clearDataChannelRecoveryTimer: vi.fn(),
scheduleDataChannelRecovery: vi.fn()
};
}
function createDataChannel(readyState: RTCDataChannelState): RTCDataChannel & { send: ReturnType<typeof vi.fn> } {
return {
addEventListener: vi.fn(),
binaryType: 'arraybuffer',
bufferedAmount: 0,
bufferedAmountLowThreshold: 0,
close: vi.fn(),
dispatchEvent: vi.fn(),
id: 1,
label: 'chat',
maxPacketLifeTime: null,
maxRetransmits: null,
negotiated: false,
onbufferedamountlow: null,
onclose: null,
onclosing: null,
onerror: null,
onmessage: null,
onopen: null,
ordered: true,
protocol: '',
readyState,
removeEventListener: vi.fn(),
send: vi.fn()
} as unknown as RTCDataChannel & { send: ReturnType<typeof vi.fn> };
}

View File

@@ -13,7 +13,7 @@ import {
P2P_TYPE_VOICE_STATE_REQUEST
} from '../../realtime.constants';
import { recordDebugNetworkDataChannelPayload, recordDebugNetworkPing } from '../../logging/debug-network-metrics';
import { PeerConnectionManagerContext } from '../shared';
import { DataChannelLifecycleHandlers, PeerConnectionManagerContext } from '../shared';
import { startPingInterval } from './ping';
type PeerMessage = Record<string, unknown> & {
@@ -27,11 +27,14 @@ type PeerMessage = Record<string, unknown> & {
export function setupDataChannel(
context: PeerConnectionManagerContext,
channel: RTCDataChannel,
remotePeerId: string
remotePeerId: string,
handlers: DataChannelLifecycleHandlers
): void {
const { logger, state } = context;
channel.onopen = () => {
handlers.clearDataChannelRecoveryTimer(remotePeerId);
logger.info('[data-channel] Data channel open', {
channelLabel: channel.label,
negotiated: channel.negotiated,
@@ -43,22 +46,7 @@ export function setupDataChannel(
state.peerConnected$.next(remotePeerId);
sendCurrentStatesToChannel(context, channel, remotePeerId);
try {
const stateRequest = { type: P2P_TYPE_STATE_REQUEST };
const rawPayload = JSON.stringify(stateRequest);
channel.send(rawPayload);
logDataChannelTraffic(context, channel, remotePeerId, 'outbound', rawPayload, stateRequest);
} catch (error) {
logger.error('[data-channel] Failed to request peer state on open', error, {
bufferedAmount: channel.bufferedAmount,
channelLabel: channel.label,
peerId: remotePeerId,
readyState: channel.readyState,
type: P2P_TYPE_STATE_REQUEST
});
}
sendStateRequestToChannel(context, channel, remotePeerId, P2P_TYPE_STATE_REQUEST, 'open');
startPingInterval(context.state, logger, remotePeerId);
};
@@ -70,6 +58,8 @@ export function setupDataChannel(
peerId: remotePeerId,
readyState: channel.readyState
});
handlers.scheduleDataChannelRecovery(remotePeerId, channel, 'close');
};
channel.onerror = (error) => {
@@ -79,6 +69,18 @@ export function setupDataChannel(
peerId: remotePeerId,
readyState: channel.readyState
});
const didRequestState = sendStateRequestToChannel(
context,
channel,
remotePeerId,
P2P_TYPE_VOICE_STATE_REQUEST,
'error'
);
if (!didRequestState) {
handlers.scheduleDataChannelRecovery(remotePeerId, channel, 'error');
}
};
channel.onmessage = (event) => {
@@ -103,6 +105,48 @@ export function setupDataChannel(
};
}
function sendStateRequestToChannel(
context: PeerConnectionManagerContext,
channel: RTCDataChannel,
remotePeerId: string,
type: typeof P2P_TYPE_STATE_REQUEST | typeof P2P_TYPE_VOICE_STATE_REQUEST,
reason: string
): boolean {
const { logger } = context;
if (channel.readyState !== DATA_CHANNEL_STATE_OPEN) {
logger.warn('[data-channel] Cannot request peer state - channel not open', {
channelLabel: channel.label,
peerId: remotePeerId,
readyState: channel.readyState,
reason,
type
});
return false;
}
try {
const stateRequest = { type };
const rawPayload = JSON.stringify(stateRequest);
channel.send(rawPayload);
logDataChannelTraffic(context, channel, remotePeerId, 'outbound', rawPayload, stateRequest);
return true;
} catch (error) {
logger.error('[data-channel] Failed to request peer state', error, {
bufferedAmount: channel.bufferedAmount,
channelLabel: channel.label,
peerId: remotePeerId,
readyState: channel.readyState,
reason,
type
});
return false;
}
}
/**
* Route an incoming peer-to-peer message.
*/

View File

@@ -1,5 +1,6 @@
/* eslint-disable @typescript-eslint/member-ordering */
import { ChatEvent } from '../../../shared-kernel';
import { DATA_CHANNEL_LABEL } from '../realtime.constants';
import { recordDebugNetworkDownloadRates } from '../logging/debug-network-metrics';
import { WebRTCLogger } from '../logging/webrtc-logger';
import { PeerData } from '../realtime.types';
@@ -22,6 +23,7 @@ import {
} from './messaging/data-channel';
import {
addToConnectedPeers,
clearDataChannelRecoveryTimer,
clearAllPeerReconnectTimers,
clearPeerDisconnectGraceTimer,
clearPeerReconnectTimer,
@@ -30,6 +32,7 @@ import {
removePeer as removeManagedPeer,
requestVoiceStateFromPeer,
resetConnectedPeers,
scheduleDataChannelRecovery,
schedulePeerDisconnectRecovery,
schedulePeerReconnect,
trackDisconnectedPeer
@@ -38,6 +41,7 @@ import { clearRemoteScreenShareStream as clearManagedRemoteScreenShareStream, ha
import {
ConnectionLifecycleHandlers,
createPeerConnectionManagerState,
DataChannelLifecycleHandlers,
NegotiationHandlers,
PeerConnectionCallbacks,
PeerConnectionManagerContext,
@@ -285,6 +289,8 @@ export class PeerConnectionManager {
private get recoveryHandlers(): RecoveryHandlers {
return {
removePeer: (peerId: string, options?: RemovePeerOptions) => this.removePeer(peerId, options),
replaceDataChannel: (peerId: string, expectedChannel: RTCDataChannel) =>
this.replaceDataChannel(peerId, expectedChannel),
createPeerConnection: (peerId: string, isInitiator: boolean) =>
this.createPeerConnection(peerId, isInitiator),
createAndSendOffer: (peerId: string) => this.createAndSendOffer(peerId)
@@ -292,7 +298,15 @@ export class PeerConnectionManager {
}
private setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void {
setupDataChannel(this.context, channel, remotePeerId);
setupDataChannel(this.context, channel, remotePeerId, this.dataChannelHandlers);
}
private get dataChannelHandlers(): DataChannelLifecycleHandlers {
return {
clearDataChannelRecoveryTimer: (peerId: string) => this.clearDataChannelRecoveryTimer(peerId),
scheduleDataChannelRecovery: (peerId: string, channel: RTCDataChannel, reason: string) =>
this.scheduleDataChannelRecovery(peerId, channel, reason)
};
}
private handleRemoteTrack(event: RTCTrackEvent, remotePeerId: string): void {
@@ -311,6 +325,36 @@ export class PeerConnectionManager {
clearPeerDisconnectGraceTimer(this.state, peerId);
}
private clearDataChannelRecoveryTimer(peerId: string): void {
clearDataChannelRecoveryTimer(this.state, peerId);
}
private scheduleDataChannelRecovery(peerId: string, channel: RTCDataChannel, reason: string): void {
scheduleDataChannelRecovery(this.context, peerId, channel, reason, this.recoveryHandlers);
}
private replaceDataChannel(peerId: string, expectedChannel: RTCDataChannel): boolean {
const peerData = this.state.activePeerConnections.get(peerId);
if (!peerData || peerData.dataChannel !== expectedChannel)
return false;
try {
const replacement = peerData.connection.createDataChannel(DATA_CHANNEL_LABEL, { ordered: true });
peerData.dataChannel = replacement;
this.setupDataChannel(replacement, peerId);
return true;
} catch (error) {
this.logger.warn('[data-channel] Failed to create replacement data channel', {
error: (error as Error)?.message ?? String(error),
peerId
});
return false;
}
}
private schedulePeerDisconnectRecovery(peerId: string): void {
schedulePeerDisconnectRecovery(this.context, peerId, this.recoveryHandlers);
}

View File

@@ -0,0 +1,216 @@
import { DATA_CHANNEL_RECOVERY_GRACE_MS, DATA_CHANNEL_STATE_OPEN } from '../../realtime.constants';
import type { PeerData } from '../../realtime.types';
import {
createPeerConnectionManagerState,
PeerConnectionManagerContext,
RecoveryHandlers
} from '../shared';
import { scheduleDataChannelRecovery } from './peer-recovery';
describe('peer recovery', () => {
afterEach(() => {
vi.clearAllTimers();
vi.useRealTimers();
});
it('waits a short grace period before replacing a closed data channel in place', () => {
vi.useFakeTimers();
const channel = createDataChannel('closed');
const context = createContext('alice');
const handlers = createRecoveryHandlers(context);
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected'));
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS - 1);
expect(handlers.removePeer).not.toHaveBeenCalled();
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
vi.advanceTimersByTime(1);
expect(handlers.replaceDataChannel).toHaveBeenCalledWith('bob', channel);
expect(handlers.removePeer).not.toHaveBeenCalled();
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
expect(handlers.createAndSendOffer).not.toHaveBeenCalled();
});
it('falls back to full peer recreation when in-place data channel replacement fails', () => {
vi.useFakeTimers();
const channel = createDataChannel('closed');
const context = createContext('alice');
const handlers = createRecoveryHandlers(context);
handlers.replaceDataChannel.mockReturnValueOnce(false);
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected'));
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS);
expect(handlers.removePeer).toHaveBeenCalledWith('bob', { preserveReconnectState: true });
expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', true);
expect(handlers.createAndSendOffer).toHaveBeenCalledWith('bob');
});
it('does not recreate a peer when a replacement data channel is adopted before the grace expires', () => {
vi.useFakeTimers();
const staleChannel = createDataChannel('closed');
const replacementChannel = createDataChannel(DATA_CHANNEL_STATE_OPEN);
const context = createContext('alice');
const handlers = createRecoveryHandlers(context);
context.state.activePeerConnections.set('bob', createPeerData(staleChannel, 'connected'));
scheduleDataChannelRecovery(context, 'bob', staleChannel, 'close', handlers);
context.state.activePeerConnections.set('bob', createPeerData(replacementChannel, 'connected'));
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS);
expect(handlers.removePeer).not.toHaveBeenCalled();
expect(handlers.replaceDataChannel).not.toHaveBeenCalled();
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
});
it('does not schedule recovery for an open data channel', () => {
vi.useFakeTimers();
const channel = createDataChannel(DATA_CHANNEL_STATE_OPEN);
const context = createContext('alice');
const handlers = createRecoveryHandlers(context);
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected'));
scheduleDataChannelRecovery(context, 'bob', channel, 'error', handlers);
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS);
expect(handlers.removePeer).not.toHaveBeenCalled();
expect(handlers.replaceDataChannel).not.toHaveBeenCalled();
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
});
it('preserves a connected non-initiator peer while waiting for the remote initiator to replace the channel', () => {
vi.useFakeTimers();
const channel = createDataChannel('closed');
const context = createContext('zoe');
const handlers = createRecoveryHandlers(context);
context.state.activePeerConnections.set('bob', createPeerData(channel, 'connected', false));
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS);
expect(handlers.removePeer).not.toHaveBeenCalled();
expect(handlers.replaceDataChannel).not.toHaveBeenCalled();
expect(handlers.createPeerConnection).not.toHaveBeenCalled();
expect(handlers.createAndSendOffer).not.toHaveBeenCalled();
});
it('waits for the remote initiator when a non-connected peer needs full reconnect', () => {
vi.useFakeTimers();
const channel = createDataChannel('closed');
const context = createContext('zoe');
const handlers = createRecoveryHandlers(context);
context.state.activePeerConnections.set('bob', createPeerData(channel, 'disconnected'));
scheduleDataChannelRecovery(context, 'bob', channel, 'close', handlers);
vi.advanceTimersByTime(DATA_CHANNEL_RECOVERY_GRACE_MS);
expect(handlers.createPeerConnection).toHaveBeenCalledWith('bob', false);
expect(handlers.createAndSendOffer).not.toHaveBeenCalled();
});
});
function createContext(localOderId: string): PeerConnectionManagerContext {
return {
logger: {
error: vi.fn(),
info: vi.fn(),
logStream: vi.fn(),
traffic: vi.fn(),
warn: vi.fn()
} as unknown as PeerConnectionManagerContext['logger'],
callbacks: {
getIceServers: vi.fn(() => []),
getIdentifyCredentials: vi.fn(() => ({ oderId: localOderId, displayName: localOderId })),
getLocalMediaStream: vi.fn(() => null),
getLocalPeerId: vi.fn(() => localOderId),
getVoiceStateSnapshot: vi.fn(() => ({
isConnected: true,
isMuted: false,
isDeafened: false,
isScreenSharing: false,
roomId: 'voice-room',
serverId: 'server-1'
})),
isCameraEnabled: vi.fn(() => false),
isScreenSharingActive: vi.fn(() => false),
isSignalingConnected: vi.fn(() => true),
sendRawMessage: vi.fn()
},
state: createPeerConnectionManagerState()
};
}
function createRecoveryHandlers(context: PeerConnectionManagerContext): RecoveryHandlers & {
createAndSendOffer: ReturnType<typeof vi.fn>;
createPeerConnection: ReturnType<typeof vi.fn>;
removePeer: ReturnType<typeof vi.fn>;
replaceDataChannel: ReturnType<typeof vi.fn>;
} {
return {
createAndSendOffer: vi.fn(async () => undefined),
createPeerConnection: vi.fn((peerId: string, isInitiator: boolean) => {
const peerData = createPeerData(createDataChannel(isInitiator ? 'connecting' : 'closed'), 'new', isInitiator);
context.state.activePeerConnections.set(peerId, peerData);
return peerData;
}),
removePeer: vi.fn((peerId: string) => {
context.state.activePeerConnections.delete(peerId);
}),
replaceDataChannel: vi.fn((peerId: string, expectedChannel: RTCDataChannel) => {
const peerData = context.state.activePeerConnections.get(peerId);
if (!peerData || peerData.dataChannel !== expectedChannel) {
return false;
}
peerData.dataChannel = createDataChannel('connecting');
return true;
})
};
}
function createPeerData(
dataChannel: RTCDataChannel,
connectionState: RTCPeerConnectionState,
isInitiator = true
): PeerData {
return {
audioSender: undefined,
connection: {
close: vi.fn(),
connectionState
} as unknown as RTCPeerConnection,
createdAt: Date.now(),
dataChannel,
isInitiator,
pendingIceCandidates: [],
remoteCameraStreamIds: new Set<string>(),
remoteScreenShareStreamIds: new Set<string>(),
remoteVoiceStreamIds: new Set<string>(),
videoSender: undefined
};
}
function createDataChannel(readyState: RTCDataChannelState): RTCDataChannel {
return {
bufferedAmount: 0,
label: 'chat',
readyState
} as unknown as RTCDataChannel;
}

View File

@@ -1,5 +1,6 @@
import {
CONNECTION_STATE_CONNECTED,
DATA_CHANNEL_RECOVERY_GRACE_MS,
DATA_CHANNEL_STATE_OPEN,
P2P_TYPE_VOICE_STATE_REQUEST,
PEER_DISCONNECT_GRACE_MS,
@@ -27,6 +28,7 @@ export function removePeer(
const preserveReconnectState = options?.preserveReconnectState === true;
clearPeerDisconnectGraceTimer(state, peerId);
clearDataChannelRecoveryTimer(state, peerId);
if (!preserveReconnectState) {
clearPeerReconnectTimer(state, peerId);
@@ -56,6 +58,7 @@ export function removePeer(
export function closeAllPeers(state: PeerConnectionManagerState): void {
clearAllPeerReconnectTimers(state);
clearAllPeerDisconnectGraceTimers(state);
clearAllDataChannelRecoveryTimers(state);
clearAllPingTimers(state);
state.activePeerConnections.forEach((peerData) => {
@@ -106,6 +109,18 @@ export function clearPeerDisconnectGraceTimer(
}
}
export function clearDataChannelRecoveryTimer(
state: PeerConnectionManagerState,
peerId: string
): void {
const timer = state.dataChannelRecoveryTimers.get(peerId);
if (timer) {
clearTimeout(timer);
state.dataChannelRecoveryTimers.delete(peerId);
}
}
/** Cancel all pending peer reconnect timers and clear the tracker. */
export function clearAllPeerReconnectTimers(state: PeerConnectionManagerState): void {
state.peerReconnectTimers.forEach((timer) => clearInterval(timer));
@@ -118,6 +133,85 @@ export function clearAllPeerDisconnectGraceTimers(state: PeerConnectionManagerSt
state.peerDisconnectGraceTimers.clear();
}
export function clearAllDataChannelRecoveryTimers(state: PeerConnectionManagerState): void {
state.dataChannelRecoveryTimers.forEach((timer) => clearTimeout(timer));
state.dataChannelRecoveryTimers.clear();
}
export function scheduleDataChannelRecovery(
context: PeerConnectionManagerContext,
peerId: string,
channel: RTCDataChannel,
reason: string,
handlers: RecoveryHandlers
): void {
const { logger, state } = context;
const peerData = state.activePeerConnections.get(peerId);
if (!peerData || peerData.dataChannel !== channel)
return;
if (channel.readyState === DATA_CHANNEL_STATE_OPEN)
return;
if (state.dataChannelRecoveryTimers.has(peerId))
return;
logger.warn('[data-channel] Control channel unavailable; waiting before reconnect', {
channelLabel: channel.label,
peerId,
readyState: channel.readyState,
reason
});
const timer = setTimeout(() => {
state.dataChannelRecoveryTimers.delete(peerId);
const latestPeerData = state.activePeerConnections.get(peerId);
if (!latestPeerData || latestPeerData.dataChannel !== channel)
return;
if (latestPeerData.dataChannel?.readyState === DATA_CHANNEL_STATE_OPEN)
return;
logger.warn('[data-channel] Control channel did not recover; selecting repair path', {
channelLabel: channel.label,
connectionState: latestPeerData.connection.connectionState,
peerId,
readyState: latestPeerData.dataChannel?.readyState ?? null,
reason
});
if (latestPeerData.connection.connectionState === CONNECTION_STATE_CONNECTED) {
if (latestPeerData.isInitiator && handlers.replaceDataChannel(peerId, channel)) {
logger.info('[data-channel] Replaced control channel without recreating media transport', {
peerId,
reason
});
return;
}
if (!latestPeerData.isInitiator) {
logger.info('[data-channel] Waiting for initiator to replace control channel; preserving media transport', {
peerId,
reason
});
return;
}
}
trackDisconnectedPeer(state, peerId);
handlers.removePeer(peerId, { preserveReconnectState: true });
attemptPeerReconnect(context, peerId, handlers);
schedulePeerReconnect(context, peerId, handlers);
}, DATA_CHANNEL_RECOVERY_GRACE_MS);
state.dataChannelRecoveryTimers.set(peerId, timer);
}
export function schedulePeerDisconnectRecovery(
context: PeerConnectionManagerContext,
peerId: string,

View File

@@ -42,6 +42,7 @@ export interface PeerConnectionManagerState {
disconnectedPeerTracker: Map<string, DisconnectedPeerEntry>;
peerReconnectTimers: Map<string, ReturnType<typeof setInterval>>;
peerDisconnectGraceTimers: Map<string, ReturnType<typeof setTimeout>>;
dataChannelRecoveryTimers: Map<string, ReturnType<typeof setTimeout>>;
pendingPings: Map<string, number>;
peerPingTimers: Map<string, ReturnType<typeof setInterval>>;
peerLatencies: Map<string, number>;
@@ -78,12 +79,18 @@ export interface ConnectionLifecycleHandlers {
setupDataChannel(channel: RTCDataChannel, remotePeerId: string): void;
}
export interface DataChannelLifecycleHandlers {
clearDataChannelRecoveryTimer(peerId: string): void;
scheduleDataChannelRecovery(peerId: string, channel: RTCDataChannel, reason: string): void;
}
export interface NegotiationHandlers {
createPeerConnection(remotePeerId: string, isInitiator: boolean): PeerData;
}
export interface RecoveryHandlers {
removePeer(peerId: string, options?: RemovePeerOptions): void;
replaceDataChannel(peerId: string, expectedChannel: RTCDataChannel): boolean;
createPeerConnection(peerId: string, isInitiator: boolean): PeerData;
createAndSendOffer(peerId: string): Promise<void>;
}
@@ -98,6 +105,7 @@ export function createPeerConnectionManagerState(): PeerConnectionManagerState {
disconnectedPeerTracker: new Map<string, DisconnectedPeerEntry>(),
peerReconnectTimers: new Map<string, ReturnType<typeof setInterval>>(),
peerDisconnectGraceTimers: new Map<string, ReturnType<typeof setTimeout>>(),
dataChannelRecoveryTimers: new Map<string, ReturnType<typeof setTimeout>>(),
pendingPings: new Map<string, number>(),
peerPingTimers: new Map<string, ReturnType<typeof setInterval>>(),
peerLatencies: new Map<string, number>(),

View File

@@ -181,6 +181,7 @@ export class WebRTCService implements OnDestroy {
this.signalingMessageHandler = new IncomingSignalingMessageHandler({
getLocalOderId: () => this.signalingTransportHandler.getIdentifyCredentials()?.oderId ?? null,
getEffectiveServerId: () => this.voiceSessionController.getEffectiveServerId(this.state.currentServerId),
isVoiceConnected: () => this.state.isVoiceConnectedActive(),
peerManager: this.peerManager,
setServerTime: (serverTime) => this.timeSync.setFromServerTime(serverTime),
signalingCoordinator: this.signalingCoordinator,

View File

@@ -40,6 +40,8 @@ export const DATA_CHANNEL_LABEL = 'chat';
export const DATA_CHANNEL_HIGH_WATER_BYTES = 4 * 1024 * 1024; // 4 MB
/** Low-water mark (bytes) - resume sending once buffered amount drops below this */
export const DATA_CHANNEL_LOW_WATER_BYTES = 1 * 1024 * 1024; // 1 MB
/** Grace period before recreating a peer whose data channel closed while media may still be flowing. */
export const DATA_CHANNEL_RECOVERY_GRACE_MS = 2_500;
export const SCREEN_SHARE_IDEAL_WIDTH = 1920;
export const SCREEN_SHARE_IDEAL_HEIGHT = 1080;

View File

@@ -0,0 +1,105 @@
import type { PeerData } from '../realtime.types';
import { PeerConnectionManager } from '../peer-connection-manager/peer-connection.manager';
import { IncomingSignalingMessageHandler } from './signaling-message-handler';
import { ServerSignalingCoordinator } from './server-signaling-coordinator';
describe('IncomingSignalingMessageHandler user_left handling', () => {
it('preserves an active voice peer on a transient user_left signal', () => {
const context = createHandlerContext({ voiceConnected: true });
context.coordinator.addJoinedServer('ws://signal-a', 'server-1');
context.coordinator.trackPeerInServer('peer-a', 'server-1', 'ws://signal-a');
context.peerManager.activePeerConnections.set('peer-a', createPeerData('connected', 'open'));
context.handler.handleMessage({ type: 'user_left', oderId: 'peer-a', serverId: 'server-1' }, 'ws://signal-a');
expect(context.peerManager.removePeer).not.toHaveBeenCalled();
expect(context.coordinator.getPeerSignalUrl('peer-a')).toBe('ws://signal-a');
expect(context.coordinator.hasTrackedPeerServers('peer-a')).toBe(true);
});
it('removes a peer on user_left when local voice is not active', () => {
const context = createHandlerContext({ voiceConnected: false });
context.coordinator.trackPeerInServer('peer-a', 'server-1', 'ws://signal-a');
context.peerManager.activePeerConnections.set('peer-a', createPeerData('connected', 'open'));
context.handler.handleMessage({ type: 'user_left', oderId: 'peer-a', serverId: 'server-1' }, 'ws://signal-a');
expect(context.peerManager.removePeer).toHaveBeenCalledWith('peer-a');
expect(context.coordinator.getPeerSignalUrl('peer-a')).toBeUndefined();
});
it('removes a stale voice peer when no active P2P transport remains', () => {
const context = createHandlerContext({ voiceConnected: true });
context.coordinator.trackPeerInServer('peer-a', 'server-1', 'ws://signal-a');
context.peerManager.activePeerConnections.set('peer-a', createPeerData('failed', 'closed'));
context.handler.handleMessage({ type: 'user_left', oderId: 'peer-a', serverId: 'server-1' }, 'ws://signal-a');
expect(context.peerManager.removePeer).toHaveBeenCalledWith('peer-a');
expect(context.coordinator.getPeerSignalUrl('peer-a')).toBeUndefined();
});
});
interface HandlerContext {
coordinator: ServerSignalingCoordinator<unknown>;
handler: IncomingSignalingMessageHandler;
peerManager: PeerConnectionManager & {
activePeerConnections: Map<string, PeerData>;
removePeer: ReturnType<typeof vi.fn>;
};
}
function createHandlerContext(options: { voiceConnected: boolean }): HandlerContext {
const coordinator = new ServerSignalingCoordinator<unknown>({
createManager: vi.fn(),
handleConnectionStatus: vi.fn(),
handleHeartbeatTick: vi.fn(),
handleMessage: vi.fn()
});
const peerManager = {
activePeerConnections: new Map<string, PeerData>(),
removePeer: vi.fn()
} as unknown as HandlerContext['peerManager'];
const handler = new IncomingSignalingMessageHandler({
getEffectiveServerId: () => 'server-1',
getLocalOderId: () => 'local-user',
isVoiceConnected: () => options.voiceConnected,
logger: {
error: vi.fn(),
info: vi.fn(),
logStream: vi.fn(),
traffic: vi.fn(),
warn: vi.fn()
} as unknown as ConstructorParameters<typeof IncomingSignalingMessageHandler>[0]['logger'],
peerManager,
setServerTime: vi.fn(),
signalingCoordinator: coordinator
});
return { coordinator, handler, peerManager };
}
function createPeerData(
connectionState: RTCPeerConnectionState,
dataChannelState: RTCDataChannelState
): PeerData {
return {
audioSender: undefined,
connection: {
connectionState
} as RTCPeerConnection,
createdAt: Date.now(),
dataChannel: {
readyState: dataChannelState
} as RTCDataChannel,
isInitiator: true,
pendingIceCandidates: [],
remoteCameraStreamIds: new Set<string>(),
remoteScreenShareStreamIds: new Set<string>(),
remoteVoiceStreamIds: new Set<string>(),
videoSender: undefined
};
}

View File

@@ -45,6 +45,7 @@ interface IncomingSignalingMessageHandlerDependencies {
logger: WebRTCLogger;
getLocalOderId(): string | null;
getEffectiveServerId(): string | null;
isVoiceConnected(): boolean;
setServerTime(serverTime: number): void;
}
@@ -294,7 +295,7 @@ export class IncomingSignalingMessageHandler {
if (message.oderId) {
this.clearUserJoinedFallbackOffer(message.oderId);
this.nonInitiatorWaitStart.delete(message.oderId);
const existing = this.dependencies.peerManager.activePeerConnections.get(message.oderId);
const hasRemainingSharedServers = Array.isArray(message.serverIds)
? this.dependencies.signalingCoordinator.replacePeerSharedServers(message.oderId, signalUrl, message.serverIds)
: message.serverId
@@ -302,6 +303,16 @@ export class IncomingSignalingMessageHandler {
: false;
if (!hasRemainingSharedServers) {
const serverIdsToPreserve = Array.isArray(message.serverIds)
? message.serverIds
: message.serverId
? [message.serverId]
: [];
if (this.shouldPreserveActiveVoicePeerAfterLeave(message.oderId, existing, signalUrl, serverIdsToPreserve)) {
return;
}
this.dependencies.peerManager.removePeer(message.oderId);
this.dependencies.signalingCoordinator.deletePeerTracking(message.oderId);
}
@@ -533,6 +544,41 @@ export class IncomingSignalingMessageHandler {
return connectionState === 'connected' || peer.dataChannel?.readyState === 'open';
}
private shouldPreserveActiveVoicePeerAfterLeave(
peerId: string,
peer: PeerData | undefined,
signalUrl: string,
serverIds: readonly string[]
): boolean {
if (!this.dependencies.isVoiceConnected() || !this.hasActivePeerConnection(peer)) {
return false;
}
let restoredServerScope = false;
for (const serverId of serverIds) {
if (!this.dependencies.signalingCoordinator.hasJoinedServer(serverId)) {
continue;
}
this.dependencies.signalingCoordinator.trackPeerInServer(peerId, serverId, signalUrl);
restoredServerScope = true;
}
if (!restoredServerScope) {
this.dependencies.signalingCoordinator.setPeerSignalUrl(peerId, signalUrl);
}
this.dependencies.logger.warn('Preserving active voice peer after transient user_left', {
connectionState: peer?.connection.connectionState ?? 'unknown',
dataChannelState: peer?.dataChannel?.readyState ?? 'missing',
peerId,
signalUrl
});
return true;
}
private isPeerConnectionNegotiating(peer: PeerData | undefined): boolean {
if (!peer || this.hasActivePeerConnection(peer))
return false;