diff --git a/electron/migrations/1000000000003-NormalizeArrayColumns.ts b/electron/migrations/1000000000003-NormalizeArrayColumns.ts index 4b93fef..519cb54 100644 --- a/electron/migrations/1000000000003-NormalizeArrayColumns.ts +++ b/electron/migrations/1000000000003-NormalizeArrayColumns.ts @@ -393,4 +393,4 @@ export class NormalizeArrayColumns1000000000003 implements MigrationInterface { await queryRunner.query(`DROP TABLE IF EXISTS "room_members"`); await queryRunner.query(`DROP TABLE IF EXISTS "room_channels"`); } -} \ No newline at end of file +} diff --git a/server/src/websocket/broadcast.ts b/server/src/websocket/broadcast.ts index b4e3add..3bc0adc 100644 --- a/server/src/websocket/broadcast.ts +++ b/server/src/websocket/broadcast.ts @@ -1,4 +1,6 @@ +import { WebSocket } from 'ws'; import { connectedUsers } from './state'; +import { ConnectedUser } from './types'; interface WsMessage { [key: string]: unknown; @@ -24,6 +26,43 @@ export function notifyServerOwner(ownerId: string, message: WsMessage): void { } } +export function getUniqueUsersInServer(serverId: string, excludeOderId?: string): ConnectedUser[] { + const usersByOderId = new Map(); + + connectedUsers.forEach((user) => { + if (user.oderId === excludeOderId || !user.serverIds.has(serverId) || user.ws.readyState !== WebSocket.OPEN) { + return; + } + + usersByOderId.set(user.oderId, user); + }); + + return Array.from(usersByOderId.values()); +} + +export function isOderIdConnectedToServer(oderId: string, serverId: string, excludeConnectionId?: string): boolean { + return Array.from(connectedUsers.entries()).some(([connectionId, user]) => + connectionId !== excludeConnectionId + && user.oderId === oderId + && user.serverIds.has(serverId) + && user.ws.readyState === WebSocket.OPEN + ); +} + +export function getServerIdsForOderId(oderId: string, excludeConnectionId?: string): string[] { + const serverIds = new Set(); + + connectedUsers.forEach((user, connectionId) => { + if (connectionId === excludeConnectionId || user.oderId !== oderId || user.ws.readyState !== WebSocket.OPEN) { + return; + } + + user.serverIds.forEach((serverId) => serverIds.add(serverId)); + }); + + return Array.from(serverIds); +} + export function notifyUser(oderId: string, message: WsMessage): void { const user = findUserByOderId(oderId); @@ -33,5 +72,13 @@ export function notifyUser(oderId: string, message: WsMessage): void { } export function findUserByOderId(oderId: string) { - return Array.from(connectedUsers.values()).find(user => user.oderId === oderId); + let match: ConnectedUser | undefined; + + connectedUsers.forEach((user) => { + if (user.oderId === oderId && user.ws.readyState === WebSocket.OPEN) { + match = user; + } + }); + + return match; } diff --git a/server/src/websocket/handler.ts b/server/src/websocket/handler.ts index 69b6cb7..192e2ea 100644 --- a/server/src/websocket/handler.ts +++ b/server/src/websocket/handler.ts @@ -1,6 +1,12 @@ import { connectedUsers } from './state'; import { ConnectedUser } from './types'; -import { broadcastToServer, findUserByOderId } from './broadcast'; +import { + broadcastToServer, + findUserByOderId, + getServerIdsForOderId, + getUniqueUsersInServer, + isOderIdConnectedToServer +} from './broadcast'; import { authorizeWebSocketJoin } from '../services/server-access.service'; interface WsMessage { @@ -14,24 +20,53 @@ function normalizeDisplayName(value: unknown, fallback = 'User'): string { return normalized || fallback; } +function readMessageId(value: unknown): string | undefined { + if (typeof value !== 'string') { + return undefined; + } + + const normalized = value.trim(); + + if (!normalized || normalized === 'undefined' || normalized === 'null') { + return undefined; + } + + return normalized; +} + /** Sends the current user list for a given server to a single connected user. */ function sendServerUsers(user: ConnectedUser, serverId: string): void { - const users = Array.from(connectedUsers.values()) - .filter(cu => cu.serverIds.has(serverId) && cu.oderId !== user.oderId) + const users = getUniqueUsersInServer(serverId, user.oderId) .map(cu => ({ oderId: cu.oderId, displayName: normalizeDisplayName(cu.displayName) })); user.ws.send(JSON.stringify({ type: 'server_users', serverId, users })); } function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: string): void { - user.oderId = String(message['oderId'] || connectionId); + const newOderId = readMessageId(message['oderId']) ?? connectionId; + + // Close stale connections from the same identity so offer routing + // always targets the freshest socket (e.g. after page refresh). + connectedUsers.forEach((existing, existingId) => { + if (existingId !== connectionId && existing.oderId === newOderId) { + console.log(`Closing stale connection for ${newOderId} (old=${existingId}, new=${connectionId})`); + + try { + existing.ws.close(); + } catch { /* already closing */ } + + connectedUsers.delete(existingId); + } + }); + + user.oderId = newOderId; user.displayName = normalizeDisplayName(message['displayName'], normalizeDisplayName(user.displayName)); connectedUsers.set(connectionId, user); console.log(`User identified: ${user.displayName} (${user.oderId})`); } async function handleJoinServer(user: ConnectedUser, message: WsMessage, connectionId: string): Promise { - const sid = String(message['serverId']); + const sid = readMessageId(message['serverId']); if (!sid) return; @@ -48,16 +83,20 @@ async function handleJoinServer(user: ConnectedUser, message: WsMessage, connect return; } - const isNew = !user.serverIds.has(sid); + const isNewConnectionMembership = !user.serverIds.has(sid); + const isNewIdentityMembership = isNewConnectionMembership && !isOderIdConnectedToServer(user.oderId, sid, connectionId); user.serverIds.add(sid); user.viewedServerId = sid; connectedUsers.set(connectionId, user); - console.log(`User ${normalizeDisplayName(user.displayName)} (${user.oderId}) joined server ${sid} (new=${isNew})`); + console.log( + `User ${normalizeDisplayName(user.displayName)} (${user.oderId}) joined server ${sid} ` + + `(newConnection=${isNewConnectionMembership}, newIdentity=${isNewIdentityMembership})` + ); sendServerUsers(user, sid); - if (isNew) { + if (isNewIdentityMembership) { broadcastToServer(sid, { type: 'user_joined', oderId: user.oderId, @@ -68,7 +107,10 @@ async function handleJoinServer(user: ConnectedUser, message: WsMessage, connect } function handleViewServer(user: ConnectedUser, message: WsMessage, connectionId: string): void { - const viewSid = String(message['serverId']); + const viewSid = readMessageId(message['serverId']); + + if (!viewSid) + return; user.viewedServerId = viewSid; connectedUsers.set(connectionId, user); @@ -78,7 +120,7 @@ function handleViewServer(user: ConnectedUser, message: WsMessage, connectionId: } function handleLeaveServer(user: ConnectedUser, message: WsMessage, connectionId: string): void { - const leaveSid = (message['serverId'] as string | undefined) ?? user.viewedServerId; + const leaveSid = readMessageId(message['serverId']) ?? user.viewedServerId; if (!leaveSid) return; @@ -90,17 +132,23 @@ function handleLeaveServer(user: ConnectedUser, message: WsMessage, connectionId connectedUsers.set(connectionId, user); + const remainingServerIds = getServerIdsForOderId(user.oderId, connectionId); + + if (remainingServerIds.includes(leaveSid)) { + return; + } + broadcastToServer(leaveSid, { type: 'user_left', oderId: user.oderId, displayName: normalizeDisplayName(user.displayName), serverId: leaveSid, - serverIds: Array.from(user.serverIds) + serverIds: remainingServerIds }, user.oderId); } function forwardRtcMessage(user: ConnectedUser, message: WsMessage): void { - const targetUserId = String(message['targetUserId'] || ''); + const targetUserId = readMessageId(message['targetUserId']) ?? ''; console.log(`Forwarding ${message.type} from ${user.oderId} to ${targetUserId}`); diff --git a/server/src/websocket/index.ts b/server/src/websocket/index.ts index 9169080..b390b8c 100644 --- a/server/src/websocket/index.ts +++ b/server/src/websocket/index.ts @@ -6,7 +6,11 @@ import { import { WebSocketServer, WebSocket } from 'ws'; import { v4 as uuidv4 } from 'uuid'; import { connectedUsers } from './state'; -import { broadcastToServer } from './broadcast'; +import { + broadcastToServer, + getServerIdsForOderId, + isOderIdConnectedToServer +} from './broadcast'; import { handleWebSocketMessage } from './handler'; /** How often to ping all connected clients (ms). */ @@ -20,13 +24,19 @@ function removeDeadConnection(connectionId: string): void { if (user) { console.log(`Removing dead connection: ${user.displayName ?? 'Unknown'} (${user.oderId})`); + const remainingServerIds = getServerIdsForOderId(user.oderId, connectionId); + user.serverIds.forEach((sid) => { + if (isOderIdConnectedToServer(user.oderId, sid, connectionId)) { + return; + } + broadcastToServer(sid, { type: 'user_left', oderId: user.oderId, displayName: user.displayName, serverId: sid, - serverIds: [] + serverIds: remainingServerIds }, user.oderId); }); diff --git a/toju-app/src/app/core/constants.ts b/toju-app/src/app/core/constants.ts index ef87c85..fc2ab57 100644 --- a/toju-app/src/app/core/constants.ts +++ b/toju-app/src/app/core/constants.ts @@ -10,7 +10,7 @@ export const STORAGE_KEY_THEME_DRAFT = 'metoyou_theme_draft'; export const STORAGE_KEY_USER_VOLUMES = 'metoyou_user_volumes'; export const ROOM_URL_PATTERN = /\/room\/([^/]+)/; export const STORE_DEVTOOLS_MAX_AGE = 25; -export const DEBUG_LOG_MAX_ENTRIES = 500; +export const DEBUG_LOG_MAX_ENTRIES = 5000; export const DEFAULT_MAX_USERS = 50; export const DEFAULT_AUDIO_BITRATE_KBPS = 96; export const DEFAULT_VOLUME = 100; diff --git a/toju-app/src/app/core/services/debugging/debugging-network-snapshot.builder.ts b/toju-app/src/app/core/services/debugging/debugging-network-snapshot.builder.ts index 308cdc1..23725c5 100644 --- a/toju-app/src/app/core/services/debugging/debugging-network-snapshot.builder.ts +++ b/toju-app/src/app/core/services/debugging/debugging-network-snapshot.builder.ts @@ -302,7 +302,9 @@ class DebugNetworkSnapshotBuilder { case 'offer': case 'answer': case 'ice_candidate': { - const peerId = this.getPayloadString(payload, 'targetPeerId') ?? this.getPayloadString(payload, 'fromUserId'); + const peerId = direction === 'outbound' + ? (this.getPayloadString(payload, 'targetPeerId') ?? this.getPayloadString(payload, 'fromUserId')) + : (this.getPayloadString(payload, 'fromUserId') ?? this.getPayloadString(payload, 'targetPeerId')); const displayName = this.getPayloadString(payload, 'displayName'); if (!peerId) @@ -1295,7 +1297,7 @@ class DebugNetworkSnapshotBuilder { private getPayloadString(payload: Record | null, key: string): string | null { const value = this.getPayloadField(payload, key); - return typeof value === 'string' ? value : null; + return this.normalizeStringValue(value); } private getPayloadNumber(payload: Record | null, key: string): number | null { @@ -1323,7 +1325,7 @@ class DebugNetworkSnapshotBuilder { private getStringProperty(record: Record | null, key: string): string | null { const value = record?.[key]; - return typeof value === 'string' ? value : null; + return this.normalizeStringValue(value); } private getBooleanProperty(record: Record | null, key: string): boolean | null { @@ -1344,4 +1346,16 @@ class DebugNetworkSnapshotBuilder { return value as Record; } + + private normalizeStringValue(value: unknown): string | null { + if (typeof value !== 'string') + return null; + + const normalized = value.trim(); + + if (!normalized || normalized === 'undefined' || normalized === 'null') + return null; + + return normalized; + } } diff --git a/toju-app/src/app/features/room/rooms-side-panel/rooms-side-panel.component.ts b/toju-app/src/app/features/room/rooms-side-panel/rooms-side-panel.component.ts index 794bc27..7926069 100644 --- a/toju-app/src/app/features/room/rooms-side-panel/rooms-side-panel.component.ts +++ b/toju-app/src/app/features/room/rooms-side-panel/rooms-side-panel.component.ts @@ -125,8 +125,13 @@ export class RoomsSidePanelComponent { }); onlineRoomUsers = computed(() => { const memberIdentifiers = this.roomMemberIdentifiers(); + const roomId = this.currentRoom()?.id; - return this.onlineUsers().filter((user) => !this.isCurrentUserIdentity(user) && this.matchesIdentifiers(memberIdentifiers, user)); + return this.onlineUsers().filter((user) => + !this.isCurrentUserIdentity(user) + && this.matchesIdentifiers(memberIdentifiers, user) + && this.isUserPresentInRoom(user, roomId) + ); }); offlineRoomMembers = computed(() => { const onlineIdentifiers = new Set(); @@ -200,6 +205,14 @@ export class RoomsSidePanelComponent { return !!((entity.id && identifiers.has(entity.id)) || (entity.oderId && identifiers.has(entity.oderId))); } + private isUserPresentInRoom(entity: { presenceServerIds?: string[] }, roomId: string | undefined): boolean { + if (!roomId || !Array.isArray(entity.presenceServerIds) || entity.presenceServerIds.length === 0) { + return true; + } + + return entity.presenceServerIds.includes(roomId); + } + private isCurrentUserIdentity(entity: { id?: string; oderId?: string }): boolean { const current = this.currentUser(); diff --git a/toju-app/src/app/infrastructure/realtime/README.md b/toju-app/src/app/infrastructure/realtime/README.md index bb6fd66..b69d827 100644 --- a/toju-app/src/app/infrastructure/realtime/README.md +++ b/toju-app/src/app/infrastructure/realtime/README.md @@ -144,13 +144,25 @@ sequenceDiagram When the WebSocket drops, `SignalingManager` schedules reconnection with exponential backoff (1s, 2s, 4s, ... up to 30s). On reconnect it replays the cached `identify` and `join_server` messages so presence is restored without the UI doing anything. +### Server-side connection hygiene + +Browsers do not reliably fire WebSocket close events during page refresh or navigation (especially Chromium). The server's `handleIdentify` now closes any existing connection that shares the same `oderId` but a different `connectionId`. This guarantees `findUserByOderId` always routes offers and presence events to the freshest socket, eliminating a class of bugs where signaling messages landed on a dead tab's socket and were silently lost. + +Join and leave broadcasts are also identity-aware: `handleJoinServer` only broadcasts `user_joined` when the identity is genuinely new to that server (not just a second WebSocket connection for the same user), and `handleLeaveServer` / dead-connection cleanup only broadcast `user_left` when no other open connection for that identity remains in the server. The `user_left` payload includes `serverIds` listing the rooms the identity still belongs to, so the client can subtract correctly without over-removing. + +### Multi-room presence + +`server_users`, `user_joined`, and `user_left` are room-scoped presence messages, but the renderer must treat them as updates into a global multi-room presence view. The users store tracks `presenceServerIds` per user instead of clearing the whole slice when a new `server_users` snapshot arrives, so startup/search background rooms keep their server-rail voice badges and active voice peers do not disappear when the user views a different server. + +Peer routing also has to stay scoped to the signaling server that reported the membership. A `user_left` from one signaling cluster must only subtract that cluster's shared servers; otherwise a leave on `signal.toju.app` can incorrectly tear down a peer that is still shared through `signal-sweden.toju.app` or a local signaling server. Route metadata is therefore kept across peer recreation and only cleared once the renderer no longer shares any servers with that peer. + ## Peer connection lifecycle -Peers connect to each other directly with `RTCPeerConnection`. The "initiator" (whoever was already in the room) creates the data channel and audio/video transceivers, then sends an offer. The other side creates an answer. +Peers connect to each other directly with `RTCPeerConnection`. The initiator is chosen deterministically from the identified logical peer IDs so only one side creates the offer and primary data channel for a given pair. The other side creates an answer. If identity or negotiation is still settling, the retry timer defers instead of comparing against the ephemeral local transport ID or reusing a half-open peer forever. ```mermaid sequenceDiagram - participant A as Peer A (initiator) + participant A as Peer A (elected initiator) participant Sig as Signaling Server participant B as Peer B @@ -180,6 +192,16 @@ sequenceDiagram Both peers might send offers at the same time ("glare"). The negotiation module implements the "polite peer" pattern: one side is designated polite (the non-initiator) and will roll back its local offer if it detects a collision, then accept the remote offer instead. The impolite side ignores the incoming offer. +Existing members also schedule a short `user_joined` fallback offer, and the `server_users` path now re-arms the same retry when an initial attempt stalls. The joiner still tries first via its `server_users` snapshot, but the fallback heals late-join races or half-open peers where that initial offer never arrives or never finishes. The retry uses the same deterministic initiator election as the main `server_users` path so the pair cannot regress into dual initiators. + +### Non-initiator takeover + +If the elected initiator's offer never arrives (stale socket, network issue, page still loading), the non-initiator does not wait forever. It tracks the start of each waiting period in `nonInitiatorWaitStart`. For the first `NON_INITIATOR_GIVE_UP_MS` (5 s) it reschedules and logs. Once that window expires it takes over: removes any stale peer, creates a fresh `RTCPeerConnection` as initiator, and sends its own offer. This ensures every peer pair eventually establishes a connection regardless of which side was originally elected. + +### Stale peer replacement + +Offers or ICE candidates can arrive while the existing `RTCPeerConnection` for that peer is in `failed` or `closed` state (the browser's `connectionstatechange` event hasn't fired yet to clean it up). `replaceUnusablePeer()` in `negotiation.ts` detects this, closes the dead connection, removes it from the active map, and lets the caller proceed with a fresh peer. The `connectionstatechange` handler in `create-peer-connection.ts` also guards against stale events: if the connection object no longer matches the current map entry for that peer, the event is ignored so it cannot accidentally remove a replacement peer. + ### Disconnect recovery ```mermaid @@ -196,7 +218,7 @@ stateDiagram-v2 Closed --> [*] ``` -When a peer connection enters `disconnected`, a 10-second grace period starts. If it recovers on its own (network blip), nothing happens. If it reaches `failed`, the connection is torn down and a reconnect loop starts: a fresh `RTCPeerConnection` is created and a new offer is sent every 5 seconds, up to 12 attempts. +When a peer connection enters `disconnected`, a 10-second grace period starts. If it recovers on its own (network blip), nothing happens. If it reaches `failed`, the connection is torn down and a reconnect loop starts. A fresh `RTCPeerConnection` is created every 5 seconds, up to 12 attempts; only the deterministically elected initiator sends a reconnect offer, while the other side waits for that offer. ## Data channel diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/create-peer-connection.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/create-peer-connection.ts index 85dba12..d817d3e 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/create-peer-connection.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/create-peer-connection.ts @@ -31,6 +31,35 @@ export function createPeerConnection( const connection = new RTCPeerConnection({ iceServers: ICE_SERVERS }); let dataChannel: RTCDataChannel | null = null; + let peerData: PeerData | null = null; + + const adoptDataChannel = (channel: RTCDataChannel): void => { + const primaryChannel = dataChannel; + const shouldAdoptAsPrimary = !primaryChannel || primaryChannel.readyState === 'closed'; + + if (shouldAdoptAsPrimary) { + dataChannel = channel; + + if (peerData) { + peerData.dataChannel = channel; + } + + const existing = state.activePeerConnections.get(remotePeerId); + + if (existing) { + existing.dataChannel = channel; + } + } else if (primaryChannel !== channel) { + logger.info('Received secondary data channel while primary channel is still active', { + channelLabel: channel.label, + primaryChannelLabel: primaryChannel.label, + primaryReadyState: primaryChannel.readyState, + remotePeerId + }); + } + + handlers.setupDataChannel(channel, remotePeerId); + }; connection.onicecandidate = (event) => { if (event.candidate) { @@ -53,6 +82,19 @@ export function createPeerConnection( state: connection.connectionState }); + // Ignore events from a connection that was already replaced in the Map + // (e.g. handleOffer recreated the peer while this handler was still queued). + const currentPeer = state.activePeerConnections.get(remotePeerId); + + if (currentPeer && currentPeer.connection !== connection) { + logger.info('Ignoring stale connectionstatechange', { + remotePeerId, + state: connection.connectionState + }); + + return; + } + recordDebugNetworkConnectionState(remotePeerId, connection.connectionState); switch (connection.connectionState) { @@ -103,27 +145,20 @@ export function createPeerConnection( handlers.handleRemoteTrack(event, remotePeerId); }; + connection.ondatachannel = (event) => { + logger.info('Received data channel', { remotePeerId }); + adoptDataChannel(event.channel); + }; + if (isInitiator) { dataChannel = connection.createDataChannel(DATA_CHANNEL_LABEL, { ordered: true }); handlers.setupDataChannel(dataChannel, remotePeerId); - } else { - connection.ondatachannel = (event) => { - logger.info('Received data channel', { remotePeerId }); - dataChannel = event.channel; - - const existing = state.activePeerConnections.get(remotePeerId); - - if (existing) { - existing.dataChannel = dataChannel; - } - - handlers.setupDataChannel(dataChannel, remotePeerId); - }; } - const peerData: PeerData = { + peerData = { connection, dataChannel, + createdAt: Date.now(), isInitiator, pendingIceCandidates: [], audioSender: undefined, diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/negotiation.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/negotiation.ts index 6aedf6b..86274e4 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/negotiation.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/connection/negotiation.ts @@ -60,6 +60,41 @@ export async function doCreateAndSendOffer( } } +/** + * Replace a peer whose underlying connection is `failed` or `closed`. + * Returns the existing peer if still usable, or `undefined` after cleanup. + */ +function replaceUnusablePeer( + context: PeerConnectionManagerContext, + peerId: string, + reason: string +): void { + const { logger, state } = context; + const peerData = state.activePeerConnections.get(peerId); + + if (!peerData) + return; + + const cs = peerData.connection.connectionState; + + if (cs !== 'failed' && cs !== 'closed') + return; + + logger.info('Replacing unusable peer', { + connectionState: cs, + peerId, + reason, + signalingState: peerData.connection.signalingState + }); + + try { + peerData.connection.close(); + } catch { /* already closing */ } + + state.activePeerConnections.delete(peerId); + state.peerNegotiationQueue.delete(peerId); +} + export async function doHandleOffer( context: PeerConnectionManagerContext, fromUserId: string, @@ -70,6 +105,8 @@ export async function doHandleOffer( logger.info('Handling offer', { fromUserId }); + replaceUnusablePeer(context, fromUserId, 'incoming offer'); + let peerData = state.activePeerConnections.get(fromUserId); if (!peerData) { @@ -82,16 +119,15 @@ export async function doHandleOffer( signalingState === 'have-local-offer' || signalingState === 'have-local-pranswer'; if (hasCollision) { - const localId = - callbacks.getIdentifyCredentials()?.oderId || callbacks.getLocalPeerId(); - const isPolite = localId > fromUserId; + const localOderId = callbacks.getIdentifyCredentials()?.oderId ?? null; + const isPolite = !localOderId || localOderId > fromUserId; if (!isPolite) { - logger.info('Ignoring colliding offer (impolite side)', { fromUserId, localId }); + logger.info('Ignoring colliding offer (impolite side)', { fromUserId, localOderId }); return; } - logger.info('Rolling back local offer (polite side)', { fromUserId, localId }); + logger.info('Rolling back local offer (polite side)', { fromUserId, localOderId }); await peerData.connection.setLocalDescription({ type: 'rollback' @@ -211,6 +247,8 @@ export async function doHandleIceCandidate( ): Promise { const { logger, state } = context; + replaceUnusablePeer(context, fromUserId, 'early ICE'); + let peerData = state.activePeerConnections.get(fromUserId); if (!peerData) { diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts index 5978fbd..56aece0 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/messaging/data-channel.ts @@ -484,15 +484,23 @@ function summarizePeerMessage(payload: PeerMessage, base?: Record = { isConnected: voiceState['isConnected'] === true, isMuted: voiceState['isMuted'] === true, isDeafened: voiceState['isDeafened'] === true, - isSpeaking: voiceState['isSpeaking'] === true, - roomId: typeof voiceState['roomId'] === 'string' ? voiceState['roomId'] : undefined, - serverId: typeof voiceState['serverId'] === 'string' ? voiceState['serverId'] : undefined, - volume: typeof voiceState['volume'] === 'number' ? voiceState['volume'] : undefined + isSpeaking: voiceState['isSpeaking'] === true }; + + if (typeof voiceState['roomId'] === 'string') + voiceStateSummary['roomId'] = voiceState['roomId']; + + if (typeof voiceState['serverId'] === 'string') + voiceStateSummary['serverId'] = voiceState['serverId']; + + if (typeof voiceState['volume'] === 'number') + voiceStateSummary['volume'] = voiceState['volume']; + + summary['voiceState'] = voiceStateSummary; } return summary; diff --git a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts index 6d1894b..aa5458f 100644 --- a/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts +++ b/toju-app/src/app/infrastructure/realtime/peer-connection-manager/recovery/peer-recovery.ts @@ -200,23 +200,44 @@ export function schedulePeerReconnect( return; } - attemptPeerReconnect(state, peerId, handlers); + attemptPeerReconnect(context, peerId, handlers); }, PEER_RECONNECT_INTERVAL_MS); state.peerReconnectTimers.set(peerId, timer); } export function attemptPeerReconnect( - state: PeerConnectionManagerState, + context: PeerConnectionManagerContext, peerId: string, handlers: RecoveryHandlers ): void { + const { callbacks, logger, state } = context; + if (state.activePeerConnections.has(peerId)) { handlers.removePeer(peerId, { preserveReconnectState: true }); } - handlers.createPeerConnection(peerId, true); - void handlers.createAndSendOffer(peerId); + const localOderId = callbacks.getIdentifyCredentials()?.oderId ?? null; + + if (!localOderId) { + logger.info('Skipping reconnect offer until logical identity is ready', { peerId }); + handlers.createPeerConnection(peerId, false); + return; + } + + const shouldInitiate = peerId !== localOderId && localOderId < peerId; + + handlers.createPeerConnection(peerId, shouldInitiate); + + if (shouldInitiate) { + void handlers.createAndSendOffer(peerId); + return; + } + + logger.info('Waiting for remote reconnect offer based on deterministic initiator selection', { + localOderId, + peerId + }); } export function requestVoiceStateFromPeer( diff --git a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts index 2a66bdd..87ea97d 100644 --- a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts +++ b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts @@ -176,6 +176,7 @@ export class WebRTCService implements OnDestroy { }); this.signalingMessageHandler = new IncomingSignalingMessageHandler({ + getLocalOderId: () => this.signalingTransportHandler.getIdentifyCredentials()?.oderId ?? null, getEffectiveServerId: () => this.voiceSessionController.getEffectiveServerId(this.state.currentServerId), peerManager: this.peerManager, setServerTime: (serverTime) => this.timeSync.setFromServerTime(serverTime), @@ -229,7 +230,6 @@ export class WebRTCService implements OnDestroy { this.peerManager.peerDisconnected$.subscribe((peerId) => { this.remoteScreenShareRequestController.handlePeerDisconnected(peerId); - this.signalingCoordinator.deletePeerTracking(peerId); }); // Media manager → voice connected signal diff --git a/toju-app/src/app/infrastructure/realtime/realtime.types.ts b/toju-app/src/app/infrastructure/realtime/realtime.types.ts index 5b1e7c0..17ceb67 100644 --- a/toju-app/src/app/infrastructure/realtime/realtime.types.ts +++ b/toju-app/src/app/infrastructure/realtime/realtime.types.ts @@ -8,6 +8,8 @@ export interface PeerData { connection: RTCPeerConnection; /** The negotiated data channel, or `null` before the channel is established. */ dataChannel: RTCDataChannel | null; + /** Timestamp (ms since epoch) when this peer attempt was created. */ + createdAt: number; /** `true` when this side created the offer (and data channel). */ isInitiator: boolean; /** ICE candidates received before the remote description was set. */ diff --git a/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts b/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts index a77df59..674cf91 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts @@ -23,9 +23,10 @@ export class ServerSignalingCoordinator { private readonly memberServerIdsBySignalUrl = new Map>(); private readonly serverSignalingUrlMap = new Map(); private readonly peerSignalingUrlMap = new Map(); + private readonly peerKnownSignalUrls = new Map>(); private readonly signalingManagers = new Map(); private readonly signalingSubscriptions = new Map(); - private readonly peerServerMap = new Map>(); + private readonly peerServerMap = new Map>>(); constructor( private readonly callbacks: ServerSignalingCoordinatorCallbacks @@ -126,15 +127,28 @@ export class ServerSignalingCoordinator { } setPeerSignalUrl(peerId: string, signalUrl: string): void { + const knownSignalUrls = this.peerKnownSignalUrls.get(peerId) ?? new Set(); + + knownSignalUrls.add(signalUrl); + this.peerKnownSignalUrls.set(peerId, knownSignalUrls); this.peerSignalingUrlMap.set(peerId, signalUrl); } getPeerSignalUrl(peerId: string): string | undefined { - return this.peerSignalingUrlMap.get(peerId); + const preferredSignalUrl = this.peerSignalingUrlMap.get(peerId); + + if (preferredSignalUrl) { + return preferredSignalUrl; + } + + const knownSignalUrls = this.peerKnownSignalUrls.get(peerId); + + return knownSignalUrls?.values().next().value; } deletePeerSignalUrl(peerId: string): void { this.peerSignalingUrlMap.delete(peerId); + this.peerKnownSignalUrls.delete(peerId); } addJoinedServer(signalUrl: string, serverId: string): void { @@ -197,64 +211,86 @@ export class ServerSignalingCoordinator { return joinedServerIds; } - trackPeerInServer(peerId: string, serverId: string): void { - if (!peerId || !serverId) + trackPeerInServer(peerId: string, serverId: string, signalUrl: string): void { + if (!peerId || !serverId || !signalUrl) return; - const trackedServers = this.peerServerMap.get(peerId) ?? new Set(); + const trackedSignalUrls = this.peerServerMap.get(peerId) ?? new Map>(); + const trackedServers = trackedSignalUrls.get(signalUrl) ?? new Set(); trackedServers.add(serverId); - this.peerServerMap.set(peerId, trackedServers); + trackedSignalUrls.set(signalUrl, trackedServers); + this.peerServerMap.set(peerId, trackedSignalUrls); + this.setPeerSignalUrl(peerId, signalUrl); } hasTrackedPeerServers(peerId: string): boolean { - return this.peerServerMap.has(peerId); + return this.getTrackedServerIds(peerId).size > 0; } - replacePeerSharedServers(peerId: string, serverIds: string[]): boolean { + replacePeerSharedServers(peerId: string, signalUrl: string, serverIds: string[]): boolean { const sharedServerIds = serverIds.filter((serverId) => this.hasJoinedServer(serverId)); if (sharedServerIds.length === 0) { - this.peerServerMap.delete(peerId); - return false; + this.removePeerSignalScope(peerId, signalUrl); + return this.hasTrackedPeerServers(peerId); } - this.peerServerMap.set(peerId, new Set(sharedServerIds)); + const trackedSignalUrls = this.peerServerMap.get(peerId) ?? new Map>(); + + trackedSignalUrls.set(signalUrl, new Set(sharedServerIds)); + this.peerServerMap.set(peerId, trackedSignalUrls); + this.setPeerSignalUrl(peerId, signalUrl); + return true; } - untrackPeerFromServer(peerId: string, serverId: string): boolean { - const trackedServers = this.peerServerMap.get(peerId); + untrackPeerFromServer(peerId: string, signalUrl: string, serverId: string): boolean { + const trackedSignalUrls = this.peerServerMap.get(peerId); + + if (!trackedSignalUrls) + return false; + + const trackedServers = trackedSignalUrls.get(signalUrl); if (!trackedServers) - return false; + return this.hasTrackedPeerServers(peerId); trackedServers.delete(serverId); if (trackedServers.size === 0) { + trackedSignalUrls.delete(signalUrl); + this.untrackPeerSignalUrl(peerId, signalUrl); + } else { + trackedSignalUrls.set(signalUrl, trackedServers); + } + + if (trackedSignalUrls.size === 0) { this.peerServerMap.delete(peerId); return false; } - this.peerServerMap.set(peerId, trackedServers); + this.peerServerMap.set(peerId, trackedSignalUrls); return true; } deletePeerTracking(peerId: string): void { this.peerServerMap.delete(peerId); this.peerSignalingUrlMap.delete(peerId); + this.peerKnownSignalUrls.delete(peerId); } clearPeerTracking(): void { this.peerServerMap.clear(); this.peerSignalingUrlMap.clear(); + this.peerKnownSignalUrls.clear(); } getPeersOutsideServer(serverId: string): string[] { const peersToClose: string[] = []; - this.peerServerMap.forEach((peerServerIds, peerId) => { - if (!peerServerIds.has(serverId)) { + this.peerServerMap.forEach((_peerServerIdsBySignalUrl, peerId) => { + if (!this.getTrackedServerIds(peerId).has(serverId)) { peersToClose.push(peerId); } }); @@ -292,4 +328,64 @@ export class ServerSignalingCoordinator { this.memberServerIdsBySignalUrl.set(signalUrl, createdSet); return createdSet; } + + private getTrackedServerIds(peerId: string): Set { + const trackedServerIds = new Set(); + const trackedSignalUrls = this.peerServerMap.get(peerId); + + if (!trackedSignalUrls) { + return trackedServerIds; + } + + trackedSignalUrls.forEach((serverIds) => { + serverIds.forEach((serverId) => trackedServerIds.add(serverId)); + }); + + return trackedServerIds; + } + + private removePeerSignalScope(peerId: string, signalUrl: string): void { + const trackedSignalUrls = this.peerServerMap.get(peerId); + + if (!trackedSignalUrls) { + this.untrackPeerSignalUrl(peerId, signalUrl); + return; + } + + trackedSignalUrls.delete(signalUrl); + + if (trackedSignalUrls.size === 0) { + this.peerServerMap.delete(peerId); + } else { + this.peerServerMap.set(peerId, trackedSignalUrls); + } + + this.untrackPeerSignalUrl(peerId, signalUrl); + } + + private untrackPeerSignalUrl(peerId: string, signalUrl: string): void { + const knownSignalUrls = this.peerKnownSignalUrls.get(peerId); + + if (!knownSignalUrls) { + if (this.peerSignalingUrlMap.get(peerId) === signalUrl) { + this.peerSignalingUrlMap.delete(peerId); + } + + return; + } + + knownSignalUrls.delete(signalUrl); + + if (knownSignalUrls.size === 0) { + this.peerKnownSignalUrls.delete(peerId); + this.peerSignalingUrlMap.delete(peerId); + return; + } + + this.peerKnownSignalUrls.set(peerId, knownSignalUrls); + + if (this.peerSignalingUrlMap.get(peerId) === signalUrl) { + this.peerSignalingUrlMap.set(peerId, knownSignalUrls.values().next().value as string); + } + } } diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts index aa1f801..12634b8 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts @@ -39,11 +39,26 @@ interface IncomingSignalingMessageHandlerDependencies { peerManager: PeerConnectionManager; signalingCoordinator: ServerSignalingCoordinator; logger: WebRTCLogger; + getLocalOderId(): string | null; getEffectiveServerId(): string | null; setServerTime(serverTime: number): void; } +const USER_JOINED_FALLBACK_OFFER_DELAY_MS = 1_000; +const PEER_NEGOTIATION_GRACE_MS = 3_000; +// Once a local offer has been sent, the peer is actively in negotiation - wait much +// longer before treating it as stale, so a slow answer path doesn't cause an +// unnecessary teardown/re-offer cycle. +const PEER_NEGOTIATION_OFFER_SENT_GRACE_MS = 20_000; +// How long the non-initiator waits for the elected initiator's offer before +// giving up and creating the connection itself. +const NON_INITIATOR_GIVE_UP_MS = 5_000; + export class IncomingSignalingMessageHandler { + private readonly userJoinedFallbackTimers = new Map>(); + /** Tracks when we first started waiting for a remote-initiated offer from each peer. */ + private readonly nonInitiatorWaitStart = new Map(); + constructor( private readonly dependencies: IncomingSignalingMessageHandlerDependencies ) {} @@ -105,6 +120,7 @@ export class IncomingSignalingMessageHandler { private handleServerUsersSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void { const users = Array.isArray(message.users) ? message.users : []; + const localOderId = this.dependencies.getLocalOderId(); this.dependencies.logger.info('Server users', { count: users.length, @@ -120,15 +136,22 @@ export class IncomingSignalingMessageHandler { if (!user.oderId) continue; + if (localOderId && user.oderId === localOderId) + continue; + + this.clearUserJoinedFallbackOffer(user.oderId); + this.dependencies.signalingCoordinator.setPeerSignalUrl(user.oderId, signalUrl); if (message.serverId) { - this.dependencies.signalingCoordinator.trackPeerInServer(user.oderId, message.serverId); + this.dependencies.signalingCoordinator.trackPeerInServer(user.oderId, message.serverId, signalUrl); } const existing = this.dependencies.peerManager.activePeerConnections.get(user.oderId); - if (this.canReusePeerConnection(existing)) { + if (this.hasActivePeerConnection(existing)) { + // Peer is already up - move on (timer already cleared above). + this.nonInitiatorWaitStart.delete(user.oderId); this.dependencies.logger.info('Reusing active peer connection', { connectionState: existing?.connection.connectionState ?? 'unknown', dataChannelState: existing?.dataChannel?.readyState ?? 'missing', @@ -140,6 +163,56 @@ export class IncomingSignalingMessageHandler { continue; } + this.scheduleUserJoinedFallbackOffer(user.oderId, signalUrl, message.serverId); + + if (this.isPeerConnectionNegotiating(existing)) { + this.dependencies.logger.info('Awaiting existing peer negotiation from server_users snapshot', { + ageMs: existing ? Date.now() - existing.createdAt : undefined, + connectionState: existing?.connection.connectionState ?? 'unknown', + dataChannelState: existing?.dataChannel?.readyState ?? 'missing', + oderId: user.oderId, + serverId: message.serverId, + signalUrl + }); + + continue; + } + + if (!localOderId) { + this.dependencies.logger.info('Deferring server_users peer initiation until logical identity is ready', { + oderId: user.oderId, + serverId: message.serverId, + signalUrl + }); + + continue; + } + + const shouldInitiate = this.shouldInitiatePeer(user.oderId, localOderId); + + if (!shouldInitiate) { + if (existing) { + this.dependencies.logger.info('Removing stale peer while waiting for remote offer', { + connectionState: existing.connection.connectionState, + dataChannelState: existing.dataChannel?.readyState ?? 'missing', + oderId: user.oderId, + serverId: message.serverId, + signalUrl + }); + + this.dependencies.peerManager.removePeer(user.oderId); + } + + this.dependencies.logger.info('Waiting for remote offer based on deterministic initiator selection', { + localOderId, + oderId: user.oderId, + serverId: message.serverId, + signalUrl + }); + + continue; + } + if (existing) { this.dependencies.logger.info('Removing failed peer before recreate', { connectionState: existing.connection.connectionState, @@ -164,6 +237,10 @@ export class IncomingSignalingMessageHandler { } private handleUserJoinedSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void { + if (message.oderId && message.oderId === this.dependencies.getLocalOderId()) { + return; + } + this.dependencies.logger.info('User joined', { displayName: message.displayName, oderId: message.oderId, @@ -179,11 +256,27 @@ export class IncomingSignalingMessageHandler { } if (message.oderId && message.serverId) { - this.dependencies.signalingCoordinator.trackPeerInServer(message.oderId, message.serverId); + this.dependencies.signalingCoordinator.trackPeerInServer(message.oderId, message.serverId, signalUrl); + } + + if (message.oderId) { + const existing = this.dependencies.peerManager.activePeerConnections.get(message.oderId); + + if (this.hasActivePeerConnection(existing)) { + // Already connected - cancel any stale timer and move on. + this.clearUserJoinedFallbackOffer(message.oderId); + this.nonInitiatorWaitStart.delete(message.oderId); + } else { + this.scheduleUserJoinedFallbackOffer(message.oderId, signalUrl, message.serverId); + } } } private handleUserLeftSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void { + if (message.oderId && message.oderId === this.dependencies.getLocalOderId()) { + return; + } + this.dependencies.logger.info('User left', { displayName: message.displayName, oderId: message.oderId, @@ -192,10 +285,13 @@ export class IncomingSignalingMessageHandler { }); if (message.oderId) { + this.clearUserJoinedFallbackOffer(message.oderId); + this.nonInitiatorWaitStart.delete(message.oderId); + const hasRemainingSharedServers = Array.isArray(message.serverIds) - ? this.dependencies.signalingCoordinator.replacePeerSharedServers(message.oderId, message.serverIds) + ? this.dependencies.signalingCoordinator.replacePeerSharedServers(message.oderId, signalUrl, message.serverIds) : (message.serverId - ? this.dependencies.signalingCoordinator.untrackPeerFromServer(message.oderId, message.serverId) + ? this.dependencies.signalingCoordinator.untrackPeerFromServer(message.oderId, signalUrl, message.serverId) : false); if (!hasRemainingSharedServers) { @@ -212,12 +308,18 @@ export class IncomingSignalingMessageHandler { if (!fromUserId || !sdp) return; + if (fromUserId === this.dependencies.getLocalOderId()) + return; + + this.clearUserJoinedFallbackOffer(fromUserId); + this.nonInitiatorWaitStart.delete(fromUserId); + this.dependencies.signalingCoordinator.setPeerSignalUrl(fromUserId, signalUrl); const effectiveServerId = this.dependencies.getEffectiveServerId(); if (effectiveServerId && !this.dependencies.signalingCoordinator.hasTrackedPeerServers(fromUserId)) { - this.dependencies.signalingCoordinator.trackPeerInServer(fromUserId, effectiveServerId); + this.dependencies.signalingCoordinator.trackPeerInServer(fromUserId, effectiveServerId, signalUrl); } this.dependencies.peerManager.handleOffer(fromUserId, sdp); @@ -230,6 +332,11 @@ export class IncomingSignalingMessageHandler { if (!fromUserId || !sdp) return; + if (fromUserId === this.dependencies.getLocalOderId()) + return; + + this.clearUserJoinedFallbackOffer(fromUserId); + this.dependencies.signalingCoordinator.setPeerSignalUrl(fromUserId, signalUrl); this.dependencies.peerManager.handleAnswer(fromUserId, sdp); } @@ -241,16 +348,197 @@ export class IncomingSignalingMessageHandler { if (!fromUserId || !candidate) return; + if (fromUserId === this.dependencies.getLocalOderId()) + return; + + this.clearUserJoinedFallbackOffer(fromUserId); + this.dependencies.signalingCoordinator.setPeerSignalUrl(fromUserId, signalUrl); this.dependencies.peerManager.handleIceCandidate(fromUserId, candidate); } - private canReusePeerConnection(peer: PeerData | undefined): boolean { + private scheduleUserJoinedFallbackOffer(peerId: string, signalUrl: string, serverId?: string): void { + this.clearUserJoinedFallbackOffer(peerId); + + const timer = setTimeout(() => { + this.userJoinedFallbackTimers.delete(peerId); + const localOderId = this.dependencies.getLocalOderId(); + const existing = this.dependencies.peerManager.activePeerConnections.get(peerId); + + if (this.hasActivePeerConnection(existing)) { + this.nonInitiatorWaitStart.delete(peerId); + this.dependencies.logger.info('Skip user_joined fallback offer - peer already active', { + connectionState: existing?.connection.connectionState ?? 'unknown', + dataChannelState: existing?.dataChannel?.readyState ?? 'missing', + oderId: peerId, + serverId, + signalUrl + }); + + return; + } + + if (!localOderId) { + this.dependencies.logger.info('Retrying peer initiation once logical identity is ready', { + oderId: peerId, + serverId, + signalUrl + }); + + this.scheduleUserJoinedFallbackOffer(peerId, signalUrl, serverId); + return; + } + + if (this.isPeerConnectionNegotiating(existing)) { + this.dependencies.logger.info('Delaying fallback offer while peer negotiation is still in progress', { + ageMs: existing ? Date.now() - existing.createdAt : undefined, + connectionState: existing?.connection.connectionState ?? 'unknown', + dataChannelState: existing?.dataChannel?.readyState ?? 'missing', + localOderId, + oderId: peerId, + serverId, + signalUrl + }); + + this.scheduleUserJoinedFallbackOffer(peerId, signalUrl, serverId); + return; + } + + const shouldInitiate = this.shouldInitiatePeer(peerId, localOderId); + + if (!shouldInitiate) { + // Track how long we've been waiting for the remote initiator's offer. + if (!this.nonInitiatorWaitStart.has(peerId)) { + this.nonInitiatorWaitStart.set(peerId, Date.now()); + } + + const waitStart = this.nonInitiatorWaitStart.get(peerId) ?? Date.now(); + const waitMs = Date.now() - waitStart; + + if (waitMs < NON_INITIATOR_GIVE_UP_MS) { + this.dependencies.logger.info('Waiting for remote initiator offer', { + localOderId, + oderId: peerId, + serverId, + signalUrl, + waitMs + }); + + this.scheduleUserJoinedFallbackOffer(peerId, signalUrl, serverId); + return; + } + + // The elected initiator never sent an offer - take over. + this.nonInitiatorWaitStart.delete(peerId); + + if (existing) { + this.dependencies.logger.info('Removing stale peer before non-initiator takeover offer', { + connectionState: existing.connection.connectionState, + dataChannelState: existing.dataChannel?.readyState ?? 'missing', + localOderId, + oderId: peerId, + serverId, + signalUrl, + waitMs + }); + + this.dependencies.peerManager.removePeer(peerId); + } + + this.dependencies.logger.info('Non-initiator takeover - creating peer connection after remote initiator timeout', { + localOderId, + oderId: peerId, + serverId, + signalUrl, + waitMs + }); + + this.dependencies.peerManager.createPeerConnection(peerId, true); + void this.dependencies.peerManager.createAndSendOffer(peerId); + this.scheduleUserJoinedFallbackOffer(peerId, signalUrl, serverId); + return; + } + + if (existing) { + this.dependencies.logger.info('Removing stale peer before user_joined fallback offer', { + connectionState: existing.connection.connectionState, + dataChannelState: existing.dataChannel?.readyState ?? 'missing', + oderId: peerId, + serverId, + signalUrl + }); + + this.dependencies.peerManager.removePeer(peerId); + } + + this.dependencies.logger.info('Create peer connection from user_joined fallback offer', { + oderId: peerId, + serverId, + signalUrl + }); + + this.dependencies.peerManager.createPeerConnection(peerId, true); + void this.dependencies.peerManager.createAndSendOffer(peerId); + + this.scheduleUserJoinedFallbackOffer(peerId, signalUrl, serverId); + }, USER_JOINED_FALLBACK_OFFER_DELAY_MS); + + this.userJoinedFallbackTimers.set(peerId, timer); + } + + private clearUserJoinedFallbackOffer(peerId: string): void { + const timer = this.userJoinedFallbackTimers.get(peerId); + + if (!timer) { + return; + } + + clearTimeout(timer); + this.userJoinedFallbackTimers.delete(peerId); + } + + private shouldInitiatePeer(peerId: string, localOderId: string | null = this.dependencies.getLocalOderId()): boolean { + if (!localOderId) + return false; + + if (peerId === localOderId) + return false; + + return localOderId < peerId; + } + + private hasActivePeerConnection(peer: PeerData | undefined): boolean { if (!peer) return false; const connectionState = peer.connection?.connectionState; - return connectionState !== 'closed' && connectionState !== 'failed'; + return connectionState === 'connected' || peer.dataChannel?.readyState === 'open'; + } + + private isPeerConnectionNegotiating(peer: PeerData | undefined): boolean { + if (!peer || this.hasActivePeerConnection(peer)) + return false; + + const connectionState = peer.connection?.connectionState; + + if (connectionState === 'closed' || connectionState === 'failed') + return false; + + const signalingState = peer.connection?.signalingState; + const ageMs = Date.now() - peer.createdAt; + + // If a local offer (or pranswer) has already been sent, the peer is actively + // negotiating with the remote side. Use a much longer grace period so that + // a slow signaling round-trip does not trigger a premature teardown. + if (signalingState === 'have-local-offer' || signalingState === 'have-local-pranswer') + return ageMs < PEER_NEGOTIATION_OFFER_SENT_GRACE_MS; + + // ICE negotiation in progress (offer/answer exchange already complete, candidates being checked). + // TURN relay can take 5-15 s on high-latency networks, so use the same extended grace. + if (connectionState === 'connecting') + return ageMs < PEER_NEGOTIATION_OFFER_SENT_GRACE_MS; + + return ageMs < PEER_NEGOTIATION_GRACE_MS; } } diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts index 14c3d11..855015e 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts @@ -95,6 +95,7 @@ export class SignalingTransportHandler { sendRawMessage(message: Record): void { const targetPeerId = typeof message['targetUserId'] === 'string' ? message['targetUserId'] : null; + const messageType = typeof message['type'] === 'string' ? message['type'] : 'unknown'; if (targetPeerId) { const targetSignalUrl = this.dependencies.signalingCoordinator.getPeerSignalUrl(targetPeerId); @@ -102,6 +103,11 @@ export class SignalingTransportHandler { if (targetSignalUrl && this.sendRawMessageToSignalUrl(targetSignalUrl, message)) { return; } + + this.dependencies.logger.warn('[signaling] Missing peer signal route for outbound raw message', { + targetPeerId, + type: messageType + }); } const serverId = typeof message['serverId'] === 'string' ? message['serverId'] : null; @@ -118,12 +124,19 @@ export class SignalingTransportHandler { if (connectedManagers.length === 0) { this.dependencies.logger.error('[signaling] No active signaling connection for outbound message', new Error('No signaling manager available'), { - type: typeof message['type'] === 'string' ? message['type'] : 'unknown' + type: messageType }); return; } + this.dependencies.logger.warn('[signaling] Broadcasting raw message to all signaling managers due to unresolved route', { + connectedSignalUrls: connectedManagers.map(({ signalUrl }) => signalUrl), + serverId, + targetPeerId, + type: messageType + }); + for (const { manager } of connectedManagers) { manager.sendRawMessage(message); } diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts index 26cd6e1..e7de4a7 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts @@ -3,7 +3,11 @@ * Manages the WebSocket connection to the signaling server, * including automatic reconnection and heartbeats. */ -import { Observable, Subject } from 'rxjs'; +import { + Observable, + Subject, + of +} from 'rxjs'; import type { SignalingMessage } from '../../../shared-kernel'; import { recordDebugNetworkSignalingPayload } from '../logging/debug-network-metrics'; import { IdentifyCredentials, JoinedServerInfo } from '../realtime.types'; @@ -54,19 +58,42 @@ export class SignalingManager { /** Open (or re-open) a WebSocket to the signaling server. */ connect(serverUrl: string): Observable { + if (this.lastSignalingUrl === serverUrl) { + if (this.isSocketOpen()) { + return of(true); + } + + if (this.isSocketConnecting()) { + return this.waitForOpen(); + } + } + this.lastSignalingUrl = serverUrl; return new Observable((observer) => { try { this.logger.info('[signaling] Connecting to signaling server', { serverUrl }); - if (this.signalingWebSocket) { - this.signalingWebSocket.close(); - } + const previousSocket = this.signalingWebSocket; this.lastSignalingUrl = serverUrl; - this.signalingWebSocket = new WebSocket(serverUrl); + const socket = new WebSocket(serverUrl); + + this.signalingWebSocket = socket; + + if (previousSocket && previousSocket !== socket) { + try { + previousSocket.close(); + } catch { + this.logger.warn('[signaling] Failed to close previous signaling socket', { + url: serverUrl + }); + } + } + + socket.onopen = () => { + if (socket !== this.signalingWebSocket) + return; - this.signalingWebSocket.onopen = () => { this.logger.info('[signaling] Connected to signaling server', { serverUrl, readyState: this.getSocketReadyStateLabel() @@ -77,9 +104,13 @@ export class SignalingManager { this.connectionStatus$.next({ connected: true }); this.reIdentifyAndRejoin(); observer.next(true); + observer.complete(); }; - this.signalingWebSocket.onmessage = (event) => { + socket.onmessage = (event) => { + if (socket !== this.signalingWebSocket) + return; + const rawPayload = this.stringifySocketPayload(event.data); const payloadBytes = rawPayload ? this.measurePayloadBytes(rawPayload) : null; @@ -109,7 +140,10 @@ export class SignalingManager { } }; - this.signalingWebSocket.onerror = (error) => { + socket.onerror = (error) => { + if (socket !== this.signalingWebSocket) + return; + this.logger.error('[signaling] Signaling socket error', error, { readyState: this.getSocketReadyStateLabel(), url: serverUrl @@ -121,7 +155,10 @@ export class SignalingManager { observer.error(error); }; - this.signalingWebSocket.onclose = (event) => { + socket.onclose = (event) => { + if (socket !== this.signalingWebSocket) + return; + this.logger.warn('[signaling] Disconnected from signaling server', { attempts: this.signalingReconnectAttempts, code: event.code, @@ -216,9 +253,12 @@ export class SignalingManager { this.stopHeartbeat(); this.clearReconnect(); - if (this.signalingWebSocket) { - this.signalingWebSocket.close(); - this.signalingWebSocket = null; + const socket = this.signalingWebSocket; + + this.signalingWebSocket = null; + + if (socket) { + socket.close(); } } @@ -227,6 +267,10 @@ export class SignalingManager { return this.signalingWebSocket !== null && this.signalingWebSocket.readyState === WebSocket.OPEN; } + isSocketConnecting(): boolean { + return this.signalingWebSocket !== null && this.signalingWebSocket.readyState === WebSocket.CONNECTING; + } + /** The URL last used to connect (needed for reconnection). */ getLastUrl(): string | null { return this.lastSignalingUrl; @@ -273,7 +317,7 @@ export class SignalingManager { * No-ops if a timer is already pending or no URL is stored. */ private scheduleReconnect(): void { - if (this.signalingReconnectTimer || !this.lastSignalingUrl) + if (this.signalingReconnectTimer || !this.lastSignalingUrl || this.isSocketOpen() || this.isSocketConnecting()) return; const delay = Math.min( @@ -283,6 +327,11 @@ export class SignalingManager { this.signalingReconnectTimer = setTimeout(() => { this.signalingReconnectTimer = null; + + if (this.isSocketOpen() || this.isSocketConnecting()) { + return; + } + this.signalingReconnectAttempts++; this.logger.info('[signaling] Attempting reconnect', { attempt: this.signalingReconnectAttempts, @@ -297,6 +346,44 @@ export class SignalingManager { }, delay); } + private waitForOpen(timeoutMs: number = SIGNALING_CONNECT_TIMEOUT_MS): Observable { + if (this.isSocketOpen()) { + return of(true); + } + + return new Observable((observer) => { + let settled = false; + + const subscription = this.connectionStatus$.subscribe(({ connected }) => { + if (!connected || settled) { + return; + } + + settled = true; + clearTimeout(timeout); + subscription.unsubscribe(); + observer.next(true); + observer.complete(); + }); + const timeout = setTimeout(() => { + if (settled) { + return; + } + + settled = true; + subscription.unsubscribe(); + observer.next(this.isSocketOpen()); + observer.complete(); + }, timeoutMs); + + return () => { + settled = true; + clearTimeout(timeout); + subscription.unsubscribe(); + }; + }); + } + /** Cancel any pending reconnect timer and reset the attempt counter. */ private clearReconnect(): void { if (this.signalingReconnectTimer) { @@ -415,21 +502,23 @@ export class SignalingManager { const record = payload as Record; const voiceState = this.summarizeVoiceState(record['voiceState']); const users = this.summarizeUsers(record['users']); - - return { - displayName: typeof record['displayName'] === 'string' ? record['displayName'] : undefined, - fromUserId: typeof record['fromUserId'] === 'string' ? record['fromUserId'] : undefined, - isScreenSharing: typeof record['isScreenSharing'] === 'boolean' ? record['isScreenSharing'] : undefined, + const preview: Record = { keys: Object.keys(record).slice(0, 10), - oderId: typeof record['oderId'] === 'string' ? record['oderId'] : undefined, - roomId: typeof record['serverId'] === 'string' ? record['serverId'] : undefined, - serverId: typeof record['serverId'] === 'string' ? record['serverId'] : undefined, - targetPeerId: typeof record['targetUserId'] === 'string' ? record['targetUserId'] : undefined, - type: typeof record['type'] === 'string' ? record['type'] : 'unknown', - userCount: Array.isArray(record['users']) ? record['users'].length : undefined, - users, - voiceState + type: typeof record['type'] === 'string' ? record['type'] : 'unknown' }; + + this.assignPreviewValue(preview, 'displayName', typeof record['displayName'] === 'string' ? record['displayName'] : undefined); + this.assignPreviewValue(preview, 'fromUserId', typeof record['fromUserId'] === 'string' ? record['fromUserId'] : undefined); + this.assignPreviewValue(preview, 'isScreenSharing', typeof record['isScreenSharing'] === 'boolean' ? record['isScreenSharing'] : undefined); + this.assignPreviewValue(preview, 'oderId', typeof record['oderId'] === 'string' ? record['oderId'] : undefined); + this.assignPreviewValue(preview, 'roomId', typeof record['roomId'] === 'string' ? record['roomId'] : undefined); + this.assignPreviewValue(preview, 'serverId', typeof record['serverId'] === 'string' ? record['serverId'] : undefined); + this.assignPreviewValue(preview, 'targetPeerId', typeof record['targetUserId'] === 'string' ? record['targetUserId'] : undefined); + this.assignPreviewValue(preview, 'userCount', Array.isArray(record['users']) ? record['users'].length : undefined); + this.assignPreviewValue(preview, 'users', users); + this.assignPreviewValue(preview, 'voiceState', voiceState); + + return preview; } private summarizeVoiceState(value: unknown): Record | undefined { @@ -438,15 +527,18 @@ export class SignalingManager { if (!voiceState) return undefined; - return { + const summary: Record = { isConnected: voiceState['isConnected'] === true, isMuted: voiceState['isMuted'] === true, isDeafened: voiceState['isDeafened'] === true, - isSpeaking: voiceState['isSpeaking'] === true, - roomId: typeof voiceState['roomId'] === 'string' ? voiceState['roomId'] : undefined, - serverId: typeof voiceState['serverId'] === 'string' ? voiceState['serverId'] : undefined, - volume: typeof voiceState['volume'] === 'number' ? voiceState['volume'] : undefined + isSpeaking: voiceState['isSpeaking'] === true }; + + this.assignPreviewValue(summary, 'roomId', typeof voiceState['roomId'] === 'string' ? voiceState['roomId'] : undefined); + this.assignPreviewValue(summary, 'serverId', typeof voiceState['serverId'] === 'string' ? voiceState['serverId'] : undefined); + this.assignPreviewValue(summary, 'volume', typeof voiceState['volume'] === 'number' ? voiceState['volume'] : undefined); + + return summary; } private summarizeUsers(value: unknown): Record[] | undefined { @@ -461,15 +553,22 @@ export class SignalingManager { if (!user) continue; - users.push({ - displayName: typeof user['displayName'] === 'string' ? user['displayName'] : undefined, - oderId: typeof user['oderId'] === 'string' ? user['oderId'] : undefined - }); + const summary: Record = {}; + + this.assignPreviewValue(summary, 'displayName', typeof user['displayName'] === 'string' ? user['displayName'] : undefined); + this.assignPreviewValue(summary, 'oderId', typeof user['oderId'] === 'string' ? user['oderId'] : undefined); + + users.push(summary); } return users; } + private assignPreviewValue(target: Record, key: string, value: unknown): void { + if (value !== undefined) + target[key] = value; + } + private asRecord(value: unknown): Record | null { if (!value || typeof value !== 'object' || Array.isArray(value)) return null; diff --git a/toju-app/src/app/shared-kernel/user.models.ts b/toju-app/src/app/shared-kernel/user.models.ts index 53396b5..de3195b 100644 --- a/toju-app/src/app/shared-kernel/user.models.ts +++ b/toju-app/src/app/shared-kernel/user.models.ts @@ -21,6 +21,7 @@ export interface User { isOnline?: boolean; isAdmin?: boolean; isRoomOwner?: boolean; + presenceServerIds?: string[]; voiceState?: VoiceState; screenShareState?: ScreenShareState; cameraState?: CameraState; diff --git a/toju-app/src/app/store/rooms/room-members-sync.effects.ts b/toju-app/src/app/store/rooms/room-members-sync.effects.ts index 1eaa04c..9a19b74 100644 --- a/toju-app/src/app/store/rooms/room-members-sync.effects.ts +++ b/toju-app/src/app/store/rooms/room-members-sync.effects.ts @@ -416,7 +416,10 @@ export class RoomMembersSyncEffects { if (currentRoom?.id === room.id && departedUserId) { actions.push( - UsersActions.userLeft({ userId: departedUserId }) + UsersActions.userLeft({ + userId: departedUserId, + serverId: room.id + }) ); } diff --git a/toju-app/src/app/store/rooms/rooms.effects.ts b/toju-app/src/app/store/rooms/rooms.effects.ts index 960f6ef..06bb1c1 100644 --- a/toju-app/src/app/store/rooms/rooms.effects.ts +++ b/toju-app/src/app/store/rooms/rooms.effects.ts @@ -163,6 +163,7 @@ interface RoomPresenceSignalingMessage { type: string; reason?: string; serverId?: string; + serverIds?: string[]; users?: { oderId: string; displayName: string }[]; oderId?: string; displayName?: string; @@ -185,8 +186,8 @@ export class RoomsEffects { /** * Tracks user IDs we already know are in voice. Lives outside the - * NgRx store so it survives `clearUsers()` dispatched on server switches - * and prevents false join/leave sounds during state re-syncs. + * NgRx store so it survives room switches and presence re-syncs, + * preventing false join/leave sounds during state refreshes. */ private knownVoiceUsers = new Set(); private roomNavigationRequestVersion = 0; @@ -696,15 +697,11 @@ export class RoomsEffects { ) ); - /** Reloads messages and users when the viewed server changes. */ + /** Reloads messages and bans when the viewed server changes. */ onViewServerSuccess$ = createEffect(() => this.actions$.pipe( ofType(RoomsActions.viewServerSuccess), - mergeMap(({ room }) => [ - UsersActions.clearUsers(), - MessagesActions.loadMessages({ roomId: room.id }), - UsersActions.loadBans() - ]) + mergeMap(({ room }) => [MessagesActions.loadMessages({ roomId: room.id }), UsersActions.loadBans()]) ) ); @@ -1199,52 +1196,63 @@ export class RoomsEffects { ) ); - /** Clears messages and users from the store when leaving a room. */ + /** Clears viewed messages when leaving a room. */ onLeaveRoom$ = createEffect(() => this.actions$.pipe( ofType(RoomsActions.leaveRoomSuccess), - mergeMap(() => { - this.knownVoiceUsers.clear(); - return [MessagesActions.clearMessages(), UsersActions.clearUsers()]; - }) + mergeMap(() => [MessagesActions.clearMessages()]) ) ); /** Handles WebRTC signaling events for user presence (join, leave, server_users). */ signalingMessages$ = createEffect(() => this.webrtc.onSignalingMessage.pipe( - withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom)), + withLatestFrom( + this.store.select(selectCurrentUser), + this.store.select(selectCurrentRoom), + this.store.select(selectSavedRooms) + ), mergeMap(([ message, currentUser, - currentRoom + currentRoom, + savedRooms ]) => { const signalingMessage: RoomPresenceSignalingMessage = message; const myId = currentUser?.oderId || currentUser?.id; const viewedServerId = currentRoom?.id; + const room = this.resolveRoom(signalingMessage.serverId, currentRoom, savedRooms); + const shouldClearReconnectFlag = !isWrongServer(signalingMessage.serverId, viewedServerId); switch (signalingMessage.type) { case 'server_users': { - if (!signalingMessage.users || isWrongServer(signalingMessage.serverId, viewedServerId)) + if (!Array.isArray(signalingMessage.users) || !signalingMessage.serverId) return EMPTY; - const joinActions = signalingMessage.users + const syncedUsers = signalingMessage.users .filter((u) => u.oderId !== myId) .map((u) => - UsersActions.userJoined({ - user: buildSignalingUser(u, buildKnownUserExtras(currentRoom, u.oderId)) + buildSignalingUser(u, { + ...buildKnownUserExtras(room, u.oderId), + presenceServerIds: [signalingMessage.serverId] }) ); - - return [ - RoomsActions.setSignalServerReconnecting({ isReconnecting: false }), - UsersActions.clearUsers(), - ...joinActions + const actions: Action[] = [ + UsersActions.syncServerPresence({ + roomId: signalingMessage.serverId, + users: syncedUsers + }) ]; + + if (shouldClearReconnectFlag) { + actions.unshift(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); + } + + return actions; } case 'user_joined': { - if (isWrongServer(signalingMessage.serverId, viewedServerId) || signalingMessage.oderId === myId) + if (!signalingMessage.serverId || signalingMessage.oderId === myId) return EMPTY; if (!signalingMessage.oderId) @@ -1254,24 +1262,47 @@ export class RoomsEffects { oderId: signalingMessage.oderId, displayName: signalingMessage.displayName }; - - return [ - RoomsActions.setSignalServerReconnecting({ isReconnecting: false }), + const actions: Action[] = [ UsersActions.userJoined({ - user: buildSignalingUser(joinedUser, buildKnownUserExtras(currentRoom, joinedUser.oderId)) + user: buildSignalingUser(joinedUser, { + ...buildKnownUserExtras(room, joinedUser.oderId), + presenceServerIds: [signalingMessage.serverId] + }) }) ]; + + if (shouldClearReconnectFlag) { + actions.unshift(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); + } + + return actions; } case 'user_left': { - if (isWrongServer(signalingMessage.serverId, viewedServerId)) - return EMPTY; - if (!signalingMessage.oderId) return EMPTY; - this.knownVoiceUsers.delete(signalingMessage.oderId); - return [RoomsActions.setSignalServerReconnecting({ isReconnecting: false }), UsersActions.userLeft({ userId: signalingMessage.oderId })]; + const remainingServerIds = Array.isArray(signalingMessage.serverIds) + ? signalingMessage.serverIds + : undefined; + + if (!remainingServerIds || remainingServerIds.length === 0) { + this.knownVoiceUsers.delete(signalingMessage.oderId); + } + + const actions: Action[] = [ + UsersActions.userLeft({ + userId: signalingMessage.oderId, + serverId: signalingMessage.serverId, + serverIds: remainingServerIds + }) + ]; + + if (shouldClearReconnectFlag) { + actions.unshift(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); + } + + return actions; } case 'access_denied': { @@ -1354,13 +1385,13 @@ export class RoomsEffects { ]) => { switch (event.type) { case 'voice-state': - return currentRoom ? this.handleVoiceOrScreenState(event, allUsers, currentUser ?? null, 'voice') : EMPTY; + return this.handleVoiceOrScreenState(event, allUsers, currentUser ?? null, 'voice'); case 'voice-channel-move': return this.handleVoiceChannelMove(event, currentRoom, savedRooms, currentUser ?? null); case 'screen-state': - return currentRoom ? this.handleVoiceOrScreenState(event, allUsers, currentUser ?? null, 'screen') : EMPTY; + return this.handleVoiceOrScreenState(event, allUsers, currentUser ?? null, 'screen'); case 'camera-state': - return currentRoom ? this.handleVoiceOrScreenState(event, allUsers, currentUser ?? null, 'camera') : EMPTY; + return this.handleVoiceOrScreenState(event, allUsers, currentUser ?? null, 'camera'); case 'server-state-request': return this.handleServerStateRequest(event, currentRoom, savedRooms); case 'server-state-full': @@ -1405,9 +1436,18 @@ export class RoomsEffects { if (!vs) return EMPTY; + const presenceRefreshAction = vs.serverId && !existingUser?.presenceServerIds?.includes(vs.serverId) + ? UsersActions.userJoined({ + user: buildSignalingUser( + { oderId: userId, + displayName: event.displayName || existingUser?.displayName || 'User' }, + { presenceServerIds: [vs.serverId] } + ) + }) + : null; // Detect voice-connection transitions to play join/leave sounds. - // Use the local knownVoiceUsers set (not the store) so that - // clearUsers() from server-switching doesn't create false transitions. + // Use the local knownVoiceUsers set (not the store) so presence + // re-syncs and room switches do not create false transitions. const weAreInVoice = this.webrtc.isVoiceConnected(); const nowConnected = vs.isConnected ?? false; const wasKnown = this.knownVoiceUsers.has(userId); @@ -1436,6 +1476,7 @@ export class RoomsEffects { { oderId: userId, displayName: event.displayName || 'User' }, { + presenceServerIds: vs.serverId ? [vs.serverId] : undefined, voiceState: { isConnected: vs.isConnected ?? false, isMuted: vs.isMuted ?? false, @@ -1452,8 +1493,16 @@ export class RoomsEffects { ); } - return of(UsersActions.updateVoiceState({ userId, + const actions: Action[] = []; + + if (presenceRefreshAction) { + actions.push(presenceRefreshAction); + } + + actions.push(UsersActions.updateVoiceState({ userId, voiceState: vs })); + + return actions; } if (kind === 'screen') { diff --git a/toju-app/src/app/store/users/users.actions.ts b/toju-app/src/app/store/users/users.actions.ts index 140e15b..ec40192 100644 --- a/toju-app/src/app/store/users/users.actions.ts +++ b/toju-app/src/app/store/users/users.actions.ts @@ -29,7 +29,8 @@ export const UsersActions = createActionGroup({ 'Load Room Users Failure': props<{ error: string }>(), 'User Joined': props<{ user: User }>(), - 'User Left': props<{ userId: string }>(), + 'User Left': props<{ userId: string; serverId?: string; serverIds?: string[] }>(), + 'Sync Server Presence': props<{ roomId: string; users: User[] }>(), 'Update User': props<{ userId: string; updates: Partial }>(), 'Update User Role': props<{ userId: string; role: User['role'] }>(), diff --git a/toju-app/src/app/store/users/users.reducer.ts b/toju-app/src/app/store/users/users.reducer.ts index 5defc27..397258b 100644 --- a/toju-app/src/app/store/users/users.reducer.ts +++ b/toju-app/src/app/store/users/users.reducer.ts @@ -7,6 +7,105 @@ import { import { User, BanEntry } from '../../shared-kernel'; import { UsersActions } from './users.actions'; +function normalizePresenceServerIds(serverIds: readonly string[] | undefined): string[] | undefined { + if (!Array.isArray(serverIds)) { + return undefined; + } + + const normalized = Array.from(new Set( + serverIds.filter((serverId): serverId is string => typeof serverId === 'string' && serverId.trim().length > 0) + )); + + return normalized.length > 0 ? normalized : undefined; +} + +function mergePresenceServerIds( + existingServerIds: readonly string[] | undefined, + incomingServerIds: readonly string[] | undefined +): string[] | undefined { + return normalizePresenceServerIds([...(existingServerIds ?? []), ...(incomingServerIds ?? [])]); +} + +function buildDisconnectedVoiceState(user: User): User['voiceState'] { + if (!user.voiceState) { + return undefined; + } + + return { + ...user.voiceState, + isConnected: false, + isMuted: false, + isDeafened: false, + isSpeaking: false, + roomId: undefined, + serverId: undefined + }; +} + +function buildInactiveScreenShareState(user: User): User['screenShareState'] { + if (!user.screenShareState) { + return undefined; + } + + return { + ...user.screenShareState, + isSharing: false, + streamId: undefined, + sourceId: undefined, + sourceName: undefined + }; +} + +function buildInactiveCameraState(user: User): User['cameraState'] { + if (!user.cameraState) { + return undefined; + } + + return { + ...user.cameraState, + isEnabled: false + }; +} + +function buildPresenceAwareUser(existingUser: User | undefined, incomingUser: User): User { + const presenceServerIds = mergePresenceServerIds(existingUser?.presenceServerIds, incomingUser.presenceServerIds); + const isOnline = (presenceServerIds?.length ?? 0) > 0 || incomingUser.isOnline === true; + const status = isOnline + ? (incomingUser.status !== 'offline' + ? incomingUser.status + : (existingUser?.status && existingUser.status !== 'offline' ? existingUser.status : 'online')) + : 'offline'; + + return { + ...existingUser, + ...incomingUser, + presenceServerIds, + isOnline, + status + }; +} + +function buildPresenceRemovalChanges( + user: User, + update: { serverId?: string; serverIds?: readonly string[] } +): Partial { + const nextPresenceServerIds = update.serverIds !== undefined + ? normalizePresenceServerIds(update.serverIds) + : normalizePresenceServerIds((user.presenceServerIds ?? []).filter((serverId) => serverId !== update.serverId)); + const isOnline = (nextPresenceServerIds?.length ?? 0) > 0; + const shouldClearLiveState = !isOnline + || (!!user.voiceState?.serverId && !nextPresenceServerIds?.includes(user.voiceState.serverId)); + + return { + presenceServerIds: nextPresenceServerIds, + isOnline, + status: isOnline ? (user.status !== 'offline' ? user.status : 'online') : 'offline', + voiceState: shouldClearLiveState ? buildDisconnectedVoiceState(user) : user.voiceState, + screenShareState: shouldClearLiveState ? buildInactiveScreenShareState(user) : user.screenShareState, + cameraState: shouldClearLiveState ? buildInactiveCameraState(user) : user.cameraState + }; +} + export interface UsersState extends EntityState { currentUserId: string | null; hostId: string | null; @@ -86,11 +185,61 @@ export const usersReducer = createReducer( error })), on(UsersActions.userJoined, (state, { user }) => - usersAdapter.upsertOne(user, state) - ), - on(UsersActions.userLeft, (state, { userId }) => - usersAdapter.removeOne(userId, state) + usersAdapter.upsertOne(buildPresenceAwareUser(state.entities[user.id], user), state) ), + on(UsersActions.syncServerPresence, (state, { roomId, users }) => { + let nextState = state; + + const seenUserIds = new Set(); + + for (const user of users) { + seenUserIds.add(user.id); + nextState = usersAdapter.upsertOne( + buildPresenceAwareUser(nextState.entities[user.id], user), + nextState + ); + } + + const stalePresenceUpdates = Object.values(nextState.entities) + .filter((user): user is User => + !!user + && user.id !== nextState.currentUserId + && user.presenceServerIds?.includes(roomId) === true + && !seenUserIds.has(user.id) + ) + .map((user) => ({ + id: user.id, + changes: buildPresenceRemovalChanges(user, { serverId: roomId }) + })); + + return stalePresenceUpdates.length > 0 + ? usersAdapter.updateMany(stalePresenceUpdates, nextState) + : nextState; + }), + on(UsersActions.userLeft, (state, { userId, serverId, serverIds }) => { + const existingUser = state.entities[userId]; + + if (!existingUser) { + return (!serverId && !serverIds) + ? usersAdapter.removeOne(userId, state) + : state; + } + + if (!serverId && !serverIds) { + return usersAdapter.removeOne(userId, state); + } + + return usersAdapter.updateOne( + { + id: userId, + changes: buildPresenceRemovalChanges(existingUser, { + serverId, + serverIds + }) + }, + state + ); + }), on(UsersActions.updateUser, (state, { userId, updates }) => usersAdapter.updateOne( { @@ -171,6 +320,8 @@ export const usersReducer = createReducer( isDeafened: false, isSpeaking: false }; + const hasRoomId = Object.prototype.hasOwnProperty.call(voiceState, 'roomId'); + const hasServerId = Object.prototype.hasOwnProperty.call(voiceState, 'serverId'); return usersAdapter.updateOne( { @@ -183,9 +334,8 @@ export const usersReducer = createReducer( isSpeaking: voiceState.isSpeaking ?? prev.isSpeaking, isMutedByAdmin: voiceState.isMutedByAdmin ?? prev.isMutedByAdmin, volume: voiceState.volume ?? prev.volume, - // Use explicit undefined check - if undefined is passed, clear the value - roomId: voiceState.roomId !== undefined ? voiceState.roomId : prev.roomId, - serverId: voiceState.serverId !== undefined ? voiceState.serverId : prev.serverId + roomId: hasRoomId ? voiceState.roomId : prev.roomId, + serverId: hasServerId ? voiceState.serverId : prev.serverId } } }, diff --git a/toju-app/src/app/store/users/users.selectors.ts b/toju-app/src/app/store/users/users.selectors.ts index fffd62e..390f438 100644 --- a/toju-app/src/app/store/users/users.selectors.ts +++ b/toju-app/src/app/store/users/users.selectors.ts @@ -82,7 +82,13 @@ export const selectIsCurrentUserAdmin = createSelector( /** Selects users who are currently online (not offline). */ export const selectOnlineUsers = createSelector( selectAllUsers, - (users) => users.filter((user) => user.status !== 'offline' || user.isOnline === true) + (users) => users.filter((user) => { + if (Array.isArray(user.presenceServerIds)) { + return user.presenceServerIds.length > 0 || user.isOnline === true || user.status !== 'offline'; + } + + return user.status !== 'offline' || user.isOnline === true; + }) ); /** Creates a selector that returns users with a specific role. */