From ef1182d46f0101f1e3b60469f76870a98728e970 Mon Sep 17 00:00:00 2001 From: Myx Date: Sat, 11 Apr 2026 12:32:22 +0200 Subject: [PATCH] fix: Broken voice states and connectivity drops --- .agents/skills/caveman/SKILL.md | 63 +++ server/src/routes/health.ts | 6 + server/src/websocket/broadcast.ts | 8 +- server/src/websocket/handler.ts | 15 +- server/src/websocket/types.ts | 7 + skills-lock.json | 10 + .../app/domains/server-directory/README.md | 21 +- .../application/server-directory.facade.ts | 92 +++- .../server-endpoint-state.service.ts | 61 +++ .../domain/room-signal-source.spec.ts | 46 ++ .../domain/room-signal-source.ts | 95 ++++ .../domain/server-directory.models.ts | 3 + .../feature/invite/invite.component.ts | 14 +- .../server-search/server-search.component.ts | 13 +- .../src/app/domains/server-directory/index.ts | 1 + .../server-directory-api.service.ts | 97 +++- .../server-endpoint-health.service.ts | 5 + .../application/voice-activity.service.ts | 22 + .../servers/servers-rail.component.ts | 173 +++++-- .../src/app/infrastructure/realtime/README.md | 6 + .../realtime/realtime.constants.ts | 1 + .../server-membership-signaling-handler.ts | 39 +- .../signaling/server-signaling-coordinator.ts | 91 +++- .../signaling/signaling-message-handler.ts | 15 + .../signaling/signaling-transport-handler.ts | 30 +- .../realtime/signaling/signaling.manager.ts | 12 +- toju-app/src/app/store/rooms/rooms.actions.ts | 6 +- toju-app/src/app/store/rooms/rooms.effects.ts | 454 ++++++++++++++---- 28 files changed, 1244 insertions(+), 162 deletions(-) create mode 100644 .agents/skills/caveman/SKILL.md create mode 100644 skills-lock.json create mode 100644 toju-app/src/app/domains/server-directory/domain/room-signal-source.spec.ts create mode 100644 toju-app/src/app/domains/server-directory/domain/room-signal-source.ts diff --git a/.agents/skills/caveman/SKILL.md b/.agents/skills/caveman/SKILL.md new file mode 100644 index 0000000..29b154f --- /dev/null +++ b/.agents/skills/caveman/SKILL.md @@ -0,0 +1,63 @@ +--- +name: caveman +description: > + Ultra-compressed communication mode. Cuts token usage ~75% by speaking like caveman + while keeping full technical accuracy. Supports intensity levels: lite, full (default), ultra, + wenyan-lite, wenyan-full, wenyan-ultra. + Use when user says "caveman mode", "talk like caveman", "use caveman", "less tokens", + "be brief", or invokes /caveman. Also auto-triggers when token efficiency is requested. +--- + +Respond terse like smart caveman. All technical substance stay. Only fluff die. + +Default: **full**. Switch: `/caveman lite|full|ultra`. + +## Rules + +Drop: articles (a/an/the), filler (just/really/basically/actually/simply), pleasantries (sure/certainly/of course/happy to), hedging. Fragments OK. Short synonyms (big not extensive, fix not "implement a solution for"). Technical terms exact. Code blocks unchanged. Errors quoted exact. + +Pattern: `[thing] [action] [reason]. [next step].` + +Not: "Sure! I'd be happy to help you with that. The issue you're experiencing is likely caused by..." +Yes: "Bug in auth middleware. Token expiry check use `<` not `<=`. Fix:" + +## Intensity + +| Level | What change | +|-------|------------| +| **lite** | No filler/hedging. Keep articles + full sentences. Professional but tight | +| **full** | Drop articles, fragments OK, short synonyms. Classic caveman | +| **ultra** | Abbreviate (DB/auth/config/req/res/fn/impl), strip conjunctions, arrows for causality (X → Y), one word when one word enough | +| **wenyan-lite** | Semi-classical. Drop filler/hedging but keep grammar structure, classical register | +| **wenyan-full** | Maximum classical terseness. Fully 文言文. 80-90% character reduction. Classical sentence patterns, verbs precede objects, subjects often omitted, classical particles (之/乃/為/其) | +| **wenyan-ultra** | Extreme abbreviation while keeping classical Chinese feel. Maximum compression, ultra terse | + +Example — "Why React component re-render?" +- lite: "Your component re-renders because you create a new object reference each render. Wrap it in `useMemo`." +- full: "New object ref each render. Inline object prop = new ref = re-render. Wrap in `useMemo`." +- ultra: "Inline obj prop → new ref → re-render. `useMemo`." +- wenyan-lite: "組件頻重繪,以每繪新生對象參照故。以 useMemo 包之。" +- wenyan-full: "物出新參照,致重繪。useMemo .Wrap之。" +- wenyan-ultra: "新參照→重繪。useMemo Wrap。" + +Example — "Explain database connection pooling." +- lite: "Connection pooling reuses open connections instead of creating new ones per request. Avoids repeated handshake overhead." +- full: "Pool reuse open DB connections. No new connection per request. Skip handshake overhead." +- ultra: "Pool = reuse DB conn. Skip handshake → fast under load." +- wenyan-full: "池reuse open connection。不每req新開。skip handshake overhead。" +- wenyan-ultra: "池reuse conn。skip handshake → fast。" + +## Auto-Clarity + +Drop caveman for: security warnings, irreversible action confirmations, multi-step sequences where fragment order risks misread, user confused. Resume caveman after clear part done. + +Example — destructive op: +> **Warning:** This will permanently delete all rows in the `users` table and cannot be undone. +> ```sql +> DROP TABLE users; +> ``` +> Caveman resume. Verify backup exist first. + +## Boundaries + +Code/commits/PRs: write normal. "stop caveman" or "normal mode": revert. Level persist until changed or session end. \ No newline at end of file diff --git a/server/src/routes/health.ts b/server/src/routes/health.ts index a647d74..17f95a8 100644 --- a/server/src/routes/health.ts +++ b/server/src/routes/health.ts @@ -1,10 +1,15 @@ import { Router } from 'express'; +import { randomUUID } from 'crypto'; import { getAllPublicServers } from '../cqrs'; import { getReleaseManifestUrl } from '../config/variables'; import { SERVER_BUILD_VERSION } from '../generated/build-version'; import { connectedUsers } from '../websocket/state'; const router = Router(); +const SERVER_INSTANCE_ID = typeof process.env.METOYOU_SERVER_INSTANCE_ID === 'string' + && process.env.METOYOU_SERVER_INSTANCE_ID.trim().length > 0 + ? process.env.METOYOU_SERVER_INSTANCE_ID.trim() + : randomUUID(); function getServerProjectVersion(): string { return typeof process.env.METOYOU_SERVER_VERSION === 'string' && process.env.METOYOU_SERVER_VERSION.trim().length > 0 @@ -20,6 +25,7 @@ router.get('/health', async (_req, res) => { timestamp: Date.now(), serverCount: servers.length, connectedUsers: connectedUsers.size, + serverInstanceId: SERVER_INSTANCE_ID, serverVersion: getServerProjectVersion(), releaseManifestUrl: getReleaseManifestUrl() }); diff --git a/server/src/websocket/broadcast.ts b/server/src/websocket/broadcast.ts index 3bc0adc..3aa7f29 100644 --- a/server/src/websocket/broadcast.ts +++ b/server/src/websocket/broadcast.ts @@ -10,8 +10,14 @@ interface WsMessage { export function broadcastToServer(serverId: string, message: WsMessage, excludeOderId?: string): void { console.log(`Broadcasting to server ${serverId}, excluding ${excludeOderId}:`, message.type); + // Deduplicate by oderId so users with multiple connections (e.g. from + // different signal URLs routing to the same server) receive the + // broadcast only once. + const sentToOderIds = new Set(); + connectedUsers.forEach((user) => { - if (user.serverIds.has(serverId) && user.oderId !== excludeOderId) { + if (user.serverIds.has(serverId) && user.oderId !== excludeOderId && !sentToOderIds.has(user.oderId)) { + sentToOderIds.add(user.oderId); console.log(` -> Sending to ${user.displayName} (${user.oderId})`); user.ws.send(JSON.stringify(message)); } diff --git a/server/src/websocket/handler.ts b/server/src/websocket/handler.ts index 192e2ea..37b6779 100644 --- a/server/src/websocket/handler.ts +++ b/server/src/websocket/handler.ts @@ -44,12 +44,18 @@ function sendServerUsers(user: ConnectedUser, serverId: string): void { function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: string): void { const newOderId = readMessageId(message['oderId']) ?? connectionId; + const newScope = typeof message['connectionScope'] === 'string' ? message['connectionScope'] : undefined; - // Close stale connections from the same identity so offer routing - // always targets the freshest socket (e.g. after page refresh). + // Close stale connections from the same identity AND the same connection + // scope so offer routing always targets the freshest socket (e.g. after + // page refresh). Connections with a *different* scope (= a different + // signal URL that happens to route to this server) are left untouched so + // multi-signal-URL setups don't trigger an eviction loop. connectedUsers.forEach((existing, existingId) => { - if (existingId !== connectionId && existing.oderId === newOderId) { - console.log(`Closing stale connection for ${newOderId} (old=${existingId}, new=${connectionId})`); + if (existingId !== connectionId + && existing.oderId === newOderId + && existing.connectionScope === newScope) { + console.log(`Closing stale connection for ${newOderId} (old=${existingId}, new=${connectionId}, scope=${newScope ?? 'none'})`); try { existing.ws.close(); @@ -61,6 +67,7 @@ function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: s user.oderId = newOderId; user.displayName = normalizeDisplayName(message['displayName'], normalizeDisplayName(user.displayName)); + user.connectionScope = newScope; connectedUsers.set(connectionId, user); console.log(`User identified: ${user.displayName} (${user.oderId})`); } diff --git a/server/src/websocket/types.ts b/server/src/websocket/types.ts index 4ca0a68..2c62ff1 100644 --- a/server/src/websocket/types.ts +++ b/server/src/websocket/types.ts @@ -6,6 +6,13 @@ export interface ConnectedUser { serverIds: Set; viewedServerId?: string; displayName?: string; + /** + * Opaque scope string sent by the client (typically the signal URL it + * connected through). Stale-connection eviction only targets connections + * that share the same (oderId, connectionScope) pair, so multiple signal + * URLs routing to the same server coexist without an eviction loop. + */ + connectionScope?: string; /** Timestamp of the last pong received (used to detect dead connections). */ lastPong: number; } diff --git a/skills-lock.json b/skills-lock.json new file mode 100644 index 0000000..a0a4cee --- /dev/null +++ b/skills-lock.json @@ -0,0 +1,10 @@ +{ + "version": 1, + "skills": { + "caveman": { + "source": "JuliusBrussee/caveman", + "sourceType": "github", + "computedHash": "4d486dd6f9fbb27ce1c51c972c9a5eb25a53236ae05eabf4d076ac1e293f4b7a" + } + } +} diff --git a/toju-app/src/app/domains/server-directory/README.md b/toju-app/src/app/domains/server-directory/README.md index d023ec0..86a8b1d 100644 --- a/toju-app/src/app/domains/server-directory/README.md +++ b/toju-app/src/app/domains/server-directory/README.md @@ -90,10 +90,15 @@ stateDiagram-v2 The facade exposes `testServer(endpointId)` and `testAllServers()`. Both delegate to `ServerEndpointHealthService.probeEndpoint()`, which: 1. Sends `GET /api/health` with a 5-second timeout -2. On success, checks the response's `serverVersion` against the client version via `ServerEndpointCompatibilityService` -3. If versions are incompatible, the endpoint is marked `incompatible` and deactivated -4. If `/api/health` fails, falls back to `GET /api/servers` as a basic liveness check -5. Updates the endpoint's status, latency, and version info in the state service +2. Reads the response's `serverVersion` and stable `serverInstanceId` +3. Checks the reported version against the client version via `ServerEndpointCompatibilityService` +4. If versions are incompatible, the endpoint is marked `incompatible` and deactivated +5. If `/api/health` fails, falls back to `GET /api/servers` as a basic liveness check +6. Updates the endpoint's status, latency, and version info in the state service + +`serverInstanceId` lets the client detect when multiple configured URLs point at the same backend. `ServerEndpointStateService.resolveCanonicalEndpoint()` prefers one canonical endpoint per backend instance so REST calls, WebSocket routing, and room fallback logic do not treat same-instance aliases as different signaling clusters. + +Room signaling now waits for that initial health sweep before the first saved-room reconnect attempt. That avoids a cold-start race where alias endpoints could open separate WebSocket managers before `serverInstanceId` had been learned. ```mermaid sequenceDiagram @@ -106,7 +111,7 @@ sequenceDiagram Health->>API: GET /api/health (5s timeout) alt 200 OK - API-->>Health: { serverVersion } + API-->>Health: { serverVersion, serverInstanceId } Health->>Compat: evaluateServerVersion(serverVersion, clientVersion) Compat-->>Health: { isCompatible, serverVersion } Health-->>Facade: online / incompatible + latency + versions @@ -132,6 +137,10 @@ The facade's `searchServers(query)` method supports two modes controlled by a `s The API service normalises every `ServerInfo` response, filling in `sourceId`, `sourceName`, and `sourceUrl` so the UI knows which endpoint each server came from. +That search fan-out is discovery only. Once a room is created or joined, the room keeps an authoritative signal-server affinity via its `sourceId` / `sourceUrl`. The join response can repair stale saved metadata, and reconnect logic now retries that authoritative endpoint first before probing any other configured endpoints. + +Fallback stays temporary. If the authoritative endpoint is unavailable, the client can probe other active compatible endpoints as a last resort for the current session, but it does not rewrite the room's saved affinity to that fallback endpoint. + ## Server-owned room metadata `ServerInfo` also carries the server-owned `channels` list for each room. Register and update calls persist this channel metadata on the server, and search or hydration responses return the normalised channel list so text and voice channel topology survives reloads, reconnects, and fresh joins. @@ -147,6 +156,8 @@ Default servers are configured in the environment file. The state service builds - `restoreDefaultServers()` re-adds any removed defaults and clears the removal tracking - The primary default URL is used as a fallback when no endpoint is resolved +Saved rooms can also self-heal their endpoint metadata. If a room has missing or stale source information, the client now searches the configured endpoints for that room, restores the correct source mapping, and persists the repair locally. + URL sanitisation strips trailing slashes and `/api` suffixes. Protocol-less URLs get `http` or `https` based on the current page protocol. ## Server administration diff --git a/toju-app/src/app/domains/server-directory/application/server-directory.facade.ts b/toju-app/src/app/domains/server-directory/application/server-directory.facade.ts index fe802bb..b17037e 100644 --- a/toju-app/src/app/domains/server-directory/application/server-directory.facade.ts +++ b/toju-app/src/app/domains/server-directory/application/server-directory.facade.ts @@ -21,6 +21,12 @@ import type { ServerSourceSelector, UnbanServerMemberRequest } from '../domain/server-directory.models'; +import { + buildRoomSignalSelector, + buildRoomSignalSource, + type RoomSignalSource, + type RoomSignalSourceInput +} from '../domain/room-signal-source'; import { ServerEndpointCompatibilityService } from '../infrastructure/server-endpoint-compatibility.service'; import { ServerEndpointHealthService } from '../infrastructure/server-endpoint-health.service'; import { ServerEndpointStateService } from './server-endpoint-state.service'; @@ -38,6 +44,7 @@ export class ServerDirectoryFacade { private readonly endpointCompatibility = inject(ServerEndpointCompatibilityService); private readonly endpointHealth = inject(ServerEndpointHealthService); private readonly api = inject(ServerDirectoryApiService); + private readonly initialServerHealthCheck: Promise; private shouldSearchAllServers = true; constructor() { @@ -47,7 +54,11 @@ export class ServerDirectoryFacade { this.activeServer = this.endpointState.activeServer; this.loadConnectionSettings(); - void this.testAllServers(); + this.initialServerHealthCheck = this.testAllServers().catch(() => undefined); + } + + async awaitInitialServerHealthCheck(): Promise { + await this.initialServerHealthCheck; } addServer(server: { name: string; url: string }): ServerEndpoint { @@ -110,6 +121,81 @@ export class ServerDirectoryFacade { return !!refreshedEndpoint && refreshedEndpoint.status !== 'incompatible'; } + resolveRoomEndpoint( + source?: RoomSignalSourceInput, + options?: { ensureEndpoint?: boolean; setActive?: boolean } + ): ServerEndpoint | null { + const normalizedSource = buildRoomSignalSource(source); + + if (normalizedSource.sourceId) { + const endpointById = this.endpointState.resolveCanonicalEndpoint( + this.servers().find((endpoint) => endpoint.id === normalizedSource.sourceId) ?? null + ); + + if (endpointById) { + return endpointById; + } + } + + if (!normalizedSource.sourceUrl) { + return null; + } + + const endpointByUrl = this.endpointState.resolveCanonicalEndpoint( + this.findServerByUrl(normalizedSource.sourceUrl) ?? null + ); + + if (endpointByUrl) { + return endpointByUrl; + } + + if (!options?.ensureEndpoint) { + return null; + } + + return this.ensureServerEndpoint({ + name: normalizedSource.sourceName ?? 'Signal Server', + url: normalizedSource.sourceUrl + }, { + setActive: options.setActive + }); + } + + normaliseRoomSignalSource( + source?: RoomSignalSourceInput, + options?: { ensureEndpoint?: boolean; setActive?: boolean } + ): RoomSignalSource { + const endpoint = this.resolveRoomEndpoint(source, options); + + return buildRoomSignalSource(source, endpoint); + } + + buildRoomSignalSelector( + source?: RoomSignalSourceInput, + options?: { ensureEndpoint?: boolean; setActive?: boolean } + ): ServerSourceSelector | undefined { + return buildRoomSignalSelector(this.normaliseRoomSignalSource(source, options)); + } + + getFallbackRoomEndpoints(source?: RoomSignalSourceInput): ServerEndpoint[] { + const primaryEndpoint = this.resolveRoomEndpoint(source); + const primarySource = this.normaliseRoomSignalSource(source); + const primaryUrl = primarySource.sourceUrl ? this.endpointState.sanitiseUrl(primarySource.sourceUrl) : null; + const primaryInstanceId = primaryEndpoint?.instanceId ?? null; + + return this.activeServers().filter((endpoint) => { + if (endpoint.status === 'incompatible') { + return false; + } + + if (primaryInstanceId && endpoint.instanceId === primaryInstanceId) { + return false; + } + + return !primaryUrl || this.endpointState.sanitiseUrl(endpoint.url) !== primaryUrl; + }); + } + setSearchAllServers(enabled: boolean): void { this.shouldSearchAllServers = enabled; } @@ -159,6 +245,10 @@ export class ServerDirectoryFacade { return this.api.getServer(serverId, selector); } + findServerAcrossActiveEndpoints(serverId: string, source?: RoomSignalSourceInput): Observable { + return this.api.findServerAcrossActiveEndpoints(serverId, source); + } + registerServer( server: Omit & { id?: string; password?: string | null }, selector?: ServerSourceSelector diff --git a/toju-app/src/app/domains/server-directory/application/server-endpoint-state.service.ts b/toju-app/src/app/domains/server-directory/application/server-endpoint-state.service.ts index 9573215..3e0214c 100644 --- a/toju-app/src/app/domains/server-directory/application/server-endpoint-state.service.ts +++ b/toju-app/src/app/domains/server-directory/application/server-endpoint-state.service.ts @@ -121,6 +121,20 @@ export class ServerEndpointStateService { return this._servers().find((endpoint) => this.sanitiseUrl(endpoint.url) === sanitisedUrl); } + resolveCanonicalEndpoint(endpoint: ServerEndpoint | null | undefined): ServerEndpoint | null { + if (!endpoint?.instanceId) { + return endpoint ?? null; + } + + const equivalentEndpoints = this._servers().filter((candidate) => candidate.instanceId === endpoint.instanceId); + + if (equivalentEndpoints.length <= 1) { + return endpoint; + } + + return [...equivalentEndpoints].sort((left, right) => this.compareEndpointPreference(left, right))[0] ?? endpoint; + } + removeServer(endpointId: string): void { const endpoints = this._servers(); const target = endpoints.find((endpoint) => endpoint.id === endpointId); @@ -208,6 +222,7 @@ export class ServerEndpointStateService { return { ...endpoint, + instanceId: versions?.serverInstanceId ?? endpoint.instanceId, status, latency, isActive: status === 'incompatible' ? false : endpoint.isActive, @@ -312,4 +327,50 @@ export class ServerEndpointStateService { private saveEndpoints(): void { this.storage.saveEndpoints(this._servers()); } + + private compareEndpointPreference(left: ServerEndpoint, right: ServerEndpoint): number { + const scoreDifference = this.endpointPreferenceScore(right) - this.endpointPreferenceScore(left); + + if (scoreDifference !== 0) { + return scoreDifference; + } + + return left.url.localeCompare(right.url); + } + + private endpointPreferenceScore(endpoint: ServerEndpoint): number { + let score = 0; + + if (endpoint.isDefault) { + score += 4; + } + + if (this.usesSecureProtocol(endpoint.url)) { + score += 2; + } + + if (!this.isIpHost(endpoint.url)) { + score += 1; + } + + return score; + } + + private usesSecureProtocol(url: string): boolean { + try { + return new URL(url).protocol === 'https:'; + } catch { + return false; + } + } + + private isIpHost(url: string): boolean { + try { + const hostname = new URL(url).hostname; + + return /^\d{1,3}(?:\.\d{1,3}){3}$/.test(hostname) || hostname.includes(':'); + } catch { + return false; + } + } } diff --git a/toju-app/src/app/domains/server-directory/domain/room-signal-source.spec.ts b/toju-app/src/app/domains/server-directory/domain/room-signal-source.spec.ts new file mode 100644 index 0000000..f1029dc --- /dev/null +++ b/toju-app/src/app/domains/server-directory/domain/room-signal-source.spec.ts @@ -0,0 +1,46 @@ +import { + areRoomSignalSourcesEqual, + buildRoomSignalSelector, + buildRoomSignalSource, + getSourceUrlFromSignalingUrl +} from './room-signal-source'; + +describe('room-signal-source helpers', () => { + it('converts signaling urls back to normalized source urls', () => { + expect(getSourceUrlFromSignalingUrl('wss://signal.toju.app')).toBe('https://signal.toju.app'); + expect(getSourceUrlFromSignalingUrl('ws://46.59.68.77:3001')).toBe('http://46.59.68.77:3001'); + }); + + it('prefers the resolved endpoint when normalizing a room source', () => { + expect(buildRoomSignalSource({ + sourceId: 'stale-id', + sourceName: 'Stale Source', + sourceUrl: 'https://old.example.com', + signalingUrl: 'wss://signal.toju.app' + }, { + id: 'primary-id', + name: 'Primary Signal', + url: 'https://signal.toju.app/' + })).toEqual({ + sourceId: 'primary-id', + sourceName: 'Primary Signal', + sourceUrl: 'https://signal.toju.app' + }); + }); + + it('builds selectors from signaling urls when no source url is persisted yet', () => { + expect(buildRoomSignalSelector({ + signalingUrl: 'wss://signal-sweden.toju.app', + fallbackName: 'Toju Signal Sweden' + })).toEqual({ + sourceUrl: 'https://signal-sweden.toju.app' + }); + }); + + it('treats equivalent persisted and signaling-derived sources as equal', () => { + expect(areRoomSignalSourcesEqual( + { sourceUrl: 'https://signal.toju.app/' }, + { signalingUrl: 'wss://signal.toju.app' } + )).toBeTrue(); + }); +}); diff --git a/toju-app/src/app/domains/server-directory/domain/room-signal-source.ts b/toju-app/src/app/domains/server-directory/domain/room-signal-source.ts new file mode 100644 index 0000000..451277f --- /dev/null +++ b/toju-app/src/app/domains/server-directory/domain/room-signal-source.ts @@ -0,0 +1,95 @@ +import type { ServerEndpoint, ServerSourceSelector } from './server-directory.models'; +import { normaliseConfiguredServerUrl, sanitiseServerBaseUrl } from './server-endpoint-defaults'; + +export interface RoomSignalSource { + sourceId?: string; + sourceName?: string; + sourceUrl?: string; +} + +export interface RoomSignalSourceInput extends RoomSignalSource { + fallbackName?: string; + signalingUrl?: string; +} + +const DEFAULT_SIGNAL_SOURCE_NAME = 'Signal Server'; + +export function getSourceUrlFromSignalingUrl(signalingUrl?: string): string | undefined { + const normalizedUrl = normalizeString(signalingUrl); + + if (!normalizedUrl) { + return undefined; + } + + const resolvedUrl = normaliseConfiguredServerUrl(normalizedUrl, 'https'); + + return resolvedUrl || undefined; +} + +export function buildRoomSignalSource( + source?: RoomSignalSourceInput | null, + endpoint?: Pick | null +): RoomSignalSource { + const sourceId = normalizeString(endpoint?.id) ?? normalizeString(source?.sourceId); + const sourceUrl = endpoint + ? sanitiseServerBaseUrl(endpoint.url) + : (normalizeUrl(source?.sourceUrl) ?? getSourceUrlFromSignalingUrl(source?.signalingUrl)); + const sourceName = normalizeString(endpoint?.name) + ?? normalizeString(source?.sourceName) + ?? normalizeString(source?.fallbackName) + ?? (sourceUrl ? DEFAULT_SIGNAL_SOURCE_NAME : undefined); + + return { + sourceId, + sourceName, + sourceUrl + }; +} + +export function buildRoomSignalSelector( + source?: RoomSignalSourceInput | null +): ServerSourceSelector | undefined { + const normalizedSource = buildRoomSignalSource(source); + + if (normalizedSource.sourceId) { + return { sourceId: normalizedSource.sourceId }; + } + + if (normalizedSource.sourceUrl) { + return { sourceUrl: normalizedSource.sourceUrl }; + } + + return undefined; +} + +export function hasRoomSignalSource(source?: RoomSignalSourceInput | null): boolean { + return !!buildRoomSignalSelector(source); +} + +export function areRoomSignalSourcesEqual( + left?: RoomSignalSourceInput | null, + right?: RoomSignalSourceInput | null +): boolean { + const normalizedLeft = buildRoomSignalSource(left); + const normalizedRight = buildRoomSignalSource(right); + + return normalizedLeft.sourceId === normalizedRight.sourceId + && normalizedLeft.sourceName === normalizedRight.sourceName + && normalizedLeft.sourceUrl === normalizedRight.sourceUrl; +} + +function normalizeString(value: string | undefined | null): string | undefined { + if (typeof value !== 'string') { + return undefined; + } + + const trimmed = value.trim(); + + return trimmed || undefined; +} + +function normalizeUrl(value: string | undefined | null): string | undefined { + const normalizedValue = normalizeString(value); + + return normalizedValue ? sanitiseServerBaseUrl(normalizedValue) : undefined; +} diff --git a/toju-app/src/app/domains/server-directory/domain/server-directory.models.ts b/toju-app/src/app/domains/server-directory/domain/server-directory.models.ts index c86f6fd..bd60c84 100644 --- a/toju-app/src/app/domains/server-directory/domain/server-directory.models.ts +++ b/toju-app/src/app/domains/server-directory/domain/server-directory.models.ts @@ -45,12 +45,14 @@ export interface DefaultServerDefinition { } export interface ServerEndpointVersions { + serverInstanceId?: string | null; serverVersion?: string | null; clientVersion?: string | null; } export interface ServerEndpoint { id: string; + instanceId?: string; name: string; url: string; isActive: boolean; @@ -135,6 +137,7 @@ export interface ServerVersionCompatibilityResult { } export interface ServerHealthCheckPayload { + serverInstanceId?: unknown; serverVersion?: unknown; } diff --git a/toju-app/src/app/domains/server-directory/feature/invite/invite.component.ts b/toju-app/src/app/domains/server-directory/feature/invite/invite.component.ts index 6c7aab7..98bf947 100644 --- a/toju-app/src/app/domains/server-directory/feature/invite/invite.component.ts +++ b/toju-app/src/app/domains/server-directory/feature/invite/invite.component.ts @@ -134,6 +134,15 @@ export class InviteComponent implements OnInit { sourceId: context.endpoint.id, sourceUrl: context.sourceUrl })); + const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: joinResponse.server.sourceId ?? context.endpoint.id, + sourceName: joinResponse.server.sourceName ?? context.endpoint.name, + sourceUrl: joinResponse.server.sourceUrl ?? context.sourceUrl, + signalingUrl: joinResponse.signalingUrl, + fallbackName: joinResponse.server.sourceName ?? context.endpoint.name ?? invite.server.name + }, { + ensureEndpoint: true + }); this.store.dispatch( RoomsActions.joinRoom({ @@ -145,9 +154,8 @@ export class InviteComponent implements OnInit { Array.isArray(joinResponse.server.channels) && joinResponse.server.channels.length > 0 ? joinResponse.server.channels : invite.server.channels, - sourceId: context.endpoint.id, - sourceName: context.endpoint.name, - sourceUrl: context.sourceUrl + ...resolvedSource, + signalingUrl: joinResponse.signalingUrl } }) ); diff --git a/toju-app/src/app/domains/server-directory/feature/server-search/server-search.component.ts b/toju-app/src/app/domains/server-directory/feature/server-search/server-search.component.ts index 38a4f89..8391d10 100644 --- a/toju-app/src/app/domains/server-directory/feature/server-search/server-search.component.ts +++ b/toju-app/src/app/domains/server-directory/feature/server-search/server-search.component.ts @@ -273,13 +273,24 @@ export class ServerSearchComponent implements OnInit { sourceId: server.sourceId, sourceUrl: server.sourceUrl })); + const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: response.server.sourceId ?? server.sourceId, + sourceName: response.server.sourceName ?? server.sourceName, + sourceUrl: response.server.sourceUrl ?? server.sourceUrl, + signalingUrl: response.signalingUrl, + fallbackName: response.server.sourceName ?? server.sourceName ?? server.name + }, { + ensureEndpoint: true + }); const resolvedServer = { ...server, ...response.server, channels: Array.isArray(response.server.channels) && response.server.channels.length > 0 ? response.server.channels - : server.channels + : server.channels, + ...resolvedSource, + signalingUrl: response.signalingUrl }; this.closePasswordDialog(); diff --git a/toju-app/src/app/domains/server-directory/index.ts b/toju-app/src/app/domains/server-directory/index.ts index 6af42b5..69c8b2c 100644 --- a/toju-app/src/app/domains/server-directory/index.ts +++ b/toju-app/src/app/domains/server-directory/index.ts @@ -1,3 +1,4 @@ export * from './application/server-directory.facade'; export * from './domain/server-directory.constants'; export * from './domain/server-directory.models'; +export * from './domain/room-signal-source'; diff --git a/toju-app/src/app/domains/server-directory/infrastructure/server-directory-api.service.ts b/toju-app/src/app/domains/server-directory/infrastructure/server-directory-api.service.ts index 98f471f..4325b3e 100644 --- a/toju-app/src/app/domains/server-directory/infrastructure/server-directory-api.service.ts +++ b/toju-app/src/app/domains/server-directory/infrastructure/server-directory-api.service.ts @@ -29,6 +29,7 @@ import type { ServerSourceSelector, UnbanServerMemberRequest } from '../domain/server-directory.models'; +import type { RoomSignalSourceInput } from '../domain/room-signal-source'; @Injectable({ providedIn: 'root' }) export class ServerDirectoryApiService { @@ -45,14 +46,16 @@ export class ServerDirectoryApiService { resolveEndpoint(selector?: ServerSourceSelector): ServerEndpoint | null { if (selector?.sourceId) { - return this.endpointState.servers().find((endpoint) => endpoint.id === selector.sourceId) ?? null; + const endpoint = this.endpointState.servers().find((candidate) => candidate.id === selector.sourceId) ?? null; + + return this.endpointState.resolveCanonicalEndpoint(endpoint); } if (selector?.sourceUrl) { - return this.endpointState.findServerByUrl(selector.sourceUrl) ?? null; + return this.endpointState.resolveCanonicalEndpoint(this.endpointState.findServerByUrl(selector.sourceUrl) ?? null); } - return ( + return this.endpointState.resolveCanonicalEndpoint( this.endpointState.activeServer() ?? this.endpointState.servers().find((endpoint) => endpoint.status !== 'incompatible') ?? this.endpointState.servers()[0] ?? @@ -92,6 +95,45 @@ export class ServerDirectoryApiService { ); } + findServerAcrossActiveEndpoints(serverId: string, source?: RoomSignalSourceInput): Observable { + const candidateEndpoints = this.getSearchableEndpoints(); + const preferredSourceUrl = source?.sourceUrl ? this.endpointState.sanitiseUrl(source.sourceUrl) : undefined; + const prioritizedEndpoints = [...candidateEndpoints].sort((left, right) => { + if (preferredSourceUrl) { + const leftMatches = this.endpointState.sanitiseUrl(left.url) === preferredSourceUrl; + const rightMatches = this.endpointState.sanitiseUrl(right.url) === preferredSourceUrl; + + if (leftMatches !== rightMatches) { + return leftMatches ? -1 : 1; + } + } + + if (source?.sourceId) { + const leftMatches = left.id === source.sourceId; + const rightMatches = right.id === source.sourceId; + + if (leftMatches !== rightMatches) { + return leftMatches ? -1 : 1; + } + } + + return 0; + }); + + if (prioritizedEndpoints.length === 0) { + return this.getServer(serverId, this.resolveSelector(source)); + } + + return forkJoin( + prioritizedEndpoints.map((endpoint) => this.getServer(serverId, { + sourceId: endpoint.id, + sourceUrl: endpoint.url + })) + ).pipe( + map((servers) => servers.find((server): server is ServerInfo => !!server) ?? null) + ); + } + registerServer( server: Omit & { id?: string; password?: string | null }, selector?: ServerSourceSelector @@ -221,11 +263,17 @@ export class ServerDirectoryApiService { } private resolveBaseServerUrl(selector?: ServerSourceSelector): string { + const resolvedEndpoint = this.resolveEndpoint(selector); + + if (resolvedEndpoint) { + return resolvedEndpoint.url; + } + if (selector?.sourceUrl) { return this.endpointState.sanitiseUrl(selector.sourceUrl); } - return this.resolveEndpoint(selector)?.url ?? this.endpointState.getPrimaryDefaultServerUrl(); + return this.endpointState.getPrimaryDefaultServerUrl(); } private unwrapServersResponse(response: { servers: ServerInfo[]; total: number } | ServerInfo[]): ServerInfo[] { @@ -245,7 +293,7 @@ export class ServerDirectoryApiService { } private searchAllEndpoints(query: string): Observable { - const onlineEndpoints = this.endpointState.activeServers().filter((endpoint) => endpoint.status !== 'offline'); + const onlineEndpoints = this.getSearchableEndpoints(); if (onlineEndpoints.length === 0) { return this.searchSingleEndpoint(query, this.getApiBaseUrl(), this.endpointState.activeServer()); @@ -258,7 +306,7 @@ export class ServerDirectoryApiService { } private getAllServersFromAllEndpoints(): Observable { - const onlineEndpoints = this.endpointState.activeServers().filter((endpoint) => endpoint.status !== 'offline'); + const onlineEndpoints = this.getSearchableEndpoints(); if (onlineEndpoints.length === 0) { return this.http.get<{ servers: ServerInfo[]; total: number }>(`${this.getApiBaseUrl()}/servers`).pipe( @@ -277,6 +325,30 @@ export class ServerDirectoryApiService { ).pipe(map((resultArrays) => resultArrays.flat())); } + private getSearchableEndpoints(): ServerEndpoint[] { + const activeEndpoints = this.endpointState.activeServers().filter((endpoint) => endpoint.status !== 'offline'); + + if (activeEndpoints.length > 0) { + return activeEndpoints; + } + + const activeServer = this.endpointState.activeServer(); + + return activeServer ? [activeServer] : []; + } + + private resolveSelector(source?: RoomSignalSourceInput): ServerSourceSelector | undefined { + if (source?.sourceId) { + return { sourceId: source.sourceId }; + } + + if (source?.sourceUrl) { + return { sourceUrl: source.sourceUrl }; + } + + return undefined; + } + private deduplicateById(items: T[]): T[] { const seen = new Set(); @@ -298,13 +370,14 @@ export class ServerDirectoryApiService { const candidate = server as Record; const sourceName = this.getStringValue(candidate['sourceName']); const sourceUrl = this.getStringValue(candidate['sourceUrl']); + const resolvedSource = this.endpointState.resolveCanonicalEndpoint(source); return { id: this.getStringValue(candidate['id']) ?? '', name: this.getStringValue(candidate['name']) ?? 'Unnamed server', description: this.getStringValue(candidate['description']), topic: this.getStringValue(candidate['topic']), - hostName: this.getStringValue(candidate['hostName']) ?? sourceName ?? source?.name ?? 'Unknown API', + hostName: this.getStringValue(candidate['hostName']) ?? sourceName ?? resolvedSource?.name ?? 'Unknown API', ownerId: this.getStringValue(candidate['ownerId']), ownerName: this.getStringValue(candidate['ownerName']), ownerPublicKey: this.getStringValue(candidate['ownerPublicKey']), @@ -319,9 +392,13 @@ export class ServerDirectoryApiService { roleAssignments: this.getRoleAssignmentsValue(candidate['roleAssignments']), channelPermissions: this.getChannelPermissionOverridesValue(candidate['channelPermissions']), createdAt: this.getNumberValue(candidate['createdAt'], Date.now()), - sourceId: this.getStringValue(candidate['sourceId']) ?? source?.id, - sourceName: sourceName ?? source?.name, - sourceUrl: sourceUrl ? this.endpointState.sanitiseUrl(sourceUrl) : source ? this.endpointState.sanitiseUrl(source.url) : undefined + sourceId: this.getStringValue(candidate['sourceId']) ?? resolvedSource?.id, + sourceName: sourceName ?? resolvedSource?.name, + sourceUrl: sourceUrl + ? this.endpointState.sanitiseUrl(sourceUrl) + : resolvedSource + ? this.endpointState.sanitiseUrl(resolvedSource.url) + : undefined }; } diff --git a/toju-app/src/app/domains/server-directory/infrastructure/server-endpoint-health.service.ts b/toju-app/src/app/domains/server-directory/infrastructure/server-endpoint-health.service.ts index c6f97c0..e8b7f84 100644 --- a/toju-app/src/app/domains/server-directory/infrastructure/server-endpoint-health.service.ts +++ b/toju-app/src/app/domains/server-directory/infrastructure/server-endpoint-health.service.ts @@ -30,12 +30,16 @@ export class ServerEndpointHealthService { payload.serverVersion, clientVersion ); + const serverInstanceId = typeof payload.serverInstanceId === 'string' && payload.serverInstanceId.trim().length > 0 + ? payload.serverInstanceId.trim() + : undefined; if (!versionCompatibility.isCompatible) { return { status: 'incompatible', latency, versions: { + serverInstanceId, serverVersion: versionCompatibility.serverVersion, clientVersion } @@ -46,6 +50,7 @@ export class ServerEndpointHealthService { status: 'online', latency, versions: { + serverInstanceId, serverVersion: versionCompatibility.serverVersion, clientVersion } diff --git a/toju-app/src/app/domains/voice-connection/application/voice-activity.service.ts b/toju-app/src/app/domains/voice-connection/application/voice-activity.service.ts index f3c0f0b..c2574fb 100644 --- a/toju-app/src/app/domains/voice-connection/application/voice-activity.service.ts +++ b/toju-app/src/app/domains/voice-connection/application/voice-activity.service.ts @@ -75,6 +75,12 @@ export class VoiceActivityService implements OnDestroy { this.untrackStream(peerId); }) ); + + this.subs.push( + this.voiceConnection.onVoiceConnected.subscribe(() => { + this.ensureAllRemoteStreamsTracked(); + }) + ); } trackLocalMic(userId: string, stream: MediaStream): void { @@ -124,6 +130,10 @@ export class VoiceActivityService implements OnDestroy { } const ctx = new AudioContext(); + + if (ctx.state === 'suspended') + void ctx.resume(); + const analyser = ctx.createAnalyser(); const sources = audioTracks.map((track) => ctx.createMediaStreamSource(new MediaStream([track]))); @@ -166,6 +176,18 @@ export class VoiceActivityService implements OnDestroy { this.stopPolling(); } + private ensureAllRemoteStreamsTracked(): void { + const peers = this.voiceConnection.getConnectedPeers(); + + for (const peerId of peers) { + const stream = this.voiceConnection.getRemoteVoiceStream(peerId); + + if (stream) { + this.trackStream(peerId, stream); + } + } + } + private ensurePolling(): void { if (this.animFrameId !== null) return; diff --git a/toju-app/src/app/features/servers/servers-rail.component.ts b/toju-app/src/app/features/servers/servers-rail.component.ts index 8638d26..a884861 100644 --- a/toju-app/src/app/features/servers/servers-rail.component.ts +++ b/toju-app/src/app/features/servers/servers-rail.component.ts @@ -18,6 +18,8 @@ import { EMPTY, Subject, catchError, + firstValueFrom, + from, switchMap, tap } from 'rxjs'; @@ -337,35 +339,44 @@ export class ServersRailComponent { this.joinPasswordError.set(null); - return this.serverDirectory.requestJoin({ - roomId: room.id, - userId: currentUserId, - userPublicKey: currentUser?.oderId || currentUserId, - displayName: currentUser?.displayName || 'Anonymous', - password: password?.trim() || undefined - }, { - sourceId: room.sourceId, - sourceUrl: room.sourceUrl - }) - .pipe( - tap((response) => { - this.closePasswordDialog(); - this.store.dispatch( - RoomsActions.updateRoom({ - roomId: room.id, - changes: this.toRoomRefreshChanges(room, response.server) + return from(this.resolveRoomJoinTarget(room)).pipe( + switchMap((joinTarget) => { + if (!joinTarget.selector) { + if (this.currentRoom()?.id === room.id) { + this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true })); + } + + return EMPTY; + } + + return this.serverDirectory.requestJoin({ + roomId: room.id, + userId: currentUserId, + userPublicKey: currentUser?.oderId || currentUserId, + displayName: currentUser?.displayName || 'Anonymous', + password: password?.trim() || undefined + }, joinTarget.selector) + .pipe( + tap((response) => { + this.closePasswordDialog(); + this.store.dispatch( + RoomsActions.updateRoom({ + roomId: room.id, + changes: this.toRoomRefreshChanges(joinTarget.room, response.server, response.signalingUrl) + }) + ); + + if (this.currentRoom()?.id === room.id) { + this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); + } }) ); - - if (this.currentRoom()?.id === room.id) { - this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); - } - }), - catchError((error: unknown) => { - this.handleBackgroundJoinError(room, error); - return EMPTY; - }) - ); + }), + catchError((error: unknown) => { + this.handleBackgroundJoinError(room, error); + return EMPTY; + }) + ); } private handleBackgroundJoinError(room: Room, error: unknown): void { @@ -414,7 +425,17 @@ export class ServersRailComponent { || (typeof status === 'number' && status >= 500); } - private toRoomRefreshChanges(room: Room, server: ServerInfo): Partial { + private toRoomRefreshChanges(room: Room, server: ServerInfo, signalingUrl?: string): Partial { + const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: server.sourceId ?? room.sourceId, + sourceName: server.sourceName ?? room.sourceName, + sourceUrl: server.sourceUrl ?? room.sourceUrl, + signalingUrl, + fallbackName: server.sourceName ?? room.sourceName ?? room.name + }, { + ensureEndpoint: true + }); + return { name: server.name, description: server.description, @@ -432,9 +453,99 @@ export class ServersRailComponent { Array.isArray(server.channels) && server.channels.length > 0 ? server.channels : room.channels, - sourceId: server.sourceId ?? room.sourceId, - sourceName: server.sourceName ?? room.sourceName, - sourceUrl: server.sourceUrl ?? room.sourceUrl + ...resolvedSource }; } + + private async resolveRoomJoinTarget(room: Room): Promise<{ + room: Room; + selector: ReturnType; + }> { + let resolvedRoom = this.applyResolvedRoomSource(room, this.serverDirectory.normaliseRoomSignalSource({ + sourceId: room.sourceId, + sourceName: room.sourceName, + sourceUrl: room.sourceUrl, + fallbackName: room.sourceName ?? room.name + }, { + ensureEndpoint: !!room.sourceUrl + })); + let selector = this.serverDirectory.buildRoomSignalSelector({ + sourceId: resolvedRoom.sourceId, + sourceName: resolvedRoom.sourceName, + sourceUrl: resolvedRoom.sourceUrl, + fallbackName: resolvedRoom.sourceName ?? resolvedRoom.name + }, { + ensureEndpoint: !!resolvedRoom.sourceUrl + }); + + const authoritativeServer = ( + selector + ? await firstValueFrom(this.serverDirectory.getServer(room.id, selector)) + : null + ) ?? await firstValueFrom(this.serverDirectory.findServerAcrossActiveEndpoints(room.id, { + sourceId: resolvedRoom.sourceId, + sourceName: resolvedRoom.sourceName, + sourceUrl: resolvedRoom.sourceUrl, + fallbackName: resolvedRoom.sourceName ?? resolvedRoom.name + })); + + if (!authoritativeServer) { + return { + room: resolvedRoom, + selector + }; + } + + const authoritativeSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: authoritativeServer.sourceId ?? resolvedRoom.sourceId, + sourceName: authoritativeServer.sourceName ?? resolvedRoom.sourceName, + sourceUrl: authoritativeServer.sourceUrl ?? resolvedRoom.sourceUrl, + fallbackName: authoritativeServer.sourceName ?? resolvedRoom.sourceName ?? resolvedRoom.name + }, { + ensureEndpoint: !!(authoritativeServer.sourceUrl ?? resolvedRoom.sourceUrl) + }); + + resolvedRoom = this.applyResolvedRoomSource(resolvedRoom, authoritativeSource); + selector = this.serverDirectory.buildRoomSignalSelector({ + sourceId: resolvedRoom.sourceId, + sourceName: resolvedRoom.sourceName, + sourceUrl: resolvedRoom.sourceUrl, + fallbackName: resolvedRoom.sourceName ?? resolvedRoom.name + }, { + ensureEndpoint: !!resolvedRoom.sourceUrl + }); + + return { + room: resolvedRoom, + selector + }; + } + + private applyResolvedRoomSource(room: Room, source: Pick): Room { + const nextRoom: Room = { + ...room, + sourceId: source.sourceId, + sourceName: source.sourceName, + sourceUrl: source.sourceUrl + }; + + if ( + room.sourceId === nextRoom.sourceId + && room.sourceName === nextRoom.sourceName + && room.sourceUrl === nextRoom.sourceUrl + ) { + return room; + } + + this.store.dispatch(RoomsActions.updateRoom({ + roomId: room.id, + changes: { + sourceId: nextRoom.sourceId, + sourceName: nextRoom.sourceName, + sourceUrl: nextRoom.sourceUrl + } + })); + + return nextRoom; + } } diff --git a/toju-app/src/app/infrastructure/realtime/README.md b/toju-app/src/app/infrastructure/realtime/README.md index b69d827..482d386 100644 --- a/toju-app/src/app/infrastructure/realtime/README.md +++ b/toju-app/src/app/infrastructure/realtime/README.md @@ -117,6 +117,12 @@ The signaling layer's only job is getting two peers to exchange SDP offers/answe Each signaling URL gets its own `SignalingManager` (one WebSocket each). `SignalingTransportHandler` picks the right socket based on which server the message is for. `ServerSignalingCoordinator` tracks which peers belong to which servers and which signaling URLs, so we know when it is safe to tear down a peer connection after leaving a server. +Room affinity is authoritative at this layer as well. The renderer repairs each room's saved `sourceId` / `sourceUrl` from server-directory responses and routes `join_server`, `view_server`, and room-scoped signaling traffic to that room's signaling URL first. If that route fails, alternate endpoints can be tried temporarily, but server-scoped raw messages are no longer broadcast to every connected signaling manager when the route is unknown. + +Cold-start routing now waits for the initial server-directory health probes so same-backend aliases can collapse to one canonical signaling endpoint before any saved rooms reconnect. When a room is reconnected on a chosen socket, its background rooms are re-joined on that same socket as well so stale per-signal memberships do not keep orphan managers alive, and reconnect replay only sends `view_server` for rooms that manager still has joined. + +This is still a non-federated model. Different signaling servers do not share peer registries or relay WebRTC offers for each other, so users in the same room must converge on the same signaling endpoint to discover one another reliably. + ```mermaid sequenceDiagram participant UI as App diff --git a/toju-app/src/app/infrastructure/realtime/realtime.constants.ts b/toju-app/src/app/infrastructure/realtime/realtime.constants.ts index 2efba53..452a5c0 100644 --- a/toju-app/src/app/infrastructure/realtime/realtime.constants.ts +++ b/toju-app/src/app/infrastructure/realtime/realtime.constants.ts @@ -84,6 +84,7 @@ export const SIGNALING_TYPE_CONNECTED = 'connected'; export const SIGNALING_TYPE_SERVER_USERS = 'server_users'; export const SIGNALING_TYPE_USER_JOINED = 'user_joined'; export const SIGNALING_TYPE_USER_LEFT = 'user_left'; +export const SIGNALING_TYPE_ACCESS_DENIED = 'access_denied'; export const P2P_TYPE_STATE_REQUEST = 'state-request'; export const P2P_TYPE_VOICE_STATE_REQUEST = 'voice-state-request'; diff --git a/toju-app/src/app/infrastructure/realtime/signaling/server-membership-signaling-handler.ts b/toju-app/src/app/infrastructure/realtime/signaling/server-membership-signaling-handler.ts index 08ed5c0..8ce8543 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/server-membership-signaling-handler.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/server-membership-signaling-handler.ts @@ -33,6 +33,8 @@ export class ServerMembershipSignalingHandler { return; } + this.migrateServerSignalUrl(roomId, resolvedSignalUrl); + this.dependencies.signalingCoordinator.setServerSignalUrl(roomId, resolvedSignalUrl); this.dependencies.signalingCoordinator.setLastJoinedServer(resolvedSignalUrl, { serverId: roomId, @@ -55,6 +57,8 @@ export class ServerMembershipSignalingHandler { return; } + this.migrateServerSignalUrl(serverId, resolvedSignalUrl); + this.dependencies.signalingCoordinator.setServerSignalUrl(serverId, resolvedSignalUrl); this.dependencies.signalingCoordinator.setLastJoinedServer(resolvedSignalUrl, { serverId, @@ -99,7 +103,9 @@ export class ServerMembershipSignalingHandler { return; } - for (const { signalUrl, serverIds } of this.dependencies.signalingCoordinator.getJoinedServerEntries()) { + const joinedEntries = this.dependencies.signalingCoordinator.getJoinedServerEntries(); + + for (const { signalUrl, serverIds } of joinedEntries) { for (const joinedServerId of serverIds) { this.dependencies.signalingTransport.sendRawMessageToSignalUrl(signalUrl, { type: SIGNALING_TYPE_LEAVE_SERVER, @@ -109,6 +115,11 @@ export class ServerMembershipSignalingHandler { } this.dependencies.signalingCoordinator.clearJoinedServers(); + + for (const { signalUrl } of joinedEntries) { + this.dependencies.signalingCoordinator.pruneUnusedSignalUrl(signalUrl); + } + this.dependencies.runFullCleanup(); } @@ -116,11 +127,13 @@ export class ServerMembershipSignalingHandler { const resolvedSignalUrl = this.dependencies.signalingCoordinator.getServerSignalUrl(serverId); if (resolvedSignalUrl) { - this.dependencies.signalingCoordinator.removeJoinedServer(resolvedSignalUrl, serverId); this.dependencies.signalingTransport.sendRawMessageToSignalUrl(resolvedSignalUrl, { type: SIGNALING_TYPE_LEAVE_SERVER, serverId }); + + this.dependencies.signalingCoordinator.removeJoinedServer(resolvedSignalUrl, serverId); + this.dependencies.signalingCoordinator.pruneUnusedSignalUrl(resolvedSignalUrl); } else { this.dependencies.signalingTransport.sendRawMessage({ type: SIGNALING_TYPE_LEAVE_SERVER, @@ -143,4 +156,26 @@ export class ServerMembershipSignalingHandler { ?? this.dependencies.signalingCoordinator.getServerSignalUrl(serverId) ?? this.getCurrentSignalingUrl(); } + + private migrateServerSignalUrl(serverId: string, nextSignalUrl: string): void { + const previousSignalUrl = this.dependencies.signalingCoordinator.getServerSignalUrl(serverId); + + if (!previousSignalUrl || previousSignalUrl === nextSignalUrl) { + return; + } + + this.dependencies.signalingTransport.sendRawMessageToSignalUrl(previousSignalUrl, { + type: SIGNALING_TYPE_LEAVE_SERVER, + serverId + }); + + this.dependencies.signalingCoordinator.removeJoinedServer(previousSignalUrl, serverId); + this.dependencies.signalingCoordinator.pruneUnusedSignalUrl(previousSignalUrl); + + this.dependencies.logger.info('Migrated server to a new signaling route', { + previousSignalUrl, + serverId, + signalUrl: nextSignalUrl + }); + } } diff --git a/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts b/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts index 674cf91..8430672 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/server-signaling-coordinator.ts @@ -110,6 +110,10 @@ export class ServerSignalingCoordinator { this.lastJoinedServerBySignalUrl.set(signalUrl, joinedServer); } + getLastJoinedServer(signalUrl: string): JoinedServerInfo | null { + return this.lastJoinedServerBySignalUrl.get(signalUrl) ?? null; + } + clearLastJoinedServers(): void { this.lastJoinedServerBySignalUrl.clear(); } @@ -156,15 +160,57 @@ export class ServerSignalingCoordinator { } removeJoinedServer(signalUrl: string, serverId: string): void { - this.getOrCreateMemberServerSet(signalUrl).delete(serverId); + const memberServerIds = this.memberServerIdsBySignalUrl.get(signalUrl); + + if (!memberServerIds) { + this.repairLastJoinedServer(signalUrl, serverId); + return; + } + + memberServerIds.delete(serverId); + + if (memberServerIds.size === 0) { + this.memberServerIdsBySignalUrl.delete(signalUrl); + } + + this.repairLastJoinedServer(signalUrl, serverId); } removeJoinedServerEverywhere(serverId: string): void { - for (const memberServerIds of this.memberServerIdsBySignalUrl.values()) { - memberServerIds.delete(serverId); + for (const signalUrl of Array.from(this.memberServerIdsBySignalUrl.keys())) { + this.removeJoinedServer(signalUrl, serverId); + this.pruneUnusedSignalUrl(signalUrl); } } + pruneUnusedSignalUrl(signalUrl: string): void { + if (this.getMemberServerIdsForSignalUrl(signalUrl).size > 0) { + return; + } + + this.memberServerIdsBySignalUrl.delete(signalUrl); + this.lastJoinedServerBySignalUrl.delete(signalUrl); + + const subscriptions = this.signalingSubscriptions.get(signalUrl); + + if (subscriptions) { + for (const subscription of subscriptions) { + subscription.unsubscribe(); + } + + this.signalingSubscriptions.delete(signalUrl); + } + + const manager = this.signalingManagers.get(signalUrl); + + if (manager) { + manager.destroy(); + this.signalingManagers.delete(signalUrl); + } + + this.removeSignalUrlFromPeerTracking(signalUrl); + } + getMemberServerIdsForSignalUrl(signalUrl: string): ReadonlySet { return this.memberServerIdsBySignalUrl.get(signalUrl) ?? new Set(); } @@ -344,6 +390,45 @@ export class ServerSignalingCoordinator { return trackedServerIds; } + private repairLastJoinedServer(signalUrl: string, removedServerId: string): void { + const lastJoined = this.lastJoinedServerBySignalUrl.get(signalUrl); + + if (!lastJoined) { + return; + } + + const memberServerIds = this.memberServerIdsBySignalUrl.get(signalUrl); + + if (!memberServerIds || memberServerIds.size === 0) { + this.lastJoinedServerBySignalUrl.delete(signalUrl); + return; + } + + if (lastJoined.serverId !== removedServerId && memberServerIds.has(lastJoined.serverId)) { + return; + } + + const nextServerId = memberServerIds.values().next().value as string | undefined; + + if (!nextServerId) { + this.lastJoinedServerBySignalUrl.delete(signalUrl); + return; + } + + this.lastJoinedServerBySignalUrl.set(signalUrl, { + ...lastJoined, + serverId: nextServerId + }); + } + + private removeSignalUrlFromPeerTracking(signalUrl: string): void { + const peerIds = new Set([...this.peerKnownSignalUrls.keys(), ...this.peerServerMap.keys()]); + + for (const peerId of peerIds) { + this.removePeerSignalScope(peerId, signalUrl); + } + } + private removePeerSignalScope(peerId: string, signalUrl: string): void { const trackedSignalUrls = this.peerServerMap.get(peerId); diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts index 12634b8..72963b9 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling-message-handler.ts @@ -1,6 +1,7 @@ import type { SignalingMessage } from '../../../shared-kernel'; import { PeerData } from '../realtime.types'; import { + SIGNALING_TYPE_ACCESS_DENIED, SIGNALING_TYPE_ANSWER, SIGNALING_TYPE_CONNECTED, SIGNALING_TYPE_ICE_CANDIDATE, @@ -98,6 +99,10 @@ export class IncomingSignalingMessageHandler { this.handleIceCandidateSignalingMessage(message, signalUrl); return; + case SIGNALING_TYPE_ACCESS_DENIED: + this.handleAccessDeniedSignalingMessage(message, signalUrl); + return; + default: return; } @@ -357,6 +362,16 @@ export class IncomingSignalingMessageHandler { this.dependencies.peerManager.handleIceCandidate(fromUserId, candidate); } + private handleAccessDeniedSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void { + if (!message.serverId) { + return; + } + + // Remove the server from the coordinator for this signal URL so it won't + // be re-joined on the next reconnect cycle. + this.dependencies.signalingCoordinator.removeJoinedServer(signalUrl, message.serverId); + } + private scheduleUserJoinedFallbackOffer(peerId: string, signalUrl: string, serverId?: string): void { this.clearUserJoinedFallbackOffer(peerId); diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts index 855015e..d79fd4c 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling-transport-handler.ts @@ -118,6 +118,13 @@ export class SignalingTransportHandler { if (serverSignalUrl && this.sendRawMessageToSignalUrl(serverSignalUrl, message)) { return; } + + this.dependencies.logger.warn('[signaling] Missing server signal route for outbound raw message', { + serverId, + type: messageType + }); + + return; } const connectedManagers = this.getConnectedSignalingManagers(); @@ -161,14 +168,14 @@ export class SignalingTransportHandler { displayName: normalizedDisplayName }; - const identifyMessage = { - type: SIGNALING_TYPE_IDENTIFY, - oderId, - displayName: normalizedDisplayName - }; - if (signalUrl) { - this.sendRawMessageToSignalUrl(signalUrl, identifyMessage); + this.sendRawMessageToSignalUrl(signalUrl, { + type: SIGNALING_TYPE_IDENTIFY, + oderId, + displayName: normalizedDisplayName, + connectionScope: signalUrl + }); + return; } @@ -178,8 +185,13 @@ export class SignalingTransportHandler { return; } - for (const { manager } of connectedManagers) { - manager.sendRawMessage(identifyMessage); + for (const { signalUrl: managerSignalUrl, manager } of connectedManagers) { + manager.sendRawMessage({ + type: SIGNALING_TYPE_IDENTIFY, + oderId, + displayName: normalizedDisplayName, + connectionScope: managerSignalUrl + }); } } } diff --git a/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts b/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts index e7de4a7..ab3d62a 100644 --- a/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts +++ b/toju-app/src/app/infrastructure/realtime/signaling/signaling.manager.ts @@ -283,7 +283,8 @@ export class SignalingManager { if (credentials) { this.sendRawMessage({ type: SIGNALING_TYPE_IDENTIFY, oderId: credentials.oderId, - displayName: credentials.displayName }); + displayName: credentials.displayName, + connectionScope: this.lastSignalingUrl ?? undefined }); } const memberIds = this.getMemberServerIds(); @@ -296,17 +297,10 @@ export class SignalingManager { const lastJoined = this.getLastJoinedServer(); - if (lastJoined) { + if (lastJoined && memberIds.has(lastJoined.serverId)) { this.sendRawMessage({ type: SIGNALING_TYPE_VIEW_SERVER, serverId: lastJoined.serverId }); } - } else { - const lastJoined = this.getLastJoinedServer(); - - if (lastJoined) { - this.sendRawMessage({ type: SIGNALING_TYPE_JOIN_SERVER, - serverId: lastJoined.serverId }); - } } } diff --git a/toju-app/src/app/store/rooms/rooms.actions.ts b/toju-app/src/app/store/rooms/rooms.actions.ts index 234e313..de7ee1a 100644 --- a/toju-app/src/app/store/rooms/rooms.actions.ts +++ b/toju-app/src/app/store/rooms/rooms.actions.ts @@ -37,7 +37,11 @@ export const RoomsActions = createActionGroup({ 'Create Room Success': props<{ room: Room }>(), 'Create Room Failure': props<{ error: string }>(), - 'Join Room': props<{ roomId: string; password?: string; serverInfo?: Partial & { name: string } }>(), + 'Join Room': props<{ + roomId: string; + password?: string; + serverInfo?: Partial & { name: string; signalingUrl?: string }; + }>(), 'Join Room Success': props<{ room: Room }>(), 'Join Room Failure': props<{ error: string }>(), diff --git a/toju-app/src/app/store/rooms/rooms.effects.ts b/toju-app/src/app/store/rooms/rooms.effects.ts index 048b98c..33282cc 100644 --- a/toju-app/src/app/store/rooms/rooms.effects.ts +++ b/toju-app/src/app/store/rooms/rooms.effects.ts @@ -13,6 +13,7 @@ import { of, from, EMPTY, + firstValueFrom, merge } from 'rxjs'; import { @@ -43,7 +44,10 @@ import { saveLastViewedChatToStorage } from '../../infrastructure/persistence'; import { + areRoomSignalSourcesEqual, CLIENT_UPDATE_REQUIRED_MESSAGE, + type RoomSignalSource, + type ServerInfo, type ServerSourceSelector, ServerDirectoryFacade } from '../../domains/server-directory'; @@ -169,6 +173,12 @@ interface RoomPresenceSignalingMessage { displayName?: string; } +interface RoomSignalConnectionPlan { + fallbackSources: RoomSignalSource[]; + primarySource: RoomSignalSource | null; + room: Room; +} + type BlockedRoomAccessAction = | ReturnType | ReturnType; @@ -196,6 +206,7 @@ export class RoomsEffects { * join/leave sound within {@link RECONNECT_SOUND_GRACE_MS}. */ private recentlyLeftVoiceTimestamps = new Map(); + private readonly roomSignalFallbackSources = new Map(); private roomNavigationRequestVersion = 0; private latestNavigatedRoomId: string | null = null; @@ -388,13 +399,27 @@ export class RoomsEffects { return from(this.db.getRoom(roomId)).pipe( switchMap((room) => { const sourceSelector = serverInfo - ? { + ? this.serverDirectory.buildRoomSignalSelector({ sourceId: serverInfo.sourceId, - sourceUrl: serverInfo.sourceUrl - } + sourceName: serverInfo.sourceName, + sourceUrl: serverInfo.sourceUrl, + signalingUrl: serverInfo.signalingUrl, + fallbackName: serverInfo.sourceName ?? serverInfo.name + }, { + ensureEndpoint: !!(serverInfo.sourceUrl ?? serverInfo.signalingUrl) + }) : undefined; if (room) { + const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: serverInfo?.sourceId ?? room.sourceId, + sourceName: serverInfo?.sourceName ?? room.sourceName, + sourceUrl: serverInfo?.sourceUrl ?? room.sourceUrl, + signalingUrl: serverInfo?.signalingUrl, + fallbackName: serverInfo?.sourceName ?? room.sourceName ?? serverInfo?.name ?? room.name + }, { + ensureEndpoint: !!(serverInfo?.sourceUrl ?? room.sourceUrl ?? serverInfo?.signalingUrl) + }); const resolvedRoom: Room = { ...room, isPrivate: typeof serverInfo?.isPrivate === 'boolean' ? serverInfo.isPrivate : room.isPrivate, @@ -403,9 +428,7 @@ export class RoomsEffects { roles: serverInfo?.roles ?? room.roles, roleAssignments: serverInfo?.roleAssignments ?? room.roleAssignments, channelPermissions: serverInfo?.channelPermissions ?? room.channelPermissions, - sourceId: serverInfo?.sourceId ?? room.sourceId, - sourceName: serverInfo?.sourceName ?? room.sourceName, - sourceUrl: serverInfo?.sourceUrl ?? room.sourceUrl, + ...resolvedSource, hasPassword: typeof serverInfo?.hasPassword === 'boolean' ? serverInfo.hasPassword @@ -430,6 +453,15 @@ export class RoomsEffects { // If not in local DB but we have server info from search, create a room entry if (serverInfo) { + const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: serverInfo.sourceId, + sourceName: serverInfo.sourceName, + sourceUrl: serverInfo.sourceUrl, + signalingUrl: serverInfo.signalingUrl, + fallbackName: serverInfo.sourceName ?? serverInfo.name + }, { + ensureEndpoint: !!(serverInfo.sourceUrl ?? serverInfo.signalingUrl) + }); const newRoom: Room = { id: roomId, name: serverInfo.name, @@ -445,9 +477,7 @@ export class RoomsEffects { roles: serverInfo.roles, roleAssignments: serverInfo.roleAssignments, channelPermissions: serverInfo.channelPermissions, - sourceId: serverInfo.sourceId, - sourceName: serverInfo.sourceName, - sourceUrl: serverInfo.sourceUrl + ...resolvedSource }; // Save to local DB for future reference @@ -459,6 +489,14 @@ export class RoomsEffects { return this.serverDirectory.getServer(roomId, sourceSelector).pipe( switchMap((serverData) => { if (serverData) { + const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: serverData.sourceId, + sourceName: serverData.sourceName, + sourceUrl: serverData.sourceUrl, + fallbackName: serverData.sourceName ?? serverData.name + }, { + ensureEndpoint: !!serverData.sourceUrl + }); const newRoom: Room = { id: serverData.id, name: serverData.name, @@ -474,9 +512,7 @@ export class RoomsEffects { roles: serverData.roles, roleAssignments: serverData.roleAssignments, channelPermissions: serverData.channelPermissions, - sourceId: serverData.sourceId, - sourceName: serverData.sourceName, - sourceUrl: serverData.sourceUrl + ...resolvedSource }; this.db.saveRoom(newRoom); @@ -621,16 +657,32 @@ export class RoomsEffects { refreshServerOwnedRoomMetadata$ = createEffect(() => this.actions$.pipe( ofType(RoomsActions.joinRoomSuccess, RoomsActions.viewServerSuccess), - switchMap(({ room }) => - this.serverDirectory.getServer(room.id, { - sourceId: room.sourceId, - sourceUrl: room.sourceUrl - }).pipe( + switchMap(({ room }) => { + const source = this.resolveRoomSignalSource(room); + const selector = this.resolveRoomSignalSelector(source, room.name); + const roomRequest$ = selector + ? this.serverDirectory.getServer(room.id, selector).pipe( + switchMap((serverData) => serverData + ? of(serverData) + : this.serverDirectory.findServerAcrossActiveEndpoints(room.id, source)) + ) + : this.serverDirectory.findServerAcrossActiveEndpoints(room.id, source); + + return roomRequest$.pipe( map((serverData) => { if (!serverData) { return null; } + const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: serverData.sourceId ?? room.sourceId, + sourceName: serverData.sourceName ?? room.sourceName, + sourceUrl: serverData.sourceUrl ?? room.sourceUrl, + fallbackName: serverData.sourceName ?? room.sourceName ?? room.name + }, { + ensureEndpoint: !!(serverData.sourceUrl ?? room.sourceUrl) + }); + return RoomsActions.updateRoom({ roomId: room.id, changes: { @@ -645,16 +697,14 @@ export class RoomsEffects { roles: serverData.roles ?? room.roles, roleAssignments: serverData.roleAssignments ?? room.roleAssignments, channelPermissions: serverData.channelPermissions ?? room.channelPermissions, - sourceId: serverData.sourceId ?? room.sourceId, - sourceName: serverData.sourceName ?? room.sourceName, - sourceUrl: serverData.sourceUrl ?? room.sourceUrl + ...resolvedSource } }); }), filter((action): action is ReturnType => !!action), catchError(() => EMPTY) - ) - ) + ); + }) ) ); @@ -732,6 +782,7 @@ export class RoomsEffects { ), switchMap(([{ roomId }]) => { this.db.deleteRoom(roomId); + this.roomSignalFallbackSources.delete(roomId); this.webrtc.broadcastMessage({ type: 'room-deleted', roomId }); @@ -824,6 +875,7 @@ export class RoomsEffects { // Delete from local DB this.db.deleteRoom(roomId); + this.roomSignalFallbackSources.delete(roomId); // Leave this specific server (doesn't affect other servers) this.webrtc.leaveRoom(roomId); @@ -1322,6 +1374,12 @@ export class RoomsEffects { if (signalingMessage.reason !== 'SERVER_NOT_FOUND') return EMPTY; + // When multiple signal URLs are configured, the room may already + // be successfully joined on a different signal server. Only show + // the reconnect notice when the room is not reachable at all. + if (signalingMessage.serverId && this.webrtc.hasJoinedServer(signalingMessage.serverId)) + return EMPTY; + return [RoomsActions.setSignalServerReconnecting({ isReconnecting: true })]; } @@ -1997,23 +2055,127 @@ export class RoomsEffects { ): Promise { const shouldShowCompatibilityError = options.showCompatibilityError ?? false; const navigationRequestVersion = options.navigationRequestVersion; - const compatibilitySelector = this.resolveCompatibilitySelector(room); - const isCompatible = compatibilitySelector === null - ? true - : await this.serverDirectory.ensureEndpointVersionCompatibility(compatibilitySelector); + const isViewedRoom = () => room.id === this.latestNavigatedRoomId; + + await this.serverDirectory.awaitInitialServerHealthCheck(); if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { return; } - if (!isCompatible) { - if (shouldShowCompatibilityError) { - this.store.dispatch( - RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE }) - ); + const connectionPlan = await this.resolveRoomSignalConnectionPlan(room); + + if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { + return; + } + + const sessionFallbackSource = this.roomSignalFallbackSources.get(room.id); + const connectionCandidates: { + isExistingFallback?: boolean; + isFallback?: boolean; + isPrimary?: boolean; + source: RoomSignalSource; + }[] = []; + const pushConnectionCandidate = ( + source: RoomSignalSource | null | undefined, + flags: { isExistingFallback?: boolean; isFallback?: boolean; isPrimary?: boolean } = {} + ) => { + if (!source || !this.resolveRoomSignalSelector(source, room.name)) { + return; + } + + if (connectionCandidates.some((candidate) => areRoomSignalSourcesEqual(candidate.source, source))) { + return; + } + + connectionCandidates.push({ + ...flags, + source + }); + }; + + if (sessionFallbackSource && this.webrtc.hasJoinedServer(room.id)) { + pushConnectionCandidate(sessionFallbackSource, { isExistingFallback: true, isFallback: true }); + } + + pushConnectionCandidate(connectionPlan.primarySource, { isPrimary: true }); + + for (const fallbackSource of connectionPlan.fallbackSources) { + pushConnectionCandidate(fallbackSource, { isFallback: true }); + } + + let attemptedFallback = false; + + for (const candidate of connectionCandidates) { + const selector = this.resolveRoomSignalSelector(candidate.source, connectionPlan.room.name); + + if (!selector) { + continue; + } + + const isCompatible = await this.serverDirectory.ensureEndpointVersionCompatibility(selector); + + if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { + return; + } + + if (!isCompatible) { + if (candidate.isPrimary) { + if (shouldShowCompatibilityError) { + this.store.dispatch( + RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE }) + ); + } + + if (isViewedRoom()) { + this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); + } + + return; + } + + continue; + } + + if (candidate.isFallback && !candidate.isExistingFallback && !attemptedFallback) { + attemptedFallback = true; + + if (isViewedRoom()) { + this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true })); + } + } + + const connected = await this.connectRoomToSignalSource( + connectionPlan.room, + candidate.source, + user, + resolvedOderId, + savedRooms, + navigationRequestVersion + ); + + if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { + return; + } + + if (!connected) { + continue; + } + + if (candidate.isFallback) { + this.roomSignalFallbackSources.set(room.id, candidate.source); + } else { + this.roomSignalFallbackSources.delete(room.id); + } + + if (shouldShowCompatibilityError) { + this.store.dispatch(RoomsActions.setSignalServerCompatibilityError({ message: null })); + } + + if (isViewedRoom()) { + this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); } - this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); return; } @@ -2021,49 +2183,9 @@ export class RoomsEffects { this.store.dispatch(RoomsActions.setSignalServerCompatibilityError({ message: null })); } - const wsUrl = this.serverDirectory.getWebSocketUrl({ - sourceId: room.sourceId, - sourceUrl: room.sourceUrl - }); - const oderId = resolvedOderId || user?.oderId || this.webrtc.peerId(); - const displayName = resolveUserDisplayName(user); - const sameSignalRooms = this.getRoomsForSignalingUrl(this.includeRoom(savedRooms, room), wsUrl); - const backgroundRooms = sameSignalRooms.filter((candidate) => candidate.id !== room.id); - const joinCurrentEndpointRooms = () => { - if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { - return; - } - - this.webrtc.setCurrentServer(room.id); - this.webrtc.identify(oderId, displayName, wsUrl); - - for (const backgroundRoom of backgroundRooms) { - if (!this.webrtc.hasJoinedServer(backgroundRoom.id)) { - this.webrtc.joinRoom(backgroundRoom.id, oderId, wsUrl); - } - } - - if (this.webrtc.hasJoinedServer(room.id)) { - this.webrtc.switchServer(room.id, oderId, wsUrl); - } else { - this.webrtc.joinRoom(room.id, oderId, wsUrl); - } - }; - - if (this.webrtc.isSignalingConnectedTo(wsUrl)) { - joinCurrentEndpointRooms(); - return; + if (isViewedRoom()) { + this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true })); } - - this.webrtc.connectToSignalingServer(wsUrl).subscribe({ - next: (connected) => { - if (!connected || !this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) - return; - - joinCurrentEndpointRooms(); - }, - error: () => {} - }); } private syncSavedRoomConnections(user: User | null, currentRoom: Room | null, savedRooms: Room[]): void { @@ -2076,10 +2198,12 @@ export class RoomsEffects { const roomsBySignalingUrl = new Map(); for (const room of roomsToSync) { - const wsUrl = this.serverDirectory.getWebSocketUrl({ - sourceId: room.sourceId, - sourceUrl: room.sourceUrl - }); + const wsUrl = this.resolveRoomSignalingUrl(room); + + if (!wsUrl) { + continue; + } + const groupedRooms = roomsBySignalingUrl.get(wsUrl) ?? []; if (!groupedRooms.some((groupedRoom) => groupedRoom.id === room.id)) { @@ -2149,10 +2273,7 @@ export class RoomsEffects { continue; } - if (this.serverDirectory.getWebSocketUrl({ - sourceId: room.sourceId, - sourceUrl: room.sourceUrl - }) !== wsUrl) { + if (this.resolveRoomSignalingUrl(room) !== wsUrl) { continue; } @@ -2202,6 +2323,175 @@ export class RoomsEffects { return localStorage.getItem('metoyou_currentUserId'); } + private async resolveRoomSignalConnectionPlan(room: Room): Promise { + let resolvedRoom = this.repairRoomSignalSource(room, this.resolveRoomSignalSource(room)); + let primarySource = this.resolveRoomSignalSource(resolvedRoom); + + if (!this.webrtc.hasJoinedServer(room.id)) { + const selector = this.resolveRoomSignalSelector(primarySource, resolvedRoom.name); + const authoritativeServer = ( + selector + ? await firstValueFrom(this.serverDirectory.getServer(room.id, selector)) + : null + ) ?? await firstValueFrom(this.serverDirectory.findServerAcrossActiveEndpoints(room.id, primarySource)); + + if (authoritativeServer) { + const authoritativeSource = this.serverDirectory.normaliseRoomSignalSource({ + sourceId: authoritativeServer.sourceId ?? primarySource.sourceId, + sourceName: authoritativeServer.sourceName ?? primarySource.sourceName, + sourceUrl: authoritativeServer.sourceUrl ?? primarySource.sourceUrl, + fallbackName: authoritativeServer.sourceName ?? primarySource.sourceName ?? resolvedRoom.name + }, { + ensureEndpoint: !!(authoritativeServer.sourceUrl ?? primarySource.sourceUrl) + }); + + resolvedRoom = this.repairRoomSignalSource(resolvedRoom, authoritativeSource); + primarySource = authoritativeSource; + } + } + + const fallbackSources = this.serverDirectory.getFallbackRoomEndpoints(primarySource) + .map((endpoint) => this.serverDirectory.normaliseRoomSignalSource({ + sourceId: endpoint.id, + sourceName: endpoint.name, + sourceUrl: endpoint.url, + fallbackName: endpoint.name + })) + .filter((source, index, sources) => + sources.findIndex((candidate) => areRoomSignalSourcesEqual(candidate, source)) === index + ); + + return { + fallbackSources, + primarySource: this.resolveRoomSignalSelector(primarySource, resolvedRoom.name) ? primarySource : null, + room: resolvedRoom + }; + } + + private async connectRoomToSignalSource( + room: Room, + source: RoomSignalSource, + user: User | null, + resolvedOderId: string | undefined, + savedRooms: Room[], + navigationRequestVersion?: number + ): Promise { + const selector = this.resolveRoomSignalSelector(source, room.name); + + if (!selector) { + return false; + } + + const wsUrl = this.serverDirectory.getWebSocketUrl(selector); + const oderId = resolvedOderId || user?.oderId || this.webrtc.peerId(); + const displayName = resolveUserDisplayName(user); + const sameSignalRooms = this.getRoomsForSignalingUrl(this.includeRoom(savedRooms, room), wsUrl); + const backgroundRooms = sameSignalRooms.filter((candidate) => candidate.id !== room.id); + const joinCurrentEndpointRooms = () => { + if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { + return; + } + + this.webrtc.setCurrentServer(room.id); + this.webrtc.identify(oderId, displayName, wsUrl); + + for (const backgroundRoom of backgroundRooms) { + this.webrtc.joinRoom(backgroundRoom.id, oderId, wsUrl); + } + + if (this.webrtc.hasJoinedServer(room.id)) { + this.webrtc.switchServer(room.id, oderId, wsUrl); + } else { + this.webrtc.joinRoom(room.id, oderId, wsUrl); + } + }; + + if (this.webrtc.isSignalingConnectedTo(wsUrl)) { + joinCurrentEndpointRooms(); + return true; + } + + try { + const connected = await firstValueFrom(this.webrtc.connectToSignalingServer(wsUrl)); + + if (!connected || !this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { + return false; + } + + joinCurrentEndpointRooms(); + return true; + } catch { + return false; + } + } + + private resolveRoomSignalSource( + room: Pick + ): RoomSignalSource { + return this.serverDirectory.normaliseRoomSignalSource({ + sourceId: room.sourceId, + sourceName: room.sourceName, + sourceUrl: room.sourceUrl, + fallbackName: room.sourceName ?? room.name + }, { + ensureEndpoint: !!room.sourceUrl + }); + } + + private repairRoomSignalSource(room: Room, source: RoomSignalSource | null): Room { + if (!source || areRoomSignalSourcesEqual(room, source)) { + return room; + } + + const changes: Partial = { + sourceId: source.sourceId, + sourceName: source.sourceName, + sourceUrl: source.sourceUrl + }; + + this.store.dispatch(RoomsActions.updateRoom({ + roomId: room.id, + changes + })); + + return { + ...room, + ...changes + }; + } + + private resolveRoomSignalSelector( + source: RoomSignalSource | null | undefined, + fallbackName: string + ): ServerSourceSelector | undefined { + if (!source) { + return undefined; + } + + return this.serverDirectory.buildRoomSignalSelector({ + ...source, + fallbackName: source.sourceName ?? fallbackName + }, { + ensureEndpoint: !!source.sourceUrl + }); + } + + private getPreferredRoomSignalSource(room: Room): RoomSignalSource { + const fallbackSource = this.roomSignalFallbackSources.get(room.id); + + if (fallbackSource && this.webrtc.hasJoinedServer(room.id)) { + return fallbackSource; + } + + return this.resolveRoomSignalSource(room); + } + + private resolveRoomSignalingUrl(room: Room): string { + const selector = this.resolveRoomSignalSelector(this.getPreferredRoomSignalSource(room), room.name); + + return selector ? this.serverDirectory.getWebSocketUrl(selector) : ''; + } + private async getBlockedRoomAccessActions( roomId: string, currentUser: { id: string; oderId: string } | null