Files
Toju/toju-app/src/app/store/messages/messages.effects.ts
Myx 84fa45985a feat: Add chat embeds v1
Youtube and Website metadata embeds
2026-04-04 04:47:04 +02:00

503 lines
16 KiB
TypeScript

/**
* Core message CRUD effects (load, send, edit, delete, react)
* and the central incoming-message dispatcher.
*
* Sync-lifecycle effects (polling, peer-connect handshakes) live in
* `messages-sync.effects.ts` to keep this file focused.
*
* The giant `incomingMessages$` switch-case has been replaced by a
* handler registry in `messages-incoming.handlers.ts`.
*/
/* eslint-disable @typescript-eslint/member-ordering */
import { Injectable, inject } from '@angular/core';
import {
Actions,
createEffect,
ofType
} from '@ngrx/effects';
import { Store } from '@ngrx/store';
import {
of,
from,
EMPTY
} from 'rxjs';
import {
mergeMap,
catchError,
withLatestFrom,
switchMap
} from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';
import { MessagesActions } from './messages.actions';
import { selectCurrentUser } from '../users/users.selectors';
import { selectCurrentRoom } from '../rooms/rooms.selectors';
import { selectMessagesEntities } from './messages.selectors';
import { RealtimeSessionFacade } from '../../core/realtime';
import { DatabaseService } from '../../infrastructure/persistence';
import { reportDebuggingError, trackDebuggingTaskFailure } from '../../core/helpers/debugging-helpers';
import { DebuggingService } from '../../core/services';
import { AttachmentFacade } from '../../domains/attachment';
import { LinkMetadataService } from '../../domains/chat/application/link-metadata.service';
import { TimeSyncService } from '../../core/services/time-sync.service';
import {
DELETED_MESSAGE_CONTENT,
Message,
Reaction
} from '../../shared-kernel';
import { hydrateMessages } from './messages.helpers';
import { canEditMessage } from '../../domains/chat/domain/message.rules';
import { resolveRoomPermission } from '../../domains/access-control';
import { dispatchIncomingMessage, IncomingMessageContext } from './messages-incoming.handlers';
@Injectable()
export class MessagesEffects {
private readonly actions$ = inject(Actions);
private readonly store = inject(Store);
private readonly db = inject(DatabaseService);
private readonly debugging = inject(DebuggingService);
private readonly attachments = inject(AttachmentFacade);
private readonly webrtc = inject(RealtimeSessionFacade);
private readonly timeSync = inject(TimeSyncService);
private readonly linkMetadata = inject(LinkMetadataService);
/** Loads messages for a room from the local database, hydrating reactions. */
loadMessages$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.loadMessages),
switchMap(({ roomId }) =>
from(this.db.getMessages(roomId)).pipe(
mergeMap(async (messages) => {
const hydrated = await hydrateMessages(messages, this.db);
for (const message of hydrated) {
this.attachments.rememberMessageRoom(message.id, message.roomId);
}
void this.attachments.requestAutoDownloadsForRoom(roomId);
return MessagesActions.loadMessagesSuccess({ messages: hydrated });
}),
catchError((error) =>
of(MessagesActions.loadMessagesFailure({ error: error.message }))
)
)
)
)
);
/** Constructs a new message, persists it locally, and broadcasts to all peers. */
sendMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.sendMessage),
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom)
),
mergeMap(([
{ content, replyToId, channelId },
currentUser,
currentRoom
]) => {
if (!currentUser || !currentRoom) {
return of(MessagesActions.sendMessageFailure({ error: 'Not connected to a room' }));
}
const message: Message = {
id: uuidv4(),
roomId: currentRoom.id,
channelId: channelId || 'general',
senderId: currentUser.id,
senderName: currentUser.displayName || currentUser.username,
content,
timestamp: this.timeSync.now(),
reactions: [],
isDeleted: false,
replyToId
};
this.attachments.rememberMessageRoom(message.id, message.roomId);
this.trackBackgroundOperation(
this.db.saveMessage(message),
'Failed to persist outgoing chat message',
{
channelId: message.channelId,
contentLength: message.content.length,
messageId: message.id,
roomId: message.roomId
}
);
this.webrtc.broadcastMessage({ type: 'chat-message',
message });
return of(MessagesActions.sendMessageSuccess({ message }));
}),
catchError((error) =>
of(MessagesActions.sendMessageFailure({ error: error.message }))
)
)
);
/** Edits an existing message (author-only), updates DB, and broadcasts the change. */
editMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.editMessage),
withLatestFrom(this.store.select(selectCurrentUser)),
switchMap(([{ messageId, content }, currentUser]) => {
if (!currentUser) {
return of(MessagesActions.editMessageFailure({ error: 'Not logged in' }));
}
return from(this.db.getMessageById(messageId)).pipe(
mergeMap((existing) => {
if (!existing) {
return of(MessagesActions.editMessageFailure({ error: 'Message not found' }));
}
if (!canEditMessage(existing, currentUser.id)) {
return of(MessagesActions.editMessageFailure({ error: 'Cannot edit others messages' }));
}
const editedAt = this.timeSync.now();
this.trackBackgroundOperation(
this.db.updateMessage(messageId, { content,
editedAt }),
'Failed to persist edited chat message',
{
contentLength: content.length,
editedAt,
messageId
}
);
this.webrtc.broadcastMessage({ type: 'message-edited',
messageId,
content,
editedAt });
return of(MessagesActions.editMessageSuccess({ messageId,
content,
editedAt }));
}),
catchError((error) =>
of(MessagesActions.editMessageFailure({ error: error.message }))
)
);
})
)
);
/** Soft-deletes a message (author-only), marks it deleted in DB, and broadcasts. */
deleteMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.deleteMessage),
withLatestFrom(this.store.select(selectCurrentUser)),
switchMap(([{ messageId }, currentUser]) => {
if (!currentUser) {
return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' }));
}
return from(this.db.getMessageById(messageId)).pipe(
mergeMap((existing) => {
if (!existing) {
return of(MessagesActions.deleteMessageFailure({ error: 'Message not found' }));
}
if (!canEditMessage(existing, currentUser.id)) {
return of(MessagesActions.deleteMessageFailure({ error: 'Cannot delete others messages' }));
}
const deletedAt = this.timeSync.now();
this.trackBackgroundOperation(
this.db.updateMessage(messageId, {
content: DELETED_MESSAGE_CONTENT,
editedAt: deletedAt,
isDeleted: true
}),
'Failed to persist message deletion',
{
deletedAt,
messageId
}
);
this.trackBackgroundOperation(
this.attachments.deleteForMessage(messageId),
'Failed to delete message attachments',
{ messageId }
);
this.webrtc.broadcastMessage({ type: 'message-deleted',
messageId,
deletedAt });
return of(MessagesActions.deleteMessageSuccess({ messageId }));
}),
catchError((error) =>
of(MessagesActions.deleteMessageFailure({ error: error.message }))
)
);
})
)
);
/** Soft-deletes any message (admin+ only). */
adminDeleteMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.adminDeleteMessage),
withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom)),
mergeMap(([
{ messageId },
currentUser,
currentRoom
]) => {
if (!currentUser) {
return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' }));
}
const hasPermission = !!currentRoom && resolveRoomPermission(currentRoom, currentUser, 'deleteMessages');
if (!hasPermission) {
return of(MessagesActions.deleteMessageFailure({ error: 'Permission denied' }));
}
const deletedAt = this.timeSync.now();
this.trackBackgroundOperation(
this.db.updateMessage(messageId, {
content: DELETED_MESSAGE_CONTENT,
editedAt: deletedAt,
isDeleted: true
}),
'Failed to persist admin message deletion',
{
deletedBy: currentUser.id,
deletedAt,
messageId
}
);
this.trackBackgroundOperation(
this.attachments.deleteForMessage(messageId),
'Failed to delete admin-deleted message attachments',
{
deletedBy: currentUser.id,
messageId
}
);
this.webrtc.broadcastMessage({ type: 'message-deleted',
messageId,
deletedBy: currentUser.id,
deletedAt });
return of(MessagesActions.deleteMessageSuccess({ messageId }));
}),
catchError((error) =>
of(MessagesActions.deleteMessageFailure({ error: error.message }))
)
)
);
/** Adds an emoji reaction to a message, persists it, and broadcasts to peers. */
addReaction$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.addReaction),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ messageId, emoji }, currentUser]) => {
if (!currentUser)
return EMPTY;
const reaction: Reaction = {
id: uuidv4(),
messageId,
oderId: currentUser.id,
userId: currentUser.id,
emoji,
timestamp: this.timeSync.now()
};
this.trackBackgroundOperation(
this.db.saveReaction(reaction),
'Failed to persist reaction',
{
emoji,
messageId,
reactionId: reaction.id,
userId: currentUser.id
}
);
this.webrtc.broadcastMessage({ type: 'reaction-added',
messageId,
reaction });
return of(MessagesActions.addReactionSuccess({ reaction }));
})
)
);
/** Removes the current user's reaction from a message, deletes from DB, and broadcasts. */
removeReaction$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.removeReaction),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ messageId, emoji }, currentUser]) => {
if (!currentUser)
return EMPTY;
this.trackBackgroundOperation(
this.db.removeReaction(messageId, currentUser.id, emoji),
'Failed to persist reaction removal',
{
emoji,
messageId,
userId: currentUser.id
}
);
this.webrtc.broadcastMessage({
type: 'reaction-removed',
messageId,
oderId: currentUser.id,
emoji
});
return of(
MessagesActions.removeReactionSuccess({
messageId,
oderId: currentUser.id,
emoji
})
);
})
)
);
/**
* Fetches link metadata for newly sent or received messages that
* contain URLs but don't already have metadata attached.
*/
fetchLinkMetadata$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.sendMessageSuccess, MessagesActions.receiveMessage),
mergeMap(({ message }) => {
if (message.isDeleted || message.linkMetadata?.length)
return EMPTY;
const urls = this.linkMetadata.extractUrls(message.content);
if (urls.length === 0)
return EMPTY;
return from(this.linkMetadata.fetchAllMetadata(urls)).pipe(
mergeMap((metadata) => {
const meaningful = metadata.filter((md) => !md.failed);
if (meaningful.length === 0)
return EMPTY;
this.trackBackgroundOperation(
this.db.updateMessage(message.id, { linkMetadata: meaningful }),
'Failed to persist link metadata',
{ messageId: message.id }
);
return of(MessagesActions.updateLinkMetadata({
messageId: message.id,
linkMetadata: meaningful
}));
}),
catchError(() => EMPTY)
);
})
)
);
/**
* Removes a single link embed from a message, persists the change,
* and updates the store.
*/
removeLinkEmbed$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.removeLinkEmbed),
withLatestFrom(this.store.select(selectMessagesEntities)),
mergeMap(([{ messageId, url }, entities]) => {
const message = entities[messageId];
if (!message?.linkMetadata)
return EMPTY;
const remaining = message.linkMetadata.filter((meta) => meta.url !== url);
this.trackBackgroundOperation(
this.db.updateMessage(messageId, { linkMetadata: remaining.length ? remaining : undefined }),
'Failed to persist link embed removal',
{ messageId }
);
return of(MessagesActions.updateLinkMetadata({
messageId,
linkMetadata: remaining
}));
})
)
);
/**
* Central dispatcher for all incoming P2P messages.
* Delegates to handler functions in `messages-incoming.handlers.ts`.
*/
incomingMessages$ = createEffect(() =>
this.webrtc.onMessageReceived.pipe(
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom)
),
mergeMap(([
event,
currentUser,
currentRoom
]) => {
const ctx: IncomingMessageContext = {
db: this.db,
webrtc: this.webrtc,
attachments: this.attachments,
debugging: this.debugging,
currentUser: currentUser ?? null,
currentRoom
};
return dispatchIncomingMessage(event, ctx).pipe(
catchError((error) => {
const eventRecord = event as unknown as Record<string, unknown>;
const messageRecord = (eventRecord['message'] && typeof eventRecord['message'] === 'object' && !Array.isArray(eventRecord['message']))
? eventRecord['message'] as Record<string, unknown>
: null;
reportDebuggingError(this.debugging, 'messages', 'Failed to process incoming peer message', {
eventType: typeof eventRecord['type'] === 'string' ? eventRecord['type'] : 'unknown',
fromPeerId: typeof eventRecord['fromPeerId'] === 'string' ? eventRecord['fromPeerId'] : null,
messageId: typeof eventRecord['messageId'] === 'string'
? eventRecord['messageId']
: (typeof messageRecord?.['id'] === 'string' ? messageRecord['id'] : null),
roomId: typeof eventRecord['roomId'] === 'string'
? eventRecord['roomId']
: (typeof messageRecord?.['roomId'] === 'string' ? messageRecord['roomId'] : null)
}, error);
return EMPTY;
})
);
})
)
);
private trackBackgroundOperation(task: Promise<unknown> | unknown, message: string, payload: Record<string, unknown>): void {
trackDebuggingTaskFailure(task, this.debugging, 'messages', message, payload);
}
}