Files
Toju/toju-app/src/app/store/messages/messages.effects.ts
Myx b2a2d9d770 fix: Bug - Users appear as both online and offline
Align chat message sender ids with per-server presence identities so profile cards opened from message authors resolve the same live user state as the members panel.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-13 20:55:13 +02:00

761 lines
26 KiB
TypeScript

/**
* Core message CRUD effects (load, send, edit, delete, react)
* and the central incoming-message dispatcher.
*
* Sync-lifecycle effects (polling, peer-connect handshakes) live in
* `messages-sync.effects.ts` to keep this file focused.
*
* The giant `incomingMessages$` switch-case has been replaced by a
* handler registry in `messages-incoming.handlers.ts`.
*/
import { Injectable, inject } from '@angular/core';
import {
Actions,
createEffect,
ofType
} from '@ngrx/effects';
import { Store } from '@ngrx/store';
import {
of,
from,
EMPTY
} from 'rxjs';
import {
mergeMap,
catchError,
withLatestFrom,
switchMap,
filter
} from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';
import { MessagesActions } from './messages.actions';
import { selectCurrentUser } from '../users/users.selectors';
import { selectCurrentRoom, selectSavedRooms } from '../rooms/rooms.selectors';
import { RoomsActions } from '../rooms/rooms.actions';
import { selectMessagesEntities } from './messages.selectors';
import { RealtimeSessionFacade } from '../../core/realtime';
import { DatabaseService } from '../../infrastructure/persistence';
import { reportDebuggingError, trackDebuggingTaskFailure } from '../../core/helpers/debugging-helpers';
import { DebuggingService } from '../../core/services';
import { AttachmentFacade } from '../../domains/attachment';
import { CustomEmojiService } from '../../domains/custom-emoji';
import { hasDedicatedChatEmbed } from '../../domains/chat/domain/rules/link-embed.rules';
import { LinkMetadataService } from '../../domains/chat/application/services/link-metadata.service';
import { TimeSyncService } from '../../core/services/time-sync.service';
import { PlatformService } from '../../core/platform';
import { AppI18nService } from '../../core/i18n';
import {
Message,
Reaction,
Room
} from '../../shared-kernel';
import { hydrateMessages } from './messages.helpers';
import { canEditMessage } from '../../domains/chat/domain/rules/message.rules';
import { resolveRoomMessageSenderId } from '../../domains/chat/domain/rules/message-sender-identity.rules';
import { resolveRoomPermission } from '../../domains/access-control';
import { SignalServerAuthService } from '../../domains/authentication/application/services/signal-server-auth.service';
import { dispatchIncomingMessage, IncomingMessageContext } from './messages-incoming.handlers';
import { MessageRevisionService } from '../../domains/chat/application/services/message-revision.service';
import { materializeMessageFromRevision } from '../../domains/chat/domain/rules/message-revision.builder.rules';
import { setStoredCurrentUserId } from '../../core/storage/current-user-storage';
const INITIAL_ROOM_MESSAGE_LIMIT = 30;
/** Cap on simultaneous browser-cache prefetches for apps with many saved rooms. */
const PREFETCH_CONCURRENCY = 3;
@Injectable()
export class MessagesEffects {
private readonly actions$ = inject(Actions);
private readonly store = inject(Store);
private readonly db = inject(DatabaseService);
private readonly debugging = inject(DebuggingService);
private readonly attachments = inject(AttachmentFacade);
private readonly customEmoji = inject(CustomEmojiService);
private readonly webrtc = inject(RealtimeSessionFacade);
private readonly timeSync = inject(TimeSyncService);
private readonly linkMetadata = inject(LinkMetadataService);
private readonly platform = inject(PlatformService);
private readonly i18n = inject(AppI18nService);
private readonly messageRevisions = inject(MessageRevisionService);
private readonly signalServerAuth = inject(SignalServerAuthService);
/** Loads messages for a room from the local database, hydrating reactions. */
loadMessages$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.loadMessages),
withLatestFrom(
this.store.select(selectCurrentRoom),
this.store.select(selectSavedRooms)
),
switchMap(([
{ roomId },
currentRoom,
savedRooms
]) => {
const targetRoom = currentRoom?.id === roomId
? currentRoom
: savedRooms.find((room) => room.id === roomId) ?? null;
return from(this.loadInitialMessages(roomId, targetRoom)).pipe(
mergeMap(async (messages) => {
const hydrated = await hydrateMessages(messages, this.db);
for (const message of hydrated) {
this.attachments.rememberMessageRoom(message.id, message.roomId);
}
return MessagesActions.loadMessagesSuccess({ messages: hydrated });
}),
catchError((error) =>
of(MessagesActions.loadMessagesFailure({ error: error.message }))
)
);
})
)
);
/**
* Background-prefetch initial messages for every saved room after the
* rooms list loads in browser. Electron avoids this path because startup
* IPC prefetch competes with foreground room switches. Results are merged
* into the messages slice via `upsertMany`, leaving the active-room loading
* flag untouched.
*/
prefetchSavedRoomsOnLoad$ = createEffect(() =>
this.actions$.pipe(
ofType(RoomsActions.loadRoomsSuccess),
filter(() => this.platform.isBrowser),
mergeMap(({ rooms }) =>
from(rooms).pipe(
mergeMap(
(room) => of(MessagesActions.prefetchRoomMessages({ roomId: room.id })),
PREFETCH_CONCURRENCY
)
)
)
)
);
prefetchRoomMessages$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.prefetchRoomMessages),
withLatestFrom(this.store.select(selectSavedRooms)),
mergeMap(
([{ roomId }, savedRooms]) =>
from(this.fetchRoomMessagesForPrefetch(roomId, savedRooms.find((room) => room.id === roomId) ?? null)),
PREFETCH_CONCURRENCY
)
)
);
private async fetchRoomMessagesForPrefetch(roomId: string, targetRoom: Room | null) {
try {
const messages = await this.loadInitialMessages(roomId, targetRoom);
const hydrated = await hydrateMessages(messages, this.db);
for (const message of hydrated) {
this.attachments.rememberMessageRoom(message.id, message.roomId);
}
return MessagesActions.prefetchRoomMessagesSuccess({ messages: hydrated });
} catch (error) {
reportDebuggingError(
this.debugging,
'MessagesEffects.prefetchRoomMessages',
'Failed to prefetch room messages',
{ roomId },
error
);
return MessagesActions.prefetchRoomMessagesSuccess({ messages: [] });
}
}
/** Paginates older messages from the local DB for scroll-up history loading. */
loadOlderMessages$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.loadOlderMessages),
mergeMap(({ roomId, channelId, beforeTimestamp, limit }) =>
from(
this.db.getMessages(roomId, limit, 0, channelId, beforeTimestamp)
).pipe(
mergeMap(async (messages) => {
const hydrated = await hydrateMessages(messages, this.db);
for (const message of hydrated) {
this.attachments.rememberMessageRoom(message.id, message.roomId);
}
return MessagesActions.loadOlderMessagesSuccess({
conversationKey: `${roomId}:${channelId}`,
messages: hydrated,
reachedEnd: hydrated.length < limit
});
}),
catchError((error) =>
of(MessagesActions.loadOlderMessagesFailure({ error: error.message }))
)
)
)
)
);
private async loadInitialMessages(roomId: string, targetRoom: Room | null): Promise<Message[]> {
const textChannels = targetRoom?.id === roomId
? (targetRoom.channels ?? []).filter((channel) => channel.type === 'text')
: [];
if (textChannels.length <= 1) {
return this.db.getMessages(roomId, INITIAL_ROOM_MESSAGE_LIMIT, 0, textChannels[0]?.id);
}
const channelMessageSets = await Promise.all(
textChannels.map((channel) => this.db.getMessages(roomId, INITIAL_ROOM_MESSAGE_LIMIT, 0, channel.id))
);
const messagesById = new Map<string, Message>();
for (const messages of channelMessageSets) {
for (const message of messages) {
messagesById.set(message.id, message);
}
}
return [...messagesById.values()].sort((first, second) => first.timestamp - second.timestamp);
}
/** Constructs a new message, persists it locally, and broadcasts to all peers. */
sendMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.sendMessage),
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom)
),
mergeMap(([
{ id, content, replyToId, channelId },
currentUser,
currentRoom
]) => {
if (!currentUser || !currentRoom) {
return of(MessagesActions.sendMessageFailure({ error: this.i18n.instant('chat.effects.notConnectedToRoom') }));
}
const senderId = resolveRoomMessageSenderId(
currentUser,
currentRoom.sourceUrl,
(serverUrl, fallbackUserId) => this.signalServerAuth.resolveActorUserIdForServer(serverUrl, fallbackUserId)
);
const draftMessage: Message = {
id: id ?? uuidv4(),
roomId: currentRoom.id,
channelId: channelId || 'general',
senderId,
senderName: currentUser.displayName || currentUser.username,
content,
timestamp: this.timeSync.now(),
reactions: [],
isDeleted: false,
replyToId,
revision: 0
};
return from((async () => {
const revision = await this.messageRevisions.createSignedRevision({
message: draftMessage,
type: 'create',
actorId: currentUser.id,
editedAt: draftMessage.timestamp
});
const message = materializeMessageFromRevision(null, revision);
setStoredCurrentUserId(currentUser.id);
this.attachments.rememberMessageRoom(message.id, message.roomId);
await this.db.saveMessage(message);
await this.messageRevisions.persistRevision(revision);
this.customEmoji.pushEmojisInContent(content);
this.webrtc.broadcastMessage({ type: 'chat-message', message });
this.messageRevisions.broadcastRevision(revision);
return MessagesActions.sendMessageSuccess({ message });
})());
}),
catchError((error) =>
of(MessagesActions.sendMessageFailure({ error: error.message }))
)
)
);
/** Edits an existing message (author-only), updates DB, and broadcasts the change. */
editMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.editMessage),
withLatestFrom(this.store.select(selectCurrentUser)),
switchMap(([{ messageId, content }, currentUser]) => {
if (!currentUser) {
return of(MessagesActions.editMessageFailure({ error: this.i18n.instant('chat.effects.notLoggedIn') }));
}
return from(this.db.getMessageById(messageId)).pipe(
mergeMap((existing) => {
if (!existing) {
return of(MessagesActions.editMessageFailure({ error: this.i18n.instant('chat.effects.messageNotFound') }));
}
if (!canEditMessage(existing, currentUser.id)) {
return of(MessagesActions.editMessageFailure({ error: this.i18n.instant('chat.effects.cannotEditOthers') }));
}
const editedAt = this.timeSync.now();
return from((async () => {
const revision = await this.messageRevisions.createSignedRevision({
message: existing,
type: 'author-edit',
actorId: currentUser.id,
content,
editedAt
});
const updatedMessage = materializeMessageFromRevision(existing, revision);
this.trackBackgroundOperation(
this.db.saveMessage(updatedMessage),
'Failed to persist edited chat message',
{
contentLength: content.length,
editedAt,
messageId
}
);
this.trackBackgroundOperation(
this.messageRevisions.persistRevision(revision),
'Failed to persist edited message revision',
{ messageId, revision: revision.revision }
);
this.customEmoji.pushEmojisInContent(content);
this.webrtc.broadcastMessage({ type: 'message-edited', messageId, content, editedAt });
this.messageRevisions.broadcastRevision(revision);
return MessagesActions.editMessageSuccess({ messageId, content, editedAt });
})());
}),
catchError((error) =>
of(MessagesActions.editMessageFailure({ error: error.message }))
)
);
})
)
);
/** Soft-deletes a message (author-only), marks it deleted in DB, and broadcasts. */
deleteMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.deleteMessage),
withLatestFrom(this.store.select(selectCurrentUser)),
switchMap(([{ messageId }, currentUser]) => {
if (!currentUser) {
return of(MessagesActions.deleteMessageFailure({ error: this.i18n.instant('chat.effects.notLoggedIn') }));
}
return from(this.db.getMessageById(messageId)).pipe(
mergeMap((existing) => {
if (!existing) {
return of(MessagesActions.deleteMessageFailure({ error: this.i18n.instant('chat.effects.messageNotFound') }));
}
if (!canEditMessage(existing, currentUser.id)) {
return of(MessagesActions.deleteMessageFailure({ error: this.i18n.instant('chat.effects.cannotDeleteOthers') }));
}
const deletedAt = this.timeSync.now();
return from((async () => {
const revision = await this.messageRevisions.createSignedRevision({
message: existing,
type: 'author-delete',
actorId: currentUser.id,
editedAt: deletedAt,
isDeleted: true
});
const deletedMessage = materializeMessageFromRevision(existing, revision);
this.trackBackgroundOperation(
this.db.saveMessage(deletedMessage),
'Failed to persist message deletion',
{ deletedAt, messageId }
);
this.trackBackgroundOperation(
this.messageRevisions.persistRevision(revision),
'Failed to persist deleted message revision',
{ messageId, revision: revision.revision }
);
this.trackBackgroundOperation(
this.attachments.deleteForMessage(messageId),
'Failed to delete message attachments',
{ messageId }
);
this.webrtc.broadcastMessage({ type: 'message-deleted', messageId, deletedAt });
this.messageRevisions.broadcastRevision(revision);
return MessagesActions.deleteMessageSuccess({ messageId });
})());
}),
catchError((error) =>
of(MessagesActions.deleteMessageFailure({ error: error.message }))
)
);
})
)
);
/** Soft-deletes any message (admin+ only). */
adminDeleteMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.adminDeleteMessage),
withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom)),
mergeMap(([
{ messageId },
currentUser,
currentRoom
]) => {
if (!currentUser) {
return of(MessagesActions.deleteMessageFailure({ error: this.i18n.instant('chat.effects.notLoggedIn') }));
}
const hasPermission = !!currentRoom && resolveRoomPermission(currentRoom, currentUser, 'deleteMessages');
if (!hasPermission) {
return of(MessagesActions.deleteMessageFailure({ error: this.i18n.instant('chat.effects.permissionDenied') }));
}
return from(this.db.getMessageById(messageId)).pipe(
mergeMap((existing) => {
if (!existing) {
return of(MessagesActions.deleteMessageFailure({ error: this.i18n.instant('chat.effects.messageNotFound') }));
}
const deletedAt = this.timeSync.now();
return from((async () => {
const revision = await this.messageRevisions.createSignedRevision({
message: existing,
type: 'moderate-delete',
actorId: currentUser.id,
editedAt: deletedAt,
isDeleted: true
});
const deletedMessage = materializeMessageFromRevision(existing, revision);
this.trackBackgroundOperation(
this.db.saveMessage(deletedMessage),
'Failed to persist admin message deletion',
{ deletedBy: currentUser.id, deletedAt, messageId }
);
this.trackBackgroundOperation(
this.messageRevisions.persistRevision(revision),
'Failed to persist moderated delete revision',
{ messageId, revision: revision.revision }
);
this.trackBackgroundOperation(
this.attachments.deleteForMessage(messageId),
'Failed to delete admin-deleted message attachments',
{ deletedBy: currentUser.id, messageId }
);
this.webrtc.broadcastMessage({
type: 'message-deleted',
messageId,
deletedBy: currentUser.id,
deletedAt
});
this.messageRevisions.broadcastRevision(revision);
return MessagesActions.deleteMessageSuccess({ messageId });
})());
})
);
}),
catchError((error) =>
of(MessagesActions.deleteMessageFailure({ error: error.message }))
)
)
);
/** Adds an emoji reaction to a message, persists it, and broadcasts to peers. */
addReaction$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.addReaction),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ messageId, emoji }, currentUser]) => {
if (!currentUser)
return EMPTY;
const reaction: Reaction = {
id: uuidv4(),
messageId,
oderId: currentUser.id,
userId: currentUser.id,
emoji,
timestamp: this.timeSync.now()
};
this.trackBackgroundOperation(
this.db.saveReaction(reaction),
'Failed to persist reaction',
{
emoji,
messageId,
reactionId: reaction.id,
userId: currentUser.id
}
);
this.customEmoji.pushEmojisInContent(emoji);
this.webrtc.broadcastMessage({ type: 'reaction-added',
messageId,
reaction });
return of(MessagesActions.addReactionSuccess({ reaction }));
})
)
);
/** Removes the current user's reaction from a message, deletes from DB, and broadcasts. */
removeReaction$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.removeReaction),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ messageId, emoji }, currentUser]) => {
if (!currentUser)
return EMPTY;
this.trackBackgroundOperation(
this.db.removeReaction(messageId, currentUser.id, emoji),
'Failed to persist reaction removal',
{
emoji,
messageId,
userId: currentUser.id
}
);
this.webrtc.broadcastMessage({
type: 'reaction-removed',
messageId,
oderId: currentUser.id,
emoji
});
return of(
MessagesActions.removeReactionSuccess({
messageId,
oderId: currentUser.id,
emoji
})
);
})
)
);
/**
* Fetches link metadata for newly sent or received messages that
* contain URLs but don't already have metadata attached.
*/
fetchLinkMetadata$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.sendMessageSuccess, MessagesActions.receiveMessage),
mergeMap(({ message }) => {
if (message.isDeleted || message.linkMetadata?.length)
return EMPTY;
const urls = this.linkMetadata.extractUrls(message.content)
.filter((url) => !hasDedicatedChatEmbed(url));
if (urls.length === 0)
return EMPTY;
return from(this.linkMetadata.fetchAllMetadata(urls)).pipe(
mergeMap((metadata) => {
const meaningful = metadata.filter((md) => !md.failed);
if (meaningful.length === 0)
return EMPTY;
this.trackBackgroundOperation(
this.db.updateMessage(message.id, { linkMetadata: meaningful }),
'Failed to persist link metadata',
{ messageId: message.id }
);
return of(MessagesActions.updateLinkMetadata({
messageId: message.id,
linkMetadata: meaningful
}));
}),
catchError(() => EMPTY)
);
})
)
);
/**
* Removes a single link embed from a message, persists the change,
* and updates the store.
*/
removeLinkEmbed$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.removeLinkEmbed),
withLatestFrom(this.store.select(selectMessagesEntities)),
mergeMap(([{ messageId, url }, entities]) => {
const message = entities[messageId];
if (!message?.linkMetadata)
return EMPTY;
const remaining = message.linkMetadata.filter((meta) => meta.url !== url);
this.trackBackgroundOperation(
this.db.updateMessage(messageId, { linkMetadata: remaining.length ? remaining : undefined }),
'Failed to persist link embed removal',
{ messageId }
);
return of(MessagesActions.updateLinkMetadata({
messageId,
linkMetadata: remaining
}));
})
)
);
/**
* Central dispatcher for all incoming P2P messages.
* Delegates to handler functions in `messages-incoming.handlers.ts`.
*/
incomingMessages$ = createEffect(() =>
this.webrtc.onMessageReceived.pipe(
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom),
this.store.select(selectSavedRooms)
),
mergeMap(([
event,
currentUser,
currentRoom,
savedRooms
]) => {
const ctx: IncomingMessageContext = {
db: this.db,
webrtc: this.webrtc,
attachments: this.attachments,
debugging: this.debugging,
messageRevisions: this.messageRevisions,
currentUser: currentUser ?? null,
currentRoom,
savedRooms,
getClientInstanceId: () => this.webrtc.getClientInstanceId()
};
return dispatchIncomingMessage(event as Parameters<typeof dispatchIncomingMessage>[0], ctx).pipe(
catchError((error) => {
const eventRecord = event as unknown as Record<string, unknown>;
const messageRecord = (eventRecord['message'] && typeof eventRecord['message'] === 'object' && !Array.isArray(eventRecord['message']))
? eventRecord['message'] as Record<string, unknown>
: null;
reportDebuggingError(this.debugging, 'messages', 'Failed to process incoming peer message', {
eventType: typeof eventRecord['type'] === 'string' ? eventRecord['type'] : 'unknown',
fromPeerId: typeof eventRecord['fromPeerId'] === 'string' ? eventRecord['fromPeerId'] : null,
messageId: typeof eventRecord['messageId'] === 'string'
? eventRecord['messageId']
: (typeof messageRecord?.['id'] === 'string' ? messageRecord['id'] : null),
roomId: typeof eventRecord['roomId'] === 'string'
? eventRecord['roomId']
: (typeof messageRecord?.['roomId'] === 'string' ? messageRecord['roomId'] : null)
}, error);
return EMPTY;
})
);
})
)
);
incomingSignalingMessages$ = createEffect(() =>
this.webrtc.onSignalingMessage.pipe(
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom),
this.store.select(selectSavedRooms)
),
mergeMap(([
event,
currentUser,
currentRoom,
savedRooms
]) => {
if (event.type !== 'chat_message') {
return EMPTY;
}
const chatRelayEvent = event as {
message?: Message & { clientInstanceId?: string };
clientInstanceId?: string;
fromUserId?: string;
senderId?: string;
serverId?: string;
};
const signalingMessage = chatRelayEvent.message && typeof chatRelayEvent.message === 'object'
? {
...chatRelayEvent.message,
clientInstanceId: chatRelayEvent.message.clientInstanceId ?? chatRelayEvent.clientInstanceId
}
: chatRelayEvent.message;
const ctx: IncomingMessageContext = {
db: this.db,
webrtc: this.webrtc,
attachments: this.attachments,
debugging: this.debugging,
messageRevisions: this.messageRevisions,
currentUser: currentUser ?? null,
currentRoom,
savedRooms,
getClientInstanceId: () => this.webrtc.getClientInstanceId()
};
return dispatchIncomingMessage({
...event,
type: 'chat-message',
fromPeerId: event.fromUserId ?? (event as { senderId?: string }).senderId,
message: signalingMessage
}, ctx).pipe(
catchError((error) => {
reportDebuggingError(this.debugging, 'messages', 'Failed to process incoming signaling chat message', {
eventType: event.type,
fromPeerId: event.fromUserId ?? null,
roomId: event.serverId ?? null
}, error);
return EMPTY;
})
);
})
)
);
private trackBackgroundOperation(task: Promise<unknown> | unknown, message: string, payload: Record<string, unknown>): void {
trackDebuggingTaskFailure(task, this.debugging, 'messages', message, payload);
}
}