From d17453627279069f2a1ea5e6261c8e52f861b299 Mon Sep 17 00:00:00 2001 From: Myx Date: Thu, 11 Jun 2026 00:04:49 +0200 Subject: [PATCH] fix: Chats doesn't sync for multi client users --- AGENTS.md | 2 + agents-docs/BUG_TRACKER.md | 90 +++++++++ agents-docs/LESSONS.md | 7 + agents-docs/features/authentication.md | 6 +- e2e/helpers/multi-device-session.ts | 101 +++++++++- e2e/tests/chat/multi-client-chat-sync.spec.ts | 176 ++++++++++++++++++ server/CONTEXT.md | 1 + server/src/websocket/handler-ordering.spec.ts | 146 +++++++++++++++ server/src/websocket/handler.ts | 27 ++- .../src/app/infrastructure/realtime/README.md | 4 +- .../account-sync-chat.helper.spec.ts | 54 ++++++ .../account-sync/account-sync-chat.helper.ts | 27 +++ .../account-sync/account-sync.effects.ts | 7 + .../account-sync/account-sync.rules.spec.ts | 6 +- .../account-sync/account-sync.rules.ts | 9 +- .../realtime/realtime-session.service.ts | 15 +- 16 files changed, 662 insertions(+), 16 deletions(-) create mode 100644 agents-docs/BUG_TRACKER.md create mode 100644 e2e/tests/chat/multi-client-chat-sync.spec.ts create mode 100644 server/src/websocket/handler-ordering.spec.ts create mode 100644 toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.spec.ts create mode 100644 toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.ts diff --git a/AGENTS.md b/AGENTS.md index 520b240..12e639d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -13,6 +13,7 @@ Reference on-demand (when the workflow triggers them — see `agents-docs/AGENT_ - `agents-docs/AGENTS_CONTEXT.md` — contract for updating `CONTEXT.md` / `CONTEXT-MAP.md` - `agents-docs/AGENTS_ADRS.md` — contract for writing architecture decision records +- `agents-docs/BUG_TRACKER.md` — Obsidian bug inbox location, allowed vault edits, and triage workflow When working in a subdomain, also read its `CONTEXT.md` first: @@ -74,6 +75,7 @@ The product client already maintains per-domain READMEs under `toju-app/src/app/ - **Feature docs:** `agents-docs/features/` - **Architecture decisions:** `agents-docs/adr/` - **Context map:** `agents-docs/CONTEXT-MAP.md` +- **Obsidian bug tracker:** `agents-docs/BUG_TRACKER.md` - **Product-client domain:** `toju-app/CONTEXT.md` - **Desktop-shell domain:** `electron/CONTEXT.md` - **Server domain:** `server/CONTEXT.md` diff --git a/agents-docs/BUG_TRACKER.md b/agents-docs/BUG_TRACKER.md new file mode 100644 index 0000000..cff9e26 --- /dev/null +++ b/agents-docs/BUG_TRACKER.md @@ -0,0 +1,90 @@ +# Obsidian Bug Tracker — Agent Contract + +User-maintained bug reports live outside the repo. Read this file when asked to triage, investigate, or work from the bug backlog. + +**Overrides** `agents-docs/AGENT_WORKFLOW.md` §8 (Autonomous Bug Fixing) unless the user explicitly asks you to fix a bug in code. + +--- + +## Location + +| Item | Path | +|------|------| +| Bug inbox | `/home/ludde/Nextcloud/Obsidian Vault/Log/Bugs/` | +| Attachments | `…/Bugs/attachments//` | +| Dashboard | `/home/ludde/Nextcloud/Obsidian Vault/Log/Create bug.md` | +| Template | `/home/ludde/Nextcloud/Obsidian Vault/Log/Templates/Bug Report.md` | + +--- + +## Allowed actions on vault files + +Unless the user explicitly asks for more: + +1. **Change `status`** in a bug note's YAML frontmatter (`Open` → `Resolved` or `Closed`). +2. **Move files** (e.g. reorganize notes or attachments when instructed). + +Do **not** edit other vault fields or sections (`Investigation`, `Resolution`, description, etc.) unless the user asks. + +--- + +## Allowed reads (unrestricted) + +To understand and solve bugs you may read freely: + +- All bug notes and attachments under `Log/Bugs/` +- The full MetoYou repo (code, tests, logs, docs) +- Runtime output, test results, and debug artifacts + +Investigation findings belong in chat or in repo changes — not in the vault — unless the user asks you to update the note. + +--- + +## Bug note format + +Each note is Markdown with YAML frontmatter: + +```yaml +--- +title: Bug - … +type: bug +status: Open # Open | Resolved | Closed +priority: Low | Medium | High | Critical +severity: Low | Medium | High | Critical +environment: … +created: YYYY-MM-DD HH:mm +tags: [bug] +--- +``` + +Body sections: **Description**, **Steps to Reproduce**, **Expected Result**, **Actual Result**, **Logs / Screenshots**, **Investigation**, **Resolution**. + +The dashboard (`Create bug.md`) uses Dataview; keep `type: bug` and `status` accurate so counts stay correct. + +--- + +## Workflow + +1. List open bugs: `Glob` or `ls` on `…/Log/Bugs/*.md`, filter `status: Open`. +2. Read the note and any linked attachments. +3. Investigate in the repo (read-only toward the vault). +4. Report findings to the user. +5. Only when told to fix: implement in repo (TDD, lint, build per `AGENTS.md`). +6. When a bug is done: update vault `status` to `Resolved` or `Closed` (and move files if the user specifies a convention). + +--- + +## Open bugs (snapshot 2026-06-10) + +| Title | Priority | Environment | +|-------|----------|-------------| +| Attachments gets syncronized corrupt | Critical | All major clients | +| Chats doesn't sync for multi client users | High | All | +| No android app icon | High | Android | +| No login screen mobile phone on startup | High | Android, Android Browser | +| Fresh users have the server list in dashboard completely empty until anything searched | High | — | +| Video attachment on android gets sent in the message bubble above with no preview image | High | Android | +| Local files should be remembered by client | High | — | +| Emojis should be user bound not client bound | Medium | All | + +Re-scan the folder at session start; this table is not auto-updated. diff --git a/agents-docs/LESSONS.md b/agents-docs/LESSONS.md index fe0ef46..8a2cba5 100644 --- a/agents-docs/LESSONS.md +++ b/agents-docs/LESSONS.md @@ -25,6 +25,13 @@ Durable rules for AI agents working on this project. Read this file at session s ## Lessons +### Don't bump E2E timeouts for sync flakes - gate on presence and read server logs [testing] [realtime] + +- **Trigger:** a multi-client chat-sync E2E flaked on "message not visible" and the first instinct was to raise `toBeVisible` timeouts or add waits; the user correctly rejected this ("it's not a timeout issue"). +- **Rule:** when a cross-user E2E assertion flakes, first gate the assertion on an observable precondition (peer visible in the members panel), then diff the signaling-server logs of a passing vs failing run (`joined server`, `user_joined`, `user_left`, `Removing dead connection`) before touching any timeout. +- **Why:** the flake was a server race — `identify` + `join_server` arriving in one TCP segment were processed concurrently, the join was dropped as unauthenticated, and room membership silently vanished; no timeout can fix a message that is never broadcast. Fixed by serializing per-connection message handling in `server/src/websocket/handler.ts`. +- **Example:** failing run showed one `joined server` for Ludde then `user_left` on sibling-client close; passing run showed two. `expectServerPeerVisible(page, displayName)` in `e2e/helpers/multi-device-session.ts` is the presence gate. + ### When renaming an Angular route, sweep every navigate/url-match/doc reference [routing] - **Trigger:** the find-servers route was renamed `/search` → `/servers` in `app.routes.ts`, but `servers-rail.component.ts` still called `router.navigate(['/search'])` (leave-server) and matched `startsWith('/search')` for the user-bar visibility signal, throwing `NG04002: 'search'` on leave and never showing the user-bar on the discovery page. diff --git a/agents-docs/features/authentication.md b/agents-docs/features/authentication.md index 51eb381..69b3fa8 100644 --- a/agents-docs/features/authentication.md +++ b/agents-docs/features/authentication.md @@ -54,6 +54,7 @@ Require `Authorization: Bearer`: - `oderId` must match the token's user id when provided. - `clientInstanceId` is a stable per-tab UUID generated by the product client (`metoyou.clientInstanceId` in `sessionStorage`). The signaling server uses it to distinguish multiple WebSocket connections for the same user and to route voice ownership. - Server responds with `auth_error` or `auth_required` when authentication fails. +- **Per-connection message ordering (invariant):** the server processes WebSocket messages for one connection strictly in arrival order (`handleWebSocketMessage` chains them per connection id). `identify` awaits a DB token lookup, and clients send `identify` + `join_server` back-to-back (often one TCP segment); concurrent handling let the join run mid-identify, get rejected as unauthenticated, and silently drop room membership — that connection then missed all `user_joined` / `chat_message` broadcasts (root cause of "chats don't sync for multi-client users"). ## Multi-device sessions @@ -68,7 +69,8 @@ When the same account is logged in on multiple devices, account-owned data is ke | Data | Mechanism | |---|---| -| Server chat messages | Existing `chat_message` relay (connection-scoped broadcast) | +| Server chat messages (live) | `chat_message` signaling relay (connection-scoped broadcast) **plus** `account_sync` `chat-message` / `message-revision` to sibling devices | +| Server chat messages (catch-up) | `account_sync` `chat-sync-batch` pushed when a sibling device comes online (`account_sync_peer_online`) | | Voice / typing | Existing `voice_state` / `user_typing` relays | | Saved servers (join/leave) | `account_sync` payload `saved-room-sync` / `saved-room-remove` | | Profile avatar + card text | `account_sync` `user-avatar-full` + `user-avatar-chunk` | @@ -81,7 +83,7 @@ Client rules: - `broadcastMessage()` still fans out over peer data channels; relayable events are **also** wrapped in `account_sync` and sent on the WebSocket. - The server forwards `account_sync` to every other open connection for the same `oderId` via `notifyOtherConnectionsForOderId`. - Receivers ignore payloads whose `clientInstanceId` matches the local tab id. -- When a new device identifies, the server notifies existing connections with `account_sync_peer_online`; those devices push a full snapshot (saved rooms, friends, profile, emoji library). +- When a new device identifies, the server notifies existing connections with `account_sync_peer_online`; those devices push a full snapshot (saved rooms, **room message history**, friends, profile, emoji library). WebSocket envelope: diff --git a/e2e/helpers/multi-device-session.ts b/e2e/helpers/multi-device-session.ts index 82eb9a2..a09db2a 100644 --- a/e2e/helpers/multi-device-session.ts +++ b/e2e/helpers/multi-device-session.ts @@ -114,10 +114,84 @@ export async function expectCrossDeviceMessage( ): Promise { await sender.sendMessage(message); - await expect.poll(async () => { - return await receiver.getMessageItemByText(message).isVisible() - .catch(() => false); - }, { timeout }).toBe(true); + await expectSyncedMessage(receiver, message, timeout); +} + +/** Waits until a message sent elsewhere appears in the local chat history. */ +export async function expectSyncedMessage( + receiver: ChatMessagesPage, + message: string, + timeout = 90_000 +): Promise { + await receiver.waitForReady(); + + await expect(receiver.getMessageItemByText(message)).toBeVisible({ timeout }); +} + +export async function expectSyncedMessageWithResync( + page: Page, + receiver: ChatMessagesPage, + message: string, + timeout = 60_000 +): Promise { + await receiver.waitForReady(); + + const alreadyVisible = await receiver.getMessageItemByText(message) + .isVisible() + .catch(() => false); + + if (!alreadyVisible) { + await resyncChannelMessages(page); + } + + await expect(receiver.getMessageItemByText(message)).toBeVisible({ timeout }); +} + +export async function resyncChannelMessages(page: Page, channelName = 'general'): Promise { + const channel = page.locator(`button[data-channel-type="text"][data-channel-name="${channelName}"]`).first(); + + await expect(channel).toBeVisible({ timeout: 10_000 }); + await channel.click({ button: 'right' }); + await page.getByRole('button', { name: 'Resync Messages' }).click(); +} + +export async function closeClient(client: Client): Promise { + await client.context.close(); +} + +export async function registerGuestAndJoinServer( + page: Page, + credentials: MultiDeviceCredentials, + serverName: string +): Promise { + const registerPage = new RegisterPage(page); + + await registerPage.goto(); + await registerPage.register(credentials.username, credentials.displayName, credentials.password); + await expect(page).toHaveURL(/\/dashboard/, { timeout: 15_000 }); + + const search = new ServerSearchPage(page); + + await search.joinServerFromSearch(serverName); + await expect(page).toHaveURL(/\/room\//, { timeout: 20_000 }); + await expect(page.locator('app-rooms-side-panel').first()).toBeVisible({ timeout: 20_000 }); +} + +export async function reopenClientInServer( + createClient: () => Promise, + credentials: MultiDeviceCredentials, + serverName: string +): Promise<{ client: Client; messages: ChatMessagesPage }> { + const client = await createClient(); + + await warmClientPage(client.page); + await loginSecondDeviceIntoServer(client.page, credentials, serverName); + + const messages = new ChatMessagesPage(client.page); + + await messages.waitForReady(); + + return { client, messages }; } async function warmClientPage(page: Page): Promise { @@ -181,6 +255,25 @@ export function membersSidePanel(page: Page) { return page.locator('app-rooms-side-panel').last(); } +export function serverMemberRow(page: Page, displayName: string) { + return membersSidePanel(page) + .locator('[role="button"], button') + .filter({ has: page.getByText(displayName, { exact: true }) }) + .first(); +} + +/** + * Gates cross-user assertions on real presence: the peer must show up in the + * members panel before chat delivery between the two users can be expected. + */ +export async function expectServerPeerVisible( + page: Page, + displayName: string, + timeout = 45_000 +): Promise { + await expect(serverMemberRow(page, displayName)).toBeVisible({ timeout }); +} + export function passiveVoiceChannelJoinBadge(page: Page, channelName = MULTI_DEVICE_VOICE_CHANNEL) { return page .locator(`button[data-channel-type="voice"][data-channel-name="${channelName}"]`) diff --git a/e2e/tests/chat/multi-client-chat-sync.spec.ts b/e2e/tests/chat/multi-client-chat-sync.spec.ts new file mode 100644 index 0000000..2ccb6e7 --- /dev/null +++ b/e2e/tests/chat/multi-client-chat-sync.spec.ts @@ -0,0 +1,176 @@ +import { test, expect } from '../../fixtures/multi-client'; +import { RegisterPage } from '../../pages/register.page'; +import { ServerSearchPage } from '../../pages/server-search.page'; +import { ChatMessagesPage } from '../../pages/chat-messages.page'; +import { + MULTI_DEVICE_PASSWORD, + closeClient, + expectCrossDeviceMessage, + expectSyncedMessage, + expectSyncedMessageWithResync, + expectServerPeerVisible, + loginSecondDeviceIntoServer, + reopenClientInServer, + uniqueMultiDeviceName +} from '../../helpers/multi-device-session'; + +test.describe('Multi-client chat sync', () => { + test.describe.configure({ timeout: 360_000, retries: 1 }); + + test('syncs messages between same-user devices and late-joining users after offline gaps', async ({ createClient }) => { + const suffix = uniqueMultiDeviceName('multi-chat-sync'); + const hostCredentials = { + username: `ludde_${suffix}`, + displayName: 'Ludde', + password: MULTI_DEVICE_PASSWORD + }; + const guestCredentials = { + username: `azaaxin_${suffix}`, + displayName: 'Azaaxin', + password: MULTI_DEVICE_PASSWORD + }; + const serverName = `Multi Client Chat Sync ${suffix}`; + const sharedBaselineMessage = `Shared baseline ${suffix}`; + const soloHostMessage = `Solo host message ${suffix}`; + const liveGuestProbeMessage = `Live guest probe ${suffix}`; + const offlineGapMessage = `Offline gap message ${suffix}`; + const client1 = await createClient(); + const client2 = await createClient(); + const client3 = await createClient(); + + await test.step('client 1: host registers and creates the shared server', async () => { + const registerPage = new RegisterPage(client1.page); + + await registerPage.goto(); + await registerPage.register( + hostCredentials.username, + hostCredentials.displayName, + hostCredentials.password + ); + + await expect(client1.page).toHaveURL(/\/dashboard/, { timeout: 15_000 }); + + const search = new ServerSearchPage(client1.page); + + await search.createServer(serverName, { + description: 'Multi-client chat sync regression coverage' + }); + + await expect(client1.page).toHaveURL(/\/room\//, { timeout: 15_000 }); + }); + + const messages1 = new ChatMessagesPage(client1.page); + + await messages1.waitForReady(); + + await test.step('client 2: second host device joins the same server', async () => { + await loginSecondDeviceIntoServer(client2.page, hostCredentials, serverName); + }); + + const messages2 = new ChatMessagesPage(client2.page); + + await messages2.waitForReady(); + + await test.step('both host devices exchange chat while online together', async () => { + await expectCrossDeviceMessage(messages1, messages2, sharedBaselineMessage); + }); + + await test.step('close the second host browser (client 2)', async () => { + await closeClient(client2); + }); + + await test.step('client 1 sends chat while the second host device is offline', async () => { + await client1.page.bringToFront(); + await messages1.sendMessage(soloHostMessage); + await expect(messages1.getMessageItemByText(soloHostMessage)).toBeVisible({ timeout: 20_000 }); + }); + + await test.step('guest account registers ahead of joining the server', async () => { + const registerPage = new RegisterPage(client3.page); + + await registerPage.goto(); + await registerPage.register( + guestCredentials.username, + guestCredentials.displayName, + guestCredentials.password + ); + + await expect(client3.page).toHaveURL(/\/dashboard/, { timeout: 15_000 }); + }); + + let messages3 = new ChatMessagesPage(client3.page); + + await test.step('client 3: guest joins and receives existing chat history', async () => { + // Keep the host tab active so its websocket + peer negotiation stay alive. + await client1.page.bringToFront(); + await messages1.waitForReady(); + + const search = new ServerSearchPage(client3.page); + + await search.joinServerFromSearch(serverName); + await expect(client3.page).toHaveURL(/\/room\//, { timeout: 20_000 }); + + messages3 = new ChatMessagesPage(client3.page); + await messages3.waitForReady(); + + // Presence gate: both users must see each other in the members panel + // before cross-user chat delivery can be expected. + await client1.page.bringToFront(); + await expectServerPeerVisible(client1.page, guestCredentials.displayName); + await client3.page.bringToFront(); + await expectServerPeerVisible(client3.page, hostCredentials.displayName); + + // Live delivery first - proves host <-> guest transport is actually up. + await expectCrossDeviceMessage(messages1, messages3, liveGuestProbeMessage); + + // History only replicates over P2P inventory once the peer link exists. + await client1.page.bringToFront(); + await expectSyncedMessageWithResync(client3.page, messages3, sharedBaselineMessage); + await expectSyncedMessageWithResync(client3.page, messages3, soloHostMessage); + }); + + await test.step('close the guest browser (client 3)', async () => { + await closeClient(client3); + }); + + await test.step('reopen client 2 and send a message while client 1 stays online', async () => { + await client1.page.bringToFront(); + const reopened = await reopenClientInServer(createClient, hostCredentials, serverName); + + // Same-user catch-up uses account_sync, not P2P between own devices. + await expectSyncedMessageWithResync( + reopened.client.page, + reopened.messages, + soloHostMessage + ); + + await reopened.messages.sendMessage(offlineGapMessage); + await expect(reopened.messages.getMessageItemByText(offlineGapMessage)).toBeVisible({ timeout: 20_000 }); + }); + + await test.step('reopened guest client receives the offline-gap message from host device 2', async () => { + await client1.page.bringToFront(); + await messages1.waitForReady(); + + const reopenedGuest = await reopenClientInServer(createClient, guestCredentials, serverName); + + // Presence gate before relying on cross-user delivery again. + await client1.page.bringToFront(); + await expectServerPeerVisible(client1.page, guestCredentials.displayName); + await reopenedGuest.client.page.bringToFront(); + await expectServerPeerVisible(reopenedGuest.client.page, hostCredentials.displayName); + + await expectCrossDeviceMessage(messages1, reopenedGuest.messages, `Guest wake ${suffix}`); + + await expectSyncedMessageWithResync( + reopenedGuest.client.page, + reopenedGuest.messages, + offlineGapMessage + ); + }); + + await test.step('primary host device still receives the message from its second device', async () => { + await expectSyncedMessage(messages1, offlineGapMessage); + }); + }); +}); diff --git a/server/CONTEXT.md b/server/CONTEXT.md index 761a2b2..e4b0b8e 100644 --- a/server/CONTEXT.md +++ b/server/CONTEXT.md @@ -48,6 +48,7 @@ Owns the shared, internet-reachable runtime: HTTP routes for server directory / - Every database schema change ships as a TypeORM **migration**; the live database is never mutated outside the migration system. - WebSocket **Envelope** types are defined once in `src/websocket/types.ts` and **must** stay structurally compatible with `toju-app/src/app/shared-kernel/signaling-contracts.ts` — drift between the two is a wire-protocol break. +- WebSocket messages from a single connection are processed **strictly in arrival order** (`handleWebSocketMessage` chains them per connection id). Concurrent handling lets a `join_server` overtake a still-awaiting `identify` and silently drop room membership. - User-supplied URLs are **never** fetched without going through `ssrf-guard.ts`. - Secrets (klipy API key, OAuth tokens, signing keys) live in `data/variables.json` or environment variables — never in code, never in logs. diff --git a/server/src/websocket/handler-ordering.spec.ts b/server/src/websocket/handler-ordering.spec.ts new file mode 100644 index 0000000..93374e2 --- /dev/null +++ b/server/src/websocket/handler-ordering.spec.ts @@ -0,0 +1,146 @@ +import { + describe, + it, + expect, + beforeEach, + vi +} from 'vitest'; +import { WebSocket } from 'ws'; +import { connectedUsers } from './state'; +import { ConnectedUser } from './types'; +import { handleWebSocketMessage } from './handler'; + +vi.mock('../services/server-access.service', () => ({ + authorizeWebSocketJoin: vi.fn(async () => ({ allowed: true as const })), + findServerMembership: vi.fn(async () => ({ id: 'membership-1' })), + usersShareServerMembership: vi.fn(async () => false) +})); + +vi.mock('../services/session-auth.service', () => ({ + // Resolve on a macrotask so an unserialized handler would interleave the + // next message before identify completes - mirrors a real DB lookup. + consumeSessionToken: vi.fn(async (token: string) => { + await new Promise((resolve) => setTimeout(resolve, 0)); + + if (token !== 'valid-token') { + return null; + } + + return { + token, + user: { + id: 'user-1', + username: 'alice', + displayName: 'Alice', + passwordHash: 'hash', + createdAt: Date.now() + }, + issuedAt: Date.now(), + expiresAt: Date.now() + 60_000 + }; + }) +})); + +vi.mock('../services/plugin-support.service', () => ({ + getPluginRequirementsSnapshot: vi.fn(async () => ({ plugins: [] })), + PluginSupportError: class PluginSupportError extends Error { + code = 'TEST'; + }, + validatePluginEventEnvelope: vi.fn(async () => undefined) +})); + +function createMockWs(): WebSocket & { sentMessages: string[] } { + const sent: string[] = []; + const ws = { + readyState: WebSocket.OPEN, + send: (data: string) => { sent.push(data); }, + close: () => {}, + sentMessages: sent + } as unknown as WebSocket & { sentMessages: string[] }; + + return ws; +} + +function createConnectedUser(connectionId: string): ConnectedUser { + const ws = createMockWs(); + const user: ConnectedUser = { + oderId: connectionId, + ws, + authenticated: false, + serverIds: new Set(), + displayName: 'Test User', + lastPong: Date.now() + }; + + connectedUsers.set(connectionId, user); + + return user; +} + +function getSentMessages(user: ConnectedUser | undefined): { type: string }[] { + const sentMessages = (user?.ws as WebSocket & { sentMessages: string[] }).sentMessages; + + return sentMessages.map((raw) => JSON.parse(raw) as { type: string }); +} + +describe('server websocket handler - per-connection message ordering', () => { + beforeEach(() => { + connectedUsers.clear(); + vi.clearAllMocks(); + }); + + it('processes join_server after a still-running identify instead of dropping it', async () => { + createConnectedUser('conn-1'); + + // Both messages arrive in the same tick (one TCP segment); the handler + // must not evaluate join_server while identify is still awaiting auth. + const identifyPromise = handleWebSocketMessage('conn-1', { + type: 'identify', + token: 'valid-token', + oderId: 'user-1', + displayName: 'Alice' + }); + const joinPromise = handleWebSocketMessage('conn-1', { + type: 'join_server', + serverId: 'server-1' + }); + + await Promise.all([identifyPromise, joinPromise]); + + const user = connectedUsers.get('conn-1'); + + expect(user?.authenticated).toBe(true); + expect(user?.serverIds.has('server-1')).toBe(true); + + const authRequired = getSentMessages(user).find((message) => message.type === 'auth_required'); + + expect(authRequired).toBeUndefined(); + }); + + it('keeps processing queued messages after a handler error', async () => { + createConnectedUser('conn-1'); + + const badIdentify = handleWebSocketMessage('conn-1', { + type: 'identify', + token: 'valid-token', + oderId: 'user-1', + displayName: 'Alice' + }); + // Unknown types are logged and ignored - must not wedge the queue. + const unknown = handleWebSocketMessage('conn-1', { type: 'not-a-real-type' }); + const join = handleWebSocketMessage('conn-1', { + type: 'join_server', + serverId: 'server-1' + }); + + await Promise.all([ + badIdentify, + unknown, + join + ]); + + const user = connectedUsers.get('conn-1'); + + expect(user?.serverIds.has('server-1')).toBe(true); + }); +}); diff --git a/server/src/websocket/handler.ts b/server/src/websocket/handler.ts index 476b363..6bf072d 100644 --- a/server/src/websocket/handler.ts +++ b/server/src/websocket/handler.ts @@ -705,7 +705,32 @@ async function handlePluginEvent(user: ConnectedUser, message: WsMessage, connec } } -export async function handleWebSocketMessage(connectionId: string, message: WsMessage): Promise { +/** + * Tail of the in-flight message chain per connection. + * + * Messages from one client can arrive in the same tick (one TCP segment), but + * handlers like identify await async work. Without serialization a + * join_server can be evaluated while identify is still pending, get rejected + * as unauthenticated, and silently lose the room membership. + */ +const connectionMessageChains = new Map>(); + +export function handleWebSocketMessage(connectionId: string, message: WsMessage): Promise { + const prior = connectionMessageChains.get(connectionId) ?? Promise.resolve(); + const current = prior.then(() => processWebSocketMessage(connectionId, message)); + const tail = current.catch(() => undefined); + + connectionMessageChains.set(connectionId, tail); + void tail.then(() => { + if (connectionMessageChains.get(connectionId) === tail) { + connectionMessageChains.delete(connectionId); + } + }); + + return current; +} + +async function processWebSocketMessage(connectionId: string, message: WsMessage): Promise { const user = connectedUsers.get(connectionId); if (!user) diff --git a/toju-app/src/app/infrastructure/realtime/README.md b/toju-app/src/app/infrastructure/realtime/README.md index 30512d2..91708ff 100644 --- a/toju-app/src/app/infrastructure/realtime/README.md +++ b/toju-app/src/app/infrastructure/realtime/README.md @@ -121,6 +121,8 @@ Each signaling URL gets its own `SignalingManager` (one WebSocket each). `Signal **Identify-before-join invariant.** The server drops any `join_server` / `view_server` that arrives on a connection that has not yet `identify`-ed, so a join that races ahead of identify is silently lost and the user never appears in the presence roster. On every (re)connect, `SignalingManager.reIdentifyAndRejoin` therefore re-`identify`s and only then re-joins. For this to work the manager's credential lookup (`SignalingTransportHandler.getIdentifyCredentialsForSignalUrl`) must resolve a credential as soon as one exists for that signal URL — it falls back to the credential store when the per-URL identify cache has not been populated yet. Do not narrow that lookup to only the in-memory cache; doing so lets a fresh socket emit a join before any identify and reintroduces the dropped-presence bug. +The server side enforces this ordering too: `handleWebSocketMessage` serializes messages per connection (a promise chain keyed by connection id in `server/src/websocket/handler.ts`). `identify` awaits a session-token DB lookup, and `identify` + `join_server` sent back-to-back often arrive in the same TCP segment; without serialization the join was evaluated mid-identify, rejected with `auth_required`, and the room membership was silently lost — the client then never received `user_joined` / `chat_message` broadcasts for that room even though same-account `account_sync` kept working, which is exactly the "chats don't sync for multi-client users" failure mode. Do not process websocket messages for one connection concurrently. + Room affinity is authoritative at this layer as well. The renderer repairs each room's saved `sourceId` / `sourceUrl` from server-directory responses and routes `join_server`, `view_server`, and room-scoped signaling traffic to that room's signaling URL first. If that route fails, alternate endpoints can be tried temporarily, but server-scoped raw messages are no longer broadcast to every connected signaling manager when the route is unknown. Server-relayed fallbacks are intentionally narrow. Room chat (`chat_message`), direct-message events (`direct-message`, `direct-message-status`, `direct-message-mutation`), and voice presence (`voice_state`) may flow over signaling so users can still see written chat and voice roster state while P2P data channels are down. Media, attachments, message inventory sync, screen/camera state, and plugin data-channel traffic remain peer-plane responsibilities. @@ -170,7 +172,7 @@ Browsers do not reliably fire WebSocket close events during page refresh or navi Multi-device sessions keep **multiple** open connections for the same `oderId` (different `clientInstanceId` values per tab/device). Server broadcasts exclude only the sending **connection id**, not the whole identity, so chat/typing/voice-state updates reach every logged-in device. Presence `user_joined` / `user_left` broadcasts still exclude the whole identity so other users never see duplicate join/leave events. -Account-owned state (saved servers, friends, profile avatar/card text, custom emoji library, server icons, message edits/reactions) syncs through **`account_sync`** WebSocket messages. The client wraps relayable P2P broadcast events and the server forwards them to other connections for the same identity via `notifyOtherConnectionsForOderId`. When a new device identifies, existing connections receive `account_sync_peer_online` and push a full snapshot. +Account-owned state (saved servers, friends, profile avatar/card text, custom emoji library, server icons, message edits/reactions, **chat message creates/revisions**) syncs through **`account_sync`** WebSocket messages. The client wraps relayable P2P broadcast events and the server forwards them to other connections for the same identity via `notifyOtherConnectionsForOderId`. When a new device identifies, existing connections receive `account_sync_peer_online` and push a full snapshot including chunked `chat-sync-batch` history for every saved room. RTC offers/answers/ICE are routed to the connection marked `voiceActive` for the target user (fallback: any open connection). Voice ownership is tracked per connection from `voice_state` payloads that include `clientInstanceId`. diff --git a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.spec.ts b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.spec.ts new file mode 100644 index 0000000..7f13f14 --- /dev/null +++ b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.spec.ts @@ -0,0 +1,54 @@ +import { pushSavedRoomMessagesViaAccountSync } from './account-sync-chat.helper'; +import type { Message } from '../../../shared-kernel'; + +function createMessage(id: string, roomId: string): Message { + return { + id, + roomId, + senderId: 'user-1', + senderName: 'User 1', + content: `message-${id}`, + timestamp: 1, + reactions: [], + isDeleted: false + }; +} + +describe('pushSavedRoomMessagesViaAccountSync', () => { + it('relays saved room messages in chat-sync-batch chunks to sibling devices', async () => { + const relayAccountSync = vi.fn(); + const roomA = [createMessage('m1', 'room-a'), createMessage('m2', 'room-a')]; + const roomB = [createMessage('m3', 'room-b')]; + const loadRoomMessages = vi.fn(async (roomId: string) => { + if (roomId === 'room-a') { + return roomA; + } + + if (roomId === 'room-b') { + return roomB; + } + + return []; + }); + + await pushSavedRoomMessagesViaAccountSync( + { relayAccountSync }, + loadRoomMessages, + ['room-a', 'room-b'] + ); + + expect(loadRoomMessages).toHaveBeenCalledWith('room-a'); + expect(loadRoomMessages).toHaveBeenCalledWith('room-b'); + expect(relayAccountSync).toHaveBeenCalledWith({ + type: 'chat-sync-batch', + roomId: 'room-a', + messages: roomA + }); + + expect(relayAccountSync).toHaveBeenCalledWith({ + type: 'chat-sync-batch', + roomId: 'room-b', + messages: roomB + }); + }); +}); diff --git a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.ts b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.ts new file mode 100644 index 0000000..10b8085 --- /dev/null +++ b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync-chat.helper.ts @@ -0,0 +1,27 @@ +import type { Message } from '../../../shared-kernel'; +import type { RealtimeSessionFacade } from '../../../core/realtime'; +import { + CHUNK_SIZE, + FULL_SYNC_LIMIT, + chunkArray +} from '../../../store/messages/messages.helpers'; + +export async function pushSavedRoomMessagesViaAccountSync( + webrtc: Pick, + loadRoomMessages: (roomId: string) => Promise, + roomIds: readonly string[] +): Promise { + for (const roomId of roomIds) { + const messages = await loadRoomMessages(roomId); + + for (const chunk of chunkArray(messages, CHUNK_SIZE)) { + webrtc.relayAccountSync({ + type: 'chat-sync-batch', + roomId, + messages: chunk + }); + } + } +} + +export const ACCOUNT_SYNC_MESSAGE_LIMIT = FULL_SYNC_LIMIT; diff --git a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.effects.ts b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.effects.ts index 4161811..0da5d83 100644 --- a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.effects.ts +++ b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.effects.ts @@ -20,6 +20,7 @@ import { selectCurrentUser } from '../../../store/users/users.selectors'; import { FriendService } from '../../../domains/direct-message/application/services/friend.service'; import { CustomEmojiService } from '../../../domains/custom-emoji/application/custom-emoji.service'; import { shouldApplyAccountSyncPayload } from './account-sync.rules'; +import { ACCOUNT_SYNC_MESSAGE_LIMIT, pushSavedRoomMessagesViaAccountSync } from './account-sync-chat.helper'; import { pushProfileViaAccountSync } from './account-sync-profile.helper'; import type { Room } from '../../../shared-kernel'; import type { IncomingSignalingMessage } from '../signaling/signaling-message-handler'; @@ -145,6 +146,12 @@ export class AccountSyncEffects { this.webrtc.relayAccountSync({ type: 'saved-room-sync', room }); } + await pushSavedRoomMessagesViaAccountSync( + this.webrtc, + (roomId) => this.db.getMessages(roomId, ACCOUNT_SYNC_MESSAGE_LIMIT, 0), + savedRooms.map((room) => room.id) + ); + const friends = await this.friends.friends(); for (const friend of friends) { diff --git a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.spec.ts b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.spec.ts index cb704bf..f9db461 100644 --- a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.spec.ts +++ b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.spec.ts @@ -5,13 +5,15 @@ import { } from './account-sync.rules'; describe('account-sync.rules', () => { - it('relays profile, emoji, room, and moderation events but not chat-message or voice-state', () => { + it('relays profile, emoji, room, moderation, and chat events but not voice-state', () => { expect(isRelayableAccountSyncEvent({ type: 'user-avatar-summary', oderId: 'u1', avatarUpdatedAt: 1 })).toBe(true); expect(isRelayableAccountSyncEvent({ type: 'custom-emoji-full', customEmoji: {} as never })).toBe(true); expect(isRelayableAccountSyncEvent({ type: 'server-icon-update', roomId: 'r1', icon: 'x', iconUpdatedAt: 1 })).toBe(true); expect(isRelayableAccountSyncEvent({ type: 'saved-room-sync', room: { id: 'r1' } as never })).toBe(true); expect(isRelayableAccountSyncEvent({ type: 'friend-added', userId: 'u2', addedAt: 1 })).toBe(true); - expect(isRelayableAccountSyncEvent({ type: 'chat-message', message: {} as never })).toBe(false); + expect(isRelayableAccountSyncEvent({ type: 'chat-message', message: {} as never })).toBe(true); + expect(isRelayableAccountSyncEvent({ type: 'message-revision', revision: {} as never })).toBe(true); + expect(isRelayableAccountSyncEvent({ type: 'chat-sync-batch', roomId: 'r1', messages: [] })).toBe(true); expect(isRelayableAccountSyncEvent({ type: 'voice-state', voiceState: {} as never })).toBe(false); }); diff --git a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.ts b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.ts index f832e3d..163880b 100644 --- a/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.ts +++ b/toju-app/src/app/infrastructure/realtime/account-sync/account-sync.rules.ts @@ -1,11 +1,10 @@ import type { ChatEvent } from '../../../shared-kernel'; -const DEDICATED_SIGNALING_RELAY_TYPES = new Set([ - 'chat-message', - 'voice-state' -]); - +const DEDICATED_SIGNALING_RELAY_TYPES = new Set(['voice-state']); const RELAYABLE_ACCOUNT_SYNC_TYPES = new Set([ + 'chat-message', + 'message-revision', + 'chat-sync-batch', 'user-avatar-summary', 'user-avatar-request', 'user-avatar-full', diff --git a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts index 7a9da8d..65bf2d1 100644 --- a/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts +++ b/toju-app/src/app/infrastructure/realtime/realtime-session.service.ts @@ -17,7 +17,11 @@ import { inject, OnDestroy } from '@angular/core'; -import { Observable, Subject, merge } from 'rxjs'; +import { + Observable, + Subject, + merge +} from 'rxjs'; import { ChatEvent } from '../../shared-kernel'; import type { SignalingMessage } from '../../shared-kernel'; import { @@ -767,6 +771,15 @@ export class WebRTCService implements OnDestroy { clientInstanceId }); + this.relayAccountSync({ + ...event, + message: { + ...event.message, + clientInstanceId + }, + clientInstanceId + }); + return; }