fix: Chats doesn't sync for multi client users
This commit is contained in:
@@ -48,6 +48,7 @@ Owns the shared, internet-reachable runtime: HTTP routes for server directory /
|
||||
|
||||
- Every database schema change ships as a TypeORM **migration**; the live database is never mutated outside the migration system.
|
||||
- WebSocket **Envelope** types are defined once in `src/websocket/types.ts` and **must** stay structurally compatible with `toju-app/src/app/shared-kernel/signaling-contracts.ts` — drift between the two is a wire-protocol break.
|
||||
- WebSocket messages from a single connection are processed **strictly in arrival order** (`handleWebSocketMessage` chains them per connection id). Concurrent handling lets a `join_server` overtake a still-awaiting `identify` and silently drop room membership.
|
||||
- User-supplied URLs are **never** fetched without going through `ssrf-guard.ts`.
|
||||
- Secrets (klipy API key, OAuth tokens, signing keys) live in `data/variables.json` or environment variables — never in code, never in logs.
|
||||
|
||||
|
||||
146
server/src/websocket/handler-ordering.spec.ts
Normal file
146
server/src/websocket/handler-ordering.spec.ts
Normal file
@@ -0,0 +1,146 @@
|
||||
import {
|
||||
describe,
|
||||
it,
|
||||
expect,
|
||||
beforeEach,
|
||||
vi
|
||||
} from 'vitest';
|
||||
import { WebSocket } from 'ws';
|
||||
import { connectedUsers } from './state';
|
||||
import { ConnectedUser } from './types';
|
||||
import { handleWebSocketMessage } from './handler';
|
||||
|
||||
vi.mock('../services/server-access.service', () => ({
|
||||
authorizeWebSocketJoin: vi.fn(async () => ({ allowed: true as const })),
|
||||
findServerMembership: vi.fn(async () => ({ id: 'membership-1' })),
|
||||
usersShareServerMembership: vi.fn(async () => false)
|
||||
}));
|
||||
|
||||
vi.mock('../services/session-auth.service', () => ({
|
||||
// Resolve on a macrotask so an unserialized handler would interleave the
|
||||
// next message before identify completes - mirrors a real DB lookup.
|
||||
consumeSessionToken: vi.fn(async (token: string) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
if (token !== 'valid-token') {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
token,
|
||||
user: {
|
||||
id: 'user-1',
|
||||
username: 'alice',
|
||||
displayName: 'Alice',
|
||||
passwordHash: 'hash',
|
||||
createdAt: Date.now()
|
||||
},
|
||||
issuedAt: Date.now(),
|
||||
expiresAt: Date.now() + 60_000
|
||||
};
|
||||
})
|
||||
}));
|
||||
|
||||
vi.mock('../services/plugin-support.service', () => ({
|
||||
getPluginRequirementsSnapshot: vi.fn(async () => ({ plugins: [] })),
|
||||
PluginSupportError: class PluginSupportError extends Error {
|
||||
code = 'TEST';
|
||||
},
|
||||
validatePluginEventEnvelope: vi.fn(async () => undefined)
|
||||
}));
|
||||
|
||||
function createMockWs(): WebSocket & { sentMessages: string[] } {
|
||||
const sent: string[] = [];
|
||||
const ws = {
|
||||
readyState: WebSocket.OPEN,
|
||||
send: (data: string) => { sent.push(data); },
|
||||
close: () => {},
|
||||
sentMessages: sent
|
||||
} as unknown as WebSocket & { sentMessages: string[] };
|
||||
|
||||
return ws;
|
||||
}
|
||||
|
||||
function createConnectedUser(connectionId: string): ConnectedUser {
|
||||
const ws = createMockWs();
|
||||
const user: ConnectedUser = {
|
||||
oderId: connectionId,
|
||||
ws,
|
||||
authenticated: false,
|
||||
serverIds: new Set(),
|
||||
displayName: 'Test User',
|
||||
lastPong: Date.now()
|
||||
};
|
||||
|
||||
connectedUsers.set(connectionId, user);
|
||||
|
||||
return user;
|
||||
}
|
||||
|
||||
function getSentMessages(user: ConnectedUser | undefined): { type: string }[] {
|
||||
const sentMessages = (user?.ws as WebSocket & { sentMessages: string[] }).sentMessages;
|
||||
|
||||
return sentMessages.map((raw) => JSON.parse(raw) as { type: string });
|
||||
}
|
||||
|
||||
describe('server websocket handler - per-connection message ordering', () => {
|
||||
beforeEach(() => {
|
||||
connectedUsers.clear();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('processes join_server after a still-running identify instead of dropping it', async () => {
|
||||
createConnectedUser('conn-1');
|
||||
|
||||
// Both messages arrive in the same tick (one TCP segment); the handler
|
||||
// must not evaluate join_server while identify is still awaiting auth.
|
||||
const identifyPromise = handleWebSocketMessage('conn-1', {
|
||||
type: 'identify',
|
||||
token: 'valid-token',
|
||||
oderId: 'user-1',
|
||||
displayName: 'Alice'
|
||||
});
|
||||
const joinPromise = handleWebSocketMessage('conn-1', {
|
||||
type: 'join_server',
|
||||
serverId: 'server-1'
|
||||
});
|
||||
|
||||
await Promise.all([identifyPromise, joinPromise]);
|
||||
|
||||
const user = connectedUsers.get('conn-1');
|
||||
|
||||
expect(user?.authenticated).toBe(true);
|
||||
expect(user?.serverIds.has('server-1')).toBe(true);
|
||||
|
||||
const authRequired = getSentMessages(user).find((message) => message.type === 'auth_required');
|
||||
|
||||
expect(authRequired).toBeUndefined();
|
||||
});
|
||||
|
||||
it('keeps processing queued messages after a handler error', async () => {
|
||||
createConnectedUser('conn-1');
|
||||
|
||||
const badIdentify = handleWebSocketMessage('conn-1', {
|
||||
type: 'identify',
|
||||
token: 'valid-token',
|
||||
oderId: 'user-1',
|
||||
displayName: 'Alice'
|
||||
});
|
||||
// Unknown types are logged and ignored - must not wedge the queue.
|
||||
const unknown = handleWebSocketMessage('conn-1', { type: 'not-a-real-type' });
|
||||
const join = handleWebSocketMessage('conn-1', {
|
||||
type: 'join_server',
|
||||
serverId: 'server-1'
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
badIdentify,
|
||||
unknown,
|
||||
join
|
||||
]);
|
||||
|
||||
const user = connectedUsers.get('conn-1');
|
||||
|
||||
expect(user?.serverIds.has('server-1')).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -705,7 +705,32 @@ async function handlePluginEvent(user: ConnectedUser, message: WsMessage, connec
|
||||
}
|
||||
}
|
||||
|
||||
export async function handleWebSocketMessage(connectionId: string, message: WsMessage): Promise<void> {
|
||||
/**
|
||||
* Tail of the in-flight message chain per connection.
|
||||
*
|
||||
* Messages from one client can arrive in the same tick (one TCP segment), but
|
||||
* handlers like identify await async work. Without serialization a
|
||||
* join_server can be evaluated while identify is still pending, get rejected
|
||||
* as unauthenticated, and silently lose the room membership.
|
||||
*/
|
||||
const connectionMessageChains = new Map<string, Promise<void>>();
|
||||
|
||||
export function handleWebSocketMessage(connectionId: string, message: WsMessage): Promise<void> {
|
||||
const prior = connectionMessageChains.get(connectionId) ?? Promise.resolve();
|
||||
const current = prior.then(() => processWebSocketMessage(connectionId, message));
|
||||
const tail = current.catch(() => undefined);
|
||||
|
||||
connectionMessageChains.set(connectionId, tail);
|
||||
void tail.then(() => {
|
||||
if (connectionMessageChains.get(connectionId) === tail) {
|
||||
connectionMessageChains.delete(connectionId);
|
||||
}
|
||||
});
|
||||
|
||||
return current;
|
||||
}
|
||||
|
||||
async function processWebSocketMessage(connectionId: string, message: WsMessage): Promise<void> {
|
||||
const user = connectedUsers.get(connectionId);
|
||||
|
||||
if (!user)
|
||||
|
||||
Reference in New Issue
Block a user