Possible fix for disconnects
This commit is contained in:
@@ -28,6 +28,7 @@ import {
|
||||
TRANSCEIVER_RECV_ONLY,
|
||||
PEER_RECONNECT_MAX_ATTEMPTS,
|
||||
PEER_RECONNECT_INTERVAL_MS,
|
||||
PEER_DISCONNECT_GRACE_MS,
|
||||
P2P_TYPE_STATE_REQUEST,
|
||||
P2P_TYPE_VOICE_STATE_REQUEST,
|
||||
P2P_TYPE_VOICE_STATE,
|
||||
@@ -72,6 +73,7 @@ export class PeerConnectionManager {
|
||||
/** Tracks disconnected peers for P2P reconnection scheduling. */
|
||||
private disconnectedPeerTracker = new Map<string, DisconnectedPeerEntry>();
|
||||
private peerReconnectTimers = new Map<string, ReturnType<typeof setInterval>>();
|
||||
private readonly peerDisconnectGraceTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
/** Pending ping timestamps keyed by peer ID. */
|
||||
private readonly pendingPings = new Map<string, number>();
|
||||
@@ -155,6 +157,7 @@ export class PeerConnectionManager {
|
||||
|
||||
switch (connection.connectionState) {
|
||||
case CONNECTION_STATE_CONNECTED:
|
||||
this.clearPeerDisconnectGraceTimer(remotePeerId);
|
||||
this.addToConnectedPeers(remotePeerId);
|
||||
this.peerConnected$.next(remotePeerId);
|
||||
this.clearPeerReconnectTimer(remotePeerId);
|
||||
@@ -163,9 +166,12 @@ export class PeerConnectionManager {
|
||||
break;
|
||||
|
||||
case CONNECTION_STATE_DISCONNECTED:
|
||||
this.schedulePeerDisconnectRecovery(remotePeerId);
|
||||
break;
|
||||
|
||||
case CONNECTION_STATE_FAILED:
|
||||
this.trackDisconnectedPeer(remotePeerId);
|
||||
this.removePeer(remotePeerId);
|
||||
this.removePeer(remotePeerId, { preserveReconnectState: true });
|
||||
this.schedulePeerReconnect(remotePeerId);
|
||||
break;
|
||||
|
||||
@@ -860,32 +866,72 @@ export class PeerConnectionManager {
|
||||
return;
|
||||
}
|
||||
|
||||
// Merge into composite stream per peer
|
||||
const compositeStream = this.remotePeerStreams.get(remotePeerId) || new MediaStream();
|
||||
const trackAlreadyAdded = compositeStream
|
||||
.getTracks()
|
||||
.some((existingTrack) => existingTrack.id === track.id);
|
||||
const compositeStream = this.buildCompositeRemoteStream(remotePeerId, track);
|
||||
|
||||
if (!trackAlreadyAdded) {
|
||||
try {
|
||||
compositeStream.addTrack(track);
|
||||
} catch (e) {
|
||||
this.logger.warn('Failed to add track to composite stream', e as any);
|
||||
}
|
||||
}
|
||||
track.addEventListener('ended', () => this.removeRemoteTrack(remotePeerId, track.id));
|
||||
|
||||
this.remotePeerStreams.set(remotePeerId, compositeStream);
|
||||
this.remoteStream$.next({ peerId: remotePeerId,
|
||||
stream: compositeStream });
|
||||
}
|
||||
|
||||
private buildCompositeRemoteStream(remotePeerId: string, incomingTrack: MediaStreamTrack): MediaStream {
|
||||
const existingStream = this.remotePeerStreams.get(remotePeerId);
|
||||
|
||||
let preservedTracks: MediaStreamTrack[] = [];
|
||||
|
||||
if (existingStream) {
|
||||
preservedTracks = existingStream.getTracks().filter(
|
||||
(existingTrack) =>
|
||||
existingTrack.kind !== incomingTrack.kind && existingTrack.readyState === 'live'
|
||||
);
|
||||
}
|
||||
|
||||
return new MediaStream([...preservedTracks, incomingTrack]);
|
||||
}
|
||||
|
||||
private removeRemoteTrack(remotePeerId: string, trackId: string): void {
|
||||
const currentStream = this.remotePeerStreams.get(remotePeerId);
|
||||
|
||||
if (!currentStream)
|
||||
return;
|
||||
|
||||
const remainingTracks = currentStream
|
||||
.getTracks()
|
||||
.filter((existingTrack) => existingTrack.id !== trackId && existingTrack.readyState === 'live');
|
||||
|
||||
if (remainingTracks.length === currentStream.getTracks().length)
|
||||
return;
|
||||
|
||||
if (remainingTracks.length === 0) {
|
||||
this.remotePeerStreams.delete(remotePeerId);
|
||||
return;
|
||||
}
|
||||
|
||||
const nextStream = new MediaStream(remainingTracks);
|
||||
|
||||
this.remotePeerStreams.set(remotePeerId, nextStream);
|
||||
this.remoteStream$.next({ peerId: remotePeerId,
|
||||
stream: nextStream });
|
||||
}
|
||||
|
||||
/**
|
||||
* Close and remove a peer connection, data channel, and emit a disconnect event.
|
||||
*
|
||||
* @param peerId - The peer to remove.
|
||||
*/
|
||||
removePeer(peerId: string): void {
|
||||
removePeer(peerId: string, options?: { preserveReconnectState?: boolean }): void {
|
||||
const peerData = this.activePeerConnections.get(peerId);
|
||||
const preserveReconnectState = options?.preserveReconnectState === true;
|
||||
|
||||
this.clearPeerDisconnectGraceTimer(peerId);
|
||||
|
||||
if (!preserveReconnectState) {
|
||||
this.clearPeerReconnectTimer(peerId);
|
||||
this.disconnectedPeerTracker.delete(peerId);
|
||||
}
|
||||
|
||||
this.remotePeerStreams.delete(peerId);
|
||||
|
||||
if (peerData) {
|
||||
if (peerData.dataChannel)
|
||||
@@ -905,6 +951,7 @@ export class PeerConnectionManager {
|
||||
/** Close every active peer connection and clear internal state. */
|
||||
closeAllPeers(): void {
|
||||
this.clearAllPeerReconnectTimers();
|
||||
this.clearAllPeerDisconnectGraceTimers();
|
||||
this.clearAllPingTimers();
|
||||
this.activePeerConnections.forEach((peerData) => {
|
||||
if (peerData.dataChannel)
|
||||
@@ -914,6 +961,7 @@ export class PeerConnectionManager {
|
||||
});
|
||||
|
||||
this.activePeerConnections.clear();
|
||||
this.remotePeerStreams.clear();
|
||||
this.peerNegotiationQueue.clear();
|
||||
this.peerLatencies.clear();
|
||||
this.pendingPings.clear();
|
||||
@@ -936,6 +984,15 @@ export class PeerConnectionManager {
|
||||
}
|
||||
}
|
||||
|
||||
private clearPeerDisconnectGraceTimer(peerId: string): void {
|
||||
const timer = this.peerDisconnectGraceTimers.get(peerId);
|
||||
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
this.peerDisconnectGraceTimers.delete(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel all pending peer reconnect timers and clear the tracker. */
|
||||
clearAllPeerReconnectTimers(): void {
|
||||
this.peerReconnectTimers.forEach((timer) => clearInterval(timer));
|
||||
@@ -943,6 +1000,49 @@ export class PeerConnectionManager {
|
||||
this.disconnectedPeerTracker.clear();
|
||||
}
|
||||
|
||||
private clearAllPeerDisconnectGraceTimers(): void {
|
||||
this.peerDisconnectGraceTimers.forEach((timer) => clearTimeout(timer));
|
||||
this.peerDisconnectGraceTimers.clear();
|
||||
}
|
||||
|
||||
private schedulePeerDisconnectRecovery(peerId: string): void {
|
||||
if (this.peerDisconnectGraceTimers.has(peerId))
|
||||
return;
|
||||
|
||||
this.logger.warn('Peer temporarily disconnected; waiting before reconnect', { peerId });
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
this.peerDisconnectGraceTimers.delete(peerId);
|
||||
|
||||
const peerData = this.activePeerConnections.get(peerId);
|
||||
|
||||
if (!peerData)
|
||||
return;
|
||||
|
||||
const state = peerData.connection.connectionState;
|
||||
|
||||
if (state === CONNECTION_STATE_CONNECTED || state === 'connecting') {
|
||||
this.logger.info('Peer recovered before disconnect grace expired', {
|
||||
peerId,
|
||||
state
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.warn('Peer still disconnected after grace period; recreating connection', {
|
||||
peerId,
|
||||
state
|
||||
});
|
||||
|
||||
this.trackDisconnectedPeer(peerId);
|
||||
this.removePeer(peerId, { preserveReconnectState: true });
|
||||
this.schedulePeerReconnect(peerId);
|
||||
}, PEER_DISCONNECT_GRACE_MS);
|
||||
|
||||
this.peerDisconnectGraceTimers.set(peerId, timer);
|
||||
}
|
||||
|
||||
private schedulePeerReconnect(peerId: string): void {
|
||||
if (this.peerReconnectTimers.has(peerId))
|
||||
return;
|
||||
@@ -980,20 +1080,12 @@ export class PeerConnectionManager {
|
||||
}
|
||||
|
||||
private attemptPeerReconnect(peerId: string): void {
|
||||
const existing = this.activePeerConnections.get(peerId);
|
||||
|
||||
if (existing) {
|
||||
try {
|
||||
existing.connection.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
|
||||
this.activePeerConnections.delete(peerId);
|
||||
if (this.activePeerConnections.has(peerId)) {
|
||||
this.removePeer(peerId, { preserveReconnectState: true });
|
||||
}
|
||||
|
||||
this.createPeerConnection(peerId, true);
|
||||
this.createAndSendOffer(peerId);
|
||||
void this.createAndSendOffer(peerId);
|
||||
}
|
||||
|
||||
private requestVoiceStateFromPeer(peerId: string): void {
|
||||
|
||||
@@ -22,6 +22,8 @@ export const SIGNALING_CONNECT_TIMEOUT_MS = 5_000;
|
||||
export const PEER_RECONNECT_MAX_ATTEMPTS = 12;
|
||||
/** Interval (ms) between P2P reconnect attempts */
|
||||
export const PEER_RECONNECT_INTERVAL_MS = 5_000;
|
||||
/** How long to wait before treating a transient disconnect as fatal */
|
||||
export const PEER_DISCONNECT_GRACE_MS = 10_000;
|
||||
|
||||
/** Interval (ms) for broadcasting state heartbeats */
|
||||
export const STATE_HEARTBEAT_INTERVAL_MS = 5_000;
|
||||
|
||||
@@ -47,6 +47,8 @@ export class VoicePlaybackService {
|
||||
}
|
||||
|
||||
if (!this.hasAudio(stream)) {
|
||||
this.rawRemoteStreams.delete(peerId);
|
||||
this.removePipeline(peerId);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user