From a49e18b9f0580fbf6254d4af99541f569332f311 Mon Sep 17 00:00:00 2001 From: Myx Date: Thu, 30 Apr 2026 04:04:34 +0200 Subject: [PATCH] fix: recurriing network issue --- .../developer/llm-plugin-builder-guide.md | 6 +- e2e/tests/chat/dm-flow.spec.ts | 21 +++ server/src/websocket/handler.ts | 27 +++ toju-app/src/app/domains/chat/README.md | 2 +- .../src/app/domains/direct-message/README.md | 9 +- .../services/peer-delivery.service.spec.ts | 131 ++++++++++++++ .../services/peer-delivery.service.ts | 58 ++++++- .../app/domains/server-directory/README.md | 4 + .../services/server-directory-api.service.ts | 4 + .../src/app/infrastructure/realtime/README.md | 6 +- .../realtime/realtime-session.service.ts | 27 +++ .../signaling/signaling-transport-handler.ts | 8 + .../app/store/messages/messages.effects.ts | 43 +++++ .../rooms/room-signaling-connection.spec.ts | 161 ++++++++++++++++++ .../store/rooms/room-signaling-connection.ts | 12 +- .../store/rooms/room-state-sync.effects.ts | 20 ++- 16 files changed, 522 insertions(+), 17 deletions(-) create mode 100644 toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.spec.ts create mode 100644 toju-app/src/app/store/rooms/room-signaling-connection.spec.ts diff --git a/docs-site/docs/developer/llm-plugin-builder-guide.md b/docs-site/docs/developer/llm-plugin-builder-guide.md index c971b31..cd0cb25 100644 --- a/docs-site/docs/developer/llm-plugin-builder-guide.md +++ b/docs-site/docs/developer/llm-plugin-builder-guide.md @@ -54,12 +54,12 @@ There are three communication boundaries a plugin author must understand: 1. Signaling plane Angular renderer <-> WebSocket signaling server Used for identity, joining servers, presence, typing, plugin requirements, - server-relayed plugin events, WebRTC offers, answers, and ICE candidates. + server-relayed plugin events, WebRTC offers, answers, and ICE candidates. 2. Peer plane Angular renderer <-> WebRTC peer connections <-> other clients - Used for media and data-channel events: chat messages, message sync, - attachments, voice state, screen/camera state, and plugin message bus data. + Used for media and data-channel events: chat messages, message sync, + attachments, voice state, screen/camera state, and plugin message bus data. 3. Desktop/local plane Angular renderer <-> Electron preload bridge <-> Electron main process diff --git a/e2e/tests/chat/dm-flow.spec.ts b/e2e/tests/chat/dm-flow.spec.ts index 6105c8d..4c16fb6 100644 --- a/e2e/tests/chat/dm-flow.spec.ts +++ b/e2e/tests/chat/dm-flow.spec.ts @@ -35,6 +35,18 @@ test.describe('Direct message flow', () => { }); }); + test('delivers a live DM to the recipient conversation', async ({ createClient }) => { + const scenario = await createDmScenario(createClient); + const liveMessage = `Live DM ${uniqueName('msg')}`; + + await openDmFromRoomUserCard(scenario.alice.page, 'Bob'); + await scenario.alice.page.getByTestId('dm-input').fill(liveMessage); + await scenario.alice.page.getByTestId('dm-input').press('Enter'); + + await openDmFromRoomUserCard(scenario.bob.page, 'Alice'); + await expect(scenario.bob.page.locator('app-dm-chat').getByText(liveMessage)).toBeVisible({ timeout: 20_000 }); + }); + test('shows friend and message actions on the search people list', async ({ createClient }) => { const scenario = await createDmScenario(createClient); @@ -110,6 +122,15 @@ async function registerUser(page: Page, username: string, displayName: string): await expect(page).toHaveURL(/\/search/, { timeout: 15_000 }); } +async function openDmFromRoomUserCard(page: Page, displayName: string): Promise { + const userCard = page.locator('[data-testid^="room-user-card-"]', { hasText: displayName }).first(); + + await expect(userCard).toBeVisible({ timeout: 20_000 }); + await userCard.getByRole('button', { name: `Message ${displayName}` }).click(); + await expect(page).toHaveURL(/\/dm\//, { timeout: 15_000 }); + await expect(page.getByRole('heading', { name: displayName })).toBeVisible({ timeout: 10_000 }); +} + function uniqueName(prefix: string): string { return `${prefix}-${Date.now()}-${Math.random().toString(36) .slice(2, 8)}`; diff --git a/server/src/websocket/handler.ts b/server/src/websocket/handler.ts index 65a89cd..23b6461 100644 --- a/server/src/websocket/handler.ts +++ b/server/src/websocket/handler.ts @@ -286,6 +286,26 @@ function handleChatMessage(user: ConnectedUser, message: WsMessage): void { } } +function handleVoiceState(user: ConnectedUser, message: WsMessage): void { + const serverId = readMessageId(message['serverId']) ?? user.viewedServerId; + + if (!serverId || !user.serverIds.has(serverId)) { + return; + } + + broadcastToServer( + serverId, + { + ...message, + type: 'voice_state', + serverId, + oderId: user.oderId, + displayName: normalizeDisplayName(user.displayName) + }, + user.oderId + ); +} + function handleTyping(user: ConnectedUser, message: WsMessage): void { const typingSid = (message['serverId'] as string | undefined) ?? user.viewedServerId; const channelId = typeof message['channelId'] === 'string' && message['channelId'].trim() ? message['channelId'].trim() : 'general'; @@ -461,6 +481,9 @@ export async function handleWebSocketMessage(connectionId: string, message: WsMe case 'offer': case 'answer': case 'ice_candidate': + case 'direct-message': + case 'direct-message-status': + case 'direct-message-mutation': case 'server_icon_peer_request': case 'server_icon_peer_data': forwardRtcMessage(user, message); @@ -470,6 +493,10 @@ export async function handleWebSocketMessage(connectionId: string, message: WsMe handleChatMessage(user, message); break; + case 'voice_state': + handleVoiceState(user, message); + break; + case 'typing': handleTyping(user, message); break; diff --git a/toju-app/src/app/domains/chat/README.md b/toju-app/src/app/domains/chat/README.md index 639c166..0a0462f 100644 --- a/toju-app/src/app/domains/chat/README.md +++ b/toju-app/src/app/domains/chat/README.md @@ -70,7 +70,7 @@ graph TD ## Message lifecycle -Messages are created in the composer, broadcast to peers over the data channel, and rendered in the list. Editing and deletion are sender-only operations. +Messages are created in the composer, broadcast to peers over the data channel, and rendered in the list. Live room chat also emits a narrow `chat_message` signaling fallback so peers can receive text while the data channel is unavailable. Editing and deletion are sender-only operations. ```mermaid sequenceDiagram diff --git a/toju-app/src/app/domains/direct-message/README.md b/toju-app/src/app/domains/direct-message/README.md index 09d77b8..6cf6d6d 100644 --- a/toju-app/src/app/domains/direct-message/README.md +++ b/toju-app/src/app/domains/direct-message/README.md @@ -1,6 +1,6 @@ # Direct Message Domain -Direct messages provide local, offline-safe one-to-one messaging over the existing WebRTC data channel. +Direct messages provide local, offline-safe one-to-one messaging over the existing WebRTC data channel, with a signaling relay fallback when no peer data channel is available but a route to the recipient is known. ## Structure @@ -16,9 +16,10 @@ direct-message/ 1. `DirectMessageService.sendMessage()` stores the message locally with `QUEUED`. 2. `PeerDeliveryService` tries to send a `direct-message` P2P event to the recipient's current peer id. -3. If the peer is connected, the sender advances to `SENT`; otherwise the message id remains in `OfflineMessageQueueService`. -4. The recipient persists the message as `DELIVERED` and sends a `direct-message-status` event back. -5. Opening the conversation marks incoming messages as `ACKNOWLEDGED` and emits a status event. +3. If no data channel is connected, `PeerDeliveryService` tries the recipient's known signaling route before leaving the message queued. +4. If either transport sends, the sender advances to `SENT`; otherwise the message id remains in `OfflineMessageQueueService`. +5. The recipient persists the message as `DELIVERED` and sends a `direct-message-status` event back. +6. Opening the conversation marks incoming messages as `ACKNOWLEDGED` and emits a status event. Status transitions are monotonic, so a stale `SENT` event cannot overwrite `DELIVERED` or `ACKNOWLEDGED`. diff --git a/toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.spec.ts b/toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.spec.ts new file mode 100644 index 0000000..4da2341 --- /dev/null +++ b/toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.spec.ts @@ -0,0 +1,131 @@ +import { + Injector, + runInInjectionContext, + signal +} from '@angular/core'; +import { Store } from '@ngrx/store'; +import { Subject } from 'rxjs'; +import { RealtimeSessionFacade } from '../../../../core/realtime'; +import { selectAllUsers } from '../../../../store/users/users.selectors'; +import type { ChatEvent, User } from '../../../../shared-kernel'; +import { PeerDeliveryService } from './peer-delivery.service'; + +describe('PeerDeliveryService', () => { + it('relays direct messages through signaling when no data channel is connected', () => { + const context = createServiceContext({ connectedPeers: [], routedPeers: ['bob'] }); + const event: ChatEvent = { + type: 'direct-message', + directMessage: { + message: { + id: 'message-1', + conversationId: 'dm-alice-bob', + senderId: 'alice', + recipientId: 'bob', + content: 'hello', + timestamp: 1, + status: 'QUEUED' + }, + sender: { + userId: 'alice', + username: 'alice', + displayName: 'Alice' + } + } + }; + + expect(context.service.sendViaWebRTC('bob', event)).toBe(true); + expect(context.realtime.sendToPeer).not.toHaveBeenCalled(); + expect(context.realtime.sendRawMessage).toHaveBeenCalledWith({ + ...event, + targetUserId: 'bob' + }); + }); + + it('keeps messages queued when neither P2P nor signaling can reach the recipient', () => { + const context = createServiceContext({ connectedPeers: [], routedPeers: [] }); + + expect(context.service.sendViaWebRTC('bob', { type: 'direct-message' })).toBe(false); + expect(context.realtime.sendRawMessage).not.toHaveBeenCalled(); + }); + + it('emits direct messages received over signaling', () => { + const context = createServiceContext({ connectedPeers: [] }); + const received: ChatEvent[] = []; + + context.service.directMessageEvents$.subscribe((event) => received.push(event)); + context.signalingMessages.next({ type: 'direct-message' } as ChatEvent); + + expect(received).toEqual([{ type: 'direct-message' }]); + }); +}); + +interface ServiceContextOptions { + connectedPeers: string[]; + routedPeers?: string[]; +} + +interface ServiceContext { + service: PeerDeliveryService; + signalingMessages: Subject; + realtime: { + getConnectedPeers: ReturnType; + hasSignalingRouteForPeer: ReturnType; + sendRawMessage: ReturnType; + sendToPeer: ReturnType; + }; +} + +function createServiceContext(options: ServiceContextOptions): ServiceContext { + const users = signal([createUser('alice', 'Alice'), createUser('bob', 'Bob')]); + const incomingMessages = new Subject(); + const signalingMessages = new Subject(); + const peerConnected = new Subject(); + const realtime = { + onMessageReceived: incomingMessages.asObservable(), + onSignalingMessage: signalingMessages.asObservable(), + onPeerConnected: peerConnected.asObservable(), + getConnectedPeers: vi.fn(() => options.connectedPeers), + hasSignalingRouteForPeer: vi.fn((peerId: string) => (options.routedPeers ?? []).includes(peerId)), + sendRawMessage: vi.fn(), + sendToPeer: vi.fn() + }; + const store = { + selectSignal: vi.fn((selector: unknown) => { + if (selector === selectAllUsers) { + return users; + } + + throw new Error('Unexpected selector requested by PeerDeliveryService test.'); + }) + }; + const injector = Injector.create({ + providers: [ + { + provide: RealtimeSessionFacade, + useValue: realtime + }, + { + provide: Store, + useValue: store + } + ] + }); + + return { + service: runInInjectionContext(injector, () => new PeerDeliveryService()), + signalingMessages, + realtime + }; +} + +function createUser(id: string, displayName: string): User { + return { + id, + oderId: id, + username: displayName.toLowerCase(), + displayName, + status: 'online', + role: 'member', + joinedAt: 1 + }; +} diff --git a/toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.ts b/toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.ts index 11ee13b..ea23f6c 100644 --- a/toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.ts +++ b/toju-app/src/app/domains/direct-message/application/services/peer-delivery.service.ts @@ -4,6 +4,7 @@ import { Store } from '@ngrx/store'; import { Subject, filter, + merge, type Observable } from 'rxjs'; import { RealtimeSessionFacade } from '../../../../core/realtime'; @@ -17,7 +18,10 @@ export class PeerDeliveryService { private readonly users = this.store.selectSignal(selectAllUsers); private readonly networkRestoredSubject = new Subject(); - readonly directMessageEvents$: Observable = this.webrtc.onMessageReceived.pipe( + readonly directMessageEvents$: Observable = merge( + this.webrtc.onMessageReceived, + this.webrtc.onSignalingMessage as Observable + ).pipe( filter((event) => event.type === 'direct-message' || event.type === 'direct-message-status' || event.type === 'direct-message-mutation') ); @@ -35,12 +39,14 @@ export class PeerDeliveryService { const peerId = this.resolvePeerId(recipientId); - if (!peerId) { - return false; + let sent = false; + + if (peerId) { + this.webrtc.sendToPeer(peerId, event); + sent = true; } - this.webrtc.sendToPeer(peerId, event); - return true; + return this.sendViaSignaling(recipientId, event) || sent; } handleAck(recipientId: string, event: ChatEvent): boolean { @@ -77,6 +83,48 @@ export class PeerDeliveryService { return candidates.find((candidate) => connectedPeerIds.has(candidate)) ?? null; } + private sendViaSignaling(recipientId: string, event: ChatEvent): boolean { + if (event.type !== 'direct-message' && event.type !== 'direct-message-status' && event.type !== 'direct-message-mutation') { + return false; + } + + const targetPeerId = this.resolveSignalingPeerId(recipientId); + + if (!targetPeerId) { + return false; + } + + try { + this.webrtc.sendRawMessage({ + ...event, + targetUserId: targetPeerId + }); + + return true; + } catch { + return false; + } + } + + private resolveSignalingPeerId(recipientId: string): string | null { + return this.resolveCandidateIds(recipientId).find((candidate) => this.webrtc.hasSignalingRouteForPeer(candidate)) ?? null; + } + + private resolveCandidateIds(recipientId: string): string[] { + const user = this.users().find((candidate: User) => + candidate.id === recipientId || candidate.oderId === recipientId || candidate.peerId === recipientId + ); + + return [ + recipientId, + user?.oderId, + user?.peerId, + user?.id + ].filter((candidate, index, candidates): candidate is string => + !!candidate && candidates.indexOf(candidate) === index + ); + } + private isOfflineOverrideEnabled(): boolean { return typeof window !== 'undefined' && !!(window as Window & { metoyouDmNetworkOffline?: boolean }).metoyouDmNetworkOffline; diff --git a/toju-app/src/app/domains/server-directory/README.md b/toju-app/src/app/domains/server-directory/README.md index ab89f3f..b7499aa 100644 --- a/toju-app/src/app/domains/server-directory/README.md +++ b/toju-app/src/app/domains/server-directory/README.md @@ -157,6 +157,10 @@ The `/search` My Servers row and the server rail both read from the active user' Fallback stays temporary. If the authoritative endpoint is unavailable, the client can probe other active compatible endpoints as a last resort for the current session, but it does not rewrite the room's saved affinity to that fallback endpoint. +Be careful around endpoint failure semantics. `ensureEndpointVersionCompatibility()` returns `false` for both incompatible versions and unreachable/offline endpoints. Only an endpoint with `status === 'incompatible'` should stop the fallback cascade with the update-required message. Cloudflare `521`/`522`, network timeouts, and WebSocket `1006` failures must continue to the next active compatible endpoint. + +`ServerDirectoryApiService.getServer()` also returns `null` for both authoritative `SERVER_NOT_FOUND` and retryable endpoint failures. Callers that need recovery must search active endpoints before treating `null` as proof that a saved room is gone or its source is stale. + ## Server-owned room metadata `ServerInfo` also carries the server-owned `channels` list for each room. Register and update calls persist this channel metadata on the server, and search or hydration responses return the normalised channel list so text and voice channel topology survives reloads, reconnects, and fresh joins. diff --git a/toju-app/src/app/domains/server-directory/infrastructure/services/server-directory-api.service.ts b/toju-app/src/app/domains/server-directory/infrastructure/services/server-directory-api.service.ts index bc3acfa..66bb173 100644 --- a/toju-app/src/app/domains/server-directory/infrastructure/services/server-directory-api.service.ts +++ b/toju-app/src/app/domains/server-directory/infrastructure/services/server-directory-api.service.ts @@ -107,6 +107,10 @@ export class ServerDirectoryApiService { return this.http.get(`${this.getApiBaseUrl(selector)}/servers/${serverId}`).pipe( map((server) => this.normalizeServerInfo(server, this.resolveEndpoint(selector))), catchError((error) => { + // Warning: this API deliberately returns null for both authoritative + // SERVER_NOT_FOUND and retryable endpoint failures. Callers that need + // resilience must try other active endpoints before treating null as + // proof that a room no longer exists. if (isServerNotFoundError(error)) { return of(null); } diff --git a/toju-app/src/app/infrastructure/realtime/README.md b/toju-app/src/app/infrastructure/realtime/README.md index cfd242f..0977fe8 100644 --- a/toju-app/src/app/infrastructure/realtime/README.md +++ b/toju-app/src/app/infrastructure/realtime/README.md @@ -115,18 +115,22 @@ graph TD ## Signaling (WebSocket) -The signaling layer's only job is getting two peers to exchange SDP offers/answers and ICE candidates so they can establish a direct WebRTC connection. Once the peer connection is up, signaling is only used for presence (user joined/left) and reconnection. +The signaling layer gets peers to exchange SDP offers/answers and ICE candidates so they can establish direct WebRTC connections. It also carries identity, room membership, presence, typing, and selected server-relayed fallback events when the peer data channel is unavailable. Each signaling URL gets its own `SignalingManager` (one WebSocket each). `SignalingTransportHandler` picks the right socket based on which server the message is for. `ServerSignalingCoordinator` tracks which peers belong to which servers and which signaling URLs, so we know when it is safe to tear down a peer connection after leaving a server. Room affinity is authoritative at this layer as well. The renderer repairs each room's saved `sourceId` / `sourceUrl` from server-directory responses and routes `join_server`, `view_server`, and room-scoped signaling traffic to that room's signaling URL first. If that route fails, alternate endpoints can be tried temporarily, but server-scoped raw messages are no longer broadcast to every connected signaling manager when the route is unknown. +Server-relayed fallbacks are intentionally narrow. Room chat (`chat_message`), direct-message events (`direct-message`, `direct-message-status`, `direct-message-mutation`), and voice presence (`voice_state`) may flow over signaling so users can still see written chat and voice roster state while P2P data channels are down. Media, attachments, message inventory sync, screen/camera state, and plugin data-channel traffic remain peer-plane responsibilities. + In UI/debug conversations, a **chat-server** means one of the saved rooms navigated from the server rail. Each chat-server has its own assigned signal server via `sourceId` / `sourceUrl`, and room-scoped feature/config checks must prefer that signal server before considering any global active endpoint. For example, KLIPY GIF picker visibility is resolved against the currently viewed chat-server's signal server so an unrelated offline chat-server does not hide the button everywhere. Cold-start routing now waits for the initial server-directory health probes so same-backend aliases can collapse to one canonical signaling endpoint before any saved rooms reconnect. When a room is reconnected on a chosen socket, its background rooms are re-joined on that same socket as well so stale per-signal memberships do not keep orphan managers alive, and reconnect replay only sends `view_server` for rooms that manager still has joined. This is still a non-federated model. Different signaling servers do not share peer registries or relay WebRTC offers for each other, so users in the same room must converge on the same signaling endpoint to discover one another reliably. +The fallback path is fragile by design: it only helps when a usable signaling socket exists. If a production origin returns Cloudflare `521`/`522` or the WebSocket closes with `1006`, room reconnect must continue to other active compatible endpoints instead of treating the room as missing or the client as incompatible. + ```mermaid sequenceDiagram participant UI as App diff --git a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts index 0a4eea1..2c0f466 100644 --- a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts +++ b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts @@ -399,6 +399,7 @@ export class WebRTCService implements OnDestroy { */ broadcastMessage(event: ChatEvent): void { this.peerMediaFacade.broadcastMessage(event); + this.relayBroadcastEvent(event); } /** @@ -430,6 +431,12 @@ export class WebRTCService implements OnDestroy { return this.peerMediaFacade.getConnectedPeerIds(); } + hasSignalingRouteForPeer(peerId: string): boolean { + const signalUrl = this.signalingCoordinator.getPeerSignalUrl(peerId); + + return !!signalUrl && this.signalingCoordinator.isSignalingConnectedTo(signalUrl); + } + /** * Get the composite remote {@link MediaStream} for a connected peer. * @@ -658,6 +665,26 @@ export class WebRTCService implements OnDestroy { this.peerMediaFacade.stopScreenShare(); } + private relayBroadcastEvent(event: ChatEvent): void { + if (event.type === 'chat-message' && event.message?.roomId) { + this.signalingTransportHandler.sendRawMessage({ + type: 'chat_message', + serverId: event.message.roomId, + message: event.message + }); + + return; + } + + if (event.type === 'voice-state' && event.voiceState?.serverId) { + this.signalingTransportHandler.sendRawMessage({ + ...event, + type: 'voice_state', + serverId: event.voiceState.serverId + }); + } + } + /** Disconnect from the signaling server and clean up all state. */ disconnect(): void { this.leaveRoom(); diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts index d04c003..d30a848 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts @@ -134,6 +134,14 @@ export class SignalingTransportHandler { const connectedManagers = this.getConnectedSignalingManagers(); if (connectedManagers.length === 0) { + if (messageType === 'status_update') { + this.dependencies.logger.warn('[signaling] Skipping status update without an active signaling connection', { + type: messageType + }); + + return; + } + this.dependencies.logger.error('[signaling] No active signaling connection for outbound message', new Error('No signaling manager available'), { type: messageType }); diff --git a/toju-app/src/app/store/messages/messages.effects.ts b/toju-app/src/app/store/messages/messages.effects.ts index dcfb6a6..abcbc9f 100644 --- a/toju-app/src/app/store/messages/messages.effects.ts +++ b/toju-app/src/app/store/messages/messages.effects.ts @@ -498,6 +498,49 @@ export class MessagesEffects { ) ); + incomingSignalingMessages$ = createEffect(() => + this.webrtc.onSignalingMessage.pipe( + withLatestFrom( + this.store.select(selectCurrentUser), + this.store.select(selectCurrentRoom) + ), + mergeMap(([ + event, + currentUser, + currentRoom + ]) => { + if (event.type !== 'chat_message') { + return EMPTY; + } + + const ctx: IncomingMessageContext = { + db: this.db, + webrtc: this.webrtc, + attachments: this.attachments, + debugging: this.debugging, + currentUser: currentUser ?? null, + currentRoom + }; + + return dispatchIncomingMessage({ + ...event, + type: 'chat-message', + fromPeerId: event.fromUserId + }, ctx).pipe( + catchError((error) => { + reportDebuggingError(this.debugging, 'messages', 'Failed to process incoming signaling chat message', { + eventType: event.type, + fromPeerId: event.fromUserId ?? null, + roomId: event.serverId ?? null + }, error); + + return EMPTY; + }) + ); + }) + ) + ); + private trackBackgroundOperation(task: Promise | unknown, message: string, payload: Record): void { trackDebuggingTaskFailure(task, this.debugging, 'messages', message, payload); } diff --git a/toju-app/src/app/store/rooms/room-signaling-connection.spec.ts b/toju-app/src/app/store/rooms/room-signaling-connection.spec.ts new file mode 100644 index 0000000..5bde2b5 --- /dev/null +++ b/toju-app/src/app/store/rooms/room-signaling-connection.spec.ts @@ -0,0 +1,161 @@ +import { of } from 'rxjs'; +import { + beforeEach, + describe, + expect, + it, + vi +} from 'vitest'; +import type { Store } from '@ngrx/store'; +import type { RealtimeSessionFacade } from '../../core/realtime'; +import type { ServerDirectoryFacade } from '../../domains/server-directory'; +import type { Room, User } from '../../shared-kernel'; +import { RoomSignalingConnection } from './room-signaling-connection'; + +interface EndpointFixture { + id: string; + isActive: boolean; + name: string; + status: 'offline' | 'online'; + url: string; +} + +interface SignalSourceFixture { + sourceId: string; + sourceName?: string; + sourceUrl: string; +} + +interface SignalSelectorFixture { + sourceId: string; + sourceUrl: string; +} + +interface FakeRealtimeSessionFacade { + connectToSignalingServer: ReturnType; + hasJoinedServer: ReturnType; + identify: ReturnType; + isSignalingConnectedTo: ReturnType; + joinRoom: ReturnType; + peerId: ReturnType; + setCurrentServer: ReturnType; + switchServer: ReturnType; +} + +interface FakeServerDirectoryFacade { + awaitInitialServerHealthCheck: ReturnType; + buildRoomSignalSelector: ReturnType; + ensureEndpointVersionCompatibility: ReturnType; + findServerAcrossActiveEndpoints: ReturnType; + getFallbackRoomEndpoints: ReturnType; + getServer: ReturnType; + getWebSocketUrl: ReturnType; + normaliseRoomSignalSource: ReturnType; + resolveRoomEndpoint: ReturnType; +} + +interface FakeStore { + dispatch: ReturnType; +} + +describe('RoomSignalingConnection', () => { + const room: Room = { + id: 'room-1', + name: 'Room One', + description: '', + hostId: 'user-1', + createdAt: 1, + userCount: 1, + sourceId: 'primary', + sourceName: 'Primary', + sourceUrl: 'https://signal.toju.app' + }; + const user: User = { + id: 'user-1', + oderId: 'peer-a', + username: 'maomao', + displayName: 'maomao', + status: 'online', + createdAt: 1 + }; + + let webrtc: FakeRealtimeSessionFacade; + let serverDirectory: FakeServerDirectoryFacade; + let store: FakeStore; + + beforeEach(() => { + const endpoints = new Map([ + [ + 'primary', + { + id: 'primary', + name: 'Primary', + url: 'https://signal.toju.app', + isActive: true, + status: 'offline' + } + ], + [ + 'fallback', + { + id: 'fallback', + name: 'Sweden', + url: 'https://signal-sweden.toju.app', + isActive: true, + status: 'online' + } + ] + ]); + + webrtc = { + connectToSignalingServer: vi.fn((url: string) => of(url === 'wss://signal-sweden.toju.app')), + hasJoinedServer: vi.fn(() => false), + identify: vi.fn(), + isSignalingConnectedTo: vi.fn(() => false), + joinRoom: vi.fn(), + peerId: vi.fn(() => 'peer-a'), + setCurrentServer: vi.fn(), + switchServer: vi.fn() + }; + + serverDirectory = { + awaitInitialServerHealthCheck: vi.fn(() => Promise.resolve()), + buildRoomSignalSelector: vi.fn((source: SignalSourceFixture) => ({ + sourceId: source.sourceId, + sourceUrl: source.sourceUrl + })), + ensureEndpointVersionCompatibility: vi.fn((selector: SignalSelectorFixture) => + Promise.resolve(selector.sourceId === 'fallback') + ), + findServerAcrossActiveEndpoints: vi.fn(() => of(null)), + getFallbackRoomEndpoints: vi.fn(() => [endpoints.get('fallback')]), + getServer: vi.fn(() => of(null)), + getWebSocketUrl: vi.fn((selector: SignalSelectorFixture) => selector.sourceUrl.replace(/^http/, 'ws')), + normaliseRoomSignalSource: vi.fn((source: SignalSourceFixture) => ({ + sourceId: source.sourceId, + sourceName: source.sourceName, + sourceUrl: source.sourceUrl + })), + resolveRoomEndpoint: vi.fn((source: SignalSourceFixture) => endpoints.get(source.sourceId) ?? null) + }; + + store = { + dispatch: vi.fn() + }; + }); + + it('tries fallback endpoints when the primary endpoint is offline', async () => { + const connection = new RoomSignalingConnection( + webrtc as unknown as RealtimeSessionFacade, + serverDirectory as unknown as ServerDirectoryFacade, + store as unknown as Store + ); + + connection.beginRoomNavigation(room.id); + await connection.connectToRoomSignaling(room, user, user.oderId, [room]); + + expect(serverDirectory.ensureEndpointVersionCompatibility).toHaveBeenCalledTimes(2); + expect(webrtc.connectToSignalingServer).toHaveBeenCalledWith('wss://signal-sweden.toju.app'); + expect(webrtc.joinRoom).toHaveBeenCalledWith(room.id, user.oderId, 'wss://signal-sweden.toju.app'); + }); +}); diff --git a/toju-app/src/app/store/rooms/room-signaling-connection.ts b/toju-app/src/app/store/rooms/room-signaling-connection.ts index d39f9bc..113cf5b 100644 --- a/toju-app/src/app/store/rooms/room-signaling-connection.ts +++ b/toju-app/src/app/store/rooms/room-signaling-connection.ts @@ -135,7 +135,13 @@ export class RoomSignalingConnection { } if (!isCompatible) { - if (candidate.isPrimary) { + // Warning: offline/unreachable endpoints also fail this check. Only + // version-incompatible primary endpoints should stop fallback; transient + // 521/522/network failures must continue to the next active endpoint. + const endpoint = this.serverDirectory.resolveRoomEndpoint(candidate.source); + const isEndpointIncompatible = endpoint?.status === 'incompatible'; + + if (candidate.isPrimary && isEndpointIncompatible) { if (shouldShowCompatibilityError) { this.store.dispatch( RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE }) @@ -297,6 +303,10 @@ export class RoomSignalingConnection { if (!this.webrtc.hasJoinedServer(room.id)) { const selector = this.resolveRoomSignalSelector(primarySource, resolvedRoom.name); + // Warning: getServer returns null for both SERVER_NOT_FOUND and transient + // endpoint failures. Always search active endpoints before deciding the + // saved room source is stale, otherwise a Cloudflare/origin outage pins + // reconnects to the dead endpoint. const authoritativeServer = ( selector ? await firstValueFrom(this.serverDirectory.getServer(room.id, selector)) diff --git a/toju-app/src/app/store/rooms/room-state-sync.effects.ts b/toju-app/src/app/store/rooms/room-state-sync.effects.ts index a14aa28..6d0e0e8 100644 --- a/toju-app/src/app/store/rooms/room-state-sync.effects.ts +++ b/toju-app/src/app/store/rooms/room-state-sync.effects.ts @@ -95,12 +95,18 @@ export class RoomStateSyncEffects { /** Handles WebRTC signaling events for user presence (join, leave, server_users). */ signalingMessages$ = createEffect(() => this.webrtc.onSignalingMessage.pipe( - withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom), this.store.select(selectSavedRooms)), + withLatestFrom( + this.store.select(selectCurrentUser), + this.store.select(selectCurrentRoom), + this.store.select(selectSavedRooms), + this.store.select(selectAllUsers) + ), mergeMap(([ message, currentUser, currentRoom, - savedRooms + savedRooms, + allUsers ]) => { const signalingMessage: RoomPresenceSignalingMessage = message; const myId = currentUser?.oderId || currentUser?.id; @@ -226,6 +232,16 @@ export class RoomStateSyncEffects { ]; } + case 'voice_state': { + const voiceEvent = { + ...signalingMessage, + type: 'voice-state', + fromPeerId: signalingMessage.oderId ?? signalingMessage.fromUserId + } as ChatEvent; + + return this.handleVoiceOrScreenState(voiceEvent, allUsers, currentUser ?? null, 'voice'); + } + case 'access_denied': { if (isWrongServer(signalingMessage.serverId, viewedServerId)) return EMPTY;