Move toju-app into own its folder

This commit is contained in:
2026-03-29 23:30:37 +02:00
parent 0467a7b612
commit 8162e0444a
287 changed files with 42 additions and 34 deletions

View File

@@ -0,0 +1,664 @@
/**
* Handler functions for incoming P2P messages dispatched via WebRTC.
*
* Each handler is a pure function that receives an event and a context
* object containing the required services. Handlers return an
* `Observable<Action>` or `EMPTY` when no store action needs dispatching.
*
* The handler registry at the bottom maps event `type` strings to their
* handlers, and `dispatchIncomingMessage()` is the single entry point
* consumed by the `incomingMessages$` effect.
*/
import {
Observable,
of,
from,
EMPTY
} from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import { Action } from '@ngrx/store';
import {
DELETED_MESSAGE_CONTENT,
type ChatEvent,
type Message,
type Room,
type User
} from '../../shared-kernel';
import type { RealtimeSessionFacade } from '../../core/realtime';
import type { DebuggingService } from '../../core/services';
import { AttachmentFacade, type AttachmentMeta } from '../../domains/attachment';
import { DatabaseService } from '../../infrastructure/persistence';
import { trackDebuggingTaskFailure } from '../../core/helpers/debugging-helpers';
import { MessagesActions } from './messages.actions';
import {
INVENTORY_LIMIT,
CHUNK_SIZE,
FULL_SYNC_LIMIT,
type InventoryItem,
chunkArray,
buildInventoryItem,
buildLocalInventoryMap,
findMissingIds,
hydrateMessage,
mergeIncomingMessage
} from './messages.helpers';
type AnnouncedAttachment = Pick<AttachmentMeta, 'id' | 'filename' | 'size' | 'mime' | 'isImage' | 'uploaderPeerId'>;
type AttachmentMetaMap = Record<string, AttachmentMeta[]>;
type IncomingMessageType =
| ChatEvent['type']
| 'chat-inventory'
| 'chat-sync-request-ids'
| 'chat-sync-batch'
| 'chat-sync-summary'
| 'chat-sync-request'
| 'chat-sync-full'
| 'file-announce'
| 'file-chunk'
| 'file-request'
| 'file-cancel'
| 'file-not-found';
interface IncomingMessageEvent extends Omit<ChatEvent, 'type'> {
type: IncomingMessageType;
items?: InventoryItem[];
ids?: string[];
messages?: Message[];
attachments?: AttachmentMetaMap;
total?: number;
index?: number;
count?: number;
lastUpdated?: number;
file?: AnnouncedAttachment;
fileId?: string;
}
type SyncBatchEvent = IncomingMessageEvent & {
messages: Message[];
attachments?: AttachmentMetaMap;
};
function hasMessageBatch(event: IncomingMessageEvent): event is SyncBatchEvent {
return Array.isArray(event.messages);
}
function hasAttachmentMetaMap(
attachmentMap: IncomingMessageEvent['attachments']
): attachmentMap is AttachmentMetaMap {
return typeof attachmentMap === 'object' && attachmentMap !== null;
}
/** Shared context injected into each handler function. */
export interface IncomingMessageContext {
db: DatabaseService;
webrtc: RealtimeSessionFacade;
attachments: AttachmentFacade;
debugging: DebuggingService;
currentUser: User | null;
currentRoom: Room | null;
}
/** Signature for an incoming-message handler function. */
type MessageHandler = (
event: IncomingMessageEvent,
ctx: IncomingMessageContext,
) => Observable<Action>;
/**
* Responds to a peer's inventory request by building and sending
* our local message inventory in chunks.
*/
function handleInventoryRequest(
event: IncomingMessageEvent,
{ db, webrtc, attachments }: IncomingMessageContext
): Observable<Action> {
const { roomId, fromPeerId } = event;
if (!roomId || !fromPeerId)
return EMPTY;
return from(
(async () => {
const messages = await db.getMessages(roomId, INVENTORY_LIMIT, 0);
const items = await Promise.all(
messages.map((msg) => {
const inMemoryAttachmentCount = attachments.getForMessage(msg.id).length;
return buildInventoryItem(
msg,
db,
inMemoryAttachmentCount > 0 ? inMemoryAttachmentCount : undefined
);
})
);
items.sort((firstItem, secondItem) => firstItem.ts - secondItem.ts);
for (const chunk of chunkArray(items, CHUNK_SIZE)) {
const inventoryEvent: ChatEvent = {
type: 'chat-inventory',
roomId,
items: chunk,
total: items.length,
index: 0
};
webrtc.sendToPeer(fromPeerId, inventoryEvent);
}
})()
).pipe(mergeMap(() => EMPTY));
}
/**
* Compares a peer's inventory against local state
* and requests any missing or stale messages.
*/
function handleInventory(
event: IncomingMessageEvent,
{ db, webrtc, attachments }: IncomingMessageContext
): Observable<Action> {
const { roomId, fromPeerId, items } = event;
if (!roomId || !Array.isArray(items) || !fromPeerId)
return EMPTY;
return from(
(async () => {
const local = await db.getMessages(roomId, INVENTORY_LIMIT, 0);
const inMemoryAttachmentCounts = new Map<string, number>();
for (const message of local) {
const count = attachments.getForMessage(message.id).length;
if (count > 0) {
inMemoryAttachmentCounts.set(message.id, count);
}
}
const localMap = await buildLocalInventoryMap(local, db, inMemoryAttachmentCounts);
const missing = findMissingIds(items, localMap);
for (const chunk of chunkArray(missing, CHUNK_SIZE)) {
const syncRequestIdsEvent: ChatEvent = {
type: 'chat-sync-request-ids',
roomId,
ids: chunk
};
webrtc.sendToPeer(fromPeerId, syncRequestIdsEvent);
}
})()
).pipe(mergeMap(() => EMPTY));
}
/**
* Responds to a peer's request for specific message IDs by sending
* hydrated messages along with their attachment metadata.
*/
function handleSyncRequestIds(
event: IncomingMessageEvent,
{ db, webrtc, attachments }: IncomingMessageContext
): Observable<Action> {
const { roomId, ids, fromPeerId } = event;
if (!Array.isArray(ids) || !fromPeerId)
return EMPTY;
return from(
(async () => {
const maybeMessages = await Promise.all(
(ids as string[]).map((id) => db.getMessageById(id))
);
const messages = maybeMessages.filter(
(msg): msg is Message => !!msg
);
const hydrated = await Promise.all(
messages.map((msg) => hydrateMessage(msg, db))
);
const msgIds = hydrated.map((msg) => msg.id);
const attachmentMetas =
attachments.getAttachmentMetasForMessages(msgIds);
for (const chunk of chunkArray(hydrated, CHUNK_SIZE)) {
const chunkAttachments: AttachmentMetaMap = {};
for (const hydratedMessage of chunk) {
if (attachmentMetas[hydratedMessage.id])
chunkAttachments[hydratedMessage.id] = attachmentMetas[hydratedMessage.id];
}
const syncBatchEvent: ChatEvent = {
type: 'chat-sync-batch',
roomId: roomId || '',
messages: chunk,
attachments:
Object.keys(chunkAttachments).length > 0
? chunkAttachments
: undefined
};
webrtc.sendToPeer(fromPeerId, syncBatchEvent);
}
})()
).pipe(mergeMap(() => EMPTY));
}
/**
* Processes a batch of synced messages from a peer: merges each into
* the local DB, registers attachment metadata, and auto-requests any
* missing image attachments.
*/
function handleSyncBatch(
event: IncomingMessageEvent,
{ db, attachments }: IncomingMessageContext
): Observable<Action> {
if (!hasMessageBatch(event))
return EMPTY;
if (hasAttachmentMetaMap(event.attachments)) {
attachments.registerSyncedAttachments(
event.attachments,
Object.fromEntries(event.messages.map((message) => [message.id, message.roomId]))
);
}
return from(processSyncBatch(event, db, attachments)).pipe(
mergeMap((toUpsert) =>
toUpsert.length > 0
? of(MessagesActions.syncMessages({ messages: toUpsert }))
: EMPTY
)
);
}
/** Merges each incoming message and collects those that changed. */
async function processSyncBatch(
event: SyncBatchEvent,
db: DatabaseService,
attachments: AttachmentFacade
): Promise<Message[]> {
const toUpsert: Message[] = [];
for (const incoming of event.messages) {
attachments.rememberMessageRoom(incoming.id, incoming.roomId);
const { message, changed } = await mergeIncomingMessage(incoming, db);
if (incoming.isDeleted) {
try {
await attachments.deleteForMessage(incoming.id);
} catch (error) {
throw new Error(`Failed to delete attachments for message ${incoming.id} during sync: ${message.id}. Error: ${error}`);
}
}
if (changed)
toUpsert.push(message);
}
if (hasAttachmentMetaMap(event.attachments)) {
queueWatchedAttachmentDownloads(event.attachments, attachments);
}
return toUpsert;
}
/** Queue best-effort auto-downloads for watched-room attachments. */
function queueWatchedAttachmentDownloads(
attachmentMap: AttachmentMetaMap,
attachments: AttachmentFacade
): void {
for (const msgId of Object.keys(attachmentMap)) {
attachments.queueAutoDownloadsForMessage(msgId);
}
}
/** Saves an incoming chat message to DB and dispatches receiveMessage. */
function handleChatMessage(
event: IncomingMessageEvent,
{
db,
debugging,
attachments,
currentUser
}: IncomingMessageContext
): Observable<Action> {
const msg = event.message;
if (!msg)
return EMPTY;
// Skip our own messages (reflected via server relay)
const isOwnMessage =
msg.senderId === currentUser?.id ||
msg.senderId === currentUser?.oderId;
if (isOwnMessage)
return EMPTY;
attachments.rememberMessageRoom(msg.id, msg.roomId);
trackBackgroundOperation(
db.saveMessage(msg),
debugging,
'Failed to persist incoming chat message',
{
channelId: msg.channelId || 'general',
fromPeerId: event.fromPeerId ?? null,
messageId: msg.id,
roomId: msg.roomId,
senderId: msg.senderId
}
);
return of(MessagesActions.receiveMessage({ message: msg }));
}
/** Applies a remote message edit to the local DB and store. */
function handleMessageEdited(
event: IncomingMessageEvent,
{ db, debugging }: IncomingMessageContext
): Observable<Action> {
if (!event.messageId || !event.content)
return EMPTY;
const editedAt = typeof event.editedAt === 'number'
? event.editedAt
: Date.now();
trackBackgroundOperation(
db.updateMessage(event.messageId, {
content: event.content,
editedAt
}),
debugging,
'Failed to persist incoming message edit',
{
editedAt,
fromPeerId: event.fromPeerId ?? null,
messageId: event.messageId
}
);
return of(
MessagesActions.editMessageSuccess({
messageId: event.messageId,
content: event.content,
editedAt
})
);
}
/** Applies a remote message deletion to the local DB and store. */
function handleMessageDeleted(
event: IncomingMessageEvent,
{ db, debugging, attachments }: IncomingMessageContext
): Observable<Action> {
if (!event.messageId)
return EMPTY;
const deletedAt = typeof event.deletedAt === 'number'
? event.deletedAt
: Date.now();
trackBackgroundOperation(
db.updateMessage(event.messageId, {
content: DELETED_MESSAGE_CONTENT,
editedAt: deletedAt,
isDeleted: true
}),
debugging,
'Failed to persist incoming message deletion',
{
deletedBy: event.deletedBy ?? null,
deletedAt,
fromPeerId: event.fromPeerId ?? null,
messageId: event.messageId
}
);
trackBackgroundOperation(
attachments.deleteForMessage(event.messageId),
debugging,
'Failed to delete incoming message attachments',
{
deletedBy: event.deletedBy ?? null,
fromPeerId: event.fromPeerId ?? null,
messageId: event.messageId
}
);
return of(
MessagesActions.deleteMessageSuccess({ messageId: event.messageId })
);
}
/** Saves an incoming reaction to DB and updates the store. */
function handleReactionAdded(
event: IncomingMessageEvent,
{ db, debugging }: IncomingMessageContext
): Observable<Action> {
if (!event.messageId || !event.reaction)
return EMPTY;
trackBackgroundOperation(
db.saveReaction(event.reaction),
debugging,
'Failed to persist incoming reaction',
{
emoji: event.reaction.emoji,
fromPeerId: event.fromPeerId ?? null,
messageId: event.messageId,
reactionId: event.reaction.id
}
);
return of(MessagesActions.addReactionSuccess({ reaction: event.reaction }));
}
/** Removes a reaction from DB and updates the store. */
function handleReactionRemoved(
event: IncomingMessageEvent,
{ db, debugging }: IncomingMessageContext
): Observable<Action> {
if (!event.messageId || !event.oderId || !event.emoji)
return EMPTY;
trackBackgroundOperation(
db.removeReaction(event.messageId, event.oderId, event.emoji),
debugging,
'Failed to persist incoming reaction removal',
{
emoji: event.emoji,
fromPeerId: event.fromPeerId ?? null,
messageId: event.messageId,
oderId: event.oderId
}
);
return of(
MessagesActions.removeReactionSuccess({
messageId: event.messageId,
oderId: event.oderId,
emoji: event.emoji
})
);
}
function handleFileAnnounce(
event: IncomingMessageEvent,
{ attachments }: IncomingMessageContext
): Observable<Action> {
attachments.handleFileAnnounce(event);
if (event.messageId) {
attachments.queueAutoDownloadsForMessage(event.messageId, event.file?.id);
}
return EMPTY;
}
function handleFileChunk(
event: IncomingMessageEvent,
{ attachments }: IncomingMessageContext
): Observable<Action> {
attachments.handleFileChunk(event);
return EMPTY;
}
function handleFileRequest(
event: IncomingMessageEvent,
{ attachments }: IncomingMessageContext
): Observable<Action> {
attachments.handleFileRequest(event);
return EMPTY;
}
function handleFileCancel(
event: IncomingMessageEvent,
{ attachments }: IncomingMessageContext
): Observable<Action> {
attachments.handleFileCancel(event);
return EMPTY;
}
function handleFileNotFound(
event: IncomingMessageEvent,
{ attachments }: IncomingMessageContext
): Observable<Action> {
attachments.handleFileNotFound(event);
return EMPTY;
}
/**
* Compares a peer's dataset summary and requests full sync
* if the peer has newer or more data.
*/
function handleSyncSummary(
event: IncomingMessageEvent,
{ db, webrtc, currentRoom }: IncomingMessageContext
): Observable<Action> {
if (!currentRoom)
return EMPTY;
return from(
(async () => {
const local = await db.getMessages(currentRoom.id, FULL_SYNC_LIMIT, 0);
const localCount = local.length;
const localLastUpdated = local.reduce(
(maxTimestamp, message) => Math.max(maxTimestamp, message.editedAt || message.timestamp || 0),
0
);
const remoteLastUpdated = event.lastUpdated || 0;
const remoteCount = event.count || 0;
const identical =
localLastUpdated === remoteLastUpdated && localCount === remoteCount;
const needsSync =
remoteLastUpdated > localLastUpdated ||
(remoteLastUpdated === localLastUpdated && remoteCount > localCount);
const fromPeerId = event.fromPeerId;
if (!identical && needsSync && fromPeerId) {
const syncRequestEvent: ChatEvent = {
type: 'chat-sync-request',
roomId: currentRoom.id
};
webrtc.sendToPeer(fromPeerId, syncRequestEvent);
}
})()
).pipe(mergeMap(() => EMPTY));
}
/** Responds to a peer's full sync request by sending all local messages. */
function handleSyncRequest(
event: IncomingMessageEvent,
{ db, webrtc, currentRoom }: IncomingMessageContext
): Observable<Action> {
const fromPeerId = event.fromPeerId;
if (!currentRoom || !fromPeerId)
return EMPTY;
return from(
(async () => {
const all = await db.getMessages(currentRoom.id, FULL_SYNC_LIMIT, 0);
const syncFullEvent: ChatEvent = {
type: 'chat-sync-full',
roomId: currentRoom.id,
messages: all
};
webrtc.sendToPeer(fromPeerId, syncFullEvent);
})()
).pipe(mergeMap(() => EMPTY));
}
/** Merges a full message dump from a peer into the local DB and store. */
function handleSyncFull(
event: IncomingMessageEvent,
{ db, attachments }: IncomingMessageContext
): Observable<Action> {
if (!hasMessageBatch(event))
return EMPTY;
return from(processSyncBatch(event, db, attachments)).pipe(
mergeMap((toUpsert) =>
toUpsert.length > 0
? of(MessagesActions.syncMessages({ messages: toUpsert }))
: EMPTY
)
);
}
/** Map of event types to their handler functions. */
const HANDLER_MAP: Readonly<Record<string, MessageHandler>> = {
// Inventory-based sync protocol
'chat-inventory-request': handleInventoryRequest,
'chat-inventory': handleInventory,
'chat-sync-request-ids': handleSyncRequestIds,
'chat-sync-batch': handleSyncBatch,
// Chat messages
'chat-message': handleChatMessage,
'message-edited': handleMessageEdited,
'message-deleted': handleMessageDeleted,
// Reactions
'reaction-added': handleReactionAdded,
'reaction-removed': handleReactionRemoved,
// Attachments
'file-announce': handleFileAnnounce,
'file-chunk': handleFileChunk,
'file-request': handleFileRequest,
'file-cancel': handleFileCancel,
'file-not-found': handleFileNotFound,
// Legacy sync handshake
'chat-sync-summary': handleSyncSummary,
'chat-sync-request': handleSyncRequest,
'chat-sync-full': handleSyncFull
};
/**
* Routes an incoming P2P message to the appropriate handler.
* Returns `EMPTY` if the event type is unknown or has no relevant handler.
*/
export function dispatchIncomingMessage(
event: IncomingMessageEvent,
ctx: IncomingMessageContext
): Observable<Action> {
const handler = HANDLER_MAP[event.type];
return handler ? handler(event, ctx) : EMPTY;
}
function trackBackgroundOperation(
task: Promise<unknown> | unknown,
debugging: DebuggingService,
message: string,
payload: Record<string, unknown>
): void {
trackDebuggingTaskFailure(task, debugging, 'messages', message, payload);
}