/** * Handler functions for incoming P2P messages dispatched via WebRTC. * * Each handler is a pure function that receives an event and a context * object containing the required services. Handlers return an * `Observable` or `EMPTY` when no store action needs dispatching. * * The handler registry at the bottom maps event `type` strings to their * handlers, and `dispatchIncomingMessage()` is the single entry point * consumed by the `incomingMessages$` effect. */ import { Observable, of, from, EMPTY } from 'rxjs'; import { mergeMap } from 'rxjs/operators'; import { Action } from '@ngrx/store'; import { DELETED_MESSAGE_CONTENT, type ChatEvent, type Message, type Room, type User } from '../../shared-kernel'; import type { RealtimeSessionFacade } from '../../core/realtime'; import type { DebuggingService } from '../../core/services'; import { AttachmentFacade, type AttachmentMeta } from '../../domains/attachment'; import { DatabaseService } from '../../infrastructure/persistence'; import { trackDebuggingTaskFailure } from '../../core/helpers/debugging-helpers'; import { MessagesActions } from './messages.actions'; import { INVENTORY_LIMIT, CHUNK_SIZE, FULL_SYNC_LIMIT, type InventoryItem, chunkArray, buildInventoryItem, buildLocalInventoryMap, findMissingIds, hydrateMessage, mergeIncomingMessage } from './messages.helpers'; type AnnouncedAttachment = Pick; type AttachmentMetaMap = Record; type IncomingMessageType = | ChatEvent['type'] | 'chat-inventory' | 'chat-sync-request-ids' | 'chat-sync-batch' | 'chat-sync-summary' | 'chat-sync-request' | 'chat-sync-full' | 'file-announce' | 'file-chunk' | 'file-request' | 'file-cancel' | 'file-not-found'; interface IncomingMessageEvent extends Omit { type: IncomingMessageType; items?: InventoryItem[]; ids?: string[]; messages?: Message[]; attachments?: AttachmentMetaMap; total?: number; index?: number; count?: number; lastUpdated?: number; file?: AnnouncedAttachment; fileId?: string; } type SyncBatchEvent = IncomingMessageEvent & { messages: Message[]; attachments?: AttachmentMetaMap; }; function hasMessageBatch(event: IncomingMessageEvent): event is SyncBatchEvent { return Array.isArray(event.messages); } function hasAttachmentMetaMap( attachmentMap: IncomingMessageEvent['attachments'] ): attachmentMap is AttachmentMetaMap { return typeof attachmentMap === 'object' && attachmentMap !== null; } /** Shared context injected into each handler function. */ export interface IncomingMessageContext { db: DatabaseService; webrtc: RealtimeSessionFacade; attachments: AttachmentFacade; debugging: DebuggingService; currentUser: User | null; currentRoom: Room | null; savedRooms?: Room[]; } /** Signature for an incoming-message handler function. */ type MessageHandler = ( event: IncomingMessageEvent, ctx: IncomingMessageContext, ) => Observable; /** * Responds to a peer's inventory request by building and sending * our local message inventory in chunks. */ function handleInventoryRequest( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { const { db, webrtc, attachments } = ctx; const { roomId, fromPeerId } = event; if (!roomId || !fromPeerId || !isKnownRoomId(roomId, ctx)) return EMPTY; return from( (async () => { const messages = await db.getMessages(roomId, INVENTORY_LIMIT, 0); const items = await Promise.all( messages.map((msg) => { const inMemoryAttachmentCount = attachments.getForMessage(msg.id).length; return buildInventoryItem( msg, db, inMemoryAttachmentCount > 0 ? inMemoryAttachmentCount : undefined ); }) ); items.sort((firstItem, secondItem) => firstItem.ts - secondItem.ts); for (const chunk of chunkArray(items, CHUNK_SIZE)) { const inventoryEvent: ChatEvent = { type: 'chat-inventory', roomId, items: chunk, total: items.length, index: 0 }; webrtc.sendToPeer(fromPeerId, inventoryEvent); } })() ).pipe(mergeMap(() => EMPTY)); } /** * Compares a peer's inventory against local state * and requests any missing or stale messages. */ function handleInventory( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { const { db, webrtc, attachments } = ctx; const { roomId, fromPeerId, items } = event; if (!roomId || !Array.isArray(items) || !fromPeerId || !isKnownRoomId(roomId, ctx)) return EMPTY; return from( (async () => { const local = await db.getMessages(roomId, INVENTORY_LIMIT, 0); const inMemoryAttachmentCounts = new Map(); for (const message of local) { const count = attachments.getForMessage(message.id).length; if (count > 0) { inMemoryAttachmentCounts.set(message.id, count); } } const localMap = await buildLocalInventoryMap(local, db, inMemoryAttachmentCounts); const missing = findMissingIds(items, localMap); for (const chunk of chunkArray(missing, CHUNK_SIZE)) { const syncRequestIdsEvent: ChatEvent = { type: 'chat-sync-request-ids', roomId, ids: chunk }; webrtc.sendToPeer(fromPeerId, syncRequestIdsEvent); } })() ).pipe(mergeMap(() => EMPTY)); } /** * Responds to a peer's request for specific message IDs by sending * hydrated messages along with their attachment metadata. */ function handleSyncRequestIds( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { const { db, webrtc, attachments } = ctx; const { roomId, ids, fromPeerId } = event; if (!roomId || !Array.isArray(ids) || !fromPeerId || !isKnownRoomId(roomId, ctx)) return EMPTY; return from( (async () => { const maybeMessages = await Promise.all( (ids as string[]).map((id) => db.getMessageById(id)) ); const messages = maybeMessages.filter( (msg): msg is Message => !!msg && msg.roomId === roomId ); const hydrated = await Promise.all( messages.map((msg) => hydrateMessage(msg, db)) ); const msgIds = hydrated.map((msg) => msg.id); const attachmentMetas = attachments.getAttachmentMetasForMessages(msgIds); for (const chunk of chunkArray(hydrated, CHUNK_SIZE)) { const chunkAttachments: AttachmentMetaMap = {}; for (const hydratedMessage of chunk) { if (attachmentMetas[hydratedMessage.id]) chunkAttachments[hydratedMessage.id] = attachmentMetas[hydratedMessage.id]; } const syncBatchEvent: ChatEvent = { type: 'chat-sync-batch', roomId: roomId || '', messages: chunk, attachments: Object.keys(chunkAttachments).length > 0 ? chunkAttachments : undefined }; webrtc.sendToPeer(fromPeerId, syncBatchEvent); } })() ).pipe(mergeMap(() => EMPTY)); } /** * Processes a batch of synced messages from a peer: merges each into * the local DB, registers attachment metadata, and auto-requests any * missing image attachments. */ function handleSyncBatch( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { if (!hasMessageBatch(event)) return EMPTY; const scopedEvent = scopeMessageBatchToKnownRooms(event, ctx); if (!scopedEvent) return EMPTY; const { db, attachments } = ctx; if (hasAttachmentMetaMap(scopedEvent.attachments)) { attachments.registerSyncedAttachments( scopedEvent.attachments, Object.fromEntries(scopedEvent.messages.map((message) => [message.id, message.roomId])) ); } return from(processSyncBatch(scopedEvent, db, attachments)).pipe( mergeMap((toUpsert) => toUpsert.length > 0 ? of(MessagesActions.syncMessages({ messages: toUpsert })) : EMPTY ) ); } /** Merges each incoming message and collects those that changed. */ async function processSyncBatch( event: SyncBatchEvent, db: DatabaseService, attachments: AttachmentFacade ): Promise { const toUpsert: Message[] = []; // Yield to the event loop every YIELD_EVERY messages so Angular change // detection and user input aren't starved while a large sync batch // (e.g. from a bulk plugin import) drains serial DB writes. const YIELD_EVERY = 50; let processed = 0; for (const incoming of event.messages) { attachments.rememberMessageRoom(incoming.id, incoming.roomId); const { message, changed } = await mergeIncomingMessage(incoming, db); if (incoming.isDeleted) { try { await attachments.deleteForMessage(incoming.id); } catch (error) { throw new Error(`Failed to delete attachments for message ${incoming.id} during sync: ${message.id}. Error: ${error}`); } } if (changed) toUpsert.push(message); processed += 1; if (processed % YIELD_EVERY === 0) { await new Promise((resolve) => setTimeout(resolve, 0)); } } if (hasAttachmentMetaMap(event.attachments)) { queueWatchedAttachmentDownloads(event.attachments, attachments); } return toUpsert; } /** Queue best-effort auto-downloads for watched-room attachments. */ function queueWatchedAttachmentDownloads( attachmentMap: AttachmentMetaMap, attachments: AttachmentFacade ): void { for (const msgId of Object.keys(attachmentMap)) { attachments.queueAutoDownloadsForMessage(msgId); } } /** Saves an incoming chat message to DB and dispatches receiveMessage. */ function handleChatMessage( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { const { db, debugging, attachments, currentUser } = ctx; const msg = event.message; if (!msg) return EMPTY; if (!isKnownRoomId(msg.roomId, ctx)) return EMPTY; // Skip our own messages (reflected via server relay) const isOwnMessage = msg.senderId === currentUser?.id || msg.senderId === currentUser?.oderId; if (isOwnMessage) return EMPTY; attachments.rememberMessageRoom(msg.id, msg.roomId); trackBackgroundOperation( db.saveMessage(msg), debugging, 'Failed to persist incoming chat message', { channelId: msg.channelId || 'general', fromPeerId: event.fromPeerId ?? null, messageId: msg.id, roomId: msg.roomId, senderId: msg.senderId } ); return of(MessagesActions.receiveMessage({ message: msg })); } /** Applies a remote message edit to the local DB and store. */ function handleMessageEdited( event: IncomingMessageEvent, { db, debugging }: IncomingMessageContext ): Observable { if (!event.messageId || !event.content) return EMPTY; const editedAt = typeof event.editedAt === 'number' ? event.editedAt : Date.now(); trackBackgroundOperation( db.updateMessage(event.messageId, { content: event.content, editedAt }), debugging, 'Failed to persist incoming message edit', { editedAt, fromPeerId: event.fromPeerId ?? null, messageId: event.messageId } ); return of( MessagesActions.editMessageSuccess({ messageId: event.messageId, content: event.content, editedAt }) ); } /** Applies a remote message deletion to the local DB and store. */ function handleMessageDeleted( event: IncomingMessageEvent, { db, debugging, attachments }: IncomingMessageContext ): Observable { if (!event.messageId) return EMPTY; const deletedAt = typeof event.deletedAt === 'number' ? event.deletedAt : Date.now(); trackBackgroundOperation( db.updateMessage(event.messageId, { content: DELETED_MESSAGE_CONTENT, editedAt: deletedAt, isDeleted: true }), debugging, 'Failed to persist incoming message deletion', { deletedBy: event.deletedBy ?? null, deletedAt, fromPeerId: event.fromPeerId ?? null, messageId: event.messageId } ); trackBackgroundOperation( attachments.deleteForMessage(event.messageId), debugging, 'Failed to delete incoming message attachments', { deletedBy: event.deletedBy ?? null, fromPeerId: event.fromPeerId ?? null, messageId: event.messageId } ); return of( MessagesActions.deleteMessageSuccess({ messageId: event.messageId }) ); } /** Saves an incoming reaction to DB and updates the store. */ function handleReactionAdded( event: IncomingMessageEvent, { db, debugging }: IncomingMessageContext ): Observable { if (!event.messageId || !event.reaction) return EMPTY; trackBackgroundOperation( db.saveReaction(event.reaction), debugging, 'Failed to persist incoming reaction', { emoji: event.reaction.emoji, fromPeerId: event.fromPeerId ?? null, messageId: event.messageId, reactionId: event.reaction.id } ); return of(MessagesActions.addReactionSuccess({ reaction: event.reaction })); } /** Removes a reaction from DB and updates the store. */ function handleReactionRemoved( event: IncomingMessageEvent, { db, debugging }: IncomingMessageContext ): Observable { if (!event.messageId || !event.oderId || !event.emoji) return EMPTY; trackBackgroundOperation( db.removeReaction(event.messageId, event.oderId, event.emoji), debugging, 'Failed to persist incoming reaction removal', { emoji: event.emoji, fromPeerId: event.fromPeerId ?? null, messageId: event.messageId, oderId: event.oderId } ); return of( MessagesActions.removeReactionSuccess({ messageId: event.messageId, oderId: event.oderId, emoji: event.emoji }) ); } function handleFileAnnounce( event: IncomingMessageEvent, { attachments }: IncomingMessageContext ): Observable { attachments.handleFileAnnounce(event); if (event.messageId) { attachments.queueAutoDownloadsForMessage(event.messageId, event.file?.id); } return EMPTY; } function handleFileChunk( event: IncomingMessageEvent, { attachments }: IncomingMessageContext ): Observable { attachments.handleFileChunk(event); return EMPTY; } function handleFileRequest( event: IncomingMessageEvent, { attachments }: IncomingMessageContext ): Observable { attachments.handleFileRequest(event); return EMPTY; } function handleFileCancel( event: IncomingMessageEvent, { attachments }: IncomingMessageContext ): Observable { attachments.handleFileCancel(event); return EMPTY; } function handleFileNotFound( event: IncomingMessageEvent, { attachments }: IncomingMessageContext ): Observable { attachments.handleFileNotFound(event); return EMPTY; } /** * Compares a peer's dataset summary and requests full sync * if the peer has newer or more data. */ function handleSyncSummary( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { const { db, webrtc, currentRoom } = ctx; const targetRoomId = event.roomId || currentRoom?.id; if (!targetRoomId || !isKnownRoomId(targetRoomId, ctx)) return EMPTY; return from( (async () => { const local = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0); const localCount = local.length; const localLastUpdated = local.reduce( (maxTimestamp, message) => Math.max(maxTimestamp, message.editedAt || message.timestamp || 0), 0 ); const remoteLastUpdated = event.lastUpdated || 0; const remoteCount = event.count || 0; const identical = localLastUpdated === remoteLastUpdated && localCount === remoteCount; const needsSync = remoteLastUpdated > localLastUpdated || (remoteLastUpdated === localLastUpdated && remoteCount > localCount); const fromPeerId = event.fromPeerId; if (!identical && needsSync && fromPeerId) { const syncRequestEvent: ChatEvent = { type: 'chat-sync-request', roomId: targetRoomId }; webrtc.sendToPeer(fromPeerId, syncRequestEvent); } })() ).pipe(mergeMap(() => EMPTY)); } /** Responds to a peer's full sync request by sending all local messages. */ function handleSyncRequest( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { const { db, webrtc, currentRoom } = ctx; const targetRoomId = event.roomId || currentRoom?.id; const fromPeerId = event.fromPeerId; if (!targetRoomId || !fromPeerId || !isKnownRoomId(targetRoomId, ctx)) return EMPTY; return from( (async () => { const all = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0); // Ship as chunked chat-sync-batch events instead of a single // chat-sync-full payload. A monolithic dump of up to FULL_SYNC_LIMIT // messages can exceed the WebRTC SCTP per-message size ceiling and be // silently dropped - especially after bulk plugin imports. for (const chunk of chunkArray(all, CHUNK_SIZE)) { const syncBatchEvent: ChatEvent = { type: 'chat-sync-batch', roomId: targetRoomId, messages: chunk }; webrtc.sendToPeer(fromPeerId, syncBatchEvent); } })() ).pipe(mergeMap(() => EMPTY)); } /** Merges a full message dump from a peer into the local DB and store. */ function handleSyncFull( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { if (!hasMessageBatch(event)) return EMPTY; const scopedEvent = scopeMessageBatchToKnownRooms(event, ctx); if (!scopedEvent) return EMPTY; return from(processSyncBatch(scopedEvent, ctx.db, ctx.attachments)).pipe( mergeMap((toUpsert) => toUpsert.length > 0 ? of(MessagesActions.syncMessages({ messages: toUpsert })) : EMPTY ) ); } /** Map of event types to their handler functions. */ const HANDLER_MAP: Readonly> = { // Inventory-based sync protocol 'chat-inventory-request': handleInventoryRequest, 'chat-inventory': handleInventory, 'chat-sync-request-ids': handleSyncRequestIds, 'chat-sync-batch': handleSyncBatch, // Chat messages 'chat-message': handleChatMessage, 'message-edited': handleMessageEdited, 'message-deleted': handleMessageDeleted, // Reactions 'reaction-added': handleReactionAdded, 'reaction-removed': handleReactionRemoved, // Attachments 'file-announce': handleFileAnnounce, 'file-chunk': handleFileChunk, 'file-request': handleFileRequest, 'file-cancel': handleFileCancel, 'file-not-found': handleFileNotFound, // Legacy sync handshake 'chat-sync-summary': handleSyncSummary, 'chat-sync-request': handleSyncRequest, 'chat-sync-full': handleSyncFull }; /** * Routes an incoming P2P message to the appropriate handler. * Returns `EMPTY` if the event type is unknown or has no relevant handler. */ export function dispatchIncomingMessage( event: IncomingMessageEvent, ctx: IncomingMessageContext ): Observable { const handler = HANDLER_MAP[event.type]; return handler ? handler(event, ctx) : EMPTY; } function isKnownRoomId(roomId: string | undefined, ctx: IncomingMessageContext): boolean { if (!roomId) { return false; } return ctx.currentRoom?.id === roomId || (ctx.savedRooms ?? []).some((room) => room.id === roomId); } function scopeMessageBatchToKnownRooms( event: SyncBatchEvent, ctx: IncomingMessageContext ): SyncBatchEvent | null { if (event.roomId && !isKnownRoomId(event.roomId, ctx)) { return null; } const messages = event.messages.filter((message) => isKnownRoomId(message.roomId, ctx)); if (messages.length === 0) { return null; } return { ...event, attachments: filterAttachmentMapToMessages(event.attachments, messages), messages }; } function filterAttachmentMapToMessages( attachmentMap: IncomingMessageEvent['attachments'], messages: Message[] ): AttachmentMetaMap | undefined { if (!hasAttachmentMetaMap(attachmentMap)) { return undefined; } const messageIds = new Set(messages.map((message) => message.id)); const filteredEntries = Object.entries(attachmentMap).filter(([messageId]) => messageIds.has(messageId)); return filteredEntries.length > 0 ? Object.fromEntries(filteredEntries) : undefined; } function trackBackgroundOperation( task: Promise | unknown, debugging: DebuggingService, message: string, payload: Record ): void { trackDebuggingTaskFailure(task, debugging, 'messages', message, payload); }