fix: Broken voice states and connectivity drops

This commit is contained in:
2026-04-11 12:32:22 +02:00
parent 0865c2fe33
commit ef1182d46f
28 changed files with 1244 additions and 162 deletions

View File

@@ -0,0 +1,63 @@
---
name: caveman
description: >
Ultra-compressed communication mode. Cuts token usage ~75% by speaking like caveman
while keeping full technical accuracy. Supports intensity levels: lite, full (default), ultra,
wenyan-lite, wenyan-full, wenyan-ultra.
Use when user says "caveman mode", "talk like caveman", "use caveman", "less tokens",
"be brief", or invokes /caveman. Also auto-triggers when token efficiency is requested.
---
Respond terse like smart caveman. All technical substance stay. Only fluff die.
Default: **full**. Switch: `/caveman lite|full|ultra`.
## Rules
Drop: articles (a/an/the), filler (just/really/basically/actually/simply), pleasantries (sure/certainly/of course/happy to), hedging. Fragments OK. Short synonyms (big not extensive, fix not "implement a solution for"). Technical terms exact. Code blocks unchanged. Errors quoted exact.
Pattern: `[thing] [action] [reason]. [next step].`
Not: "Sure! I'd be happy to help you with that. The issue you're experiencing is likely caused by..."
Yes: "Bug in auth middleware. Token expiry check use `<` not `<=`. Fix:"
## Intensity
| Level | What change |
|-------|------------|
| **lite** | No filler/hedging. Keep articles + full sentences. Professional but tight |
| **full** | Drop articles, fragments OK, short synonyms. Classic caveman |
| **ultra** | Abbreviate (DB/auth/config/req/res/fn/impl), strip conjunctions, arrows for causality (X → Y), one word when one word enough |
| **wenyan-lite** | Semi-classical. Drop filler/hedging but keep grammar structure, classical register |
| **wenyan-full** | Maximum classical terseness. Fully 文言文. 80-90% character reduction. Classical sentence patterns, verbs precede objects, subjects often omitted, classical particles (之/乃/為/其) |
| **wenyan-ultra** | Extreme abbreviation while keeping classical Chinese feel. Maximum compression, ultra terse |
Example — "Why React component re-render?"
- lite: "Your component re-renders because you create a new object reference each render. Wrap it in `useMemo`."
- full: "New object ref each render. Inline object prop = new ref = re-render. Wrap in `useMemo`."
- ultra: "Inline obj prop → new ref → re-render. `useMemo`."
- wenyan-lite: "組件頻重繪,以每繪新生對象參照故。以 useMemo 包之。"
- wenyan-full: "物出新參照致重繪。useMemo .Wrap之。"
- wenyan-ultra: "新參照→重繪。useMemo Wrap。"
Example — "Explain database connection pooling."
- lite: "Connection pooling reuses open connections instead of creating new ones per request. Avoids repeated handshake overhead."
- full: "Pool reuse open DB connections. No new connection per request. Skip handshake overhead."
- ultra: "Pool = reuse DB conn. Skip handshake → fast under load."
- wenyan-full: "池reuse open connection。不每req新開。skip handshake overhead。"
- wenyan-ultra: "池reuse conn。skip handshake → fast。"
## Auto-Clarity
Drop caveman for: security warnings, irreversible action confirmations, multi-step sequences where fragment order risks misread, user confused. Resume caveman after clear part done.
Example — destructive op:
> **Warning:** This will permanently delete all rows in the `users` table and cannot be undone.
> ```sql
> DROP TABLE users;
> ```
> Caveman resume. Verify backup exist first.
## Boundaries
Code/commits/PRs: write normal. "stop caveman" or "normal mode": revert. Level persist until changed or session end.

View File

@@ -1,10 +1,15 @@
import { Router } from 'express'; import { Router } from 'express';
import { randomUUID } from 'crypto';
import { getAllPublicServers } from '../cqrs'; import { getAllPublicServers } from '../cqrs';
import { getReleaseManifestUrl } from '../config/variables'; import { getReleaseManifestUrl } from '../config/variables';
import { SERVER_BUILD_VERSION } from '../generated/build-version'; import { SERVER_BUILD_VERSION } from '../generated/build-version';
import { connectedUsers } from '../websocket/state'; import { connectedUsers } from '../websocket/state';
const router = Router(); const router = Router();
const SERVER_INSTANCE_ID = typeof process.env.METOYOU_SERVER_INSTANCE_ID === 'string'
&& process.env.METOYOU_SERVER_INSTANCE_ID.trim().length > 0
? process.env.METOYOU_SERVER_INSTANCE_ID.trim()
: randomUUID();
function getServerProjectVersion(): string { function getServerProjectVersion(): string {
return typeof process.env.METOYOU_SERVER_VERSION === 'string' && process.env.METOYOU_SERVER_VERSION.trim().length > 0 return typeof process.env.METOYOU_SERVER_VERSION === 'string' && process.env.METOYOU_SERVER_VERSION.trim().length > 0
@@ -20,6 +25,7 @@ router.get('/health', async (_req, res) => {
timestamp: Date.now(), timestamp: Date.now(),
serverCount: servers.length, serverCount: servers.length,
connectedUsers: connectedUsers.size, connectedUsers: connectedUsers.size,
serverInstanceId: SERVER_INSTANCE_ID,
serverVersion: getServerProjectVersion(), serverVersion: getServerProjectVersion(),
releaseManifestUrl: getReleaseManifestUrl() releaseManifestUrl: getReleaseManifestUrl()
}); });

View File

@@ -10,8 +10,14 @@ interface WsMessage {
export function broadcastToServer(serverId: string, message: WsMessage, excludeOderId?: string): void { export function broadcastToServer(serverId: string, message: WsMessage, excludeOderId?: string): void {
console.log(`Broadcasting to server ${serverId}, excluding ${excludeOderId}:`, message.type); console.log(`Broadcasting to server ${serverId}, excluding ${excludeOderId}:`, message.type);
// Deduplicate by oderId so users with multiple connections (e.g. from
// different signal URLs routing to the same server) receive the
// broadcast only once.
const sentToOderIds = new Set<string>();
connectedUsers.forEach((user) => { connectedUsers.forEach((user) => {
if (user.serverIds.has(serverId) && user.oderId !== excludeOderId) { if (user.serverIds.has(serverId) && user.oderId !== excludeOderId && !sentToOderIds.has(user.oderId)) {
sentToOderIds.add(user.oderId);
console.log(` -> Sending to ${user.displayName} (${user.oderId})`); console.log(` -> Sending to ${user.displayName} (${user.oderId})`);
user.ws.send(JSON.stringify(message)); user.ws.send(JSON.stringify(message));
} }

View File

@@ -44,12 +44,18 @@ function sendServerUsers(user: ConnectedUser, serverId: string): void {
function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: string): void { function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: string): void {
const newOderId = readMessageId(message['oderId']) ?? connectionId; const newOderId = readMessageId(message['oderId']) ?? connectionId;
const newScope = typeof message['connectionScope'] === 'string' ? message['connectionScope'] : undefined;
// Close stale connections from the same identity so offer routing // Close stale connections from the same identity AND the same connection
// always targets the freshest socket (e.g. after page refresh). // scope so offer routing always targets the freshest socket (e.g. after
// page refresh). Connections with a *different* scope (= a different
// signal URL that happens to route to this server) are left untouched so
// multi-signal-URL setups don't trigger an eviction loop.
connectedUsers.forEach((existing, existingId) => { connectedUsers.forEach((existing, existingId) => {
if (existingId !== connectionId && existing.oderId === newOderId) { if (existingId !== connectionId
console.log(`Closing stale connection for ${newOderId} (old=${existingId}, new=${connectionId})`); && existing.oderId === newOderId
&& existing.connectionScope === newScope) {
console.log(`Closing stale connection for ${newOderId} (old=${existingId}, new=${connectionId}, scope=${newScope ?? 'none'})`);
try { try {
existing.ws.close(); existing.ws.close();
@@ -61,6 +67,7 @@ function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: s
user.oderId = newOderId; user.oderId = newOderId;
user.displayName = normalizeDisplayName(message['displayName'], normalizeDisplayName(user.displayName)); user.displayName = normalizeDisplayName(message['displayName'], normalizeDisplayName(user.displayName));
user.connectionScope = newScope;
connectedUsers.set(connectionId, user); connectedUsers.set(connectionId, user);
console.log(`User identified: ${user.displayName} (${user.oderId})`); console.log(`User identified: ${user.displayName} (${user.oderId})`);
} }

View File

@@ -6,6 +6,13 @@ export interface ConnectedUser {
serverIds: Set<string>; serverIds: Set<string>;
viewedServerId?: string; viewedServerId?: string;
displayName?: string; displayName?: string;
/**
* Opaque scope string sent by the client (typically the signal URL it
* connected through). Stale-connection eviction only targets connections
* that share the same (oderId, connectionScope) pair, so multiple signal
* URLs routing to the same server coexist without an eviction loop.
*/
connectionScope?: string;
/** Timestamp of the last pong received (used to detect dead connections). */ /** Timestamp of the last pong received (used to detect dead connections). */
lastPong: number; lastPong: number;
} }

10
skills-lock.json Normal file
View File

@@ -0,0 +1,10 @@
{
"version": 1,
"skills": {
"caveman": {
"source": "JuliusBrussee/caveman",
"sourceType": "github",
"computedHash": "4d486dd6f9fbb27ce1c51c972c9a5eb25a53236ae05eabf4d076ac1e293f4b7a"
}
}
}

View File

@@ -90,10 +90,15 @@ stateDiagram-v2
The facade exposes `testServer(endpointId)` and `testAllServers()`. Both delegate to `ServerEndpointHealthService.probeEndpoint()`, which: The facade exposes `testServer(endpointId)` and `testAllServers()`. Both delegate to `ServerEndpointHealthService.probeEndpoint()`, which:
1. Sends `GET /api/health` with a 5-second timeout 1. Sends `GET /api/health` with a 5-second timeout
2. On success, checks the response's `serverVersion` against the client version via `ServerEndpointCompatibilityService` 2. Reads the response's `serverVersion` and stable `serverInstanceId`
3. If versions are incompatible, the endpoint is marked `incompatible` and deactivated 3. Checks the reported version against the client version via `ServerEndpointCompatibilityService`
4. If `/api/health` fails, falls back to `GET /api/servers` as a basic liveness check 4. If versions are incompatible, the endpoint is marked `incompatible` and deactivated
5. Updates the endpoint's status, latency, and version info in the state service 5. If `/api/health` fails, falls back to `GET /api/servers` as a basic liveness check
6. Updates the endpoint's status, latency, and version info in the state service
`serverInstanceId` lets the client detect when multiple configured URLs point at the same backend. `ServerEndpointStateService.resolveCanonicalEndpoint()` prefers one canonical endpoint per backend instance so REST calls, WebSocket routing, and room fallback logic do not treat same-instance aliases as different signaling clusters.
Room signaling now waits for that initial health sweep before the first saved-room reconnect attempt. That avoids a cold-start race where alias endpoints could open separate WebSocket managers before `serverInstanceId` had been learned.
```mermaid ```mermaid
sequenceDiagram sequenceDiagram
@@ -106,7 +111,7 @@ sequenceDiagram
Health->>API: GET /api/health (5s timeout) Health->>API: GET /api/health (5s timeout)
alt 200 OK alt 200 OK
API-->>Health: { serverVersion } API-->>Health: { serverVersion, serverInstanceId }
Health->>Compat: evaluateServerVersion(serverVersion, clientVersion) Health->>Compat: evaluateServerVersion(serverVersion, clientVersion)
Compat-->>Health: { isCompatible, serverVersion } Compat-->>Health: { isCompatible, serverVersion }
Health-->>Facade: online / incompatible + latency + versions Health-->>Facade: online / incompatible + latency + versions
@@ -132,6 +137,10 @@ The facade's `searchServers(query)` method supports two modes controlled by a `s
The API service normalises every `ServerInfo` response, filling in `sourceId`, `sourceName`, and `sourceUrl` so the UI knows which endpoint each server came from. The API service normalises every `ServerInfo` response, filling in `sourceId`, `sourceName`, and `sourceUrl` so the UI knows which endpoint each server came from.
That search fan-out is discovery only. Once a room is created or joined, the room keeps an authoritative signal-server affinity via its `sourceId` / `sourceUrl`. The join response can repair stale saved metadata, and reconnect logic now retries that authoritative endpoint first before probing any other configured endpoints.
Fallback stays temporary. If the authoritative endpoint is unavailable, the client can probe other active compatible endpoints as a last resort for the current session, but it does not rewrite the room's saved affinity to that fallback endpoint.
## Server-owned room metadata ## Server-owned room metadata
`ServerInfo` also carries the server-owned `channels` list for each room. Register and update calls persist this channel metadata on the server, and search or hydration responses return the normalised channel list so text and voice channel topology survives reloads, reconnects, and fresh joins. `ServerInfo` also carries the server-owned `channels` list for each room. Register and update calls persist this channel metadata on the server, and search or hydration responses return the normalised channel list so text and voice channel topology survives reloads, reconnects, and fresh joins.
@@ -147,6 +156,8 @@ Default servers are configured in the environment file. The state service builds
- `restoreDefaultServers()` re-adds any removed defaults and clears the removal tracking - `restoreDefaultServers()` re-adds any removed defaults and clears the removal tracking
- The primary default URL is used as a fallback when no endpoint is resolved - The primary default URL is used as a fallback when no endpoint is resolved
Saved rooms can also self-heal their endpoint metadata. If a room has missing or stale source information, the client now searches the configured endpoints for that room, restores the correct source mapping, and persists the repair locally.
URL sanitisation strips trailing slashes and `/api` suffixes. Protocol-less URLs get `http` or `https` based on the current page protocol. URL sanitisation strips trailing slashes and `/api` suffixes. Protocol-less URLs get `http` or `https` based on the current page protocol.
## Server administration ## Server administration

View File

@@ -21,6 +21,12 @@ import type {
ServerSourceSelector, ServerSourceSelector,
UnbanServerMemberRequest UnbanServerMemberRequest
} from '../domain/server-directory.models'; } from '../domain/server-directory.models';
import {
buildRoomSignalSelector,
buildRoomSignalSource,
type RoomSignalSource,
type RoomSignalSourceInput
} from '../domain/room-signal-source';
import { ServerEndpointCompatibilityService } from '../infrastructure/server-endpoint-compatibility.service'; import { ServerEndpointCompatibilityService } from '../infrastructure/server-endpoint-compatibility.service';
import { ServerEndpointHealthService } from '../infrastructure/server-endpoint-health.service'; import { ServerEndpointHealthService } from '../infrastructure/server-endpoint-health.service';
import { ServerEndpointStateService } from './server-endpoint-state.service'; import { ServerEndpointStateService } from './server-endpoint-state.service';
@@ -38,6 +44,7 @@ export class ServerDirectoryFacade {
private readonly endpointCompatibility = inject(ServerEndpointCompatibilityService); private readonly endpointCompatibility = inject(ServerEndpointCompatibilityService);
private readonly endpointHealth = inject(ServerEndpointHealthService); private readonly endpointHealth = inject(ServerEndpointHealthService);
private readonly api = inject(ServerDirectoryApiService); private readonly api = inject(ServerDirectoryApiService);
private readonly initialServerHealthCheck: Promise<void>;
private shouldSearchAllServers = true; private shouldSearchAllServers = true;
constructor() { constructor() {
@@ -47,7 +54,11 @@ export class ServerDirectoryFacade {
this.activeServer = this.endpointState.activeServer; this.activeServer = this.endpointState.activeServer;
this.loadConnectionSettings(); this.loadConnectionSettings();
void this.testAllServers(); this.initialServerHealthCheck = this.testAllServers().catch(() => undefined);
}
async awaitInitialServerHealthCheck(): Promise<void> {
await this.initialServerHealthCheck;
} }
addServer(server: { name: string; url: string }): ServerEndpoint { addServer(server: { name: string; url: string }): ServerEndpoint {
@@ -110,6 +121,81 @@ export class ServerDirectoryFacade {
return !!refreshedEndpoint && refreshedEndpoint.status !== 'incompatible'; return !!refreshedEndpoint && refreshedEndpoint.status !== 'incompatible';
} }
resolveRoomEndpoint(
source?: RoomSignalSourceInput,
options?: { ensureEndpoint?: boolean; setActive?: boolean }
): ServerEndpoint | null {
const normalizedSource = buildRoomSignalSource(source);
if (normalizedSource.sourceId) {
const endpointById = this.endpointState.resolveCanonicalEndpoint(
this.servers().find((endpoint) => endpoint.id === normalizedSource.sourceId) ?? null
);
if (endpointById) {
return endpointById;
}
}
if (!normalizedSource.sourceUrl) {
return null;
}
const endpointByUrl = this.endpointState.resolveCanonicalEndpoint(
this.findServerByUrl(normalizedSource.sourceUrl) ?? null
);
if (endpointByUrl) {
return endpointByUrl;
}
if (!options?.ensureEndpoint) {
return null;
}
return this.ensureServerEndpoint({
name: normalizedSource.sourceName ?? 'Signal Server',
url: normalizedSource.sourceUrl
}, {
setActive: options.setActive
});
}
normaliseRoomSignalSource(
source?: RoomSignalSourceInput,
options?: { ensureEndpoint?: boolean; setActive?: boolean }
): RoomSignalSource {
const endpoint = this.resolveRoomEndpoint(source, options);
return buildRoomSignalSource(source, endpoint);
}
buildRoomSignalSelector(
source?: RoomSignalSourceInput,
options?: { ensureEndpoint?: boolean; setActive?: boolean }
): ServerSourceSelector | undefined {
return buildRoomSignalSelector(this.normaliseRoomSignalSource(source, options));
}
getFallbackRoomEndpoints(source?: RoomSignalSourceInput): ServerEndpoint[] {
const primaryEndpoint = this.resolveRoomEndpoint(source);
const primarySource = this.normaliseRoomSignalSource(source);
const primaryUrl = primarySource.sourceUrl ? this.endpointState.sanitiseUrl(primarySource.sourceUrl) : null;
const primaryInstanceId = primaryEndpoint?.instanceId ?? null;
return this.activeServers().filter((endpoint) => {
if (endpoint.status === 'incompatible') {
return false;
}
if (primaryInstanceId && endpoint.instanceId === primaryInstanceId) {
return false;
}
return !primaryUrl || this.endpointState.sanitiseUrl(endpoint.url) !== primaryUrl;
});
}
setSearchAllServers(enabled: boolean): void { setSearchAllServers(enabled: boolean): void {
this.shouldSearchAllServers = enabled; this.shouldSearchAllServers = enabled;
} }
@@ -159,6 +245,10 @@ export class ServerDirectoryFacade {
return this.api.getServer(serverId, selector); return this.api.getServer(serverId, selector);
} }
findServerAcrossActiveEndpoints(serverId: string, source?: RoomSignalSourceInput): Observable<ServerInfo | null> {
return this.api.findServerAcrossActiveEndpoints(serverId, source);
}
registerServer( registerServer(
server: Omit<ServerInfo, 'createdAt'> & { id?: string; password?: string | null }, server: Omit<ServerInfo, 'createdAt'> & { id?: string; password?: string | null },
selector?: ServerSourceSelector selector?: ServerSourceSelector

View File

@@ -121,6 +121,20 @@ export class ServerEndpointStateService {
return this._servers().find((endpoint) => this.sanitiseUrl(endpoint.url) === sanitisedUrl); return this._servers().find((endpoint) => this.sanitiseUrl(endpoint.url) === sanitisedUrl);
} }
resolveCanonicalEndpoint(endpoint: ServerEndpoint | null | undefined): ServerEndpoint | null {
if (!endpoint?.instanceId) {
return endpoint ?? null;
}
const equivalentEndpoints = this._servers().filter((candidate) => candidate.instanceId === endpoint.instanceId);
if (equivalentEndpoints.length <= 1) {
return endpoint;
}
return [...equivalentEndpoints].sort((left, right) => this.compareEndpointPreference(left, right))[0] ?? endpoint;
}
removeServer(endpointId: string): void { removeServer(endpointId: string): void {
const endpoints = this._servers(); const endpoints = this._servers();
const target = endpoints.find((endpoint) => endpoint.id === endpointId); const target = endpoints.find((endpoint) => endpoint.id === endpointId);
@@ -208,6 +222,7 @@ export class ServerEndpointStateService {
return { return {
...endpoint, ...endpoint,
instanceId: versions?.serverInstanceId ?? endpoint.instanceId,
status, status,
latency, latency,
isActive: status === 'incompatible' ? false : endpoint.isActive, isActive: status === 'incompatible' ? false : endpoint.isActive,
@@ -312,4 +327,50 @@ export class ServerEndpointStateService {
private saveEndpoints(): void { private saveEndpoints(): void {
this.storage.saveEndpoints(this._servers()); this.storage.saveEndpoints(this._servers());
} }
private compareEndpointPreference(left: ServerEndpoint, right: ServerEndpoint): number {
const scoreDifference = this.endpointPreferenceScore(right) - this.endpointPreferenceScore(left);
if (scoreDifference !== 0) {
return scoreDifference;
}
return left.url.localeCompare(right.url);
}
private endpointPreferenceScore(endpoint: ServerEndpoint): number {
let score = 0;
if (endpoint.isDefault) {
score += 4;
}
if (this.usesSecureProtocol(endpoint.url)) {
score += 2;
}
if (!this.isIpHost(endpoint.url)) {
score += 1;
}
return score;
}
private usesSecureProtocol(url: string): boolean {
try {
return new URL(url).protocol === 'https:';
} catch {
return false;
}
}
private isIpHost(url: string): boolean {
try {
const hostname = new URL(url).hostname;
return /^\d{1,3}(?:\.\d{1,3}){3}$/.test(hostname) || hostname.includes(':');
} catch {
return false;
}
}
} }

View File

@@ -0,0 +1,46 @@
import {
areRoomSignalSourcesEqual,
buildRoomSignalSelector,
buildRoomSignalSource,
getSourceUrlFromSignalingUrl
} from './room-signal-source';
describe('room-signal-source helpers', () => {
it('converts signaling urls back to normalized source urls', () => {
expect(getSourceUrlFromSignalingUrl('wss://signal.toju.app')).toBe('https://signal.toju.app');
expect(getSourceUrlFromSignalingUrl('ws://46.59.68.77:3001')).toBe('http://46.59.68.77:3001');
});
it('prefers the resolved endpoint when normalizing a room source', () => {
expect(buildRoomSignalSource({
sourceId: 'stale-id',
sourceName: 'Stale Source',
sourceUrl: 'https://old.example.com',
signalingUrl: 'wss://signal.toju.app'
}, {
id: 'primary-id',
name: 'Primary Signal',
url: 'https://signal.toju.app/'
})).toEqual({
sourceId: 'primary-id',
sourceName: 'Primary Signal',
sourceUrl: 'https://signal.toju.app'
});
});
it('builds selectors from signaling urls when no source url is persisted yet', () => {
expect(buildRoomSignalSelector({
signalingUrl: 'wss://signal-sweden.toju.app',
fallbackName: 'Toju Signal Sweden'
})).toEqual({
sourceUrl: 'https://signal-sweden.toju.app'
});
});
it('treats equivalent persisted and signaling-derived sources as equal', () => {
expect(areRoomSignalSourcesEqual(
{ sourceUrl: 'https://signal.toju.app/' },
{ signalingUrl: 'wss://signal.toju.app' }
)).toBeTrue();
});
});

View File

@@ -0,0 +1,95 @@
import type { ServerEndpoint, ServerSourceSelector } from './server-directory.models';
import { normaliseConfiguredServerUrl, sanitiseServerBaseUrl } from './server-endpoint-defaults';
export interface RoomSignalSource {
sourceId?: string;
sourceName?: string;
sourceUrl?: string;
}
export interface RoomSignalSourceInput extends RoomSignalSource {
fallbackName?: string;
signalingUrl?: string;
}
const DEFAULT_SIGNAL_SOURCE_NAME = 'Signal Server';
export function getSourceUrlFromSignalingUrl(signalingUrl?: string): string | undefined {
const normalizedUrl = normalizeString(signalingUrl);
if (!normalizedUrl) {
return undefined;
}
const resolvedUrl = normaliseConfiguredServerUrl(normalizedUrl, 'https');
return resolvedUrl || undefined;
}
export function buildRoomSignalSource(
source?: RoomSignalSourceInput | null,
endpoint?: Pick<ServerEndpoint, 'id' | 'name' | 'url'> | null
): RoomSignalSource {
const sourceId = normalizeString(endpoint?.id) ?? normalizeString(source?.sourceId);
const sourceUrl = endpoint
? sanitiseServerBaseUrl(endpoint.url)
: (normalizeUrl(source?.sourceUrl) ?? getSourceUrlFromSignalingUrl(source?.signalingUrl));
const sourceName = normalizeString(endpoint?.name)
?? normalizeString(source?.sourceName)
?? normalizeString(source?.fallbackName)
?? (sourceUrl ? DEFAULT_SIGNAL_SOURCE_NAME : undefined);
return {
sourceId,
sourceName,
sourceUrl
};
}
export function buildRoomSignalSelector(
source?: RoomSignalSourceInput | null
): ServerSourceSelector | undefined {
const normalizedSource = buildRoomSignalSource(source);
if (normalizedSource.sourceId) {
return { sourceId: normalizedSource.sourceId };
}
if (normalizedSource.sourceUrl) {
return { sourceUrl: normalizedSource.sourceUrl };
}
return undefined;
}
export function hasRoomSignalSource(source?: RoomSignalSourceInput | null): boolean {
return !!buildRoomSignalSelector(source);
}
export function areRoomSignalSourcesEqual(
left?: RoomSignalSourceInput | null,
right?: RoomSignalSourceInput | null
): boolean {
const normalizedLeft = buildRoomSignalSource(left);
const normalizedRight = buildRoomSignalSource(right);
return normalizedLeft.sourceId === normalizedRight.sourceId
&& normalizedLeft.sourceName === normalizedRight.sourceName
&& normalizedLeft.sourceUrl === normalizedRight.sourceUrl;
}
function normalizeString(value: string | undefined | null): string | undefined {
if (typeof value !== 'string') {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function normalizeUrl(value: string | undefined | null): string | undefined {
const normalizedValue = normalizeString(value);
return normalizedValue ? sanitiseServerBaseUrl(normalizedValue) : undefined;
}

View File

@@ -45,12 +45,14 @@ export interface DefaultServerDefinition {
} }
export interface ServerEndpointVersions { export interface ServerEndpointVersions {
serverInstanceId?: string | null;
serverVersion?: string | null; serverVersion?: string | null;
clientVersion?: string | null; clientVersion?: string | null;
} }
export interface ServerEndpoint { export interface ServerEndpoint {
id: string; id: string;
instanceId?: string;
name: string; name: string;
url: string; url: string;
isActive: boolean; isActive: boolean;
@@ -135,6 +137,7 @@ export interface ServerVersionCompatibilityResult {
} }
export interface ServerHealthCheckPayload { export interface ServerHealthCheckPayload {
serverInstanceId?: unknown;
serverVersion?: unknown; serverVersion?: unknown;
} }

View File

@@ -134,6 +134,15 @@ export class InviteComponent implements OnInit {
sourceId: context.endpoint.id, sourceId: context.endpoint.id,
sourceUrl: context.sourceUrl sourceUrl: context.sourceUrl
})); }));
const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: joinResponse.server.sourceId ?? context.endpoint.id,
sourceName: joinResponse.server.sourceName ?? context.endpoint.name,
sourceUrl: joinResponse.server.sourceUrl ?? context.sourceUrl,
signalingUrl: joinResponse.signalingUrl,
fallbackName: joinResponse.server.sourceName ?? context.endpoint.name ?? invite.server.name
}, {
ensureEndpoint: true
});
this.store.dispatch( this.store.dispatch(
RoomsActions.joinRoom({ RoomsActions.joinRoom({
@@ -145,9 +154,8 @@ export class InviteComponent implements OnInit {
Array.isArray(joinResponse.server.channels) && joinResponse.server.channels.length > 0 Array.isArray(joinResponse.server.channels) && joinResponse.server.channels.length > 0
? joinResponse.server.channels ? joinResponse.server.channels
: invite.server.channels, : invite.server.channels,
sourceId: context.endpoint.id, ...resolvedSource,
sourceName: context.endpoint.name, signalingUrl: joinResponse.signalingUrl
sourceUrl: context.sourceUrl
} }
}) })
); );

View File

@@ -273,13 +273,24 @@ export class ServerSearchComponent implements OnInit {
sourceId: server.sourceId, sourceId: server.sourceId,
sourceUrl: server.sourceUrl sourceUrl: server.sourceUrl
})); }));
const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: response.server.sourceId ?? server.sourceId,
sourceName: response.server.sourceName ?? server.sourceName,
sourceUrl: response.server.sourceUrl ?? server.sourceUrl,
signalingUrl: response.signalingUrl,
fallbackName: response.server.sourceName ?? server.sourceName ?? server.name
}, {
ensureEndpoint: true
});
const resolvedServer = { const resolvedServer = {
...server, ...server,
...response.server, ...response.server,
channels: channels:
Array.isArray(response.server.channels) && response.server.channels.length > 0 Array.isArray(response.server.channels) && response.server.channels.length > 0
? response.server.channels ? response.server.channels
: server.channels : server.channels,
...resolvedSource,
signalingUrl: response.signalingUrl
}; };
this.closePasswordDialog(); this.closePasswordDialog();

View File

@@ -1,3 +1,4 @@
export * from './application/server-directory.facade'; export * from './application/server-directory.facade';
export * from './domain/server-directory.constants'; export * from './domain/server-directory.constants';
export * from './domain/server-directory.models'; export * from './domain/server-directory.models';
export * from './domain/room-signal-source';

View File

@@ -29,6 +29,7 @@ import type {
ServerSourceSelector, ServerSourceSelector,
UnbanServerMemberRequest UnbanServerMemberRequest
} from '../domain/server-directory.models'; } from '../domain/server-directory.models';
import type { RoomSignalSourceInput } from '../domain/room-signal-source';
@Injectable({ providedIn: 'root' }) @Injectable({ providedIn: 'root' })
export class ServerDirectoryApiService { export class ServerDirectoryApiService {
@@ -45,14 +46,16 @@ export class ServerDirectoryApiService {
resolveEndpoint(selector?: ServerSourceSelector): ServerEndpoint | null { resolveEndpoint(selector?: ServerSourceSelector): ServerEndpoint | null {
if (selector?.sourceId) { if (selector?.sourceId) {
return this.endpointState.servers().find((endpoint) => endpoint.id === selector.sourceId) ?? null; const endpoint = this.endpointState.servers().find((candidate) => candidate.id === selector.sourceId) ?? null;
return this.endpointState.resolveCanonicalEndpoint(endpoint);
} }
if (selector?.sourceUrl) { if (selector?.sourceUrl) {
return this.endpointState.findServerByUrl(selector.sourceUrl) ?? null; return this.endpointState.resolveCanonicalEndpoint(this.endpointState.findServerByUrl(selector.sourceUrl) ?? null);
} }
return ( return this.endpointState.resolveCanonicalEndpoint(
this.endpointState.activeServer() ?? this.endpointState.activeServer() ??
this.endpointState.servers().find((endpoint) => endpoint.status !== 'incompatible') ?? this.endpointState.servers().find((endpoint) => endpoint.status !== 'incompatible') ??
this.endpointState.servers()[0] ?? this.endpointState.servers()[0] ??
@@ -92,6 +95,45 @@ export class ServerDirectoryApiService {
); );
} }
findServerAcrossActiveEndpoints(serverId: string, source?: RoomSignalSourceInput): Observable<ServerInfo | null> {
const candidateEndpoints = this.getSearchableEndpoints();
const preferredSourceUrl = source?.sourceUrl ? this.endpointState.sanitiseUrl(source.sourceUrl) : undefined;
const prioritizedEndpoints = [...candidateEndpoints].sort((left, right) => {
if (preferredSourceUrl) {
const leftMatches = this.endpointState.sanitiseUrl(left.url) === preferredSourceUrl;
const rightMatches = this.endpointState.sanitiseUrl(right.url) === preferredSourceUrl;
if (leftMatches !== rightMatches) {
return leftMatches ? -1 : 1;
}
}
if (source?.sourceId) {
const leftMatches = left.id === source.sourceId;
const rightMatches = right.id === source.sourceId;
if (leftMatches !== rightMatches) {
return leftMatches ? -1 : 1;
}
}
return 0;
});
if (prioritizedEndpoints.length === 0) {
return this.getServer(serverId, this.resolveSelector(source));
}
return forkJoin(
prioritizedEndpoints.map((endpoint) => this.getServer(serverId, {
sourceId: endpoint.id,
sourceUrl: endpoint.url
}))
).pipe(
map((servers) => servers.find((server): server is ServerInfo => !!server) ?? null)
);
}
registerServer( registerServer(
server: Omit<ServerInfo, 'createdAt'> & { id?: string; password?: string | null }, server: Omit<ServerInfo, 'createdAt'> & { id?: string; password?: string | null },
selector?: ServerSourceSelector selector?: ServerSourceSelector
@@ -221,11 +263,17 @@ export class ServerDirectoryApiService {
} }
private resolveBaseServerUrl(selector?: ServerSourceSelector): string { private resolveBaseServerUrl(selector?: ServerSourceSelector): string {
const resolvedEndpoint = this.resolveEndpoint(selector);
if (resolvedEndpoint) {
return resolvedEndpoint.url;
}
if (selector?.sourceUrl) { if (selector?.sourceUrl) {
return this.endpointState.sanitiseUrl(selector.sourceUrl); return this.endpointState.sanitiseUrl(selector.sourceUrl);
} }
return this.resolveEndpoint(selector)?.url ?? this.endpointState.getPrimaryDefaultServerUrl(); return this.endpointState.getPrimaryDefaultServerUrl();
} }
private unwrapServersResponse(response: { servers: ServerInfo[]; total: number } | ServerInfo[]): ServerInfo[] { private unwrapServersResponse(response: { servers: ServerInfo[]; total: number } | ServerInfo[]): ServerInfo[] {
@@ -245,7 +293,7 @@ export class ServerDirectoryApiService {
} }
private searchAllEndpoints(query: string): Observable<ServerInfo[]> { private searchAllEndpoints(query: string): Observable<ServerInfo[]> {
const onlineEndpoints = this.endpointState.activeServers().filter((endpoint) => endpoint.status !== 'offline'); const onlineEndpoints = this.getSearchableEndpoints();
if (onlineEndpoints.length === 0) { if (onlineEndpoints.length === 0) {
return this.searchSingleEndpoint(query, this.getApiBaseUrl(), this.endpointState.activeServer()); return this.searchSingleEndpoint(query, this.getApiBaseUrl(), this.endpointState.activeServer());
@@ -258,7 +306,7 @@ export class ServerDirectoryApiService {
} }
private getAllServersFromAllEndpoints(): Observable<ServerInfo[]> { private getAllServersFromAllEndpoints(): Observable<ServerInfo[]> {
const onlineEndpoints = this.endpointState.activeServers().filter((endpoint) => endpoint.status !== 'offline'); const onlineEndpoints = this.getSearchableEndpoints();
if (onlineEndpoints.length === 0) { if (onlineEndpoints.length === 0) {
return this.http.get<{ servers: ServerInfo[]; total: number }>(`${this.getApiBaseUrl()}/servers`).pipe( return this.http.get<{ servers: ServerInfo[]; total: number }>(`${this.getApiBaseUrl()}/servers`).pipe(
@@ -277,6 +325,30 @@ export class ServerDirectoryApiService {
).pipe(map((resultArrays) => resultArrays.flat())); ).pipe(map((resultArrays) => resultArrays.flat()));
} }
private getSearchableEndpoints(): ServerEndpoint[] {
const activeEndpoints = this.endpointState.activeServers().filter((endpoint) => endpoint.status !== 'offline');
if (activeEndpoints.length > 0) {
return activeEndpoints;
}
const activeServer = this.endpointState.activeServer();
return activeServer ? [activeServer] : [];
}
private resolveSelector(source?: RoomSignalSourceInput): ServerSourceSelector | undefined {
if (source?.sourceId) {
return { sourceId: source.sourceId };
}
if (source?.sourceUrl) {
return { sourceUrl: source.sourceUrl };
}
return undefined;
}
private deduplicateById<T extends { id: string }>(items: T[]): T[] { private deduplicateById<T extends { id: string }>(items: T[]): T[] {
const seen = new Set<string>(); const seen = new Set<string>();
@@ -298,13 +370,14 @@ export class ServerDirectoryApiService {
const candidate = server as Record<string, unknown>; const candidate = server as Record<string, unknown>;
const sourceName = this.getStringValue(candidate['sourceName']); const sourceName = this.getStringValue(candidate['sourceName']);
const sourceUrl = this.getStringValue(candidate['sourceUrl']); const sourceUrl = this.getStringValue(candidate['sourceUrl']);
const resolvedSource = this.endpointState.resolveCanonicalEndpoint(source);
return { return {
id: this.getStringValue(candidate['id']) ?? '', id: this.getStringValue(candidate['id']) ?? '',
name: this.getStringValue(candidate['name']) ?? 'Unnamed server', name: this.getStringValue(candidate['name']) ?? 'Unnamed server',
description: this.getStringValue(candidate['description']), description: this.getStringValue(candidate['description']),
topic: this.getStringValue(candidate['topic']), topic: this.getStringValue(candidate['topic']),
hostName: this.getStringValue(candidate['hostName']) ?? sourceName ?? source?.name ?? 'Unknown API', hostName: this.getStringValue(candidate['hostName']) ?? sourceName ?? resolvedSource?.name ?? 'Unknown API',
ownerId: this.getStringValue(candidate['ownerId']), ownerId: this.getStringValue(candidate['ownerId']),
ownerName: this.getStringValue(candidate['ownerName']), ownerName: this.getStringValue(candidate['ownerName']),
ownerPublicKey: this.getStringValue(candidate['ownerPublicKey']), ownerPublicKey: this.getStringValue(candidate['ownerPublicKey']),
@@ -319,9 +392,13 @@ export class ServerDirectoryApiService {
roleAssignments: this.getRoleAssignmentsValue(candidate['roleAssignments']), roleAssignments: this.getRoleAssignmentsValue(candidate['roleAssignments']),
channelPermissions: this.getChannelPermissionOverridesValue(candidate['channelPermissions']), channelPermissions: this.getChannelPermissionOverridesValue(candidate['channelPermissions']),
createdAt: this.getNumberValue(candidate['createdAt'], Date.now()), createdAt: this.getNumberValue(candidate['createdAt'], Date.now()),
sourceId: this.getStringValue(candidate['sourceId']) ?? source?.id, sourceId: this.getStringValue(candidate['sourceId']) ?? resolvedSource?.id,
sourceName: sourceName ?? source?.name, sourceName: sourceName ?? resolvedSource?.name,
sourceUrl: sourceUrl ? this.endpointState.sanitiseUrl(sourceUrl) : source ? this.endpointState.sanitiseUrl(source.url) : undefined sourceUrl: sourceUrl
? this.endpointState.sanitiseUrl(sourceUrl)
: resolvedSource
? this.endpointState.sanitiseUrl(resolvedSource.url)
: undefined
}; };
} }

View File

@@ -30,12 +30,16 @@ export class ServerEndpointHealthService {
payload.serverVersion, payload.serverVersion,
clientVersion clientVersion
); );
const serverInstanceId = typeof payload.serverInstanceId === 'string' && payload.serverInstanceId.trim().length > 0
? payload.serverInstanceId.trim()
: undefined;
if (!versionCompatibility.isCompatible) { if (!versionCompatibility.isCompatible) {
return { return {
status: 'incompatible', status: 'incompatible',
latency, latency,
versions: { versions: {
serverInstanceId,
serverVersion: versionCompatibility.serverVersion, serverVersion: versionCompatibility.serverVersion,
clientVersion clientVersion
} }
@@ -46,6 +50,7 @@ export class ServerEndpointHealthService {
status: 'online', status: 'online',
latency, latency,
versions: { versions: {
serverInstanceId,
serverVersion: versionCompatibility.serverVersion, serverVersion: versionCompatibility.serverVersion,
clientVersion clientVersion
} }

View File

@@ -75,6 +75,12 @@ export class VoiceActivityService implements OnDestroy {
this.untrackStream(peerId); this.untrackStream(peerId);
}) })
); );
this.subs.push(
this.voiceConnection.onVoiceConnected.subscribe(() => {
this.ensureAllRemoteStreamsTracked();
})
);
} }
trackLocalMic(userId: string, stream: MediaStream): void { trackLocalMic(userId: string, stream: MediaStream): void {
@@ -124,6 +130,10 @@ export class VoiceActivityService implements OnDestroy {
} }
const ctx = new AudioContext(); const ctx = new AudioContext();
if (ctx.state === 'suspended')
void ctx.resume();
const analyser = ctx.createAnalyser(); const analyser = ctx.createAnalyser();
const sources = audioTracks.map((track) => ctx.createMediaStreamSource(new MediaStream([track]))); const sources = audioTracks.map((track) => ctx.createMediaStreamSource(new MediaStream([track])));
@@ -166,6 +176,18 @@ export class VoiceActivityService implements OnDestroy {
this.stopPolling(); this.stopPolling();
} }
private ensureAllRemoteStreamsTracked(): void {
const peers = this.voiceConnection.getConnectedPeers();
for (const peerId of peers) {
const stream = this.voiceConnection.getRemoteVoiceStream(peerId);
if (stream) {
this.trackStream(peerId, stream);
}
}
}
private ensurePolling(): void { private ensurePolling(): void {
if (this.animFrameId !== null) if (this.animFrameId !== null)
return; return;

View File

@@ -18,6 +18,8 @@ import {
EMPTY, EMPTY,
Subject, Subject,
catchError, catchError,
firstValueFrom,
from,
switchMap, switchMap,
tap tap
} from 'rxjs'; } from 'rxjs';
@@ -337,35 +339,44 @@ export class ServersRailComponent {
this.joinPasswordError.set(null); this.joinPasswordError.set(null);
return this.serverDirectory.requestJoin({ return from(this.resolveRoomJoinTarget(room)).pipe(
roomId: room.id, switchMap((joinTarget) => {
userId: currentUserId, if (!joinTarget.selector) {
userPublicKey: currentUser?.oderId || currentUserId, if (this.currentRoom()?.id === room.id) {
displayName: currentUser?.displayName || 'Anonymous', this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true }));
password: password?.trim() || undefined }
}, {
sourceId: room.sourceId, return EMPTY;
sourceUrl: room.sourceUrl }
})
.pipe( return this.serverDirectory.requestJoin({
tap((response) => { roomId: room.id,
this.closePasswordDialog(); userId: currentUserId,
this.store.dispatch( userPublicKey: currentUser?.oderId || currentUserId,
RoomsActions.updateRoom({ displayName: currentUser?.displayName || 'Anonymous',
roomId: room.id, password: password?.trim() || undefined
changes: this.toRoomRefreshChanges(room, response.server) }, joinTarget.selector)
.pipe(
tap((response) => {
this.closePasswordDialog();
this.store.dispatch(
RoomsActions.updateRoom({
roomId: room.id,
changes: this.toRoomRefreshChanges(joinTarget.room, response.server, response.signalingUrl)
})
);
if (this.currentRoom()?.id === room.id) {
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false }));
}
}) })
); );
}),
if (this.currentRoom()?.id === room.id) { catchError((error: unknown) => {
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false })); this.handleBackgroundJoinError(room, error);
} return EMPTY;
}), })
catchError((error: unknown) => { );
this.handleBackgroundJoinError(room, error);
return EMPTY;
})
);
} }
private handleBackgroundJoinError(room: Room, error: unknown): void { private handleBackgroundJoinError(room: Room, error: unknown): void {
@@ -414,7 +425,17 @@ export class ServersRailComponent {
|| (typeof status === 'number' && status >= 500); || (typeof status === 'number' && status >= 500);
} }
private toRoomRefreshChanges(room: Room, server: ServerInfo): Partial<Room> { private toRoomRefreshChanges(room: Room, server: ServerInfo, signalingUrl?: string): Partial<Room> {
const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: server.sourceId ?? room.sourceId,
sourceName: server.sourceName ?? room.sourceName,
sourceUrl: server.sourceUrl ?? room.sourceUrl,
signalingUrl,
fallbackName: server.sourceName ?? room.sourceName ?? room.name
}, {
ensureEndpoint: true
});
return { return {
name: server.name, name: server.name,
description: server.description, description: server.description,
@@ -432,9 +453,99 @@ export class ServersRailComponent {
Array.isArray(server.channels) && server.channels.length > 0 Array.isArray(server.channels) && server.channels.length > 0
? server.channels ? server.channels
: room.channels, : room.channels,
sourceId: server.sourceId ?? room.sourceId, ...resolvedSource
sourceName: server.sourceName ?? room.sourceName,
sourceUrl: server.sourceUrl ?? room.sourceUrl
}; };
} }
private async resolveRoomJoinTarget(room: Room): Promise<{
room: Room;
selector: ReturnType<ServerDirectoryFacade['buildRoomSignalSelector']>;
}> {
let resolvedRoom = this.applyResolvedRoomSource(room, this.serverDirectory.normaliseRoomSignalSource({
sourceId: room.sourceId,
sourceName: room.sourceName,
sourceUrl: room.sourceUrl,
fallbackName: room.sourceName ?? room.name
}, {
ensureEndpoint: !!room.sourceUrl
}));
let selector = this.serverDirectory.buildRoomSignalSelector({
sourceId: resolvedRoom.sourceId,
sourceName: resolvedRoom.sourceName,
sourceUrl: resolvedRoom.sourceUrl,
fallbackName: resolvedRoom.sourceName ?? resolvedRoom.name
}, {
ensureEndpoint: !!resolvedRoom.sourceUrl
});
const authoritativeServer = (
selector
? await firstValueFrom(this.serverDirectory.getServer(room.id, selector))
: null
) ?? await firstValueFrom(this.serverDirectory.findServerAcrossActiveEndpoints(room.id, {
sourceId: resolvedRoom.sourceId,
sourceName: resolvedRoom.sourceName,
sourceUrl: resolvedRoom.sourceUrl,
fallbackName: resolvedRoom.sourceName ?? resolvedRoom.name
}));
if (!authoritativeServer) {
return {
room: resolvedRoom,
selector
};
}
const authoritativeSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: authoritativeServer.sourceId ?? resolvedRoom.sourceId,
sourceName: authoritativeServer.sourceName ?? resolvedRoom.sourceName,
sourceUrl: authoritativeServer.sourceUrl ?? resolvedRoom.sourceUrl,
fallbackName: authoritativeServer.sourceName ?? resolvedRoom.sourceName ?? resolvedRoom.name
}, {
ensureEndpoint: !!(authoritativeServer.sourceUrl ?? resolvedRoom.sourceUrl)
});
resolvedRoom = this.applyResolvedRoomSource(resolvedRoom, authoritativeSource);
selector = this.serverDirectory.buildRoomSignalSelector({
sourceId: resolvedRoom.sourceId,
sourceName: resolvedRoom.sourceName,
sourceUrl: resolvedRoom.sourceUrl,
fallbackName: resolvedRoom.sourceName ?? resolvedRoom.name
}, {
ensureEndpoint: !!resolvedRoom.sourceUrl
});
return {
room: resolvedRoom,
selector
};
}
private applyResolvedRoomSource(room: Room, source: Pick<Room, 'sourceId' | 'sourceName' | 'sourceUrl'>): Room {
const nextRoom: Room = {
...room,
sourceId: source.sourceId,
sourceName: source.sourceName,
sourceUrl: source.sourceUrl
};
if (
room.sourceId === nextRoom.sourceId
&& room.sourceName === nextRoom.sourceName
&& room.sourceUrl === nextRoom.sourceUrl
) {
return room;
}
this.store.dispatch(RoomsActions.updateRoom({
roomId: room.id,
changes: {
sourceId: nextRoom.sourceId,
sourceName: nextRoom.sourceName,
sourceUrl: nextRoom.sourceUrl
}
}));
return nextRoom;
}
} }

View File

@@ -117,6 +117,12 @@ The signaling layer's only job is getting two peers to exchange SDP offers/answe
Each signaling URL gets its own `SignalingManager` (one WebSocket each). `SignalingTransportHandler` picks the right socket based on which server the message is for. `ServerSignalingCoordinator` tracks which peers belong to which servers and which signaling URLs, so we know when it is safe to tear down a peer connection after leaving a server. Each signaling URL gets its own `SignalingManager` (one WebSocket each). `SignalingTransportHandler` picks the right socket based on which server the message is for. `ServerSignalingCoordinator` tracks which peers belong to which servers and which signaling URLs, so we know when it is safe to tear down a peer connection after leaving a server.
Room affinity is authoritative at this layer as well. The renderer repairs each room's saved `sourceId` / `sourceUrl` from server-directory responses and routes `join_server`, `view_server`, and room-scoped signaling traffic to that room's signaling URL first. If that route fails, alternate endpoints can be tried temporarily, but server-scoped raw messages are no longer broadcast to every connected signaling manager when the route is unknown.
Cold-start routing now waits for the initial server-directory health probes so same-backend aliases can collapse to one canonical signaling endpoint before any saved rooms reconnect. When a room is reconnected on a chosen socket, its background rooms are re-joined on that same socket as well so stale per-signal memberships do not keep orphan managers alive, and reconnect replay only sends `view_server` for rooms that manager still has joined.
This is still a non-federated model. Different signaling servers do not share peer registries or relay WebRTC offers for each other, so users in the same room must converge on the same signaling endpoint to discover one another reliably.
```mermaid ```mermaid
sequenceDiagram sequenceDiagram
participant UI as App participant UI as App

View File

@@ -84,6 +84,7 @@ export const SIGNALING_TYPE_CONNECTED = 'connected';
export const SIGNALING_TYPE_SERVER_USERS = 'server_users'; export const SIGNALING_TYPE_SERVER_USERS = 'server_users';
export const SIGNALING_TYPE_USER_JOINED = 'user_joined'; export const SIGNALING_TYPE_USER_JOINED = 'user_joined';
export const SIGNALING_TYPE_USER_LEFT = 'user_left'; export const SIGNALING_TYPE_USER_LEFT = 'user_left';
export const SIGNALING_TYPE_ACCESS_DENIED = 'access_denied';
export const P2P_TYPE_STATE_REQUEST = 'state-request'; export const P2P_TYPE_STATE_REQUEST = 'state-request';
export const P2P_TYPE_VOICE_STATE_REQUEST = 'voice-state-request'; export const P2P_TYPE_VOICE_STATE_REQUEST = 'voice-state-request';

View File

@@ -33,6 +33,8 @@ export class ServerMembershipSignalingHandler<TMessage> {
return; return;
} }
this.migrateServerSignalUrl(roomId, resolvedSignalUrl);
this.dependencies.signalingCoordinator.setServerSignalUrl(roomId, resolvedSignalUrl); this.dependencies.signalingCoordinator.setServerSignalUrl(roomId, resolvedSignalUrl);
this.dependencies.signalingCoordinator.setLastJoinedServer(resolvedSignalUrl, { this.dependencies.signalingCoordinator.setLastJoinedServer(resolvedSignalUrl, {
serverId: roomId, serverId: roomId,
@@ -55,6 +57,8 @@ export class ServerMembershipSignalingHandler<TMessage> {
return; return;
} }
this.migrateServerSignalUrl(serverId, resolvedSignalUrl);
this.dependencies.signalingCoordinator.setServerSignalUrl(serverId, resolvedSignalUrl); this.dependencies.signalingCoordinator.setServerSignalUrl(serverId, resolvedSignalUrl);
this.dependencies.signalingCoordinator.setLastJoinedServer(resolvedSignalUrl, { this.dependencies.signalingCoordinator.setLastJoinedServer(resolvedSignalUrl, {
serverId, serverId,
@@ -99,7 +103,9 @@ export class ServerMembershipSignalingHandler<TMessage> {
return; return;
} }
for (const { signalUrl, serverIds } of this.dependencies.signalingCoordinator.getJoinedServerEntries()) { const joinedEntries = this.dependencies.signalingCoordinator.getJoinedServerEntries();
for (const { signalUrl, serverIds } of joinedEntries) {
for (const joinedServerId of serverIds) { for (const joinedServerId of serverIds) {
this.dependencies.signalingTransport.sendRawMessageToSignalUrl(signalUrl, { this.dependencies.signalingTransport.sendRawMessageToSignalUrl(signalUrl, {
type: SIGNALING_TYPE_LEAVE_SERVER, type: SIGNALING_TYPE_LEAVE_SERVER,
@@ -109,6 +115,11 @@ export class ServerMembershipSignalingHandler<TMessage> {
} }
this.dependencies.signalingCoordinator.clearJoinedServers(); this.dependencies.signalingCoordinator.clearJoinedServers();
for (const { signalUrl } of joinedEntries) {
this.dependencies.signalingCoordinator.pruneUnusedSignalUrl(signalUrl);
}
this.dependencies.runFullCleanup(); this.dependencies.runFullCleanup();
} }
@@ -116,11 +127,13 @@ export class ServerMembershipSignalingHandler<TMessage> {
const resolvedSignalUrl = this.dependencies.signalingCoordinator.getServerSignalUrl(serverId); const resolvedSignalUrl = this.dependencies.signalingCoordinator.getServerSignalUrl(serverId);
if (resolvedSignalUrl) { if (resolvedSignalUrl) {
this.dependencies.signalingCoordinator.removeJoinedServer(resolvedSignalUrl, serverId);
this.dependencies.signalingTransport.sendRawMessageToSignalUrl(resolvedSignalUrl, { this.dependencies.signalingTransport.sendRawMessageToSignalUrl(resolvedSignalUrl, {
type: SIGNALING_TYPE_LEAVE_SERVER, type: SIGNALING_TYPE_LEAVE_SERVER,
serverId serverId
}); });
this.dependencies.signalingCoordinator.removeJoinedServer(resolvedSignalUrl, serverId);
this.dependencies.signalingCoordinator.pruneUnusedSignalUrl(resolvedSignalUrl);
} else { } else {
this.dependencies.signalingTransport.sendRawMessage({ this.dependencies.signalingTransport.sendRawMessage({
type: SIGNALING_TYPE_LEAVE_SERVER, type: SIGNALING_TYPE_LEAVE_SERVER,
@@ -143,4 +156,26 @@ export class ServerMembershipSignalingHandler<TMessage> {
?? this.dependencies.signalingCoordinator.getServerSignalUrl(serverId) ?? this.dependencies.signalingCoordinator.getServerSignalUrl(serverId)
?? this.getCurrentSignalingUrl(); ?? this.getCurrentSignalingUrl();
} }
private migrateServerSignalUrl(serverId: string, nextSignalUrl: string): void {
const previousSignalUrl = this.dependencies.signalingCoordinator.getServerSignalUrl(serverId);
if (!previousSignalUrl || previousSignalUrl === nextSignalUrl) {
return;
}
this.dependencies.signalingTransport.sendRawMessageToSignalUrl(previousSignalUrl, {
type: SIGNALING_TYPE_LEAVE_SERVER,
serverId
});
this.dependencies.signalingCoordinator.removeJoinedServer(previousSignalUrl, serverId);
this.dependencies.signalingCoordinator.pruneUnusedSignalUrl(previousSignalUrl);
this.dependencies.logger.info('Migrated server to a new signaling route', {
previousSignalUrl,
serverId,
signalUrl: nextSignalUrl
});
}
} }

View File

@@ -110,6 +110,10 @@ export class ServerSignalingCoordinator<TMessage> {
this.lastJoinedServerBySignalUrl.set(signalUrl, joinedServer); this.lastJoinedServerBySignalUrl.set(signalUrl, joinedServer);
} }
getLastJoinedServer(signalUrl: string): JoinedServerInfo | null {
return this.lastJoinedServerBySignalUrl.get(signalUrl) ?? null;
}
clearLastJoinedServers(): void { clearLastJoinedServers(): void {
this.lastJoinedServerBySignalUrl.clear(); this.lastJoinedServerBySignalUrl.clear();
} }
@@ -156,15 +160,57 @@ export class ServerSignalingCoordinator<TMessage> {
} }
removeJoinedServer(signalUrl: string, serverId: string): void { removeJoinedServer(signalUrl: string, serverId: string): void {
this.getOrCreateMemberServerSet(signalUrl).delete(serverId); const memberServerIds = this.memberServerIdsBySignalUrl.get(signalUrl);
if (!memberServerIds) {
this.repairLastJoinedServer(signalUrl, serverId);
return;
}
memberServerIds.delete(serverId);
if (memberServerIds.size === 0) {
this.memberServerIdsBySignalUrl.delete(signalUrl);
}
this.repairLastJoinedServer(signalUrl, serverId);
} }
removeJoinedServerEverywhere(serverId: string): void { removeJoinedServerEverywhere(serverId: string): void {
for (const memberServerIds of this.memberServerIdsBySignalUrl.values()) { for (const signalUrl of Array.from(this.memberServerIdsBySignalUrl.keys())) {
memberServerIds.delete(serverId); this.removeJoinedServer(signalUrl, serverId);
this.pruneUnusedSignalUrl(signalUrl);
} }
} }
pruneUnusedSignalUrl(signalUrl: string): void {
if (this.getMemberServerIdsForSignalUrl(signalUrl).size > 0) {
return;
}
this.memberServerIdsBySignalUrl.delete(signalUrl);
this.lastJoinedServerBySignalUrl.delete(signalUrl);
const subscriptions = this.signalingSubscriptions.get(signalUrl);
if (subscriptions) {
for (const subscription of subscriptions) {
subscription.unsubscribe();
}
this.signalingSubscriptions.delete(signalUrl);
}
const manager = this.signalingManagers.get(signalUrl);
if (manager) {
manager.destroy();
this.signalingManagers.delete(signalUrl);
}
this.removeSignalUrlFromPeerTracking(signalUrl);
}
getMemberServerIdsForSignalUrl(signalUrl: string): ReadonlySet<string> { getMemberServerIdsForSignalUrl(signalUrl: string): ReadonlySet<string> {
return this.memberServerIdsBySignalUrl.get(signalUrl) ?? new Set<string>(); return this.memberServerIdsBySignalUrl.get(signalUrl) ?? new Set<string>();
} }
@@ -344,6 +390,45 @@ export class ServerSignalingCoordinator<TMessage> {
return trackedServerIds; return trackedServerIds;
} }
private repairLastJoinedServer(signalUrl: string, removedServerId: string): void {
const lastJoined = this.lastJoinedServerBySignalUrl.get(signalUrl);
if (!lastJoined) {
return;
}
const memberServerIds = this.memberServerIdsBySignalUrl.get(signalUrl);
if (!memberServerIds || memberServerIds.size === 0) {
this.lastJoinedServerBySignalUrl.delete(signalUrl);
return;
}
if (lastJoined.serverId !== removedServerId && memberServerIds.has(lastJoined.serverId)) {
return;
}
const nextServerId = memberServerIds.values().next().value as string | undefined;
if (!nextServerId) {
this.lastJoinedServerBySignalUrl.delete(signalUrl);
return;
}
this.lastJoinedServerBySignalUrl.set(signalUrl, {
...lastJoined,
serverId: nextServerId
});
}
private removeSignalUrlFromPeerTracking(signalUrl: string): void {
const peerIds = new Set<string>([...this.peerKnownSignalUrls.keys(), ...this.peerServerMap.keys()]);
for (const peerId of peerIds) {
this.removePeerSignalScope(peerId, signalUrl);
}
}
private removePeerSignalScope(peerId: string, signalUrl: string): void { private removePeerSignalScope(peerId: string, signalUrl: string): void {
const trackedSignalUrls = this.peerServerMap.get(peerId); const trackedSignalUrls = this.peerServerMap.get(peerId);

View File

@@ -1,6 +1,7 @@
import type { SignalingMessage } from '../../../shared-kernel'; import type { SignalingMessage } from '../../../shared-kernel';
import { PeerData } from '../realtime.types'; import { PeerData } from '../realtime.types';
import { import {
SIGNALING_TYPE_ACCESS_DENIED,
SIGNALING_TYPE_ANSWER, SIGNALING_TYPE_ANSWER,
SIGNALING_TYPE_CONNECTED, SIGNALING_TYPE_CONNECTED,
SIGNALING_TYPE_ICE_CANDIDATE, SIGNALING_TYPE_ICE_CANDIDATE,
@@ -98,6 +99,10 @@ export class IncomingSignalingMessageHandler {
this.handleIceCandidateSignalingMessage(message, signalUrl); this.handleIceCandidateSignalingMessage(message, signalUrl);
return; return;
case SIGNALING_TYPE_ACCESS_DENIED:
this.handleAccessDeniedSignalingMessage(message, signalUrl);
return;
default: default:
return; return;
} }
@@ -357,6 +362,16 @@ export class IncomingSignalingMessageHandler {
this.dependencies.peerManager.handleIceCandidate(fromUserId, candidate); this.dependencies.peerManager.handleIceCandidate(fromUserId, candidate);
} }
private handleAccessDeniedSignalingMessage(message: IncomingSignalingMessage, signalUrl: string): void {
if (!message.serverId) {
return;
}
// Remove the server from the coordinator for this signal URL so it won't
// be re-joined on the next reconnect cycle.
this.dependencies.signalingCoordinator.removeJoinedServer(signalUrl, message.serverId);
}
private scheduleUserJoinedFallbackOffer(peerId: string, signalUrl: string, serverId?: string): void { private scheduleUserJoinedFallbackOffer(peerId: string, signalUrl: string, serverId?: string): void {
this.clearUserJoinedFallbackOffer(peerId); this.clearUserJoinedFallbackOffer(peerId);

View File

@@ -118,6 +118,13 @@ export class SignalingTransportHandler<TMessage> {
if (serverSignalUrl && this.sendRawMessageToSignalUrl(serverSignalUrl, message)) { if (serverSignalUrl && this.sendRawMessageToSignalUrl(serverSignalUrl, message)) {
return; return;
} }
this.dependencies.logger.warn('[signaling] Missing server signal route for outbound raw message', {
serverId,
type: messageType
});
return;
} }
const connectedManagers = this.getConnectedSignalingManagers(); const connectedManagers = this.getConnectedSignalingManagers();
@@ -161,14 +168,14 @@ export class SignalingTransportHandler<TMessage> {
displayName: normalizedDisplayName displayName: normalizedDisplayName
}; };
const identifyMessage = {
type: SIGNALING_TYPE_IDENTIFY,
oderId,
displayName: normalizedDisplayName
};
if (signalUrl) { if (signalUrl) {
this.sendRawMessageToSignalUrl(signalUrl, identifyMessage); this.sendRawMessageToSignalUrl(signalUrl, {
type: SIGNALING_TYPE_IDENTIFY,
oderId,
displayName: normalizedDisplayName,
connectionScope: signalUrl
});
return; return;
} }
@@ -178,8 +185,13 @@ export class SignalingTransportHandler<TMessage> {
return; return;
} }
for (const { manager } of connectedManagers) { for (const { signalUrl: managerSignalUrl, manager } of connectedManagers) {
manager.sendRawMessage(identifyMessage); manager.sendRawMessage({
type: SIGNALING_TYPE_IDENTIFY,
oderId,
displayName: normalizedDisplayName,
connectionScope: managerSignalUrl
});
} }
} }
} }

View File

@@ -283,7 +283,8 @@ export class SignalingManager {
if (credentials) { if (credentials) {
this.sendRawMessage({ type: SIGNALING_TYPE_IDENTIFY, this.sendRawMessage({ type: SIGNALING_TYPE_IDENTIFY,
oderId: credentials.oderId, oderId: credentials.oderId,
displayName: credentials.displayName }); displayName: credentials.displayName,
connectionScope: this.lastSignalingUrl ?? undefined });
} }
const memberIds = this.getMemberServerIds(); const memberIds = this.getMemberServerIds();
@@ -296,17 +297,10 @@ export class SignalingManager {
const lastJoined = this.getLastJoinedServer(); const lastJoined = this.getLastJoinedServer();
if (lastJoined) { if (lastJoined && memberIds.has(lastJoined.serverId)) {
this.sendRawMessage({ type: SIGNALING_TYPE_VIEW_SERVER, this.sendRawMessage({ type: SIGNALING_TYPE_VIEW_SERVER,
serverId: lastJoined.serverId }); serverId: lastJoined.serverId });
} }
} else {
const lastJoined = this.getLastJoinedServer();
if (lastJoined) {
this.sendRawMessage({ type: SIGNALING_TYPE_JOIN_SERVER,
serverId: lastJoined.serverId });
}
} }
} }

View File

@@ -37,7 +37,11 @@ export const RoomsActions = createActionGroup({
'Create Room Success': props<{ room: Room }>(), 'Create Room Success': props<{ room: Room }>(),
'Create Room Failure': props<{ error: string }>(), 'Create Room Failure': props<{ error: string }>(),
'Join Room': props<{ roomId: string; password?: string; serverInfo?: Partial<ServerInfo> & { name: string } }>(), 'Join Room': props<{
roomId: string;
password?: string;
serverInfo?: Partial<ServerInfo> & { name: string; signalingUrl?: string };
}>(),
'Join Room Success': props<{ room: Room }>(), 'Join Room Success': props<{ room: Room }>(),
'Join Room Failure': props<{ error: string }>(), 'Join Room Failure': props<{ error: string }>(),

View File

@@ -13,6 +13,7 @@ import {
of, of,
from, from,
EMPTY, EMPTY,
firstValueFrom,
merge merge
} from 'rxjs'; } from 'rxjs';
import { import {
@@ -43,7 +44,10 @@ import {
saveLastViewedChatToStorage saveLastViewedChatToStorage
} from '../../infrastructure/persistence'; } from '../../infrastructure/persistence';
import { import {
areRoomSignalSourcesEqual,
CLIENT_UPDATE_REQUIRED_MESSAGE, CLIENT_UPDATE_REQUIRED_MESSAGE,
type RoomSignalSource,
type ServerInfo,
type ServerSourceSelector, type ServerSourceSelector,
ServerDirectoryFacade ServerDirectoryFacade
} from '../../domains/server-directory'; } from '../../domains/server-directory';
@@ -169,6 +173,12 @@ interface RoomPresenceSignalingMessage {
displayName?: string; displayName?: string;
} }
interface RoomSignalConnectionPlan {
fallbackSources: RoomSignalSource[];
primarySource: RoomSignalSource | null;
room: Room;
}
type BlockedRoomAccessAction = type BlockedRoomAccessAction =
| ReturnType<typeof RoomsActions.forgetRoom> | ReturnType<typeof RoomsActions.forgetRoom>
| ReturnType<typeof RoomsActions.joinRoomFailure>; | ReturnType<typeof RoomsActions.joinRoomFailure>;
@@ -196,6 +206,7 @@ export class RoomsEffects {
* join/leave sound within {@link RECONNECT_SOUND_GRACE_MS}. * join/leave sound within {@link RECONNECT_SOUND_GRACE_MS}.
*/ */
private recentlyLeftVoiceTimestamps = new Map<string, number>(); private recentlyLeftVoiceTimestamps = new Map<string, number>();
private readonly roomSignalFallbackSources = new Map<string, RoomSignalSource>();
private roomNavigationRequestVersion = 0; private roomNavigationRequestVersion = 0;
private latestNavigatedRoomId: string | null = null; private latestNavigatedRoomId: string | null = null;
@@ -388,13 +399,27 @@ export class RoomsEffects {
return from(this.db.getRoom(roomId)).pipe( return from(this.db.getRoom(roomId)).pipe(
switchMap((room) => { switchMap((room) => {
const sourceSelector = serverInfo const sourceSelector = serverInfo
? { ? this.serverDirectory.buildRoomSignalSelector({
sourceId: serverInfo.sourceId, sourceId: serverInfo.sourceId,
sourceUrl: serverInfo.sourceUrl sourceName: serverInfo.sourceName,
} sourceUrl: serverInfo.sourceUrl,
signalingUrl: serverInfo.signalingUrl,
fallbackName: serverInfo.sourceName ?? serverInfo.name
}, {
ensureEndpoint: !!(serverInfo.sourceUrl ?? serverInfo.signalingUrl)
})
: undefined; : undefined;
if (room) { if (room) {
const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: serverInfo?.sourceId ?? room.sourceId,
sourceName: serverInfo?.sourceName ?? room.sourceName,
sourceUrl: serverInfo?.sourceUrl ?? room.sourceUrl,
signalingUrl: serverInfo?.signalingUrl,
fallbackName: serverInfo?.sourceName ?? room.sourceName ?? serverInfo?.name ?? room.name
}, {
ensureEndpoint: !!(serverInfo?.sourceUrl ?? room.sourceUrl ?? serverInfo?.signalingUrl)
});
const resolvedRoom: Room = { const resolvedRoom: Room = {
...room, ...room,
isPrivate: typeof serverInfo?.isPrivate === 'boolean' ? serverInfo.isPrivate : room.isPrivate, isPrivate: typeof serverInfo?.isPrivate === 'boolean' ? serverInfo.isPrivate : room.isPrivate,
@@ -403,9 +428,7 @@ export class RoomsEffects {
roles: serverInfo?.roles ?? room.roles, roles: serverInfo?.roles ?? room.roles,
roleAssignments: serverInfo?.roleAssignments ?? room.roleAssignments, roleAssignments: serverInfo?.roleAssignments ?? room.roleAssignments,
channelPermissions: serverInfo?.channelPermissions ?? room.channelPermissions, channelPermissions: serverInfo?.channelPermissions ?? room.channelPermissions,
sourceId: serverInfo?.sourceId ?? room.sourceId, ...resolvedSource,
sourceName: serverInfo?.sourceName ?? room.sourceName,
sourceUrl: serverInfo?.sourceUrl ?? room.sourceUrl,
hasPassword: hasPassword:
typeof serverInfo?.hasPassword === 'boolean' typeof serverInfo?.hasPassword === 'boolean'
? serverInfo.hasPassword ? serverInfo.hasPassword
@@ -430,6 +453,15 @@ export class RoomsEffects {
// If not in local DB but we have server info from search, create a room entry // If not in local DB but we have server info from search, create a room entry
if (serverInfo) { if (serverInfo) {
const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: serverInfo.sourceId,
sourceName: serverInfo.sourceName,
sourceUrl: serverInfo.sourceUrl,
signalingUrl: serverInfo.signalingUrl,
fallbackName: serverInfo.sourceName ?? serverInfo.name
}, {
ensureEndpoint: !!(serverInfo.sourceUrl ?? serverInfo.signalingUrl)
});
const newRoom: Room = { const newRoom: Room = {
id: roomId, id: roomId,
name: serverInfo.name, name: serverInfo.name,
@@ -445,9 +477,7 @@ export class RoomsEffects {
roles: serverInfo.roles, roles: serverInfo.roles,
roleAssignments: serverInfo.roleAssignments, roleAssignments: serverInfo.roleAssignments,
channelPermissions: serverInfo.channelPermissions, channelPermissions: serverInfo.channelPermissions,
sourceId: serverInfo.sourceId, ...resolvedSource
sourceName: serverInfo.sourceName,
sourceUrl: serverInfo.sourceUrl
}; };
// Save to local DB for future reference // Save to local DB for future reference
@@ -459,6 +489,14 @@ export class RoomsEffects {
return this.serverDirectory.getServer(roomId, sourceSelector).pipe( return this.serverDirectory.getServer(roomId, sourceSelector).pipe(
switchMap((serverData) => { switchMap((serverData) => {
if (serverData) { if (serverData) {
const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: serverData.sourceId,
sourceName: serverData.sourceName,
sourceUrl: serverData.sourceUrl,
fallbackName: serverData.sourceName ?? serverData.name
}, {
ensureEndpoint: !!serverData.sourceUrl
});
const newRoom: Room = { const newRoom: Room = {
id: serverData.id, id: serverData.id,
name: serverData.name, name: serverData.name,
@@ -474,9 +512,7 @@ export class RoomsEffects {
roles: serverData.roles, roles: serverData.roles,
roleAssignments: serverData.roleAssignments, roleAssignments: serverData.roleAssignments,
channelPermissions: serverData.channelPermissions, channelPermissions: serverData.channelPermissions,
sourceId: serverData.sourceId, ...resolvedSource
sourceName: serverData.sourceName,
sourceUrl: serverData.sourceUrl
}; };
this.db.saveRoom(newRoom); this.db.saveRoom(newRoom);
@@ -621,16 +657,32 @@ export class RoomsEffects {
refreshServerOwnedRoomMetadata$ = createEffect(() => refreshServerOwnedRoomMetadata$ = createEffect(() =>
this.actions$.pipe( this.actions$.pipe(
ofType(RoomsActions.joinRoomSuccess, RoomsActions.viewServerSuccess), ofType(RoomsActions.joinRoomSuccess, RoomsActions.viewServerSuccess),
switchMap(({ room }) => switchMap(({ room }) => {
this.serverDirectory.getServer(room.id, { const source = this.resolveRoomSignalSource(room);
sourceId: room.sourceId, const selector = this.resolveRoomSignalSelector(source, room.name);
sourceUrl: room.sourceUrl const roomRequest$ = selector
}).pipe( ? this.serverDirectory.getServer(room.id, selector).pipe(
switchMap((serverData) => serverData
? of(serverData)
: this.serverDirectory.findServerAcrossActiveEndpoints(room.id, source))
)
: this.serverDirectory.findServerAcrossActiveEndpoints(room.id, source);
return roomRequest$.pipe(
map((serverData) => { map((serverData) => {
if (!serverData) { if (!serverData) {
return null; return null;
} }
const resolvedSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: serverData.sourceId ?? room.sourceId,
sourceName: serverData.sourceName ?? room.sourceName,
sourceUrl: serverData.sourceUrl ?? room.sourceUrl,
fallbackName: serverData.sourceName ?? room.sourceName ?? room.name
}, {
ensureEndpoint: !!(serverData.sourceUrl ?? room.sourceUrl)
});
return RoomsActions.updateRoom({ return RoomsActions.updateRoom({
roomId: room.id, roomId: room.id,
changes: { changes: {
@@ -645,16 +697,14 @@ export class RoomsEffects {
roles: serverData.roles ?? room.roles, roles: serverData.roles ?? room.roles,
roleAssignments: serverData.roleAssignments ?? room.roleAssignments, roleAssignments: serverData.roleAssignments ?? room.roleAssignments,
channelPermissions: serverData.channelPermissions ?? room.channelPermissions, channelPermissions: serverData.channelPermissions ?? room.channelPermissions,
sourceId: serverData.sourceId ?? room.sourceId, ...resolvedSource
sourceName: serverData.sourceName ?? room.sourceName,
sourceUrl: serverData.sourceUrl ?? room.sourceUrl
} }
}); });
}), }),
filter((action): action is ReturnType<typeof RoomsActions.updateRoom> => !!action), filter((action): action is ReturnType<typeof RoomsActions.updateRoom> => !!action),
catchError(() => EMPTY) catchError(() => EMPTY)
) );
) })
) )
); );
@@ -732,6 +782,7 @@ export class RoomsEffects {
), ),
switchMap(([{ roomId }]) => { switchMap(([{ roomId }]) => {
this.db.deleteRoom(roomId); this.db.deleteRoom(roomId);
this.roomSignalFallbackSources.delete(roomId);
this.webrtc.broadcastMessage({ type: 'room-deleted', this.webrtc.broadcastMessage({ type: 'room-deleted',
roomId }); roomId });
@@ -824,6 +875,7 @@ export class RoomsEffects {
// Delete from local DB // Delete from local DB
this.db.deleteRoom(roomId); this.db.deleteRoom(roomId);
this.roomSignalFallbackSources.delete(roomId);
// Leave this specific server (doesn't affect other servers) // Leave this specific server (doesn't affect other servers)
this.webrtc.leaveRoom(roomId); this.webrtc.leaveRoom(roomId);
@@ -1322,6 +1374,12 @@ export class RoomsEffects {
if (signalingMessage.reason !== 'SERVER_NOT_FOUND') if (signalingMessage.reason !== 'SERVER_NOT_FOUND')
return EMPTY; return EMPTY;
// When multiple signal URLs are configured, the room may already
// be successfully joined on a different signal server. Only show
// the reconnect notice when the room is not reachable at all.
if (signalingMessage.serverId && this.webrtc.hasJoinedServer(signalingMessage.serverId))
return EMPTY;
return [RoomsActions.setSignalServerReconnecting({ isReconnecting: true })]; return [RoomsActions.setSignalServerReconnecting({ isReconnecting: true })];
} }
@@ -1997,23 +2055,127 @@ export class RoomsEffects {
): Promise<void> { ): Promise<void> {
const shouldShowCompatibilityError = options.showCompatibilityError ?? false; const shouldShowCompatibilityError = options.showCompatibilityError ?? false;
const navigationRequestVersion = options.navigationRequestVersion; const navigationRequestVersion = options.navigationRequestVersion;
const compatibilitySelector = this.resolveCompatibilitySelector(room); const isViewedRoom = () => room.id === this.latestNavigatedRoomId;
const isCompatible = compatibilitySelector === null
? true await this.serverDirectory.awaitInitialServerHealthCheck();
: await this.serverDirectory.ensureEndpointVersionCompatibility(compatibilitySelector);
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) { if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
return; return;
} }
if (!isCompatible) { const connectionPlan = await this.resolveRoomSignalConnectionPlan(room);
if (shouldShowCompatibilityError) {
this.store.dispatch( if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE }) return;
); }
const sessionFallbackSource = this.roomSignalFallbackSources.get(room.id);
const connectionCandidates: {
isExistingFallback?: boolean;
isFallback?: boolean;
isPrimary?: boolean;
source: RoomSignalSource;
}[] = [];
const pushConnectionCandidate = (
source: RoomSignalSource | null | undefined,
flags: { isExistingFallback?: boolean; isFallback?: boolean; isPrimary?: boolean } = {}
) => {
if (!source || !this.resolveRoomSignalSelector(source, room.name)) {
return;
}
if (connectionCandidates.some((candidate) => areRoomSignalSourcesEqual(candidate.source, source))) {
return;
}
connectionCandidates.push({
...flags,
source
});
};
if (sessionFallbackSource && this.webrtc.hasJoinedServer(room.id)) {
pushConnectionCandidate(sessionFallbackSource, { isExistingFallback: true, isFallback: true });
}
pushConnectionCandidate(connectionPlan.primarySource, { isPrimary: true });
for (const fallbackSource of connectionPlan.fallbackSources) {
pushConnectionCandidate(fallbackSource, { isFallback: true });
}
let attemptedFallback = false;
for (const candidate of connectionCandidates) {
const selector = this.resolveRoomSignalSelector(candidate.source, connectionPlan.room.name);
if (!selector) {
continue;
}
const isCompatible = await this.serverDirectory.ensureEndpointVersionCompatibility(selector);
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
return;
}
if (!isCompatible) {
if (candidate.isPrimary) {
if (shouldShowCompatibilityError) {
this.store.dispatch(
RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE })
);
}
if (isViewedRoom()) {
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false }));
}
return;
}
continue;
}
if (candidate.isFallback && !candidate.isExistingFallback && !attemptedFallback) {
attemptedFallback = true;
if (isViewedRoom()) {
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true }));
}
}
const connected = await this.connectRoomToSignalSource(
connectionPlan.room,
candidate.source,
user,
resolvedOderId,
savedRooms,
navigationRequestVersion
);
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
return;
}
if (!connected) {
continue;
}
if (candidate.isFallback) {
this.roomSignalFallbackSources.set(room.id, candidate.source);
} else {
this.roomSignalFallbackSources.delete(room.id);
}
if (shouldShowCompatibilityError) {
this.store.dispatch(RoomsActions.setSignalServerCompatibilityError({ message: null }));
}
if (isViewedRoom()) {
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false }));
} }
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false }));
return; return;
} }
@@ -2021,49 +2183,9 @@ export class RoomsEffects {
this.store.dispatch(RoomsActions.setSignalServerCompatibilityError({ message: null })); this.store.dispatch(RoomsActions.setSignalServerCompatibilityError({ message: null }));
} }
const wsUrl = this.serverDirectory.getWebSocketUrl({ if (isViewedRoom()) {
sourceId: room.sourceId, this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true }));
sourceUrl: room.sourceUrl
});
const oderId = resolvedOderId || user?.oderId || this.webrtc.peerId();
const displayName = resolveUserDisplayName(user);
const sameSignalRooms = this.getRoomsForSignalingUrl(this.includeRoom(savedRooms, room), wsUrl);
const backgroundRooms = sameSignalRooms.filter((candidate) => candidate.id !== room.id);
const joinCurrentEndpointRooms = () => {
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
return;
}
this.webrtc.setCurrentServer(room.id);
this.webrtc.identify(oderId, displayName, wsUrl);
for (const backgroundRoom of backgroundRooms) {
if (!this.webrtc.hasJoinedServer(backgroundRoom.id)) {
this.webrtc.joinRoom(backgroundRoom.id, oderId, wsUrl);
}
}
if (this.webrtc.hasJoinedServer(room.id)) {
this.webrtc.switchServer(room.id, oderId, wsUrl);
} else {
this.webrtc.joinRoom(room.id, oderId, wsUrl);
}
};
if (this.webrtc.isSignalingConnectedTo(wsUrl)) {
joinCurrentEndpointRooms();
return;
} }
this.webrtc.connectToSignalingServer(wsUrl).subscribe({
next: (connected) => {
if (!connected || !this.isCurrentRoomNavigation(room.id, navigationRequestVersion))
return;
joinCurrentEndpointRooms();
},
error: () => {}
});
} }
private syncSavedRoomConnections(user: User | null, currentRoom: Room | null, savedRooms: Room[]): void { private syncSavedRoomConnections(user: User | null, currentRoom: Room | null, savedRooms: Room[]): void {
@@ -2076,10 +2198,12 @@ export class RoomsEffects {
const roomsBySignalingUrl = new Map<string, Room[]>(); const roomsBySignalingUrl = new Map<string, Room[]>();
for (const room of roomsToSync) { for (const room of roomsToSync) {
const wsUrl = this.serverDirectory.getWebSocketUrl({ const wsUrl = this.resolveRoomSignalingUrl(room);
sourceId: room.sourceId,
sourceUrl: room.sourceUrl if (!wsUrl) {
}); continue;
}
const groupedRooms = roomsBySignalingUrl.get(wsUrl) ?? []; const groupedRooms = roomsBySignalingUrl.get(wsUrl) ?? [];
if (!groupedRooms.some((groupedRoom) => groupedRoom.id === room.id)) { if (!groupedRooms.some((groupedRoom) => groupedRoom.id === room.id)) {
@@ -2149,10 +2273,7 @@ export class RoomsEffects {
continue; continue;
} }
if (this.serverDirectory.getWebSocketUrl({ if (this.resolveRoomSignalingUrl(room) !== wsUrl) {
sourceId: room.sourceId,
sourceUrl: room.sourceUrl
}) !== wsUrl) {
continue; continue;
} }
@@ -2202,6 +2323,175 @@ export class RoomsEffects {
return localStorage.getItem('metoyou_currentUserId'); return localStorage.getItem('metoyou_currentUserId');
} }
private async resolveRoomSignalConnectionPlan(room: Room): Promise<RoomSignalConnectionPlan> {
let resolvedRoom = this.repairRoomSignalSource(room, this.resolveRoomSignalSource(room));
let primarySource = this.resolveRoomSignalSource(resolvedRoom);
if (!this.webrtc.hasJoinedServer(room.id)) {
const selector = this.resolveRoomSignalSelector(primarySource, resolvedRoom.name);
const authoritativeServer = (
selector
? await firstValueFrom(this.serverDirectory.getServer(room.id, selector))
: null
) ?? await firstValueFrom(this.serverDirectory.findServerAcrossActiveEndpoints(room.id, primarySource));
if (authoritativeServer) {
const authoritativeSource = this.serverDirectory.normaliseRoomSignalSource({
sourceId: authoritativeServer.sourceId ?? primarySource.sourceId,
sourceName: authoritativeServer.sourceName ?? primarySource.sourceName,
sourceUrl: authoritativeServer.sourceUrl ?? primarySource.sourceUrl,
fallbackName: authoritativeServer.sourceName ?? primarySource.sourceName ?? resolvedRoom.name
}, {
ensureEndpoint: !!(authoritativeServer.sourceUrl ?? primarySource.sourceUrl)
});
resolvedRoom = this.repairRoomSignalSource(resolvedRoom, authoritativeSource);
primarySource = authoritativeSource;
}
}
const fallbackSources = this.serverDirectory.getFallbackRoomEndpoints(primarySource)
.map((endpoint) => this.serverDirectory.normaliseRoomSignalSource({
sourceId: endpoint.id,
sourceName: endpoint.name,
sourceUrl: endpoint.url,
fallbackName: endpoint.name
}))
.filter((source, index, sources) =>
sources.findIndex((candidate) => areRoomSignalSourcesEqual(candidate, source)) === index
);
return {
fallbackSources,
primarySource: this.resolveRoomSignalSelector(primarySource, resolvedRoom.name) ? primarySource : null,
room: resolvedRoom
};
}
private async connectRoomToSignalSource(
room: Room,
source: RoomSignalSource,
user: User | null,
resolvedOderId: string | undefined,
savedRooms: Room[],
navigationRequestVersion?: number
): Promise<boolean> {
const selector = this.resolveRoomSignalSelector(source, room.name);
if (!selector) {
return false;
}
const wsUrl = this.serverDirectory.getWebSocketUrl(selector);
const oderId = resolvedOderId || user?.oderId || this.webrtc.peerId();
const displayName = resolveUserDisplayName(user);
const sameSignalRooms = this.getRoomsForSignalingUrl(this.includeRoom(savedRooms, room), wsUrl);
const backgroundRooms = sameSignalRooms.filter((candidate) => candidate.id !== room.id);
const joinCurrentEndpointRooms = () => {
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
return;
}
this.webrtc.setCurrentServer(room.id);
this.webrtc.identify(oderId, displayName, wsUrl);
for (const backgroundRoom of backgroundRooms) {
this.webrtc.joinRoom(backgroundRoom.id, oderId, wsUrl);
}
if (this.webrtc.hasJoinedServer(room.id)) {
this.webrtc.switchServer(room.id, oderId, wsUrl);
} else {
this.webrtc.joinRoom(room.id, oderId, wsUrl);
}
};
if (this.webrtc.isSignalingConnectedTo(wsUrl)) {
joinCurrentEndpointRooms();
return true;
}
try {
const connected = await firstValueFrom(this.webrtc.connectToSignalingServer(wsUrl));
if (!connected || !this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
return false;
}
joinCurrentEndpointRooms();
return true;
} catch {
return false;
}
}
private resolveRoomSignalSource(
room: Pick<Room, 'name' | 'sourceId' | 'sourceName' | 'sourceUrl'>
): RoomSignalSource {
return this.serverDirectory.normaliseRoomSignalSource({
sourceId: room.sourceId,
sourceName: room.sourceName,
sourceUrl: room.sourceUrl,
fallbackName: room.sourceName ?? room.name
}, {
ensureEndpoint: !!room.sourceUrl
});
}
private repairRoomSignalSource(room: Room, source: RoomSignalSource | null): Room {
if (!source || areRoomSignalSourcesEqual(room, source)) {
return room;
}
const changes: Partial<Room> = {
sourceId: source.sourceId,
sourceName: source.sourceName,
sourceUrl: source.sourceUrl
};
this.store.dispatch(RoomsActions.updateRoom({
roomId: room.id,
changes
}));
return {
...room,
...changes
};
}
private resolveRoomSignalSelector(
source: RoomSignalSource | null | undefined,
fallbackName: string
): ServerSourceSelector | undefined {
if (!source) {
return undefined;
}
return this.serverDirectory.buildRoomSignalSelector({
...source,
fallbackName: source.sourceName ?? fallbackName
}, {
ensureEndpoint: !!source.sourceUrl
});
}
private getPreferredRoomSignalSource(room: Room): RoomSignalSource {
const fallbackSource = this.roomSignalFallbackSources.get(room.id);
if (fallbackSource && this.webrtc.hasJoinedServer(room.id)) {
return fallbackSource;
}
return this.resolveRoomSignalSource(room);
}
private resolveRoomSignalingUrl(room: Room): string {
const selector = this.resolveRoomSignalSelector(this.getPreferredRoomSignalSource(room), room.name);
return selector ? this.serverDirectory.getWebSocketUrl(selector) : '';
}
private async getBlockedRoomAccessActions( private async getBlockedRoomAccessActions(
roomId: string, roomId: string,
currentUser: { id: string; oderId: string } | null currentUser: { id: string; oderId: string } | null