All checks were successful
Queue Release Build / prepare (push) Successful in 23s
Deploy Web Apps / deploy (push) Successful in 7m36s
Queue Release Build / build-windows (push) Successful in 28m3s
Queue Release Build / build-linux (push) Successful in 44m14s
Queue Release Build / finalize (push) Successful in 39s
752 lines
20 KiB
TypeScript
752 lines
20 KiB
TypeScript
/**
|
|
* Handler functions for incoming P2P messages dispatched via WebRTC.
|
|
*
|
|
* Each handler is a pure function that receives an event and a context
|
|
* object containing the required services. Handlers return an
|
|
* `Observable<Action>` or `EMPTY` when no store action needs dispatching.
|
|
*
|
|
* The handler registry at the bottom maps event `type` strings to their
|
|
* handlers, and `dispatchIncomingMessage()` is the single entry point
|
|
* consumed by the `incomingMessages$` effect.
|
|
*/
|
|
import {
|
|
Observable,
|
|
of,
|
|
from,
|
|
EMPTY
|
|
} from 'rxjs';
|
|
import { mergeMap } from 'rxjs/operators';
|
|
import { Action } from '@ngrx/store';
|
|
import {
|
|
DELETED_MESSAGE_CONTENT,
|
|
type ChatEvent,
|
|
type Message,
|
|
type Room,
|
|
type User
|
|
} from '../../shared-kernel';
|
|
import type { RealtimeSessionFacade } from '../../core/realtime';
|
|
import type { DebuggingService } from '../../core/services';
|
|
import { AttachmentFacade, type AttachmentMeta } from '../../domains/attachment';
|
|
import { DatabaseService } from '../../infrastructure/persistence';
|
|
import { trackDebuggingTaskFailure } from '../../core/helpers/debugging-helpers';
|
|
import { MessagesActions } from './messages.actions';
|
|
import {
|
|
INVENTORY_LIMIT,
|
|
CHUNK_SIZE,
|
|
FULL_SYNC_LIMIT,
|
|
type InventoryItem,
|
|
chunkArray,
|
|
buildInventoryItem,
|
|
buildLocalInventoryMap,
|
|
findMissingIds,
|
|
hydrateMessage,
|
|
mergeIncomingMessage
|
|
} from './messages.helpers';
|
|
|
|
type AnnouncedAttachment = Pick<AttachmentMeta, 'id' | 'filename' | 'size' | 'mime' | 'isImage' | 'uploaderPeerId'>;
|
|
type AttachmentMetaMap = Record<string, AttachmentMeta[]>;
|
|
type IncomingMessageType =
|
|
| ChatEvent['type']
|
|
| 'chat-inventory'
|
|
| 'chat-sync-request-ids'
|
|
| 'chat-sync-batch'
|
|
| 'chat-sync-summary'
|
|
| 'chat-sync-request'
|
|
| 'chat-sync-full'
|
|
| 'file-announce'
|
|
| 'file-chunk'
|
|
| 'file-request'
|
|
| 'file-cancel'
|
|
| 'file-not-found';
|
|
|
|
interface IncomingMessageEvent extends Omit<ChatEvent, 'type'> {
|
|
type: IncomingMessageType;
|
|
items?: InventoryItem[];
|
|
ids?: string[];
|
|
messages?: Message[];
|
|
attachments?: AttachmentMetaMap;
|
|
total?: number;
|
|
index?: number;
|
|
count?: number;
|
|
lastUpdated?: number;
|
|
file?: AnnouncedAttachment;
|
|
fileId?: string;
|
|
}
|
|
|
|
type SyncBatchEvent = IncomingMessageEvent & {
|
|
messages: Message[];
|
|
attachments?: AttachmentMetaMap;
|
|
};
|
|
|
|
function hasMessageBatch(event: IncomingMessageEvent): event is SyncBatchEvent {
|
|
return Array.isArray(event.messages);
|
|
}
|
|
|
|
function hasAttachmentMetaMap(
|
|
attachmentMap: IncomingMessageEvent['attachments']
|
|
): attachmentMap is AttachmentMetaMap {
|
|
return typeof attachmentMap === 'object' && attachmentMap !== null;
|
|
}
|
|
|
|
/** Shared context injected into each handler function. */
|
|
export interface IncomingMessageContext {
|
|
db: DatabaseService;
|
|
webrtc: RealtimeSessionFacade;
|
|
attachments: AttachmentFacade;
|
|
debugging: DebuggingService;
|
|
currentUser: User | null;
|
|
currentRoom: Room | null;
|
|
savedRooms?: Room[];
|
|
}
|
|
|
|
/** Signature for an incoming-message handler function. */
|
|
type MessageHandler = (
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext,
|
|
) => Observable<Action>;
|
|
|
|
/**
|
|
* Responds to a peer's inventory request by building and sending
|
|
* our local message inventory in chunks.
|
|
*/
|
|
function handleInventoryRequest(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
const { db, webrtc, attachments } = ctx;
|
|
const { roomId, fromPeerId } = event;
|
|
|
|
if (!roomId || !fromPeerId || !isKnownRoomId(roomId, ctx))
|
|
return EMPTY;
|
|
|
|
return from(
|
|
(async () => {
|
|
const messages = await db.getMessages(roomId, INVENTORY_LIMIT, 0);
|
|
const items = await Promise.all(
|
|
messages.map((msg) => {
|
|
const inMemoryAttachmentCount = attachments.getForMessage(msg.id).length;
|
|
|
|
return buildInventoryItem(
|
|
msg,
|
|
db,
|
|
inMemoryAttachmentCount > 0 ? inMemoryAttachmentCount : undefined
|
|
);
|
|
})
|
|
);
|
|
|
|
items.sort((firstItem, secondItem) => firstItem.ts - secondItem.ts);
|
|
|
|
for (const chunk of chunkArray(items, CHUNK_SIZE)) {
|
|
const inventoryEvent: ChatEvent = {
|
|
type: 'chat-inventory',
|
|
roomId,
|
|
items: chunk,
|
|
total: items.length,
|
|
index: 0
|
|
};
|
|
|
|
webrtc.sendToPeer(fromPeerId, inventoryEvent);
|
|
}
|
|
})()
|
|
).pipe(mergeMap(() => EMPTY));
|
|
}
|
|
|
|
/**
|
|
* Compares a peer's inventory against local state
|
|
* and requests any missing or stale messages.
|
|
*/
|
|
function handleInventory(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
const { db, webrtc, attachments } = ctx;
|
|
const { roomId, fromPeerId, items } = event;
|
|
|
|
if (!roomId || !Array.isArray(items) || !fromPeerId || !isKnownRoomId(roomId, ctx))
|
|
return EMPTY;
|
|
|
|
return from(
|
|
(async () => {
|
|
const local = await db.getMessages(roomId, INVENTORY_LIMIT, 0);
|
|
const inMemoryAttachmentCounts = new Map<string, number>();
|
|
|
|
for (const message of local) {
|
|
const count = attachments.getForMessage(message.id).length;
|
|
|
|
if (count > 0) {
|
|
inMemoryAttachmentCounts.set(message.id, count);
|
|
}
|
|
}
|
|
|
|
const localMap = await buildLocalInventoryMap(local, db, inMemoryAttachmentCounts);
|
|
const missing = findMissingIds(items, localMap);
|
|
|
|
for (const chunk of chunkArray(missing, CHUNK_SIZE)) {
|
|
const syncRequestIdsEvent: ChatEvent = {
|
|
type: 'chat-sync-request-ids',
|
|
roomId,
|
|
ids: chunk
|
|
};
|
|
|
|
webrtc.sendToPeer(fromPeerId, syncRequestIdsEvent);
|
|
}
|
|
})()
|
|
).pipe(mergeMap(() => EMPTY));
|
|
}
|
|
|
|
/**
|
|
* Responds to a peer's request for specific message IDs by sending
|
|
* hydrated messages along with their attachment metadata.
|
|
*/
|
|
function handleSyncRequestIds(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
const { db, webrtc, attachments } = ctx;
|
|
const { roomId, ids, fromPeerId } = event;
|
|
|
|
if (!roomId || !Array.isArray(ids) || !fromPeerId || !isKnownRoomId(roomId, ctx))
|
|
return EMPTY;
|
|
|
|
return from(
|
|
(async () => {
|
|
const maybeMessages = await Promise.all(
|
|
(ids as string[]).map((id) => db.getMessageById(id))
|
|
);
|
|
const messages = maybeMessages.filter(
|
|
(msg): msg is Message => !!msg && msg.roomId === roomId
|
|
);
|
|
const hydrated = await Promise.all(
|
|
messages.map((msg) => hydrateMessage(msg, db))
|
|
);
|
|
const msgIds = hydrated.map((msg) => msg.id);
|
|
const attachmentMetas =
|
|
attachments.getAttachmentMetasForMessages(msgIds);
|
|
|
|
for (const chunk of chunkArray(hydrated, CHUNK_SIZE)) {
|
|
const chunkAttachments: AttachmentMetaMap = {};
|
|
|
|
for (const hydratedMessage of chunk) {
|
|
if (attachmentMetas[hydratedMessage.id])
|
|
chunkAttachments[hydratedMessage.id] = attachmentMetas[hydratedMessage.id];
|
|
}
|
|
|
|
const syncBatchEvent: ChatEvent = {
|
|
type: 'chat-sync-batch',
|
|
roomId: roomId || '',
|
|
messages: chunk,
|
|
attachments:
|
|
Object.keys(chunkAttachments).length > 0
|
|
? chunkAttachments
|
|
: undefined
|
|
};
|
|
|
|
webrtc.sendToPeer(fromPeerId, syncBatchEvent);
|
|
}
|
|
})()
|
|
).pipe(mergeMap(() => EMPTY));
|
|
}
|
|
|
|
/**
|
|
* Processes a batch of synced messages from a peer: merges each into
|
|
* the local DB, registers attachment metadata, and auto-requests any
|
|
* missing image attachments.
|
|
*/
|
|
function handleSyncBatch(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
if (!hasMessageBatch(event))
|
|
return EMPTY;
|
|
|
|
const scopedEvent = scopeMessageBatchToKnownRooms(event, ctx);
|
|
|
|
if (!scopedEvent)
|
|
return EMPTY;
|
|
|
|
const { db, attachments } = ctx;
|
|
|
|
if (hasAttachmentMetaMap(scopedEvent.attachments)) {
|
|
attachments.registerSyncedAttachments(
|
|
scopedEvent.attachments,
|
|
Object.fromEntries(scopedEvent.messages.map((message) => [message.id, message.roomId]))
|
|
);
|
|
}
|
|
|
|
return from(processSyncBatch(scopedEvent, db, attachments)).pipe(
|
|
mergeMap((toUpsert) =>
|
|
toUpsert.length > 0
|
|
? of(MessagesActions.syncMessages({ messages: toUpsert }))
|
|
: EMPTY
|
|
)
|
|
);
|
|
}
|
|
|
|
/** Merges each incoming message and collects those that changed. */
|
|
async function processSyncBatch(
|
|
event: SyncBatchEvent,
|
|
db: DatabaseService,
|
|
attachments: AttachmentFacade
|
|
): Promise<Message[]> {
|
|
const toUpsert: Message[] = [];
|
|
// Yield to the event loop every YIELD_EVERY messages so Angular change
|
|
// detection and user input aren't starved while a large sync batch
|
|
// (e.g. from a bulk plugin import) drains serial DB writes.
|
|
const YIELD_EVERY = 50;
|
|
|
|
let processed = 0;
|
|
|
|
for (const incoming of event.messages) {
|
|
attachments.rememberMessageRoom(incoming.id, incoming.roomId);
|
|
|
|
const { message, changed } = await mergeIncomingMessage(incoming, db);
|
|
|
|
if (incoming.isDeleted) {
|
|
try {
|
|
await attachments.deleteForMessage(incoming.id);
|
|
} catch (error) {
|
|
throw new Error(`Failed to delete attachments for message ${incoming.id} during sync: ${message.id}. Error: ${error}`);
|
|
}
|
|
}
|
|
|
|
if (changed)
|
|
toUpsert.push(message);
|
|
|
|
processed += 1;
|
|
|
|
if (processed % YIELD_EVERY === 0) {
|
|
await new Promise<void>((resolve) => setTimeout(resolve, 0));
|
|
}
|
|
}
|
|
|
|
if (hasAttachmentMetaMap(event.attachments)) {
|
|
queueWatchedAttachmentDownloads(event.attachments, attachments);
|
|
}
|
|
|
|
return toUpsert;
|
|
}
|
|
|
|
/** Queue best-effort auto-downloads for watched-room attachments. */
|
|
function queueWatchedAttachmentDownloads(
|
|
attachmentMap: AttachmentMetaMap,
|
|
attachments: AttachmentFacade
|
|
): void {
|
|
for (const msgId of Object.keys(attachmentMap)) {
|
|
attachments.queueAutoDownloadsForMessage(msgId);
|
|
}
|
|
}
|
|
|
|
/** Saves an incoming chat message to DB and dispatches receiveMessage. */
|
|
function handleChatMessage(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
const {
|
|
db,
|
|
debugging,
|
|
attachments,
|
|
currentUser
|
|
} = ctx;
|
|
const msg = event.message;
|
|
|
|
if (!msg)
|
|
return EMPTY;
|
|
|
|
if (!isKnownRoomId(msg.roomId, ctx))
|
|
return EMPTY;
|
|
|
|
// Skip our own messages (reflected via server relay)
|
|
const isOwnMessage =
|
|
msg.senderId === currentUser?.id ||
|
|
msg.senderId === currentUser?.oderId;
|
|
|
|
if (isOwnMessage)
|
|
return EMPTY;
|
|
|
|
attachments.rememberMessageRoom(msg.id, msg.roomId);
|
|
|
|
trackBackgroundOperation(
|
|
db.saveMessage(msg),
|
|
debugging,
|
|
'Failed to persist incoming chat message',
|
|
{
|
|
channelId: msg.channelId || 'general',
|
|
fromPeerId: event.fromPeerId ?? null,
|
|
messageId: msg.id,
|
|
roomId: msg.roomId,
|
|
senderId: msg.senderId
|
|
}
|
|
);
|
|
|
|
return of(MessagesActions.receiveMessage({ message: msg }));
|
|
}
|
|
|
|
/** Applies a remote message edit to the local DB and store. */
|
|
function handleMessageEdited(
|
|
event: IncomingMessageEvent,
|
|
{ db, debugging }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
if (!event.messageId || !event.content)
|
|
return EMPTY;
|
|
|
|
const editedAt = typeof event.editedAt === 'number'
|
|
? event.editedAt
|
|
: Date.now();
|
|
|
|
trackBackgroundOperation(
|
|
db.updateMessage(event.messageId, {
|
|
content: event.content,
|
|
editedAt
|
|
}),
|
|
debugging,
|
|
'Failed to persist incoming message edit',
|
|
{
|
|
editedAt,
|
|
fromPeerId: event.fromPeerId ?? null,
|
|
messageId: event.messageId
|
|
}
|
|
);
|
|
|
|
return of(
|
|
MessagesActions.editMessageSuccess({
|
|
messageId: event.messageId,
|
|
content: event.content,
|
|
editedAt
|
|
})
|
|
);
|
|
}
|
|
|
|
/** Applies a remote message deletion to the local DB and store. */
|
|
function handleMessageDeleted(
|
|
event: IncomingMessageEvent,
|
|
{ db, debugging, attachments }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
if (!event.messageId)
|
|
return EMPTY;
|
|
|
|
const deletedAt = typeof event.deletedAt === 'number'
|
|
? event.deletedAt
|
|
: Date.now();
|
|
|
|
trackBackgroundOperation(
|
|
db.updateMessage(event.messageId, {
|
|
content: DELETED_MESSAGE_CONTENT,
|
|
editedAt: deletedAt,
|
|
isDeleted: true
|
|
}),
|
|
debugging,
|
|
'Failed to persist incoming message deletion',
|
|
{
|
|
deletedBy: event.deletedBy ?? null,
|
|
deletedAt,
|
|
fromPeerId: event.fromPeerId ?? null,
|
|
messageId: event.messageId
|
|
}
|
|
);
|
|
|
|
trackBackgroundOperation(
|
|
attachments.deleteForMessage(event.messageId),
|
|
debugging,
|
|
'Failed to delete incoming message attachments',
|
|
{
|
|
deletedBy: event.deletedBy ?? null,
|
|
fromPeerId: event.fromPeerId ?? null,
|
|
messageId: event.messageId
|
|
}
|
|
);
|
|
|
|
return of(
|
|
MessagesActions.deleteMessageSuccess({ messageId: event.messageId })
|
|
);
|
|
}
|
|
|
|
/** Saves an incoming reaction to DB and updates the store. */
|
|
function handleReactionAdded(
|
|
event: IncomingMessageEvent,
|
|
{ db, debugging }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
if (!event.messageId || !event.reaction)
|
|
return EMPTY;
|
|
|
|
trackBackgroundOperation(
|
|
db.saveReaction(event.reaction),
|
|
debugging,
|
|
'Failed to persist incoming reaction',
|
|
{
|
|
emoji: event.reaction.emoji,
|
|
fromPeerId: event.fromPeerId ?? null,
|
|
messageId: event.messageId,
|
|
reactionId: event.reaction.id
|
|
}
|
|
);
|
|
|
|
return of(MessagesActions.addReactionSuccess({ reaction: event.reaction }));
|
|
}
|
|
|
|
/** Removes a reaction from DB and updates the store. */
|
|
function handleReactionRemoved(
|
|
event: IncomingMessageEvent,
|
|
{ db, debugging }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
if (!event.messageId || !event.oderId || !event.emoji)
|
|
return EMPTY;
|
|
|
|
trackBackgroundOperation(
|
|
db.removeReaction(event.messageId, event.oderId, event.emoji),
|
|
debugging,
|
|
'Failed to persist incoming reaction removal',
|
|
{
|
|
emoji: event.emoji,
|
|
fromPeerId: event.fromPeerId ?? null,
|
|
messageId: event.messageId,
|
|
oderId: event.oderId
|
|
}
|
|
);
|
|
|
|
return of(
|
|
MessagesActions.removeReactionSuccess({
|
|
messageId: event.messageId,
|
|
oderId: event.oderId,
|
|
emoji: event.emoji
|
|
})
|
|
);
|
|
}
|
|
|
|
function handleFileAnnounce(
|
|
event: IncomingMessageEvent,
|
|
{ attachments }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
attachments.handleFileAnnounce(event);
|
|
|
|
if (event.messageId) {
|
|
attachments.queueAutoDownloadsForMessage(event.messageId, event.file?.id);
|
|
}
|
|
|
|
return EMPTY;
|
|
}
|
|
|
|
function handleFileChunk(
|
|
event: IncomingMessageEvent,
|
|
{ attachments }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
attachments.handleFileChunk(event);
|
|
return EMPTY;
|
|
}
|
|
|
|
function handleFileRequest(
|
|
event: IncomingMessageEvent,
|
|
{ attachments }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
attachments.handleFileRequest(event);
|
|
return EMPTY;
|
|
}
|
|
|
|
function handleFileCancel(
|
|
event: IncomingMessageEvent,
|
|
{ attachments }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
attachments.handleFileCancel(event);
|
|
return EMPTY;
|
|
}
|
|
|
|
function handleFileNotFound(
|
|
event: IncomingMessageEvent,
|
|
{ attachments }: IncomingMessageContext
|
|
): Observable<Action> {
|
|
attachments.handleFileNotFound(event);
|
|
return EMPTY;
|
|
}
|
|
|
|
/**
|
|
* Compares a peer's dataset summary and requests full sync
|
|
* if the peer has newer or more data.
|
|
*/
|
|
function handleSyncSummary(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
const { db, webrtc, currentRoom } = ctx;
|
|
const targetRoomId = event.roomId || currentRoom?.id;
|
|
|
|
if (!targetRoomId || !isKnownRoomId(targetRoomId, ctx))
|
|
return EMPTY;
|
|
|
|
return from(
|
|
(async () => {
|
|
const local = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0);
|
|
const localCount = local.length;
|
|
const localLastUpdated = local.reduce(
|
|
(maxTimestamp, message) => Math.max(maxTimestamp, message.editedAt || message.timestamp || 0),
|
|
0
|
|
);
|
|
const remoteLastUpdated = event.lastUpdated || 0;
|
|
const remoteCount = event.count || 0;
|
|
const identical =
|
|
localLastUpdated === remoteLastUpdated && localCount === remoteCount;
|
|
const needsSync =
|
|
remoteLastUpdated > localLastUpdated ||
|
|
(remoteLastUpdated === localLastUpdated && remoteCount > localCount);
|
|
const fromPeerId = event.fromPeerId;
|
|
|
|
if (!identical && needsSync && fromPeerId) {
|
|
const syncRequestEvent: ChatEvent = {
|
|
type: 'chat-sync-request',
|
|
roomId: targetRoomId
|
|
};
|
|
|
|
webrtc.sendToPeer(fromPeerId, syncRequestEvent);
|
|
}
|
|
})()
|
|
).pipe(mergeMap(() => EMPTY));
|
|
}
|
|
|
|
/** Responds to a peer's full sync request by sending all local messages. */
|
|
function handleSyncRequest(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
const { db, webrtc, currentRoom } = ctx;
|
|
const targetRoomId = event.roomId || currentRoom?.id;
|
|
const fromPeerId = event.fromPeerId;
|
|
|
|
if (!targetRoomId || !fromPeerId || !isKnownRoomId(targetRoomId, ctx))
|
|
return EMPTY;
|
|
|
|
return from(
|
|
(async () => {
|
|
const all = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0);
|
|
|
|
// Ship as chunked chat-sync-batch events instead of a single
|
|
// chat-sync-full payload. A monolithic dump of up to FULL_SYNC_LIMIT
|
|
// messages can exceed the WebRTC SCTP per-message size ceiling and be
|
|
// silently dropped - especially after bulk plugin imports.
|
|
for (const chunk of chunkArray(all, CHUNK_SIZE)) {
|
|
const syncBatchEvent: ChatEvent = {
|
|
type: 'chat-sync-batch',
|
|
roomId: targetRoomId,
|
|
messages: chunk
|
|
};
|
|
|
|
webrtc.sendToPeer(fromPeerId, syncBatchEvent);
|
|
}
|
|
})()
|
|
).pipe(mergeMap(() => EMPTY));
|
|
}
|
|
|
|
/** Merges a full message dump from a peer into the local DB and store. */
|
|
function handleSyncFull(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
if (!hasMessageBatch(event))
|
|
return EMPTY;
|
|
|
|
const scopedEvent = scopeMessageBatchToKnownRooms(event, ctx);
|
|
|
|
if (!scopedEvent)
|
|
return EMPTY;
|
|
|
|
return from(processSyncBatch(scopedEvent, ctx.db, ctx.attachments)).pipe(
|
|
mergeMap((toUpsert) =>
|
|
toUpsert.length > 0
|
|
? of(MessagesActions.syncMessages({ messages: toUpsert }))
|
|
: EMPTY
|
|
)
|
|
);
|
|
}
|
|
|
|
/** Map of event types to their handler functions. */
|
|
const HANDLER_MAP: Readonly<Record<string, MessageHandler>> = {
|
|
// Inventory-based sync protocol
|
|
'chat-inventory-request': handleInventoryRequest,
|
|
'chat-inventory': handleInventory,
|
|
'chat-sync-request-ids': handleSyncRequestIds,
|
|
'chat-sync-batch': handleSyncBatch,
|
|
|
|
// Chat messages
|
|
'chat-message': handleChatMessage,
|
|
'message-edited': handleMessageEdited,
|
|
'message-deleted': handleMessageDeleted,
|
|
|
|
// Reactions
|
|
'reaction-added': handleReactionAdded,
|
|
'reaction-removed': handleReactionRemoved,
|
|
|
|
// Attachments
|
|
'file-announce': handleFileAnnounce,
|
|
'file-chunk': handleFileChunk,
|
|
'file-request': handleFileRequest,
|
|
'file-cancel': handleFileCancel,
|
|
'file-not-found': handleFileNotFound,
|
|
|
|
// Legacy sync handshake
|
|
'chat-sync-summary': handleSyncSummary,
|
|
'chat-sync-request': handleSyncRequest,
|
|
'chat-sync-full': handleSyncFull
|
|
};
|
|
|
|
/**
|
|
* Routes an incoming P2P message to the appropriate handler.
|
|
* Returns `EMPTY` if the event type is unknown or has no relevant handler.
|
|
*/
|
|
export function dispatchIncomingMessage(
|
|
event: IncomingMessageEvent,
|
|
ctx: IncomingMessageContext
|
|
): Observable<Action> {
|
|
const handler = HANDLER_MAP[event.type];
|
|
|
|
return handler ? handler(event, ctx) : EMPTY;
|
|
}
|
|
|
|
function isKnownRoomId(roomId: string | undefined, ctx: IncomingMessageContext): boolean {
|
|
if (!roomId) {
|
|
return false;
|
|
}
|
|
|
|
return ctx.currentRoom?.id === roomId || (ctx.savedRooms ?? []).some((room) => room.id === roomId);
|
|
}
|
|
|
|
function scopeMessageBatchToKnownRooms(
|
|
event: SyncBatchEvent,
|
|
ctx: IncomingMessageContext
|
|
): SyncBatchEvent | null {
|
|
if (event.roomId && !isKnownRoomId(event.roomId, ctx)) {
|
|
return null;
|
|
}
|
|
|
|
const messages = event.messages.filter((message) => isKnownRoomId(message.roomId, ctx));
|
|
|
|
if (messages.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
return {
|
|
...event,
|
|
attachments: filterAttachmentMapToMessages(event.attachments, messages),
|
|
messages
|
|
};
|
|
}
|
|
|
|
function filterAttachmentMapToMessages(
|
|
attachmentMap: IncomingMessageEvent['attachments'],
|
|
messages: Message[]
|
|
): AttachmentMetaMap | undefined {
|
|
if (!hasAttachmentMetaMap(attachmentMap)) {
|
|
return undefined;
|
|
}
|
|
|
|
const messageIds = new Set(messages.map((message) => message.id));
|
|
const filteredEntries = Object.entries(attachmentMap).filter(([messageId]) => messageIds.has(messageId));
|
|
|
|
return filteredEntries.length > 0 ? Object.fromEntries(filteredEntries) : undefined;
|
|
}
|
|
|
|
function trackBackgroundOperation(
|
|
task: Promise<unknown> | unknown,
|
|
debugging: DebuggingService,
|
|
message: string,
|
|
payload: Record<string, unknown>
|
|
): void {
|
|
trackDebuggingTaskFailure(task, debugging, 'messages', message, payload);
|
|
}
|