diff --git a/electron/cqrs/queries/handlers/getMessages.ts b/electron/cqrs/queries/handlers/getMessages.ts index b5a3286..eaafd70 100644 --- a/electron/cqrs/queries/handlers/getMessages.ts +++ b/electron/cqrs/queries/handlers/getMessages.ts @@ -7,19 +7,34 @@ import { getCurrentUserScope } from '../../current-user-scope'; export async function handleGetMessages(query: GetMessagesQuery, dataSource: DataSource) { const repo = dataSource.getRepository(MessageEntity); - const { roomId, limit = 100, offset = 0 } = query.payload; + const { roomId, limit = 100, offset = 0, channelId, beforeTimestamp } = query.payload; const currentUserId = await getCurrentUserScope(dataSource); if (!currentUserId) { return []; } - const rows = await repo.find({ - where: { roomId, ownerUserId: currentUserId }, - order: { timestamp: 'DESC' }, - take: limit, - skip: offset - }); + const rowsQuery = repo.createQueryBuilder('message') + .where('message.roomId = :roomId', { roomId }) + .andWhere('message.ownerUserId = :currentUserId', { currentUserId }) + .orderBy('message.timestamp', 'DESC') + .take(limit) + .skip(offset); + + if (channelId === 'general') { + rowsQuery.andWhere('(message.channelId = :channelId OR message.channelId IS NULL OR message.channelId = :emptyChannelId)', { + channelId, + emptyChannelId: '' + }); + } else if (channelId) { + rowsQuery.andWhere('message.channelId = :channelId', { channelId }); + } + + if (typeof beforeTimestamp === 'number') { + rowsQuery.andWhere('message.timestamp < :beforeTimestamp', { beforeTimestamp }); + } + + const rows = await rowsQuery.getMany(); const chronologicalRows = [...rows].reverse(); const reactionsByMessageId = await loadMessageReactionsMap(dataSource, chronologicalRows.map((row) => row.id)); diff --git a/electron/cqrs/types.ts b/electron/cqrs/types.ts index 3f39bdb..011560e 100644 --- a/electron/cqrs/types.ts +++ b/electron/cqrs/types.ts @@ -230,7 +230,16 @@ export type Command = | SaveMetaCommand | ClearAllDataCommand; -export interface GetMessagesQuery { type: typeof QueryType.GetMessages; payload: { roomId: string; limit?: number; offset?: number } } +export interface GetMessagesQuery { + type: typeof QueryType.GetMessages; + payload: { + roomId: string; + limit?: number; + offset?: number; + channelId?: string; + beforeTimestamp?: number; + }; +} export interface GetMessagesSinceQuery { type: typeof QueryType.GetMessagesSince; payload: { roomId: string; sinceTimestamp: number } } export interface GetMessageByIdQuery { type: typeof QueryType.GetMessageById; payload: { messageId: string } } export interface GetReactionsForMessageQuery { type: typeof QueryType.GetReactionsForMessage; payload: { messageId: string } } diff --git a/toju-app/src/app/domains/chat/domain/rules/message-sync.rules.ts b/toju-app/src/app/domains/chat/domain/rules/message-sync.rules.ts index e68bdbd..2cbda02 100644 --- a/toju-app/src/app/domains/chat/domain/rules/message-sync.rules.ts +++ b/toju-app/src/app/domains/chat/domain/rules/message-sync.rules.ts @@ -1,5 +1,11 @@ -/** Maximum number of recent messages to include in sync inventories. */ -export const INVENTORY_LIMIT = 1000; +/** Maximum number of messages to include in sync inventories. + * + * The inventory protocol now ships every message in the room (id, ts, rc, ac) + * chunked at `CHUNK_SIZE`, so peers converge on the full history regardless + * of how lopsided their message counts are. The constant remains as a safety + * ceiling for pathological rooms. + */ +export const INVENTORY_LIMIT = 1_000_000; /** Number of messages per chunk for inventory / batch transfers. */ export const CHUNK_SIZE = 200; @@ -14,7 +20,7 @@ export const SYNC_POLL_SLOW_MS = 900_000; export const SYNC_TIMEOUT_MS = 5_000; /** Large limit used for legacy full-sync operations. */ -export const FULL_SYNC_LIMIT = 10_000; +export const FULL_SYNC_LIMIT = 1_000_000; /** Inventory item representing a message's sync state. */ export interface InventoryItem { diff --git a/toju-app/src/app/domains/chat/feature/chat-messages/chat-messages.component.html b/toju-app/src/app/domains/chat/feature/chat-messages/chat-messages.component.html index e1a5ac3..4ac8da7 100644 --- a/toju-app/src/app/domains/chat/feature/chat-messages/chat-messages.component.html +++ b/toju-app/src/app/domains/chat/feature/chat-messages/chat-messages.component.html @@ -11,6 +11,8 @@ [isAdmin]="isAdmin()" [bottomPadding]="composerBottomPadding()" [conversationKey]="conversationKey()" + [loadingOlder]="loadingOlder()" + [conversationExhausted]="conversationExhausted()" (replyRequested)="setReplyTo($event)" (deleteRequested)="handleDeleteRequested($event)" (editSaved)="handleEditSaved($event)" @@ -20,6 +22,7 @@ (imageOpened)="openLightbox($event)" (imageContextMenuRequested)="openImageContextMenu($event)" (embedRemoved)="handleEmbedRemoved($event)" + (loadOlderRequested)="handleLoadOlderRequested($event)" />
`${this.currentRoom()?.id ?? 'no-room'}:${this.activeChannelId() ?? 'general'}`); + readonly conversationExhausted = toSignal( + toObservable(this.conversationKey).pipe( + switchMap((key) => this.store.select(selectConversationExhausted(key))) + ), + { initialValue: false } + ); readonly klipyEnabled = computed(() => this.klipy.isEnabled(this.currentRoom())); readonly composerBottomPadding = signal(140); readonly klipyGifPickerAnchorRight = signal(16); @@ -213,6 +224,22 @@ export class ChatMessagesComponent { ); } + handleLoadOlderRequested(event: { beforeTimestamp: number; limit: number }): void { + const roomId = this.currentRoom()?.id; + + if (!roomId) + return; + + this.store.dispatch( + MessagesActions.loadOlderMessages({ + roomId, + channelId: this.activeChannelId() ?? 'general', + beforeTimestamp: event.beforeTimestamp, + limit: event.limit + }) + ); + } + toggleKlipyGifPicker(): void { const nextState = !this.showKlipyGifPicker(); diff --git a/toju-app/src/app/domains/chat/feature/chat-messages/components/message-item/chat-message-item.component.ts b/toju-app/src/app/domains/chat/feature/chat-messages/components/message-item/chat-message-item.component.ts index 2efdcfc..21f0249 100644 --- a/toju-app/src/app/domains/chat/feature/chat-messages/components/message-item/chat-message-item.component.ts +++ b/toju-app/src/app/domains/chat/feature/chat-messages/components/message-item/chat-message-item.component.ts @@ -2,6 +2,7 @@ import { CommonModule } from '@angular/common'; import { FormsModule } from '@angular/forms'; import { + ChangeDetectionStrategy, Component, computed, ElementRef, @@ -153,6 +154,7 @@ interface MissingPluginEmbedFallback { ], templateUrl: './chat-message-item.component.html', styleUrl: './chat-message-item.component.scss', + changeDetection: ChangeDetectionStrategy.OnPush, host: { style: 'display: contents;' } diff --git a/toju-app/src/app/domains/chat/feature/chat-messages/components/message-list/chat-message-list.component.ts b/toju-app/src/app/domains/chat/feature/chat-messages/components/message-list/chat-message-list.component.ts index 4211d64..ea516f2 100644 --- a/toju-app/src/app/domains/chat/feature/chat-messages/components/message-list/chat-message-list.component.ts +++ b/toju-app/src/app/domains/chat/feature/chat-messages/components/message-list/chat-message-list.component.ts @@ -2,6 +2,7 @@ import { CommonModule } from '@angular/common'; import { AfterViewChecked, + ChangeDetectionStrategy, Component, ElementRef, OnDestroy, @@ -48,6 +49,7 @@ declare global { ThemeNodeDirective ], templateUrl: './chat-message-list.component.html', + changeDetection: ChangeDetectionStrategy.OnPush, host: { style: 'display: contents;' } @@ -82,6 +84,16 @@ export class ChatMessageListComponent implements AfterViewChecked, OnDestroy { readonly imageOpened = output(); readonly imageContextMenuRequested = output(); readonly embedRemoved = output(); + /** + * Emitted when the user scrolls up past the in-store window and the + * component needs the parent to fetch an older page from the DB. + */ + readonly loadOlderRequested = output<{ beforeTimestamp: number; limit: number }>(); + + /** True while a DB-backed older-page fetch dispatched by the parent is in flight. */ + readonly loadingOlder = input(false); + /** True once the parent has paginated all the way back to the start of DB history. */ + readonly conversationExhausted = input(false); private readonly PAGE_SIZE = 50; @@ -141,6 +153,21 @@ export class ChatMessageListComponent implements AfterViewChecked, OnDestroy { return lookup; }); + /** + * O(1) index of messages by id, built once per `allMessages()` change. + * Used by `findRepliedMessage` so each rendered row doing a reply lookup + * costs a Map.get instead of an Array.find over the full message list. + */ + private readonly messagesById = computed>(() => { + const index = new Map(); + + for (const message of this.allMessages()) { + index.set(message.id, message); + } + + return index; + }); + private bottomScrollObserver: MutationObserver | null = null; private bottomScrollTimer: ReturnType | null = null; private boundOnImageLoad: (() => void) | null = null; @@ -150,12 +177,41 @@ export class ChatMessageListComponent implements AfterViewChecked, OnDestroy { private lastMessageCount = 0; private initialScrollPending = true; private prismHighlightScheduled = false; + /** + * Set when an older-page DB fetch is in flight. While true, the + * `onMessagesChanged` effect treats incoming message-count growth as a + * prepend (older history arriving) and preserves the user's scroll + * position instead of running sticky-bottom / new-messages-indicator + * logic. + */ + private pendingOlderFetchScrollHeight: number | null = null; private readonly onConversationChanged = effect(() => { void this.conversationKey(); this.resetScrollingState(); }); + /** + * Clears the in-flight older-fetch flag when the parent reports the + * load has finished (regardless of how many rows were returned, even + * zero). Without this, `loadingMore` would stick on if the DB had no + * rows older than the cursor. + */ + private readonly onLoadingOlderChanged = effect(() => { + const inFlight = this.loadingOlder(); + + if (!inFlight && this.pendingOlderFetchScrollHeight !== null) { + // If onMessagesChanged already consumed the pending state because + // rows arrived, this is a no-op; otherwise we clear it now. + queueMicrotask(() => { + if (this.pendingOlderFetchScrollHeight !== null) { + this.pendingOlderFetchScrollHeight = null; + this.loadingMore.set(false); + } + }); + } + }); + private readonly onMessagesChanged = effect(() => { const currentCount = this.channelMessages().length; const element = this.messagesContainer?.nativeElement; @@ -170,6 +226,36 @@ export class ChatMessageListComponent implements AfterViewChecked, OnDestroy { return; } + // Handle older-history backfill: messages were prepended, not appended. + // Reveal the new rows by widening the display window, and preserve the + // user's visual scroll position across the height change. We skip the + // sticky-bottom / new-messages-indicator logic entirely for this path. + if (this.pendingOlderFetchScrollHeight !== null && currentCount > this.lastMessageCount) { + const previousScrollHeight = this.pendingOlderFetchScrollHeight; + const previousScrollTop = element.scrollTop; + const newlyLoaded = currentCount - this.lastMessageCount; + + this.pendingOlderFetchScrollHeight = null; + this.displayLimit.update((limit) => limit + newlyLoaded); + + requestAnimationFrame(() => { + requestAnimationFrame(() => { + const container = this.messagesContainer?.nativeElement; + + if (container) { + const newScrollHeight = container.scrollHeight; + + container.scrollTop = previousScrollTop + (newScrollHeight - previousScrollHeight); + } + + this.loadingMore.set(false); + }); + }); + + this.lastMessageCount = currentCount; + return; + } + const distanceFromBottom = element.scrollHeight - element.scrollTop - element.clientHeight; const newMessages = currentCount > this.lastMessageCount; const forceLocalSendScroll = this.shouldForceLocalSendScroll(); @@ -232,7 +318,7 @@ export class ChatMessageListComponent implements AfterViewChecked, OnDestroy { if (!messageId) return undefined; - return this.allMessages().find((message) => message.id === messageId); + return this.messagesById().get(messageId); } onScroll(): void { @@ -252,32 +338,68 @@ export class ChatMessageListComponent implements AfterViewChecked, OnDestroy { this.stopBottomScrollWatch(); } - if (element.scrollTop < 150 && this.hasMoreMessages() && !this.loadingMore()) { - this.loadMore(); + if (element.scrollTop < 150 && !this.loadingMore()) { + const canFetchOlderFromDb = + !this.hasMoreMessages() + && !this.conversationExhausted() + && !this.loadingOlder() + && this.channelMessages().length > 0; + + if (this.hasMoreMessages() || canFetchOlderFromDb) { + this.loadMore(); + } } } loadMore(): void { - if (this.loadingMore() || !this.hasMoreMessages()) + if (this.loadingMore()) return; - this.loadingMore.set(true); + // Case 1: there are still in-store messages above the rendered window. + // Just widen the display window and preserve scroll position. + if (this.hasMoreMessages()) { + this.loadingMore.set(true); - const element = this.messagesContainer?.nativeElement; - const previousScrollHeight = element?.scrollHeight ?? 0; + const element = this.messagesContainer?.nativeElement; + const previousScrollHeight = element?.scrollHeight ?? 0; - this.displayLimit.update((limit) => limit + this.PAGE_SIZE); + this.displayLimit.update((limit) => limit + this.PAGE_SIZE); - requestAnimationFrame(() => { requestAnimationFrame(() => { - if (element) { - const newScrollHeight = element.scrollHeight; + requestAnimationFrame(() => { + if (element) { + const newScrollHeight = element.scrollHeight; - element.scrollTop += newScrollHeight - previousScrollHeight; - } + element.scrollTop += newScrollHeight - previousScrollHeight; + } - this.loadingMore.set(false); + this.loadingMore.set(false); + }); }); + + return; + } + + // Case 2: in-store window is exhausted. Ask the parent to fetch the + // next older page from the DB. The parent dispatches loadOlderMessages + // and the resulting store update is handled by onMessagesChanged via + // pendingOlderFetchScrollHeight (prepend-aware scroll preservation). + if (this.loadingOlder() || this.conversationExhausted()) + return; + + const all = this.channelMessages(); + + if (all.length === 0) + return; + + const oldest = all[0]; + const element = this.messagesContainer?.nativeElement; + + this.loadingMore.set(true); + this.pendingOlderFetchScrollHeight = element?.scrollHeight ?? 0; + this.loadOlderRequested.emit({ + beforeTimestamp: oldest.timestamp, + limit: this.PAGE_SIZE }); } @@ -359,6 +481,8 @@ export class ChatMessageListComponent implements AfterViewChecked, OnDestroy { this.showNewMessagesBar.set(false); this.lastMessageCount = 0; this.displayLimit.set(this.PAGE_SIZE); + this.pendingOlderFetchScrollHeight = null; + this.loadingMore.set(false); } private startBottomScrollWatch(): void { diff --git a/toju-app/src/app/domains/plugins/application/services/plugin-client-api.service.ts b/toju-app/src/app/domains/plugins/application/services/plugin-client-api.service.ts index 1484000..26f8284 100644 --- a/toju-app/src/app/domains/plugins/application/services/plugin-client-api.service.ts +++ b/toju-app/src/app/domains/plugins/application/services/plugin-client-api.service.ts @@ -17,6 +17,7 @@ import type { User } from '../../../../shared-kernel'; import { MessagesActions } from '../../../../store/messages/messages.actions'; +import { CHUNK_SIZE, chunkArray } from '../../../../store/messages/messages.helpers'; import { selectCurrentRoomMessages } from '../../../../store/messages/messages.selectors'; import { RoomsActions } from '../../../../store/rooms/rooms.actions'; import { @@ -27,6 +28,8 @@ import { } from '../../../../store/rooms/rooms.selectors'; import { UsersActions } from '../../../../store/users/users.actions'; import { selectAllUsers, selectCurrentUser } from '../../../../store/users/users.selectors'; +import { defaultChannels } from '../../../../store/rooms/room-channels.defaults'; +import { isChannelNameTaken, normalizeChannelName } from '../../../../store/rooms/room-channels.rules'; import type { PluginApiAvatarUpdate, PluginApiActionContext, @@ -77,11 +80,11 @@ export class PluginClientApiService { channels: { addAudioChannel: (request) => { requireCapability('channels.manage'); - this.store.dispatch(RoomsActions.addChannel({ channel: createChannel(request, 'voice') })); + this.addPluginManagedChannel(pluginId, createChannel(request, 'voice')); }, addTextChannel: (request) => { requireCapability('channels.manage'); - this.store.dispatch(RoomsActions.addChannel({ channel: createChannel(request, 'text') })); + this.addPluginManagedChannel(pluginId, createChannel(request, 'text')); }, addVideoChannel: (request) => { requireCapability('channels.manage'); @@ -743,9 +746,86 @@ export class PluginClientApiService { } this.store.dispatch(MessagesActions.syncMessages({ messages: normalizedMessages })); + + // Broadcast imported history to peers in CHUNK_SIZE batches so they don't + // depend on the inventory-limited background sync to discover bulk imports. + for (const chunk of chunkArray(normalizedMessages, CHUNK_SIZE)) { + this.voice.broadcastMessage({ + type: 'chat-sync-batch', + roomId, + messages: chunk + } as unknown as ChatEvent); + } + this.logger.info(pluginId, 'Historical messages imported', { count: normalizedMessages.length }); } + private addPluginManagedChannel(pluginId: string, channel: Channel): void { + const room = this.currentRoom(); + const currentUser = this.currentUser(); + + if (!room || !currentUser) { + return; + } + + const isOwner = room.hostId === currentUser.id || room.hostId === currentUser.oderId; + const isServerAdmin = currentUser.role === 'admin' || currentUser.role === 'host'; + const canManageChannels = resolveRoomPermission(room, currentUser, 'manageChannels'); + + if (!isOwner && !isServerAdmin && !canManageChannels) { + this.logger.warn(pluginId, 'Plugin channel creation denied by room permissions', { + channelId: channel.id, + roomId: room.id + }); + + return; + } + + const existingChannels = room.channels ?? defaultChannels(); + const normalizedName = normalizeChannelName(channel.name); + const channelExists = existingChannels.some((entry) => entry.id === channel.id) || + isChannelNameTaken(existingChannels, normalizedName, channel.type); + + if (!normalizedName || channelExists) { + return; + } + + const channels = [ + ...existingChannels, + { ...channel, + name: normalizedName } + ]; + + this.store.dispatch(RoomsActions.updateRoom({ roomId: room.id, + changes: { channels } })); + + void this.db.updateRoom(room.id, { channels }).catch((error: unknown) => { + this.logger.warn(pluginId, 'Failed to persist plugin-created channel', error); + }); + + this.realtime.broadcastMessage({ + type: 'channels-update', + roomId: room.id, + channels + }); + + this.serverDirectory.updateServer(room.id, { + actingRole: isOwner ? 'host' : undefined, + channels, + currentOwnerId: currentUser.id + }, { + sourceId: room.sourceId, + sourceUrl: room.sourceUrl + }).subscribe({ + error: () => {} + }); + + this.logger.info(pluginId, 'Plugin channel created', { + channelId: channel.id, + roomId: room.id + }); + } + private persistPluginMessageUpdate(pluginId: string, messageId: string, updates: Partial): void { void this.db.updateMessage(messageId, updates).catch((error: unknown) => { this.logger.warn(pluginId, 'Failed to persist plugin message update', error); diff --git a/toju-app/src/app/infrastructure/persistence/README.md b/toju-app/src/app/infrastructure/persistence/README.md index 7240b82..d138aba 100644 --- a/toju-app/src/app/infrastructure/persistence/README.md +++ b/toju-app/src/app/infrastructure/persistence/README.md @@ -57,7 +57,7 @@ The persisted `rooms` store is a local cache of room metadata. Channel topology ### Browser (IndexedDB) -All operations run inside IndexedDB transactions in the renderer thread. The browser backend resolves the active database name from the logged-in user, reusing a legacy shared database only when it already belongs to that same account. Queries like `getMessages` pull all messages for a room via the `roomId` index, sort them by timestamp in JS, then apply limit/offset. Deleted messages are normalised on read (content replaced with a sentinel string). +All operations run inside IndexedDB transactions in the renderer thread. The browser backend resolves the active database name from the logged-in user, reusing a legacy shared database only when it already belongs to that same account. Queries like `getMessages` pull all messages for a room via the `roomId` index, optionally filter to a text channel, sort them by timestamp in JS, then apply limit/offset. Deleted messages are normalised on read (content replaced with a sentinel string). ```mermaid sequenceDiagram @@ -66,11 +66,11 @@ sequenceDiagram participant BDB as BrowserDatabaseService participant IDB as IndexedDB - Eff->>DB: getMessages(roomId, 50) - DB->>BDB: getMessages(roomId, 50) + Eff->>DB: getMessages(roomId, 50, 0, channelId?) + DB->>BDB: getMessages(roomId, 50, 0, channelId?) BDB->>IDB: tx.objectStore("messages")
.index("roomId").getAll(roomId) IDB-->>BDB: Message[] - Note over BDB: Sort by timestamp, slice, normalise + Note over BDB: Optional channel filter, sort, slice, normalise BDB-->>DB: Message[] DB-->>Eff: Message[] ``` diff --git a/toju-app/src/app/infrastructure/persistence/browser-database.service.ts b/toju-app/src/app/infrastructure/persistence/browser-database.service.ts index 93b7993..e4031ec 100644 --- a/toju-app/src/app/infrastructure/persistence/browser-database.service.ts +++ b/toju-app/src/app/infrastructure/persistence/browser-database.service.ts @@ -70,12 +70,28 @@ export class BrowserDatabaseService { * @param roomId - Target room. * @param limit - Maximum number of messages to return. * @param offset - Number of newer messages to skip (for pagination). + * @param channelId - Optional channel scope; 'general' includes null/empty. + * @param beforeTimestamp - Optional cursor; only messages strictly older + * than this timestamp are returned. Used for + * scroll-up history pagination. */ - async getMessages(roomId: string, limit = 100, offset = 0): Promise { + async getMessages( + roomId: string, + limit = 100, + offset = 0, + channelId?: string, + beforeTimestamp?: number + ): Promise { const allRoomMessages = await this.getAllFromIndex( STORE_MESSAGES, 'roomId', roomId ); - const sortedMessages = allRoomMessages.sort((first, second) => first.timestamp - second.timestamp); + const scopedMessages = channelId + ? allRoomMessages.filter((message) => (message.channelId || 'general') === channelId) + : allRoomMessages; + const cursorFiltered = beforeTimestamp === undefined + ? scopedMessages + : scopedMessages.filter((message) => message.timestamp < beforeTimestamp); + const sortedMessages = cursorFiltered.sort((first, second) => first.timestamp - second.timestamp); const endIndex = Math.max(sortedMessages.length - offset, 0); const startIndex = Math.max(endIndex - limit, 0); const messages = sortedMessages.slice(startIndex, endIndex); diff --git a/toju-app/src/app/infrastructure/persistence/database.service.ts b/toju-app/src/app/infrastructure/persistence/database.service.ts index 10f5498..2e2da63 100644 --- a/toju-app/src/app/infrastructure/persistence/database.service.ts +++ b/toju-app/src/app/infrastructure/persistence/database.service.ts @@ -49,8 +49,19 @@ export class DatabaseService { /** Persist a single chat message. */ saveMessage(message: Message) { return this.backend.saveMessage(message); } - /** Retrieve the latest messages for a room with optional pagination. */ - getMessages(roomId: string, limit = 100, offset = 0) { return this.backend.getMessages(roomId, limit, offset); } + /** Retrieve the latest messages for a room or channel with optional pagination. + * + * When `beforeTimestamp` is provided, only messages strictly older than that + * timestamp are returned. This is how scroll-up history loading paginates + * backwards through the DB without holding the whole history in memory. + */ + getMessages( + roomId: string, + limit = 100, + offset = 0, + channelId?: string, + beforeTimestamp?: number + ) { return this.backend.getMessages(roomId, limit, offset, channelId, beforeTimestamp); } /** Retrieve messages newer than a given timestamp for a room. */ getMessagesSince(roomId: string, sinceTimestamp: number) { return this.backend.getMessagesSince(roomId, sinceTimestamp); } diff --git a/toju-app/src/app/infrastructure/persistence/electron-database.service.ts b/toju-app/src/app/infrastructure/persistence/electron-database.service.ts index 7949e16..7c3e6fa 100644 --- a/toju-app/src/app/infrastructure/persistence/electron-database.service.ts +++ b/toju-app/src/app/infrastructure/persistence/electron-database.service.ts @@ -42,9 +42,21 @@ export class ElectronDatabaseService { * @param roomId - Target room. * @param limit - Maximum number of messages to return. * @param offset - Number of newer messages to skip (for pagination). + * @param channelId - Optional channel scope; 'general' includes null/empty. + * @param beforeTimestamp - Optional cursor; only messages strictly older + * than this timestamp are returned (scroll-up paging). */ - getMessages(roomId: string, limit = 100, offset = 0): Promise { - return this.api.query({ type: 'get-messages', payload: { roomId, limit, offset } }); + getMessages( + roomId: string, + limit = 100, + offset = 0, + channelId?: string, + beforeTimestamp?: number + ): Promise { + return this.api.query({ + type: 'get-messages', + payload: { roomId, limit, offset, channelId, beforeTimestamp } + }); } getMessagesSince(roomId: string, sinceTimestamp: number): Promise { diff --git a/toju-app/src/app/store/messages/messages-incoming.handlers.spec.ts b/toju-app/src/app/store/messages/messages-incoming.handlers.spec.ts index afa91f1..4930653 100644 --- a/toju-app/src/app/store/messages/messages-incoming.handlers.spec.ts +++ b/toju-app/src/app/store/messages/messages-incoming.handlers.spec.ts @@ -95,7 +95,7 @@ describe('dispatchIncomingMessage room-scoped sync', () => { expect(getMessages).toHaveBeenCalledWith('room-b', expect.any(Number), 0); expect(sendToPeer).toHaveBeenCalledWith('peer-2', { - type: 'chat-sync-full', + type: 'chat-sync-batch', roomId: 'room-b', messages: roomBMessages }); diff --git a/toju-app/src/app/store/messages/messages-incoming.handlers.ts b/toju-app/src/app/store/messages/messages-incoming.handlers.ts index 3ef017a..61a2b50 100644 --- a/toju-app/src/app/store/messages/messages-incoming.handlers.ts +++ b/toju-app/src/app/store/messages/messages-incoming.handlers.ts @@ -289,6 +289,12 @@ async function processSyncBatch( attachments: AttachmentFacade ): Promise { const toUpsert: Message[] = []; + // Yield to the event loop every YIELD_EVERY messages so Angular change + // detection and user input aren't starved while a large sync batch + // (e.g. from a bulk plugin import) drains serial DB writes. + const YIELD_EVERY = 50; + + let processed = 0; for (const incoming of event.messages) { attachments.rememberMessageRoom(incoming.id, incoming.roomId); @@ -305,6 +311,12 @@ async function processSyncBatch( if (changed) toUpsert.push(message); + + processed += 1; + + if (processed % YIELD_EVERY === 0) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } } if (hasAttachmentMetaMap(event.attachments)) { @@ -603,13 +615,20 @@ function handleSyncRequest( return from( (async () => { const all = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0); - const syncFullEvent: ChatEvent = { - type: 'chat-sync-full', - roomId: targetRoomId, - messages: all - }; - webrtc.sendToPeer(fromPeerId, syncFullEvent); + // Ship as chunked chat-sync-batch events instead of a single + // chat-sync-full payload. A monolithic dump of up to FULL_SYNC_LIMIT + // messages can exceed the WebRTC SCTP per-message size ceiling and be + // silently dropped - especially after bulk plugin imports. + for (const chunk of chunkArray(all, CHUNK_SIZE)) { + const syncBatchEvent: ChatEvent = { + type: 'chat-sync-batch', + roomId: targetRoomId, + messages: chunk + }; + + webrtc.sendToPeer(fromPeerId, syncBatchEvent); + } })() ).pipe(mergeMap(() => EMPTY)); } diff --git a/toju-app/src/app/store/messages/messages.actions.ts b/toju-app/src/app/store/messages/messages.actions.ts index 03686dc..ea2f54f 100644 --- a/toju-app/src/app/store/messages/messages.actions.ts +++ b/toju-app/src/app/store/messages/messages.actions.ts @@ -23,6 +23,24 @@ export const MessagesActions = createActionGroup({ 'Load Messages Success': props<{ messages: Message[] }>(), 'Load Messages Failure': props<{ error: string }>(), + /** + * Fetches a page of messages strictly older than `beforeTimestamp` for a + * given conversation (room + channel). Used by the chat scroll-up handler + * to backfill history from the local database on demand. + */ + 'Load Older Messages': props<{ + roomId: string; + channelId: string; + beforeTimestamp: number; + limit: number; + }>(), + 'Load Older Messages Success': props<{ + conversationKey: string; + messages: Message[]; + reachedEnd: boolean; + }>(), + 'Load Older Messages Failure': props<{ error: string }>(), + /** Sends a new chat message to the current room and broadcasts to peers. */ 'Send Message': props<{ content: string; replyToId?: string; channelId?: string }>(), 'Send Message Success': props<{ message: Message }>(), diff --git a/toju-app/src/app/store/messages/messages.effects.ts b/toju-app/src/app/store/messages/messages.effects.ts index 068c611..9620bf7 100644 --- a/toju-app/src/app/store/messages/messages.effects.ts +++ b/toju-app/src/app/store/messages/messages.effects.ts @@ -43,7 +43,8 @@ import { TimeSyncService } from '../../core/services/time-sync.service'; import { DELETED_MESSAGE_CONTENT, Message, - Reaction + Reaction, + Room } from '../../shared-kernel'; import { hydrateMessages } from './messages.helpers'; import { canEditMessage } from '../../domains/chat/domain/rules/message.rules'; @@ -67,8 +68,9 @@ export class MessagesEffects { loadMessages$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.loadMessages), - switchMap(({ roomId }) => - from(this.db.getMessages(roomId, INITIAL_ROOM_MESSAGE_LIMIT, 0)).pipe( + withLatestFrom(this.store.select(selectCurrentRoom)), + switchMap(([{ roomId }, currentRoom]) => + from(this.loadInitialMessages(roomId, currentRoom)).pipe( mergeMap(async (messages) => { const hydrated = await hydrateMessages(messages, this.db); @@ -88,6 +90,58 @@ export class MessagesEffects { ) ); + /** Paginates older messages from the local DB for scroll-up history loading. */ + loadOlderMessages$ = createEffect(() => + this.actions$.pipe( + ofType(MessagesActions.loadOlderMessages), + mergeMap(({ roomId, channelId, beforeTimestamp, limit }) => + from( + this.db.getMessages(roomId, limit, 0, channelId, beforeTimestamp) + ).pipe( + mergeMap(async (messages) => { + const hydrated = await hydrateMessages(messages, this.db); + + for (const message of hydrated) { + this.attachments.rememberMessageRoom(message.id, message.roomId); + } + + return MessagesActions.loadOlderMessagesSuccess({ + conversationKey: `${roomId}:${channelId}`, + messages: hydrated, + reachedEnd: hydrated.length < limit + }); + }), + catchError((error) => + of(MessagesActions.loadOlderMessagesFailure({ error: error.message })) + ) + ) + ) + ) + ); + + private async loadInitialMessages(roomId: string, currentRoom: Room | null): Promise { + const textChannels = currentRoom?.id === roomId + ? (currentRoom.channels ?? []).filter((channel) => channel.type === 'text') + : []; + + if (textChannels.length <= 1) { + return this.db.getMessages(roomId, INITIAL_ROOM_MESSAGE_LIMIT, 0, textChannels[0]?.id); + } + + const channelMessageSets = await Promise.all( + textChannels.map((channel) => this.db.getMessages(roomId, INITIAL_ROOM_MESSAGE_LIMIT, 0, channel.id)) + ); + const messagesById = new Map(); + + for (const messages of channelMessageSets) { + for (const message of messages) { + messagesById.set(message.id, message); + } + } + + return [...messagesById.values()].sort((first, second) => first.timestamp - second.timestamp); + } + /** Constructs a new message, persists it locally, and broadcasts to all peers. */ sendMessage$ = createEffect(() => this.actions$.pipe( diff --git a/toju-app/src/app/store/messages/messages.helpers.ts b/toju-app/src/app/store/messages/messages.helpers.ts index f4ea065..9da9ce4 100644 --- a/toju-app/src/app/store/messages/messages.helpers.ts +++ b/toju-app/src/app/store/messages/messages.helpers.ts @@ -45,10 +45,17 @@ export async function hydrateMessages( return messages.map((msg) => msg.isDeleted ? normaliseDeletedMessage(msg) : msg); } -/** Builds a sync inventory item from a message and its reaction count. */ +/** Builds a sync inventory item from a message and its reaction count. + * + * Reactions are read from the already-hydrated `msg.reactions` array (the + * persistence layer joins them in via `getMessages`), and attachment counts + * only come from the in-memory override. We deliberately avoid per-message + * DB lookups here so a whole-room inventory stays O(1) DB calls even when + * the room contains tens of thousands of messages. + */ export async function buildInventoryItem( msg: Message, - db: DatabaseService, + _db: DatabaseService, attachmentCountOverride?: number ): Promise { if (msg.isDeleted) { @@ -60,50 +67,49 @@ export async function buildInventoryItem( }; } - const reactions = await db.getReactionsForMessage(msg.id); - const attachments = - attachmentCountOverride === undefined - ? await db.getAttachmentsForMessage(msg.id) - : []; - - return { id: msg.id, + const item: InventoryItem = { + id: msg.id, ts: getMessageTimestamp(msg), - rc: reactions.length, - ac: attachmentCountOverride ?? attachments.length }; + rc: msg.reactions?.length ?? 0 + }; + + if (attachmentCountOverride !== undefined) { + item.ac = attachmentCountOverride; + } + + return item; } -/** Builds a local map of `{timestamp, reactionCount, attachmentCount}` keyed by message ID. */ +/** Builds a local map of `{timestamp, reactionCount, attachmentCount}` keyed by message ID. + * + * As with {@link buildInventoryItem}, reactions come from the already-hydrated + * `msg.reactions` array and attachment counts only come from the in-memory + * override map. + */ export async function buildLocalInventoryMap( messages: Message[], - db: DatabaseService, + _db: DatabaseService, attachmentCountOverrides?: ReadonlyMap ): Promise> { const map = new Map(); - await Promise.all( - messages.map(async (msg) => { - if (msg.isDeleted) { - map.set(msg.id, { - ts: getMessageTimestamp(msg), - rc: 0, - ac: 0 - }); + for (const msg of messages) { + if (msg.isDeleted) { + map.set(msg.id, { + ts: getMessageTimestamp(msg), + rc: 0, + ac: 0 + }); - return; - } + continue; + } - const reactions = await db.getReactionsForMessage(msg.id); - const attachmentCountOverride = attachmentCountOverrides?.get(msg.id); - const attachments = - attachmentCountOverride === undefined - ? await db.getAttachmentsForMessage(msg.id) - : []; - - map.set(msg.id, { ts: getMessageTimestamp(msg), - rc: reactions.length, - ac: attachmentCountOverride ?? attachments.length }); - }) - ); + map.set(msg.id, { + ts: getMessageTimestamp(msg), + rc: msg.reactions?.length ?? 0, + ac: attachmentCountOverrides?.get(msg.id) ?? 0 + }); + } return map; } diff --git a/toju-app/src/app/store/messages/messages.reducer.ts b/toju-app/src/app/store/messages/messages.reducer.ts index 14d8cdf..ada7960 100644 --- a/toju-app/src/app/store/messages/messages.reducer.ts +++ b/toju-app/src/app/store/messages/messages.reducer.ts @@ -13,10 +13,18 @@ export interface MessagesState extends EntityState { loading: boolean; /** Whether a peer-to-peer sync cycle is in progress. */ syncing: boolean; + /** Whether a scroll-up older-page fetch is currently in flight. */ + loadingOlder: boolean; /** Most recent error message from message operations. */ error: string | null; /** ID of the room whose messages are currently loaded. */ currentRoomId: string | null; + /** + * Conversation keys (`${roomId}:${channelId}`) that have been paginated + * all the way back to the start of the local DB history. Used by the + * scroll-up handler to stop issuing further DB pages. + */ + exhaustedConversations: Record; } export const messagesAdapter: EntityAdapter = createEntityAdapter({ @@ -27,8 +35,10 @@ export const messagesAdapter: EntityAdapter = createEntityAdapter ({ + ...state, + loadingOlder: true, + error: null + })), + + on(MessagesActions.loadOlderMessagesSuccess, (state, { conversationKey, messages, reachedEnd }) => + messagesAdapter.upsertMany(messages, { + ...state, + loadingOlder: false, + exhaustedConversations: reachedEnd + ? { ...state.exhaustedConversations, + [conversationKey]: true } + : state.exhaustedConversations + }) + ), + + on(MessagesActions.loadOlderMessagesFailure, (state, { error }) => ({ + ...state, + loadingOlder: false, + error + })), + // Send message on(MessagesActions.sendMessage, (state) => ({ ...state, @@ -202,7 +237,10 @@ export const messagesReducer = createReducer( return messagesAdapter.upsertMany(merged, { ...state, - syncing: false + syncing: false, + // Peer sync may have inserted messages older than our current oldest; + // reopen pagination so the scroll-up handler revisits the DB. + exhaustedConversations: {} }); }), @@ -221,7 +259,8 @@ export const messagesReducer = createReducer( on(MessagesActions.clearMessages, (state) => messagesAdapter.removeAll({ ...state, - currentRoomId: null + currentRoomId: null, + exhaustedConversations: {} }) ) ); diff --git a/toju-app/src/app/store/messages/messages.selectors.ts b/toju-app/src/app/store/messages/messages.selectors.ts index 0d7efc8..43d7210 100644 --- a/toju-app/src/app/store/messages/messages.selectors.ts +++ b/toju-app/src/app/store/messages/messages.selectors.ts @@ -36,6 +36,21 @@ export const selectMessagesSyncing = createSelector( (state) => state.syncing ); +/** Whether a scroll-up older-page DB fetch is currently in flight. */ +export const selectMessagesLoadingOlder = createSelector( + selectMessagesState, + (state) => state.loadingOlder +); + +/** Whether the given conversation (`${roomId}:${channelId}`) has been + * paginated all the way back to the start of the local DB history. + */ +export const selectConversationExhausted = (conversationKey: string) => + createSelector( + selectMessagesState, + (state) => state.exhaustedConversations[conversationKey] === true + ); + /** Selects the ID of the room whose messages are currently loaded. */ export const selectCurrentRoomId = createSelector( selectMessagesState,