Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a49e18b9f0 |
@@ -35,6 +35,18 @@ test.describe('Direct message flow', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('delivers a live DM to the recipient conversation', async ({ createClient }) => {
|
||||||
|
const scenario = await createDmScenario(createClient);
|
||||||
|
const liveMessage = `Live DM ${uniqueName('msg')}`;
|
||||||
|
|
||||||
|
await openDmFromRoomUserCard(scenario.alice.page, 'Bob');
|
||||||
|
await scenario.alice.page.getByTestId('dm-input').fill(liveMessage);
|
||||||
|
await scenario.alice.page.getByTestId('dm-input').press('Enter');
|
||||||
|
|
||||||
|
await openDmFromRoomUserCard(scenario.bob.page, 'Alice');
|
||||||
|
await expect(scenario.bob.page.locator('app-dm-chat').getByText(liveMessage)).toBeVisible({ timeout: 20_000 });
|
||||||
|
});
|
||||||
|
|
||||||
test('shows friend and message actions on the search people list', async ({ createClient }) => {
|
test('shows friend and message actions on the search people list', async ({ createClient }) => {
|
||||||
const scenario = await createDmScenario(createClient);
|
const scenario = await createDmScenario(createClient);
|
||||||
|
|
||||||
@@ -110,6 +122,15 @@ async function registerUser(page: Page, username: string, displayName: string):
|
|||||||
await expect(page).toHaveURL(/\/search/, { timeout: 15_000 });
|
await expect(page).toHaveURL(/\/search/, { timeout: 15_000 });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function openDmFromRoomUserCard(page: Page, displayName: string): Promise<void> {
|
||||||
|
const userCard = page.locator('[data-testid^="room-user-card-"]', { hasText: displayName }).first();
|
||||||
|
|
||||||
|
await expect(userCard).toBeVisible({ timeout: 20_000 });
|
||||||
|
await userCard.getByRole('button', { name: `Message ${displayName}` }).click();
|
||||||
|
await expect(page).toHaveURL(/\/dm\//, { timeout: 15_000 });
|
||||||
|
await expect(page.getByRole('heading', { name: displayName })).toBeVisible({ timeout: 10_000 });
|
||||||
|
}
|
||||||
|
|
||||||
function uniqueName(prefix: string): string {
|
function uniqueName(prefix: string): string {
|
||||||
return `${prefix}-${Date.now()}-${Math.random().toString(36)
|
return `${prefix}-${Date.now()}-${Math.random().toString(36)
|
||||||
.slice(2, 8)}`;
|
.slice(2, 8)}`;
|
||||||
|
|||||||
@@ -286,6 +286,26 @@ function handleChatMessage(user: ConnectedUser, message: WsMessage): void {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function handleVoiceState(user: ConnectedUser, message: WsMessage): void {
|
||||||
|
const serverId = readMessageId(message['serverId']) ?? user.viewedServerId;
|
||||||
|
|
||||||
|
if (!serverId || !user.serverIds.has(serverId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
broadcastToServer(
|
||||||
|
serverId,
|
||||||
|
{
|
||||||
|
...message,
|
||||||
|
type: 'voice_state',
|
||||||
|
serverId,
|
||||||
|
oderId: user.oderId,
|
||||||
|
displayName: normalizeDisplayName(user.displayName)
|
||||||
|
},
|
||||||
|
user.oderId
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
function handleTyping(user: ConnectedUser, message: WsMessage): void {
|
function handleTyping(user: ConnectedUser, message: WsMessage): void {
|
||||||
const typingSid = (message['serverId'] as string | undefined) ?? user.viewedServerId;
|
const typingSid = (message['serverId'] as string | undefined) ?? user.viewedServerId;
|
||||||
const channelId = typeof message['channelId'] === 'string' && message['channelId'].trim() ? message['channelId'].trim() : 'general';
|
const channelId = typeof message['channelId'] === 'string' && message['channelId'].trim() ? message['channelId'].trim() : 'general';
|
||||||
@@ -461,6 +481,9 @@ export async function handleWebSocketMessage(connectionId: string, message: WsMe
|
|||||||
case 'offer':
|
case 'offer':
|
||||||
case 'answer':
|
case 'answer':
|
||||||
case 'ice_candidate':
|
case 'ice_candidate':
|
||||||
|
case 'direct-message':
|
||||||
|
case 'direct-message-status':
|
||||||
|
case 'direct-message-mutation':
|
||||||
case 'server_icon_peer_request':
|
case 'server_icon_peer_request':
|
||||||
case 'server_icon_peer_data':
|
case 'server_icon_peer_data':
|
||||||
forwardRtcMessage(user, message);
|
forwardRtcMessage(user, message);
|
||||||
@@ -470,6 +493,10 @@ export async function handleWebSocketMessage(connectionId: string, message: WsMe
|
|||||||
handleChatMessage(user, message);
|
handleChatMessage(user, message);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case 'voice_state':
|
||||||
|
handleVoiceState(user, message);
|
||||||
|
break;
|
||||||
|
|
||||||
case 'typing':
|
case 'typing':
|
||||||
handleTyping(user, message);
|
handleTyping(user, message);
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ graph TD
|
|||||||
|
|
||||||
## Message lifecycle
|
## Message lifecycle
|
||||||
|
|
||||||
Messages are created in the composer, broadcast to peers over the data channel, and rendered in the list. Editing and deletion are sender-only operations.
|
Messages are created in the composer, broadcast to peers over the data channel, and rendered in the list. Live room chat also emits a narrow `chat_message` signaling fallback so peers can receive text while the data channel is unavailable. Editing and deletion are sender-only operations.
|
||||||
|
|
||||||
```mermaid
|
```mermaid
|
||||||
sequenceDiagram
|
sequenceDiagram
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# Direct Message Domain
|
# Direct Message Domain
|
||||||
|
|
||||||
Direct messages provide local, offline-safe one-to-one messaging over the existing WebRTC data channel.
|
Direct messages provide local, offline-safe one-to-one messaging over the existing WebRTC data channel, with a signaling relay fallback when no peer data channel is available but a route to the recipient is known.
|
||||||
|
|
||||||
## Structure
|
## Structure
|
||||||
|
|
||||||
@@ -16,9 +16,10 @@ direct-message/
|
|||||||
|
|
||||||
1. `DirectMessageService.sendMessage()` stores the message locally with `QUEUED`.
|
1. `DirectMessageService.sendMessage()` stores the message locally with `QUEUED`.
|
||||||
2. `PeerDeliveryService` tries to send a `direct-message` P2P event to the recipient's current peer id.
|
2. `PeerDeliveryService` tries to send a `direct-message` P2P event to the recipient's current peer id.
|
||||||
3. If the peer is connected, the sender advances to `SENT`; otherwise the message id remains in `OfflineMessageQueueService`.
|
3. If no data channel is connected, `PeerDeliveryService` tries the recipient's known signaling route before leaving the message queued.
|
||||||
4. The recipient persists the message as `DELIVERED` and sends a `direct-message-status` event back.
|
4. If either transport sends, the sender advances to `SENT`; otherwise the message id remains in `OfflineMessageQueueService`.
|
||||||
5. Opening the conversation marks incoming messages as `ACKNOWLEDGED` and emits a status event.
|
5. The recipient persists the message as `DELIVERED` and sends a `direct-message-status` event back.
|
||||||
|
6. Opening the conversation marks incoming messages as `ACKNOWLEDGED` and emits a status event.
|
||||||
|
|
||||||
Status transitions are monotonic, so a stale `SENT` event cannot overwrite `DELIVERED` or `ACKNOWLEDGED`.
|
Status transitions are monotonic, so a stale `SENT` event cannot overwrite `DELIVERED` or `ACKNOWLEDGED`.
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,131 @@
|
|||||||
|
import {
|
||||||
|
Injector,
|
||||||
|
runInInjectionContext,
|
||||||
|
signal
|
||||||
|
} from '@angular/core';
|
||||||
|
import { Store } from '@ngrx/store';
|
||||||
|
import { Subject } from 'rxjs';
|
||||||
|
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
||||||
|
import { selectAllUsers } from '../../../../store/users/users.selectors';
|
||||||
|
import type { ChatEvent, User } from '../../../../shared-kernel';
|
||||||
|
import { PeerDeliveryService } from './peer-delivery.service';
|
||||||
|
|
||||||
|
describe('PeerDeliveryService', () => {
|
||||||
|
it('relays direct messages through signaling when no data channel is connected', () => {
|
||||||
|
const context = createServiceContext({ connectedPeers: [], routedPeers: ['bob'] });
|
||||||
|
const event: ChatEvent = {
|
||||||
|
type: 'direct-message',
|
||||||
|
directMessage: {
|
||||||
|
message: {
|
||||||
|
id: 'message-1',
|
||||||
|
conversationId: 'dm-alice-bob',
|
||||||
|
senderId: 'alice',
|
||||||
|
recipientId: 'bob',
|
||||||
|
content: 'hello',
|
||||||
|
timestamp: 1,
|
||||||
|
status: 'QUEUED'
|
||||||
|
},
|
||||||
|
sender: {
|
||||||
|
userId: 'alice',
|
||||||
|
username: 'alice',
|
||||||
|
displayName: 'Alice'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
expect(context.service.sendViaWebRTC('bob', event)).toBe(true);
|
||||||
|
expect(context.realtime.sendToPeer).not.toHaveBeenCalled();
|
||||||
|
expect(context.realtime.sendRawMessage).toHaveBeenCalledWith({
|
||||||
|
...event,
|
||||||
|
targetUserId: 'bob'
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps messages queued when neither P2P nor signaling can reach the recipient', () => {
|
||||||
|
const context = createServiceContext({ connectedPeers: [], routedPeers: [] });
|
||||||
|
|
||||||
|
expect(context.service.sendViaWebRTC('bob', { type: 'direct-message' })).toBe(false);
|
||||||
|
expect(context.realtime.sendRawMessage).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits direct messages received over signaling', () => {
|
||||||
|
const context = createServiceContext({ connectedPeers: [] });
|
||||||
|
const received: ChatEvent[] = [];
|
||||||
|
|
||||||
|
context.service.directMessageEvents$.subscribe((event) => received.push(event));
|
||||||
|
context.signalingMessages.next({ type: 'direct-message' } as ChatEvent);
|
||||||
|
|
||||||
|
expect(received).toEqual([{ type: 'direct-message' }]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
interface ServiceContextOptions {
|
||||||
|
connectedPeers: string[];
|
||||||
|
routedPeers?: string[];
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ServiceContext {
|
||||||
|
service: PeerDeliveryService;
|
||||||
|
signalingMessages: Subject<ChatEvent>;
|
||||||
|
realtime: {
|
||||||
|
getConnectedPeers: ReturnType<typeof vi.fn>;
|
||||||
|
hasSignalingRouteForPeer: ReturnType<typeof vi.fn>;
|
||||||
|
sendRawMessage: ReturnType<typeof vi.fn>;
|
||||||
|
sendToPeer: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createServiceContext(options: ServiceContextOptions): ServiceContext {
|
||||||
|
const users = signal<User[]>([createUser('alice', 'Alice'), createUser('bob', 'Bob')]);
|
||||||
|
const incomingMessages = new Subject<ChatEvent>();
|
||||||
|
const signalingMessages = new Subject<ChatEvent>();
|
||||||
|
const peerConnected = new Subject<string>();
|
||||||
|
const realtime = {
|
||||||
|
onMessageReceived: incomingMessages.asObservable(),
|
||||||
|
onSignalingMessage: signalingMessages.asObservable(),
|
||||||
|
onPeerConnected: peerConnected.asObservable(),
|
||||||
|
getConnectedPeers: vi.fn(() => options.connectedPeers),
|
||||||
|
hasSignalingRouteForPeer: vi.fn((peerId: string) => (options.routedPeers ?? []).includes(peerId)),
|
||||||
|
sendRawMessage: vi.fn(),
|
||||||
|
sendToPeer: vi.fn()
|
||||||
|
};
|
||||||
|
const store = {
|
||||||
|
selectSignal: vi.fn((selector: unknown) => {
|
||||||
|
if (selector === selectAllUsers) {
|
||||||
|
return users;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error('Unexpected selector requested by PeerDeliveryService test.');
|
||||||
|
})
|
||||||
|
};
|
||||||
|
const injector = Injector.create({
|
||||||
|
providers: [
|
||||||
|
{
|
||||||
|
provide: RealtimeSessionFacade,
|
||||||
|
useValue: realtime
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: Store,
|
||||||
|
useValue: store
|
||||||
|
}
|
||||||
|
]
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
service: runInInjectionContext(injector, () => new PeerDeliveryService()),
|
||||||
|
signalingMessages,
|
||||||
|
realtime
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createUser(id: string, displayName: string): User {
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
oderId: id,
|
||||||
|
username: displayName.toLowerCase(),
|
||||||
|
displayName,
|
||||||
|
status: 'online',
|
||||||
|
role: 'member',
|
||||||
|
joinedAt: 1
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import { Store } from '@ngrx/store';
|
|||||||
import {
|
import {
|
||||||
Subject,
|
Subject,
|
||||||
filter,
|
filter,
|
||||||
|
merge,
|
||||||
type Observable
|
type Observable
|
||||||
} from 'rxjs';
|
} from 'rxjs';
|
||||||
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
||||||
@@ -17,7 +18,10 @@ export class PeerDeliveryService {
|
|||||||
private readonly users = this.store.selectSignal(selectAllUsers);
|
private readonly users = this.store.selectSignal(selectAllUsers);
|
||||||
private readonly networkRestoredSubject = new Subject<void>();
|
private readonly networkRestoredSubject = new Subject<void>();
|
||||||
|
|
||||||
readonly directMessageEvents$: Observable<ChatEvent> = this.webrtc.onMessageReceived.pipe(
|
readonly directMessageEvents$: Observable<ChatEvent> = merge(
|
||||||
|
this.webrtc.onMessageReceived,
|
||||||
|
this.webrtc.onSignalingMessage as Observable<ChatEvent>
|
||||||
|
).pipe(
|
||||||
filter((event) => event.type === 'direct-message' || event.type === 'direct-message-status' || event.type === 'direct-message-mutation')
|
filter((event) => event.type === 'direct-message' || event.type === 'direct-message-status' || event.type === 'direct-message-mutation')
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -35,12 +39,14 @@ export class PeerDeliveryService {
|
|||||||
|
|
||||||
const peerId = this.resolvePeerId(recipientId);
|
const peerId = this.resolvePeerId(recipientId);
|
||||||
|
|
||||||
if (!peerId) {
|
let sent = false;
|
||||||
return false;
|
|
||||||
|
if (peerId) {
|
||||||
|
this.webrtc.sendToPeer(peerId, event);
|
||||||
|
sent = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.webrtc.sendToPeer(peerId, event);
|
return this.sendViaSignaling(recipientId, event) || sent;
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
handleAck(recipientId: string, event: ChatEvent): boolean {
|
handleAck(recipientId: string, event: ChatEvent): boolean {
|
||||||
@@ -77,6 +83,48 @@ export class PeerDeliveryService {
|
|||||||
return candidates.find((candidate) => connectedPeerIds.has(candidate)) ?? null;
|
return candidates.find((candidate) => connectedPeerIds.has(candidate)) ?? null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private sendViaSignaling(recipientId: string, event: ChatEvent): boolean {
|
||||||
|
if (event.type !== 'direct-message' && event.type !== 'direct-message-status' && event.type !== 'direct-message-mutation') {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const targetPeerId = this.resolveSignalingPeerId(recipientId);
|
||||||
|
|
||||||
|
if (!targetPeerId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.webrtc.sendRawMessage({
|
||||||
|
...event,
|
||||||
|
targetUserId: targetPeerId
|
||||||
|
});
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveSignalingPeerId(recipientId: string): string | null {
|
||||||
|
return this.resolveCandidateIds(recipientId).find((candidate) => this.webrtc.hasSignalingRouteForPeer(candidate)) ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveCandidateIds(recipientId: string): string[] {
|
||||||
|
const user = this.users().find((candidate: User) =>
|
||||||
|
candidate.id === recipientId || candidate.oderId === recipientId || candidate.peerId === recipientId
|
||||||
|
);
|
||||||
|
|
||||||
|
return [
|
||||||
|
recipientId,
|
||||||
|
user?.oderId,
|
||||||
|
user?.peerId,
|
||||||
|
user?.id
|
||||||
|
].filter((candidate, index, candidates): candidate is string =>
|
||||||
|
!!candidate && candidates.indexOf(candidate) === index
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private isOfflineOverrideEnabled(): boolean {
|
private isOfflineOverrideEnabled(): boolean {
|
||||||
return typeof window !== 'undefined'
|
return typeof window !== 'undefined'
|
||||||
&& !!(window as Window & { metoyouDmNetworkOffline?: boolean }).metoyouDmNetworkOffline;
|
&& !!(window as Window & { metoyouDmNetworkOffline?: boolean }).metoyouDmNetworkOffline;
|
||||||
|
|||||||
@@ -157,6 +157,10 @@ The `/search` My Servers row and the server rail both read from the active user'
|
|||||||
|
|
||||||
Fallback stays temporary. If the authoritative endpoint is unavailable, the client can probe other active compatible endpoints as a last resort for the current session, but it does not rewrite the room's saved affinity to that fallback endpoint.
|
Fallback stays temporary. If the authoritative endpoint is unavailable, the client can probe other active compatible endpoints as a last resort for the current session, but it does not rewrite the room's saved affinity to that fallback endpoint.
|
||||||
|
|
||||||
|
Be careful around endpoint failure semantics. `ensureEndpointVersionCompatibility()` returns `false` for both incompatible versions and unreachable/offline endpoints. Only an endpoint with `status === 'incompatible'` should stop the fallback cascade with the update-required message. Cloudflare `521`/`522`, network timeouts, and WebSocket `1006` failures must continue to the next active compatible endpoint.
|
||||||
|
|
||||||
|
`ServerDirectoryApiService.getServer()` also returns `null` for both authoritative `SERVER_NOT_FOUND` and retryable endpoint failures. Callers that need recovery must search active endpoints before treating `null` as proof that a saved room is gone or its source is stale.
|
||||||
|
|
||||||
## Server-owned room metadata
|
## Server-owned room metadata
|
||||||
|
|
||||||
`ServerInfo` also carries the server-owned `channels` list for each room. Register and update calls persist this channel metadata on the server, and search or hydration responses return the normalised channel list so text and voice channel topology survives reloads, reconnects, and fresh joins.
|
`ServerInfo` also carries the server-owned `channels` list for each room. Register and update calls persist this channel metadata on the server, and search or hydration responses return the normalised channel list so text and voice channel topology survives reloads, reconnects, and fresh joins.
|
||||||
|
|||||||
@@ -107,6 +107,10 @@ export class ServerDirectoryApiService {
|
|||||||
return this.http.get<ServerInfo>(`${this.getApiBaseUrl(selector)}/servers/${serverId}`).pipe(
|
return this.http.get<ServerInfo>(`${this.getApiBaseUrl(selector)}/servers/${serverId}`).pipe(
|
||||||
map((server) => this.normalizeServerInfo(server, this.resolveEndpoint(selector))),
|
map((server) => this.normalizeServerInfo(server, this.resolveEndpoint(selector))),
|
||||||
catchError((error) => {
|
catchError((error) => {
|
||||||
|
// Warning: this API deliberately returns null for both authoritative
|
||||||
|
// SERVER_NOT_FOUND and retryable endpoint failures. Callers that need
|
||||||
|
// resilience must try other active endpoints before treating null as
|
||||||
|
// proof that a room no longer exists.
|
||||||
if (isServerNotFoundError(error)) {
|
if (isServerNotFoundError(error)) {
|
||||||
return of(null);
|
return of(null);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -115,18 +115,22 @@ graph TD
|
|||||||
|
|
||||||
## Signaling (WebSocket)
|
## Signaling (WebSocket)
|
||||||
|
|
||||||
The signaling layer's only job is getting two peers to exchange SDP offers/answers and ICE candidates so they can establish a direct WebRTC connection. Once the peer connection is up, signaling is only used for presence (user joined/left) and reconnection.
|
The signaling layer gets peers to exchange SDP offers/answers and ICE candidates so they can establish direct WebRTC connections. It also carries identity, room membership, presence, typing, and selected server-relayed fallback events when the peer data channel is unavailable.
|
||||||
|
|
||||||
Each signaling URL gets its own `SignalingManager` (one WebSocket each). `SignalingTransportHandler` picks the right socket based on which server the message is for. `ServerSignalingCoordinator` tracks which peers belong to which servers and which signaling URLs, so we know when it is safe to tear down a peer connection after leaving a server.
|
Each signaling URL gets its own `SignalingManager` (one WebSocket each). `SignalingTransportHandler` picks the right socket based on which server the message is for. `ServerSignalingCoordinator` tracks which peers belong to which servers and which signaling URLs, so we know when it is safe to tear down a peer connection after leaving a server.
|
||||||
|
|
||||||
Room affinity is authoritative at this layer as well. The renderer repairs each room's saved `sourceId` / `sourceUrl` from server-directory responses and routes `join_server`, `view_server`, and room-scoped signaling traffic to that room's signaling URL first. If that route fails, alternate endpoints can be tried temporarily, but server-scoped raw messages are no longer broadcast to every connected signaling manager when the route is unknown.
|
Room affinity is authoritative at this layer as well. The renderer repairs each room's saved `sourceId` / `sourceUrl` from server-directory responses and routes `join_server`, `view_server`, and room-scoped signaling traffic to that room's signaling URL first. If that route fails, alternate endpoints can be tried temporarily, but server-scoped raw messages are no longer broadcast to every connected signaling manager when the route is unknown.
|
||||||
|
|
||||||
|
Server-relayed fallbacks are intentionally narrow. Room chat (`chat_message`), direct-message events (`direct-message`, `direct-message-status`, `direct-message-mutation`), and voice presence (`voice_state`) may flow over signaling so users can still see written chat and voice roster state while P2P data channels are down. Media, attachments, message inventory sync, screen/camera state, and plugin data-channel traffic remain peer-plane responsibilities.
|
||||||
|
|
||||||
In UI/debug conversations, a **chat-server** means one of the saved rooms navigated from the server rail. Each chat-server has its own assigned signal server via `sourceId` / `sourceUrl`, and room-scoped feature/config checks must prefer that signal server before considering any global active endpoint. For example, KLIPY GIF picker visibility is resolved against the currently viewed chat-server's signal server so an unrelated offline chat-server does not hide the button everywhere.
|
In UI/debug conversations, a **chat-server** means one of the saved rooms navigated from the server rail. Each chat-server has its own assigned signal server via `sourceId` / `sourceUrl`, and room-scoped feature/config checks must prefer that signal server before considering any global active endpoint. For example, KLIPY GIF picker visibility is resolved against the currently viewed chat-server's signal server so an unrelated offline chat-server does not hide the button everywhere.
|
||||||
|
|
||||||
Cold-start routing now waits for the initial server-directory health probes so same-backend aliases can collapse to one canonical signaling endpoint before any saved rooms reconnect. When a room is reconnected on a chosen socket, its background rooms are re-joined on that same socket as well so stale per-signal memberships do not keep orphan managers alive, and reconnect replay only sends `view_server` for rooms that manager still has joined.
|
Cold-start routing now waits for the initial server-directory health probes so same-backend aliases can collapse to one canonical signaling endpoint before any saved rooms reconnect. When a room is reconnected on a chosen socket, its background rooms are re-joined on that same socket as well so stale per-signal memberships do not keep orphan managers alive, and reconnect replay only sends `view_server` for rooms that manager still has joined.
|
||||||
|
|
||||||
This is still a non-federated model. Different signaling servers do not share peer registries or relay WebRTC offers for each other, so users in the same room must converge on the same signaling endpoint to discover one another reliably.
|
This is still a non-federated model. Different signaling servers do not share peer registries or relay WebRTC offers for each other, so users in the same room must converge on the same signaling endpoint to discover one another reliably.
|
||||||
|
|
||||||
|
The fallback path is fragile by design: it only helps when a usable signaling socket exists. If a production origin returns Cloudflare `521`/`522` or the WebSocket closes with `1006`, room reconnect must continue to other active compatible endpoints instead of treating the room as missing or the client as incompatible.
|
||||||
|
|
||||||
```mermaid
|
```mermaid
|
||||||
sequenceDiagram
|
sequenceDiagram
|
||||||
participant UI as App
|
participant UI as App
|
||||||
|
|||||||
@@ -399,6 +399,7 @@ export class WebRTCService implements OnDestroy {
|
|||||||
*/
|
*/
|
||||||
broadcastMessage(event: ChatEvent): void {
|
broadcastMessage(event: ChatEvent): void {
|
||||||
this.peerMediaFacade.broadcastMessage(event);
|
this.peerMediaFacade.broadcastMessage(event);
|
||||||
|
this.relayBroadcastEvent(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -430,6 +431,12 @@ export class WebRTCService implements OnDestroy {
|
|||||||
return this.peerMediaFacade.getConnectedPeerIds();
|
return this.peerMediaFacade.getConnectedPeerIds();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hasSignalingRouteForPeer(peerId: string): boolean {
|
||||||
|
const signalUrl = this.signalingCoordinator.getPeerSignalUrl(peerId);
|
||||||
|
|
||||||
|
return !!signalUrl && this.signalingCoordinator.isSignalingConnectedTo(signalUrl);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the composite remote {@link MediaStream} for a connected peer.
|
* Get the composite remote {@link MediaStream} for a connected peer.
|
||||||
*
|
*
|
||||||
@@ -658,6 +665,26 @@ export class WebRTCService implements OnDestroy {
|
|||||||
this.peerMediaFacade.stopScreenShare();
|
this.peerMediaFacade.stopScreenShare();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private relayBroadcastEvent(event: ChatEvent): void {
|
||||||
|
if (event.type === 'chat-message' && event.message?.roomId) {
|
||||||
|
this.signalingTransportHandler.sendRawMessage({
|
||||||
|
type: 'chat_message',
|
||||||
|
serverId: event.message.roomId,
|
||||||
|
message: event.message
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === 'voice-state' && event.voiceState?.serverId) {
|
||||||
|
this.signalingTransportHandler.sendRawMessage({
|
||||||
|
...event,
|
||||||
|
type: 'voice_state',
|
||||||
|
serverId: event.voiceState.serverId
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Disconnect from the signaling server and clean up all state. */
|
/** Disconnect from the signaling server and clean up all state. */
|
||||||
disconnect(): void {
|
disconnect(): void {
|
||||||
this.leaveRoom();
|
this.leaveRoom();
|
||||||
|
|||||||
@@ -134,6 +134,14 @@ export class SignalingTransportHandler<TMessage> {
|
|||||||
const connectedManagers = this.getConnectedSignalingManagers();
|
const connectedManagers = this.getConnectedSignalingManagers();
|
||||||
|
|
||||||
if (connectedManagers.length === 0) {
|
if (connectedManagers.length === 0) {
|
||||||
|
if (messageType === 'status_update') {
|
||||||
|
this.dependencies.logger.warn('[signaling] Skipping status update without an active signaling connection', {
|
||||||
|
type: messageType
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.dependencies.logger.error('[signaling] No active signaling connection for outbound message', new Error('No signaling manager available'), {
|
this.dependencies.logger.error('[signaling] No active signaling connection for outbound message', new Error('No signaling manager available'), {
|
||||||
type: messageType
|
type: messageType
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -498,6 +498,49 @@ export class MessagesEffects {
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
incomingSignalingMessages$ = createEffect(() =>
|
||||||
|
this.webrtc.onSignalingMessage.pipe(
|
||||||
|
withLatestFrom(
|
||||||
|
this.store.select(selectCurrentUser),
|
||||||
|
this.store.select(selectCurrentRoom)
|
||||||
|
),
|
||||||
|
mergeMap(([
|
||||||
|
event,
|
||||||
|
currentUser,
|
||||||
|
currentRoom
|
||||||
|
]) => {
|
||||||
|
if (event.type !== 'chat_message') {
|
||||||
|
return EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ctx: IncomingMessageContext = {
|
||||||
|
db: this.db,
|
||||||
|
webrtc: this.webrtc,
|
||||||
|
attachments: this.attachments,
|
||||||
|
debugging: this.debugging,
|
||||||
|
currentUser: currentUser ?? null,
|
||||||
|
currentRoom
|
||||||
|
};
|
||||||
|
|
||||||
|
return dispatchIncomingMessage({
|
||||||
|
...event,
|
||||||
|
type: 'chat-message',
|
||||||
|
fromPeerId: event.fromUserId
|
||||||
|
}, ctx).pipe(
|
||||||
|
catchError((error) => {
|
||||||
|
reportDebuggingError(this.debugging, 'messages', 'Failed to process incoming signaling chat message', {
|
||||||
|
eventType: event.type,
|
||||||
|
fromPeerId: event.fromUserId ?? null,
|
||||||
|
roomId: event.serverId ?? null
|
||||||
|
}, error);
|
||||||
|
|
||||||
|
return EMPTY;
|
||||||
|
})
|
||||||
|
);
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
private trackBackgroundOperation(task: Promise<unknown> | unknown, message: string, payload: Record<string, unknown>): void {
|
private trackBackgroundOperation(task: Promise<unknown> | unknown, message: string, payload: Record<string, unknown>): void {
|
||||||
trackDebuggingTaskFailure(task, this.debugging, 'messages', message, payload);
|
trackDebuggingTaskFailure(task, this.debugging, 'messages', message, payload);
|
||||||
}
|
}
|
||||||
|
|||||||
161
toju-app/src/app/store/rooms/room-signaling-connection.spec.ts
Normal file
161
toju-app/src/app/store/rooms/room-signaling-connection.spec.ts
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
import { of } from 'rxjs';
|
||||||
|
import {
|
||||||
|
beforeEach,
|
||||||
|
describe,
|
||||||
|
expect,
|
||||||
|
it,
|
||||||
|
vi
|
||||||
|
} from 'vitest';
|
||||||
|
import type { Store } from '@ngrx/store';
|
||||||
|
import type { RealtimeSessionFacade } from '../../core/realtime';
|
||||||
|
import type { ServerDirectoryFacade } from '../../domains/server-directory';
|
||||||
|
import type { Room, User } from '../../shared-kernel';
|
||||||
|
import { RoomSignalingConnection } from './room-signaling-connection';
|
||||||
|
|
||||||
|
interface EndpointFixture {
|
||||||
|
id: string;
|
||||||
|
isActive: boolean;
|
||||||
|
name: string;
|
||||||
|
status: 'offline' | 'online';
|
||||||
|
url: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface SignalSourceFixture {
|
||||||
|
sourceId: string;
|
||||||
|
sourceName?: string;
|
||||||
|
sourceUrl: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface SignalSelectorFixture {
|
||||||
|
sourceId: string;
|
||||||
|
sourceUrl: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface FakeRealtimeSessionFacade {
|
||||||
|
connectToSignalingServer: ReturnType<typeof vi.fn>;
|
||||||
|
hasJoinedServer: ReturnType<typeof vi.fn>;
|
||||||
|
identify: ReturnType<typeof vi.fn>;
|
||||||
|
isSignalingConnectedTo: ReturnType<typeof vi.fn>;
|
||||||
|
joinRoom: ReturnType<typeof vi.fn>;
|
||||||
|
peerId: ReturnType<typeof vi.fn>;
|
||||||
|
setCurrentServer: ReturnType<typeof vi.fn>;
|
||||||
|
switchServer: ReturnType<typeof vi.fn>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface FakeServerDirectoryFacade {
|
||||||
|
awaitInitialServerHealthCheck: ReturnType<typeof vi.fn>;
|
||||||
|
buildRoomSignalSelector: ReturnType<typeof vi.fn>;
|
||||||
|
ensureEndpointVersionCompatibility: ReturnType<typeof vi.fn>;
|
||||||
|
findServerAcrossActiveEndpoints: ReturnType<typeof vi.fn>;
|
||||||
|
getFallbackRoomEndpoints: ReturnType<typeof vi.fn>;
|
||||||
|
getServer: ReturnType<typeof vi.fn>;
|
||||||
|
getWebSocketUrl: ReturnType<typeof vi.fn>;
|
||||||
|
normaliseRoomSignalSource: ReturnType<typeof vi.fn>;
|
||||||
|
resolveRoomEndpoint: ReturnType<typeof vi.fn>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface FakeStore {
|
||||||
|
dispatch: ReturnType<typeof vi.fn>;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('RoomSignalingConnection', () => {
|
||||||
|
const room: Room = {
|
||||||
|
id: 'room-1',
|
||||||
|
name: 'Room One',
|
||||||
|
description: '',
|
||||||
|
hostId: 'user-1',
|
||||||
|
createdAt: 1,
|
||||||
|
userCount: 1,
|
||||||
|
sourceId: 'primary',
|
||||||
|
sourceName: 'Primary',
|
||||||
|
sourceUrl: 'https://signal.toju.app'
|
||||||
|
};
|
||||||
|
const user: User = {
|
||||||
|
id: 'user-1',
|
||||||
|
oderId: 'peer-a',
|
||||||
|
username: 'maomao',
|
||||||
|
displayName: 'maomao',
|
||||||
|
status: 'online',
|
||||||
|
createdAt: 1
|
||||||
|
};
|
||||||
|
|
||||||
|
let webrtc: FakeRealtimeSessionFacade;
|
||||||
|
let serverDirectory: FakeServerDirectoryFacade;
|
||||||
|
let store: FakeStore;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
const endpoints = new Map<string, EndpointFixture>([
|
||||||
|
[
|
||||||
|
'primary',
|
||||||
|
{
|
||||||
|
id: 'primary',
|
||||||
|
name: 'Primary',
|
||||||
|
url: 'https://signal.toju.app',
|
||||||
|
isActive: true,
|
||||||
|
status: 'offline'
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
'fallback',
|
||||||
|
{
|
||||||
|
id: 'fallback',
|
||||||
|
name: 'Sweden',
|
||||||
|
url: 'https://signal-sweden.toju.app',
|
||||||
|
isActive: true,
|
||||||
|
status: 'online'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
]);
|
||||||
|
|
||||||
|
webrtc = {
|
||||||
|
connectToSignalingServer: vi.fn((url: string) => of(url === 'wss://signal-sweden.toju.app')),
|
||||||
|
hasJoinedServer: vi.fn(() => false),
|
||||||
|
identify: vi.fn(),
|
||||||
|
isSignalingConnectedTo: vi.fn(() => false),
|
||||||
|
joinRoom: vi.fn(),
|
||||||
|
peerId: vi.fn(() => 'peer-a'),
|
||||||
|
setCurrentServer: vi.fn(),
|
||||||
|
switchServer: vi.fn()
|
||||||
|
};
|
||||||
|
|
||||||
|
serverDirectory = {
|
||||||
|
awaitInitialServerHealthCheck: vi.fn(() => Promise.resolve()),
|
||||||
|
buildRoomSignalSelector: vi.fn((source: SignalSourceFixture) => ({
|
||||||
|
sourceId: source.sourceId,
|
||||||
|
sourceUrl: source.sourceUrl
|
||||||
|
})),
|
||||||
|
ensureEndpointVersionCompatibility: vi.fn((selector: SignalSelectorFixture) =>
|
||||||
|
Promise.resolve(selector.sourceId === 'fallback')
|
||||||
|
),
|
||||||
|
findServerAcrossActiveEndpoints: vi.fn(() => of(null)),
|
||||||
|
getFallbackRoomEndpoints: vi.fn(() => [endpoints.get('fallback')]),
|
||||||
|
getServer: vi.fn(() => of(null)),
|
||||||
|
getWebSocketUrl: vi.fn((selector: SignalSelectorFixture) => selector.sourceUrl.replace(/^http/, 'ws')),
|
||||||
|
normaliseRoomSignalSource: vi.fn((source: SignalSourceFixture) => ({
|
||||||
|
sourceId: source.sourceId,
|
||||||
|
sourceName: source.sourceName,
|
||||||
|
sourceUrl: source.sourceUrl
|
||||||
|
})),
|
||||||
|
resolveRoomEndpoint: vi.fn((source: SignalSourceFixture) => endpoints.get(source.sourceId) ?? null)
|
||||||
|
};
|
||||||
|
|
||||||
|
store = {
|
||||||
|
dispatch: vi.fn()
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
it('tries fallback endpoints when the primary endpoint is offline', async () => {
|
||||||
|
const connection = new RoomSignalingConnection(
|
||||||
|
webrtc as unknown as RealtimeSessionFacade,
|
||||||
|
serverDirectory as unknown as ServerDirectoryFacade,
|
||||||
|
store as unknown as Store
|
||||||
|
);
|
||||||
|
|
||||||
|
connection.beginRoomNavigation(room.id);
|
||||||
|
await connection.connectToRoomSignaling(room, user, user.oderId, [room]);
|
||||||
|
|
||||||
|
expect(serverDirectory.ensureEndpointVersionCompatibility).toHaveBeenCalledTimes(2);
|
||||||
|
expect(webrtc.connectToSignalingServer).toHaveBeenCalledWith('wss://signal-sweden.toju.app');
|
||||||
|
expect(webrtc.joinRoom).toHaveBeenCalledWith(room.id, user.oderId, 'wss://signal-sweden.toju.app');
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -135,7 +135,13 @@ export class RoomSignalingConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!isCompatible) {
|
if (!isCompatible) {
|
||||||
if (candidate.isPrimary) {
|
// Warning: offline/unreachable endpoints also fail this check. Only
|
||||||
|
// version-incompatible primary endpoints should stop fallback; transient
|
||||||
|
// 521/522/network failures must continue to the next active endpoint.
|
||||||
|
const endpoint = this.serverDirectory.resolveRoomEndpoint(candidate.source);
|
||||||
|
const isEndpointIncompatible = endpoint?.status === 'incompatible';
|
||||||
|
|
||||||
|
if (candidate.isPrimary && isEndpointIncompatible) {
|
||||||
if (shouldShowCompatibilityError) {
|
if (shouldShowCompatibilityError) {
|
||||||
this.store.dispatch(
|
this.store.dispatch(
|
||||||
RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE })
|
RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE })
|
||||||
@@ -297,6 +303,10 @@ export class RoomSignalingConnection {
|
|||||||
|
|
||||||
if (!this.webrtc.hasJoinedServer(room.id)) {
|
if (!this.webrtc.hasJoinedServer(room.id)) {
|
||||||
const selector = this.resolveRoomSignalSelector(primarySource, resolvedRoom.name);
|
const selector = this.resolveRoomSignalSelector(primarySource, resolvedRoom.name);
|
||||||
|
// Warning: getServer returns null for both SERVER_NOT_FOUND and transient
|
||||||
|
// endpoint failures. Always search active endpoints before deciding the
|
||||||
|
// saved room source is stale, otherwise a Cloudflare/origin outage pins
|
||||||
|
// reconnects to the dead endpoint.
|
||||||
const authoritativeServer = (
|
const authoritativeServer = (
|
||||||
selector
|
selector
|
||||||
? await firstValueFrom(this.serverDirectory.getServer(room.id, selector))
|
? await firstValueFrom(this.serverDirectory.getServer(room.id, selector))
|
||||||
|
|||||||
@@ -95,12 +95,18 @@ export class RoomStateSyncEffects {
|
|||||||
/** Handles WebRTC signaling events for user presence (join, leave, server_users). */
|
/** Handles WebRTC signaling events for user presence (join, leave, server_users). */
|
||||||
signalingMessages$ = createEffect(() =>
|
signalingMessages$ = createEffect(() =>
|
||||||
this.webrtc.onSignalingMessage.pipe(
|
this.webrtc.onSignalingMessage.pipe(
|
||||||
withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom), this.store.select(selectSavedRooms)),
|
withLatestFrom(
|
||||||
|
this.store.select(selectCurrentUser),
|
||||||
|
this.store.select(selectCurrentRoom),
|
||||||
|
this.store.select(selectSavedRooms),
|
||||||
|
this.store.select(selectAllUsers)
|
||||||
|
),
|
||||||
mergeMap(([
|
mergeMap(([
|
||||||
message,
|
message,
|
||||||
currentUser,
|
currentUser,
|
||||||
currentRoom,
|
currentRoom,
|
||||||
savedRooms
|
savedRooms,
|
||||||
|
allUsers
|
||||||
]) => {
|
]) => {
|
||||||
const signalingMessage: RoomPresenceSignalingMessage = message;
|
const signalingMessage: RoomPresenceSignalingMessage = message;
|
||||||
const myId = currentUser?.oderId || currentUser?.id;
|
const myId = currentUser?.oderId || currentUser?.id;
|
||||||
@@ -226,6 +232,16 @@ export class RoomStateSyncEffects {
|
|||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case 'voice_state': {
|
||||||
|
const voiceEvent = {
|
||||||
|
...signalingMessage,
|
||||||
|
type: 'voice-state',
|
||||||
|
fromPeerId: signalingMessage.oderId ?? signalingMessage.fromUserId
|
||||||
|
} as ChatEvent;
|
||||||
|
|
||||||
|
return this.handleVoiceOrScreenState(voiceEvent, allUsers, currentUser ?? null, 'voice');
|
||||||
|
}
|
||||||
|
|
||||||
case 'access_denied': {
|
case 'access_denied': {
|
||||||
if (isWrongServer(signalingMessage.serverId, viewedServerId))
|
if (isWrongServer(signalingMessage.serverId, viewedServerId))
|
||||||
return EMPTY;
|
return EMPTY;
|
||||||
|
|||||||
Reference in New Issue
Block a user