/** * Core message CRUD effects (load, send, edit, delete, react) * and the central incoming-message dispatcher. * * Sync-lifecycle effects (polling, peer-connect handshakes) live in * `messages-sync.effects.ts` to keep this file focused. * * The giant `incomingMessages$` switch-case has been replaced by a * handler registry in `messages-incoming.handlers.ts`. */ /* eslint-disable @typescript-eslint/member-ordering */ import { Injectable, inject } from '@angular/core'; import { Actions, createEffect, ofType } from '@ngrx/effects'; import { Store } from '@ngrx/store'; import { of, from, EMPTY } from 'rxjs'; import { mergeMap, catchError, withLatestFrom, switchMap } from 'rxjs/operators'; import { v4 as uuidv4 } from 'uuid'; import { MessagesActions } from './messages.actions'; import { selectCurrentUser } from '../users/users.selectors'; import { selectCurrentRoom } from '../rooms/rooms.selectors'; import { DatabaseService } from '../../core/services/database.service'; import { WebRTCService } from '../../core/services/webrtc.service'; import { TimeSyncService } from '../../core/services/time-sync.service'; import { AttachmentService } from '../../core/services/attachment.service'; import { Message, Reaction } from '../../core/models'; import { hydrateMessages } from './messages.helpers'; import { dispatchIncomingMessage, IncomingMessageContext } from './messages-incoming.handlers'; @Injectable() export class MessagesEffects { private readonly actions$ = inject(Actions); private readonly store = inject(Store); private readonly db = inject(DatabaseService); private readonly webrtc = inject(WebRTCService); private readonly timeSync = inject(TimeSyncService); private readonly attachments = inject(AttachmentService); /** Loads messages for a room from the local database, hydrating reactions. */ loadMessages$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.loadMessages), switchMap(({ roomId }) => from(this.db.getMessages(roomId)).pipe( mergeMap(async (messages) => { const hydrated = await hydrateMessages(messages, this.db); return MessagesActions.loadMessagesSuccess({ messages: hydrated }); }), catchError((error) => of(MessagesActions.loadMessagesFailure({ error: error.message })) ) ) ) ) ); /** Constructs a new message, persists it locally, and broadcasts to all peers. */ sendMessage$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.sendMessage), withLatestFrom( this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom) ), mergeMap(([ { content, replyToId, channelId }, currentUser, currentRoom ]) => { if (!currentUser || !currentRoom) { return of(MessagesActions.sendMessageFailure({ error: 'Not connected to a room' })); } const message: Message = { id: uuidv4(), roomId: currentRoom.id, channelId: channelId || 'general', senderId: currentUser.id, senderName: currentUser.displayName || currentUser.username, content, timestamp: this.timeSync.now(), reactions: [], isDeleted: false, replyToId }; this.db.saveMessage(message); this.webrtc.broadcastMessage({ type: 'chat-message', message }); return of(MessagesActions.sendMessageSuccess({ message })); }), catchError((error) => of(MessagesActions.sendMessageFailure({ error: error.message })) ) ) ); /** Edits an existing message (author-only), updates DB, and broadcasts the change. */ editMessage$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.editMessage), withLatestFrom(this.store.select(selectCurrentUser)), switchMap(([{ messageId, content }, currentUser]) => { if (!currentUser) { return of(MessagesActions.editMessageFailure({ error: 'Not logged in' })); } return from(this.db.getMessageById(messageId)).pipe( mergeMap((existing) => { if (!existing) { return of(MessagesActions.editMessageFailure({ error: 'Message not found' })); } if (existing.senderId !== currentUser.id) { return of(MessagesActions.editMessageFailure({ error: 'Cannot edit others messages' })); } const editedAt = this.timeSync.now(); this.db.updateMessage(messageId, { content, editedAt }); this.webrtc.broadcastMessage({ type: 'message-edited', messageId, content, editedAt }); return of(MessagesActions.editMessageSuccess({ messageId, content, editedAt })); }), catchError((error) => of(MessagesActions.editMessageFailure({ error: error.message })) ) ); }) ) ); /** Soft-deletes a message (author-only), marks it deleted in DB, and broadcasts. */ deleteMessage$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.deleteMessage), withLatestFrom(this.store.select(selectCurrentUser)), switchMap(([{ messageId }, currentUser]) => { if (!currentUser) { return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' })); } return from(this.db.getMessageById(messageId)).pipe( mergeMap((existing) => { if (!existing) { return of(MessagesActions.deleteMessageFailure({ error: 'Message not found' })); } if (existing.senderId !== currentUser.id) { return of(MessagesActions.deleteMessageFailure({ error: 'Cannot delete others messages' })); } this.db.updateMessage(messageId, { isDeleted: true }); this.webrtc.broadcastMessage({ type: 'message-deleted', messageId }); return of(MessagesActions.deleteMessageSuccess({ messageId })); }), catchError((error) => of(MessagesActions.deleteMessageFailure({ error: error.message })) ) ); }) ) ); /** Soft-deletes any message (admin+ only). */ adminDeleteMessage$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.adminDeleteMessage), withLatestFrom(this.store.select(selectCurrentUser)), mergeMap(([{ messageId }, currentUser]) => { if (!currentUser) { return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' })); } const hasPermission = currentUser.role === 'host' || currentUser.role === 'admin' || currentUser.role === 'moderator'; if (!hasPermission) { return of(MessagesActions.deleteMessageFailure({ error: 'Permission denied' })); } this.db.updateMessage(messageId, { isDeleted: true }); this.webrtc.broadcastMessage({ type: 'message-deleted', messageId, deletedBy: currentUser.id }); return of(MessagesActions.deleteMessageSuccess({ messageId })); }), catchError((error) => of(MessagesActions.deleteMessageFailure({ error: error.message })) ) ) ); /** Adds an emoji reaction to a message, persists it, and broadcasts to peers. */ addReaction$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.addReaction), withLatestFrom(this.store.select(selectCurrentUser)), mergeMap(([{ messageId, emoji }, currentUser]) => { if (!currentUser) return EMPTY; const reaction: Reaction = { id: uuidv4(), messageId, oderId: currentUser.id, userId: currentUser.id, emoji, timestamp: this.timeSync.now() }; this.db.saveReaction(reaction); this.webrtc.broadcastMessage({ type: 'reaction-added', messageId, reaction }); return of(MessagesActions.addReactionSuccess({ reaction })); }) ) ); /** Removes the current user's reaction from a message, deletes from DB, and broadcasts. */ removeReaction$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.removeReaction), withLatestFrom(this.store.select(selectCurrentUser)), mergeMap(([{ messageId, emoji }, currentUser]) => { if (!currentUser) return EMPTY; this.db.removeReaction(messageId, currentUser.id, emoji); this.webrtc.broadcastMessage({ type: 'reaction-removed', messageId, oderId: currentUser.id, emoji }); return of( MessagesActions.removeReactionSuccess({ messageId, oderId: currentUser.id, emoji }) ); }) ) ); /** * Central dispatcher for all incoming P2P messages. * Delegates to handler functions in `messages-incoming.handlers.ts`. */ incomingMessages$ = createEffect(() => this.webrtc.onMessageReceived.pipe( withLatestFrom( this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom) ), mergeMap(([ event, currentUser, currentRoom]: [any, any, any ]) => { const ctx: IncomingMessageContext = { db: this.db, webrtc: this.webrtc, attachments: this.attachments, currentUser, currentRoom }; return dispatchIncomingMessage(event, ctx); }) ) ); }