Some checks failed
Queue Release Build / build-linux (push) Blocked by required conditions
Queue Release Build / prepare (push) Successful in 15s
Deploy Web Apps / deploy (push) Successful in 16m15s
Queue Release Build / finalize (push) Has been cancelled
Queue Release Build / build-windows (push) Has been cancelled
427 lines
14 KiB
TypeScript
427 lines
14 KiB
TypeScript
/**
|
|
* Core message CRUD effects (load, send, edit, delete, react)
|
|
* and the central incoming-message dispatcher.
|
|
*
|
|
* Sync-lifecycle effects (polling, peer-connect handshakes) live in
|
|
* `messages-sync.effects.ts` to keep this file focused.
|
|
*
|
|
* The giant `incomingMessages$` switch-case has been replaced by a
|
|
* handler registry in `messages-incoming.handlers.ts`.
|
|
*/
|
|
/* eslint-disable @typescript-eslint/member-ordering */
|
|
import { Injectable, inject } from '@angular/core';
|
|
import {
|
|
Actions,
|
|
createEffect,
|
|
ofType
|
|
} from '@ngrx/effects';
|
|
import { Store } from '@ngrx/store';
|
|
import {
|
|
of,
|
|
from,
|
|
EMPTY
|
|
} from 'rxjs';
|
|
import {
|
|
mergeMap,
|
|
catchError,
|
|
withLatestFrom,
|
|
switchMap
|
|
} from 'rxjs/operators';
|
|
import { v4 as uuidv4 } from 'uuid';
|
|
import { MessagesActions } from './messages.actions';
|
|
import { selectCurrentUser } from '../users/users.selectors';
|
|
import { selectCurrentRoom } from '../rooms/rooms.selectors';
|
|
import { DatabaseService } from '../../core/services/database.service';
|
|
import { reportDebuggingError, trackDebuggingTaskFailure } from '../../core/helpers/debugging-helpers';
|
|
import { DebuggingService } from '../../core/services';
|
|
import { WebRTCService } from '../../core/services/webrtc.service';
|
|
import { TimeSyncService } from '../../core/services/time-sync.service';
|
|
import { AttachmentService } from '../../core/services/attachment.service';
|
|
import {
|
|
DELETED_MESSAGE_CONTENT,
|
|
Message,
|
|
Reaction
|
|
} from '../../core/models/index';
|
|
import { hydrateMessages } from './messages.helpers';
|
|
import { dispatchIncomingMessage, IncomingMessageContext } from './messages-incoming.handlers';
|
|
|
|
@Injectable()
|
|
export class MessagesEffects {
|
|
private readonly actions$ = inject(Actions);
|
|
private readonly store = inject(Store);
|
|
private readonly db = inject(DatabaseService);
|
|
private readonly debugging = inject(DebuggingService);
|
|
private readonly webrtc = inject(WebRTCService);
|
|
private readonly timeSync = inject(TimeSyncService);
|
|
private readonly attachments = inject(AttachmentService);
|
|
|
|
/** Loads messages for a room from the local database, hydrating reactions. */
|
|
loadMessages$ = createEffect(() =>
|
|
this.actions$.pipe(
|
|
ofType(MessagesActions.loadMessages),
|
|
switchMap(({ roomId }) =>
|
|
from(this.db.getMessages(roomId)).pipe(
|
|
mergeMap(async (messages) => {
|
|
const hydrated = await hydrateMessages(messages, this.db);
|
|
|
|
for (const message of hydrated) {
|
|
this.attachments.rememberMessageRoom(message.id, message.roomId);
|
|
}
|
|
|
|
void this.attachments.requestAutoDownloadsForRoom(roomId);
|
|
|
|
return MessagesActions.loadMessagesSuccess({ messages: hydrated });
|
|
}),
|
|
catchError((error) =>
|
|
of(MessagesActions.loadMessagesFailure({ error: error.message }))
|
|
)
|
|
)
|
|
)
|
|
)
|
|
);
|
|
|
|
/** Constructs a new message, persists it locally, and broadcasts to all peers. */
|
|
sendMessage$ = createEffect(() =>
|
|
this.actions$.pipe(
|
|
ofType(MessagesActions.sendMessage),
|
|
withLatestFrom(
|
|
this.store.select(selectCurrentUser),
|
|
this.store.select(selectCurrentRoom)
|
|
),
|
|
mergeMap(([
|
|
{ content, replyToId, channelId },
|
|
currentUser,
|
|
currentRoom
|
|
]) => {
|
|
if (!currentUser || !currentRoom) {
|
|
return of(MessagesActions.sendMessageFailure({ error: 'Not connected to a room' }));
|
|
}
|
|
|
|
const message: Message = {
|
|
id: uuidv4(),
|
|
roomId: currentRoom.id,
|
|
channelId: channelId || 'general',
|
|
senderId: currentUser.id,
|
|
senderName: currentUser.displayName || currentUser.username,
|
|
content,
|
|
timestamp: this.timeSync.now(),
|
|
reactions: [],
|
|
isDeleted: false,
|
|
replyToId
|
|
};
|
|
|
|
this.attachments.rememberMessageRoom(message.id, message.roomId);
|
|
|
|
this.trackBackgroundOperation(
|
|
this.db.saveMessage(message),
|
|
'Failed to persist outgoing chat message',
|
|
{
|
|
channelId: message.channelId,
|
|
contentLength: message.content.length,
|
|
messageId: message.id,
|
|
roomId: message.roomId
|
|
}
|
|
);
|
|
|
|
this.webrtc.broadcastMessage({ type: 'chat-message',
|
|
message });
|
|
|
|
return of(MessagesActions.sendMessageSuccess({ message }));
|
|
}),
|
|
catchError((error) =>
|
|
of(MessagesActions.sendMessageFailure({ error: error.message }))
|
|
)
|
|
)
|
|
);
|
|
|
|
/** Edits an existing message (author-only), updates DB, and broadcasts the change. */
|
|
editMessage$ = createEffect(() =>
|
|
this.actions$.pipe(
|
|
ofType(MessagesActions.editMessage),
|
|
withLatestFrom(this.store.select(selectCurrentUser)),
|
|
switchMap(([{ messageId, content }, currentUser]) => {
|
|
if (!currentUser) {
|
|
return of(MessagesActions.editMessageFailure({ error: 'Not logged in' }));
|
|
}
|
|
|
|
return from(this.db.getMessageById(messageId)).pipe(
|
|
mergeMap((existing) => {
|
|
if (!existing) {
|
|
return of(MessagesActions.editMessageFailure({ error: 'Message not found' }));
|
|
}
|
|
|
|
if (existing.senderId !== currentUser.id) {
|
|
return of(MessagesActions.editMessageFailure({ error: 'Cannot edit others messages' }));
|
|
}
|
|
|
|
const editedAt = this.timeSync.now();
|
|
|
|
this.trackBackgroundOperation(
|
|
this.db.updateMessage(messageId, { content,
|
|
editedAt }),
|
|
'Failed to persist edited chat message',
|
|
{
|
|
contentLength: content.length,
|
|
editedAt,
|
|
messageId
|
|
}
|
|
);
|
|
|
|
this.webrtc.broadcastMessage({ type: 'message-edited',
|
|
messageId,
|
|
content,
|
|
editedAt });
|
|
|
|
return of(MessagesActions.editMessageSuccess({ messageId,
|
|
content,
|
|
editedAt }));
|
|
}),
|
|
catchError((error) =>
|
|
of(MessagesActions.editMessageFailure({ error: error.message }))
|
|
)
|
|
);
|
|
})
|
|
)
|
|
);
|
|
|
|
/** Soft-deletes a message (author-only), marks it deleted in DB, and broadcasts. */
|
|
deleteMessage$ = createEffect(() =>
|
|
this.actions$.pipe(
|
|
ofType(MessagesActions.deleteMessage),
|
|
withLatestFrom(this.store.select(selectCurrentUser)),
|
|
switchMap(([{ messageId }, currentUser]) => {
|
|
if (!currentUser) {
|
|
return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' }));
|
|
}
|
|
|
|
return from(this.db.getMessageById(messageId)).pipe(
|
|
mergeMap((existing) => {
|
|
if (!existing) {
|
|
return of(MessagesActions.deleteMessageFailure({ error: 'Message not found' }));
|
|
}
|
|
|
|
if (existing.senderId !== currentUser.id) {
|
|
return of(MessagesActions.deleteMessageFailure({ error: 'Cannot delete others messages' }));
|
|
}
|
|
|
|
const deletedAt = this.timeSync.now();
|
|
|
|
this.trackBackgroundOperation(
|
|
this.db.updateMessage(messageId, {
|
|
content: DELETED_MESSAGE_CONTENT,
|
|
editedAt: deletedAt,
|
|
isDeleted: true
|
|
}),
|
|
'Failed to persist message deletion',
|
|
{
|
|
deletedAt,
|
|
messageId
|
|
}
|
|
);
|
|
|
|
this.trackBackgroundOperation(
|
|
this.attachments.deleteForMessage(messageId),
|
|
'Failed to delete message attachments',
|
|
{ messageId }
|
|
);
|
|
|
|
this.webrtc.broadcastMessage({ type: 'message-deleted',
|
|
messageId,
|
|
deletedAt });
|
|
|
|
return of(MessagesActions.deleteMessageSuccess({ messageId }));
|
|
}),
|
|
catchError((error) =>
|
|
of(MessagesActions.deleteMessageFailure({ error: error.message }))
|
|
)
|
|
);
|
|
})
|
|
)
|
|
);
|
|
|
|
/** Soft-deletes any message (admin+ only). */
|
|
adminDeleteMessage$ = createEffect(() =>
|
|
this.actions$.pipe(
|
|
ofType(MessagesActions.adminDeleteMessage),
|
|
withLatestFrom(this.store.select(selectCurrentUser)),
|
|
mergeMap(([{ messageId }, currentUser]) => {
|
|
if (!currentUser) {
|
|
return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' }));
|
|
}
|
|
|
|
const hasPermission =
|
|
currentUser.role === 'host' ||
|
|
currentUser.role === 'admin' ||
|
|
currentUser.role === 'moderator';
|
|
|
|
if (!hasPermission) {
|
|
return of(MessagesActions.deleteMessageFailure({ error: 'Permission denied' }));
|
|
}
|
|
|
|
const deletedAt = this.timeSync.now();
|
|
|
|
this.trackBackgroundOperation(
|
|
this.db.updateMessage(messageId, {
|
|
content: DELETED_MESSAGE_CONTENT,
|
|
editedAt: deletedAt,
|
|
isDeleted: true
|
|
}),
|
|
'Failed to persist admin message deletion',
|
|
{
|
|
deletedBy: currentUser.id,
|
|
deletedAt,
|
|
messageId
|
|
}
|
|
);
|
|
|
|
this.trackBackgroundOperation(
|
|
this.attachments.deleteForMessage(messageId),
|
|
'Failed to delete admin-deleted message attachments',
|
|
{
|
|
deletedBy: currentUser.id,
|
|
messageId
|
|
}
|
|
);
|
|
|
|
this.webrtc.broadcastMessage({ type: 'message-deleted',
|
|
messageId,
|
|
deletedBy: currentUser.id,
|
|
deletedAt });
|
|
|
|
return of(MessagesActions.deleteMessageSuccess({ messageId }));
|
|
}),
|
|
catchError((error) =>
|
|
of(MessagesActions.deleteMessageFailure({ error: error.message }))
|
|
)
|
|
)
|
|
);
|
|
|
|
/** Adds an emoji reaction to a message, persists it, and broadcasts to peers. */
|
|
addReaction$ = createEffect(() =>
|
|
this.actions$.pipe(
|
|
ofType(MessagesActions.addReaction),
|
|
withLatestFrom(this.store.select(selectCurrentUser)),
|
|
mergeMap(([{ messageId, emoji }, currentUser]) => {
|
|
if (!currentUser)
|
|
return EMPTY;
|
|
|
|
const reaction: Reaction = {
|
|
id: uuidv4(),
|
|
messageId,
|
|
oderId: currentUser.id,
|
|
userId: currentUser.id,
|
|
emoji,
|
|
timestamp: this.timeSync.now()
|
|
};
|
|
|
|
this.trackBackgroundOperation(
|
|
this.db.saveReaction(reaction),
|
|
'Failed to persist reaction',
|
|
{
|
|
emoji,
|
|
messageId,
|
|
reactionId: reaction.id,
|
|
userId: currentUser.id
|
|
}
|
|
);
|
|
|
|
this.webrtc.broadcastMessage({ type: 'reaction-added',
|
|
messageId,
|
|
reaction });
|
|
|
|
return of(MessagesActions.addReactionSuccess({ reaction }));
|
|
})
|
|
)
|
|
);
|
|
|
|
/** Removes the current user's reaction from a message, deletes from DB, and broadcasts. */
|
|
removeReaction$ = createEffect(() =>
|
|
this.actions$.pipe(
|
|
ofType(MessagesActions.removeReaction),
|
|
withLatestFrom(this.store.select(selectCurrentUser)),
|
|
mergeMap(([{ messageId, emoji }, currentUser]) => {
|
|
if (!currentUser)
|
|
return EMPTY;
|
|
|
|
this.trackBackgroundOperation(
|
|
this.db.removeReaction(messageId, currentUser.id, emoji),
|
|
'Failed to persist reaction removal',
|
|
{
|
|
emoji,
|
|
messageId,
|
|
userId: currentUser.id
|
|
}
|
|
);
|
|
|
|
this.webrtc.broadcastMessage({
|
|
type: 'reaction-removed',
|
|
messageId,
|
|
oderId: currentUser.id,
|
|
emoji
|
|
});
|
|
|
|
return of(
|
|
MessagesActions.removeReactionSuccess({
|
|
messageId,
|
|
oderId: currentUser.id,
|
|
emoji
|
|
})
|
|
);
|
|
})
|
|
)
|
|
);
|
|
|
|
/**
|
|
* Central dispatcher for all incoming P2P messages.
|
|
* Delegates to handler functions in `messages-incoming.handlers.ts`.
|
|
*/
|
|
incomingMessages$ = createEffect(() =>
|
|
this.webrtc.onMessageReceived.pipe(
|
|
withLatestFrom(
|
|
this.store.select(selectCurrentUser),
|
|
this.store.select(selectCurrentRoom)
|
|
),
|
|
mergeMap(([
|
|
event,
|
|
currentUser,
|
|
currentRoom
|
|
]) => {
|
|
const ctx: IncomingMessageContext = {
|
|
db: this.db,
|
|
webrtc: this.webrtc,
|
|
attachments: this.attachments,
|
|
debugging: this.debugging,
|
|
currentUser: currentUser ?? null,
|
|
currentRoom
|
|
};
|
|
|
|
return dispatchIncomingMessage(event, ctx).pipe(
|
|
catchError((error) => {
|
|
const eventRecord = event as unknown as Record<string, unknown>;
|
|
const messageRecord = (eventRecord['message'] && typeof eventRecord['message'] === 'object' && !Array.isArray(eventRecord['message']))
|
|
? eventRecord['message'] as Record<string, unknown>
|
|
: null;
|
|
|
|
reportDebuggingError(this.debugging, 'messages', 'Failed to process incoming peer message', {
|
|
eventType: typeof eventRecord['type'] === 'string' ? eventRecord['type'] : 'unknown',
|
|
fromPeerId: typeof eventRecord['fromPeerId'] === 'string' ? eventRecord['fromPeerId'] : null,
|
|
messageId: typeof eventRecord['messageId'] === 'string'
|
|
? eventRecord['messageId']
|
|
: (typeof messageRecord?.['id'] === 'string' ? messageRecord['id'] : null),
|
|
roomId: typeof eventRecord['roomId'] === 'string'
|
|
? eventRecord['roomId']
|
|
: (typeof messageRecord?.['roomId'] === 'string' ? messageRecord['roomId'] : null)
|
|
}, error);
|
|
|
|
return EMPTY;
|
|
})
|
|
);
|
|
})
|
|
)
|
|
);
|
|
|
|
private trackBackgroundOperation(task: Promise<unknown> | unknown, message: string, payload: Record<string, unknown>): void {
|
|
trackDebuggingTaskFailure(task, this.debugging, 'messages', message, payload);
|
|
}
|
|
}
|