wip: optimizations
This commit is contained in:
@@ -110,7 +110,6 @@ describe('dispatchIncomingMessage room-scoped sync', () => {
|
||||
currentRoom: { id: 'room-a' },
|
||||
savedRooms: [{ id: 'room-a' }]
|
||||
});
|
||||
|
||||
const action = await firstValueFrom(
|
||||
dispatchIncomingMessage(
|
||||
{
|
||||
|
||||
@@ -573,12 +573,7 @@ function handleSyncSummary(
|
||||
|
||||
return from(
|
||||
(async () => {
|
||||
const local = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0);
|
||||
const localCount = local.length;
|
||||
const localLastUpdated = local.reduce(
|
||||
(maxTimestamp, message) => Math.max(maxTimestamp, message.editedAt || message.timestamp || 0),
|
||||
0
|
||||
);
|
||||
const { count: localCount, lastUpdated: localLastUpdated } = await db.getRoomMessageStats(targetRoomId);
|
||||
const remoteLastUpdated = event.lastUpdated || 0;
|
||||
const remoteCount = event.count || 0;
|
||||
const identical =
|
||||
|
||||
@@ -26,7 +26,6 @@ import {
|
||||
import {
|
||||
map,
|
||||
mergeMap,
|
||||
catchError,
|
||||
withLatestFrom,
|
||||
tap,
|
||||
filter,
|
||||
@@ -43,12 +42,9 @@ import { RealtimeSessionFacade } from '../../core/realtime';
|
||||
import { DatabaseService } from '../../infrastructure/persistence';
|
||||
import { DebuggingService } from '../../core/services/debugging.service';
|
||||
import {
|
||||
INVENTORY_LIMIT,
|
||||
FULL_SYNC_LIMIT,
|
||||
SYNC_POLL_FAST_MS,
|
||||
SYNC_POLL_SLOW_MS,
|
||||
SYNC_TIMEOUT_MS,
|
||||
getLatestTimestamp
|
||||
SYNC_TIMEOUT_MS
|
||||
} from './messages.helpers';
|
||||
|
||||
@Injectable()
|
||||
@@ -77,13 +73,8 @@ export class MessagesSyncEffects {
|
||||
if (!room)
|
||||
return EMPTY;
|
||||
|
||||
return from(
|
||||
this.db.getMessages(room.id, FULL_SYNC_LIMIT, 0)
|
||||
).pipe(
|
||||
tap((messages) => {
|
||||
const count = messages.length;
|
||||
const lastUpdated = getLatestTimestamp(messages);
|
||||
|
||||
return from(this.db.getRoomMessageStats(room.id)).pipe(
|
||||
tap(({ count, lastUpdated }) => {
|
||||
this.webrtc.sendToPeer(peerId, {
|
||||
type: 'chat-sync-summary',
|
||||
roomId: room.id,
|
||||
@@ -124,11 +115,8 @@ export class MessagesSyncEffects {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
return from(this.db.getMessages(activeRoom.id, FULL_SYNC_LIMIT, 0)).pipe(
|
||||
tap((messages) => {
|
||||
const count = messages.length;
|
||||
const lastUpdated = getLatestTimestamp(messages);
|
||||
|
||||
return from(this.db.getRoomMessageStats(activeRoom.id)).pipe(
|
||||
tap(({ count, lastUpdated }) => {
|
||||
for (const pid of peers) {
|
||||
try {
|
||||
this.webrtc.sendToPeer(pid, {
|
||||
@@ -202,37 +190,22 @@ export class MessagesSyncEffects {
|
||||
return of(MessagesActions.syncComplete());
|
||||
}
|
||||
|
||||
return from(
|
||||
this.db.getMessages(room.id, INVENTORY_LIMIT, 0)
|
||||
).pipe(
|
||||
map(() => {
|
||||
for (const pid of peers) {
|
||||
try {
|
||||
this.webrtc.sendToPeer(pid, {
|
||||
type: 'chat-inventory-request',
|
||||
roomId: room.id
|
||||
});
|
||||
} catch (error) {
|
||||
this.debugging.warn('messages', 'Failed to request peer inventory during sync poll', {
|
||||
error,
|
||||
peerId: pid,
|
||||
roomId: room.id
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return MessagesActions.startSync();
|
||||
}),
|
||||
catchError((error) => {
|
||||
this.lastSyncClean = false;
|
||||
this.debugging.warn('messages', 'Periodic sync poll failed', {
|
||||
error,
|
||||
for (const pid of peers) {
|
||||
try {
|
||||
this.webrtc.sendToPeer(pid, {
|
||||
type: 'chat-inventory-request',
|
||||
roomId: room.id
|
||||
});
|
||||
} catch (error) {
|
||||
this.debugging.warn('messages', 'Failed to request peer inventory during sync poll', {
|
||||
error,
|
||||
peerId: pid,
|
||||
roomId: room.id
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return of(MessagesActions.syncComplete());
|
||||
})
|
||||
);
|
||||
return of(MessagesActions.startSync());
|
||||
})
|
||||
)
|
||||
)
|
||||
|
||||
@@ -23,6 +23,15 @@ export const MessagesActions = createActionGroup({
|
||||
'Load Messages Success': props<{ messages: Message[] }>(),
|
||||
'Load Messages Failure': props<{ error: string }>(),
|
||||
|
||||
/**
|
||||
* Background-prefetches the initial page of messages for a room without
|
||||
* touching the active-room `loading` flag. Fired for every saved room
|
||||
* after `loadRoomsSuccess` so subsequent room navigations resolve from
|
||||
* the in-memory cache instead of paying an IPC round-trip.
|
||||
*/
|
||||
'Prefetch Room Messages': props<{ roomId: string }>(),
|
||||
'Prefetch Room Messages Success': props<{ messages: Message[] }>(),
|
||||
|
||||
/**
|
||||
* Fetches a page of messages strictly older than `beforeTimestamp` for a
|
||||
* given conversation (room + channel). Used by the chat scroll-up handler
|
||||
|
||||
@@ -25,12 +25,14 @@ import {
|
||||
mergeMap,
|
||||
catchError,
|
||||
withLatestFrom,
|
||||
switchMap
|
||||
switchMap,
|
||||
filter
|
||||
} from 'rxjs/operators';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { MessagesActions } from './messages.actions';
|
||||
import { selectCurrentUser } from '../users/users.selectors';
|
||||
import { selectCurrentRoom, selectSavedRooms } from '../rooms/rooms.selectors';
|
||||
import { RoomsActions } from '../rooms/rooms.actions';
|
||||
import { selectMessagesEntities } from './messages.selectors';
|
||||
import { RealtimeSessionFacade } from '../../core/realtime';
|
||||
import { DatabaseService } from '../../infrastructure/persistence';
|
||||
@@ -40,6 +42,7 @@ import { AttachmentFacade } from '../../domains/attachment';
|
||||
import { hasDedicatedChatEmbed } from '../../domains/chat/domain/rules/link-embed.rules';
|
||||
import { LinkMetadataService } from '../../domains/chat/application/services/link-metadata.service';
|
||||
import { TimeSyncService } from '../../core/services/time-sync.service';
|
||||
import { PlatformService } from '../../core/platform';
|
||||
import {
|
||||
DELETED_MESSAGE_CONTENT,
|
||||
Message,
|
||||
@@ -52,6 +55,8 @@ import { resolveRoomPermission } from '../../domains/access-control';
|
||||
import { dispatchIncomingMessage, IncomingMessageContext } from './messages-incoming.handlers';
|
||||
|
||||
const INITIAL_ROOM_MESSAGE_LIMIT = 30;
|
||||
/** Cap on simultaneous browser-cache prefetches for apps with many saved rooms. */
|
||||
const PREFETCH_CONCURRENCY = 3;
|
||||
|
||||
@Injectable()
|
||||
export class MessagesEffects {
|
||||
@@ -63,14 +68,26 @@ export class MessagesEffects {
|
||||
private readonly webrtc = inject(RealtimeSessionFacade);
|
||||
private readonly timeSync = inject(TimeSyncService);
|
||||
private readonly linkMetadata = inject(LinkMetadataService);
|
||||
private readonly platform = inject(PlatformService);
|
||||
|
||||
/** Loads messages for a room from the local database, hydrating reactions. */
|
||||
loadMessages$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.loadMessages),
|
||||
withLatestFrom(this.store.select(selectCurrentRoom)),
|
||||
switchMap(([{ roomId }, currentRoom]) =>
|
||||
from(this.loadInitialMessages(roomId, currentRoom)).pipe(
|
||||
withLatestFrom(
|
||||
this.store.select(selectCurrentRoom),
|
||||
this.store.select(selectSavedRooms)
|
||||
),
|
||||
switchMap(([
|
||||
{ roomId },
|
||||
currentRoom,
|
||||
savedRooms
|
||||
]) => {
|
||||
const targetRoom = currentRoom?.id === roomId
|
||||
? currentRoom
|
||||
: savedRooms.find((room) => room.id === roomId) ?? null;
|
||||
|
||||
return from(this.loadInitialMessages(roomId, targetRoom)).pipe(
|
||||
mergeMap(async (messages) => {
|
||||
const hydrated = await hydrateMessages(messages, this.db);
|
||||
|
||||
@@ -78,18 +95,73 @@ export class MessagesEffects {
|
||||
this.attachments.rememberMessageRoom(message.id, message.roomId);
|
||||
}
|
||||
|
||||
void this.attachments.requestAutoDownloadsForRoom(roomId);
|
||||
|
||||
return MessagesActions.loadMessagesSuccess({ messages: hydrated });
|
||||
}),
|
||||
catchError((error) =>
|
||||
of(MessagesActions.loadMessagesFailure({ error: error.message }))
|
||||
)
|
||||
);
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
/**
|
||||
* Background-prefetch initial messages for every saved room after the
|
||||
* rooms list loads in browser. Electron avoids this path because startup
|
||||
* IPC prefetch competes with foreground room switches. Results are merged
|
||||
* into the messages slice via `upsertMany`, leaving the active-room loading
|
||||
* flag untouched.
|
||||
*/
|
||||
prefetchSavedRoomsOnLoad$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(RoomsActions.loadRoomsSuccess),
|
||||
filter(() => this.platform.isBrowser),
|
||||
mergeMap(({ rooms }) =>
|
||||
from(rooms).pipe(
|
||||
mergeMap(
|
||||
(room) => of(MessagesActions.prefetchRoomMessages({ roomId: room.id })),
|
||||
PREFETCH_CONCURRENCY
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
prefetchRoomMessages$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.prefetchRoomMessages),
|
||||
withLatestFrom(this.store.select(selectSavedRooms)),
|
||||
mergeMap(
|
||||
([{ roomId }, savedRooms]) =>
|
||||
from(this.fetchRoomMessagesForPrefetch(roomId, savedRooms.find((room) => room.id === roomId) ?? null)),
|
||||
PREFETCH_CONCURRENCY
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
private async fetchRoomMessagesForPrefetch(roomId: string, targetRoom: Room | null) {
|
||||
try {
|
||||
const messages = await this.loadInitialMessages(roomId, targetRoom);
|
||||
const hydrated = await hydrateMessages(messages, this.db);
|
||||
|
||||
for (const message of hydrated) {
|
||||
this.attachments.rememberMessageRoom(message.id, message.roomId);
|
||||
}
|
||||
|
||||
return MessagesActions.prefetchRoomMessagesSuccess({ messages: hydrated });
|
||||
} catch (error) {
|
||||
reportDebuggingError(
|
||||
this.debugging,
|
||||
'MessagesEffects.prefetchRoomMessages',
|
||||
'Failed to prefetch room messages',
|
||||
{ roomId },
|
||||
error
|
||||
);
|
||||
|
||||
return MessagesActions.prefetchRoomMessagesSuccess({ messages: [] });
|
||||
}
|
||||
}
|
||||
|
||||
/** Paginates older messages from the local DB for scroll-up history loading. */
|
||||
loadOlderMessages$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
@@ -119,9 +191,9 @@ export class MessagesEffects {
|
||||
)
|
||||
);
|
||||
|
||||
private async loadInitialMessages(roomId: string, currentRoom: Room | null): Promise<Message[]> {
|
||||
const textChannels = currentRoom?.id === roomId
|
||||
? (currentRoom.channels ?? []).filter((channel) => channel.type === 'text')
|
||||
private async loadInitialMessages(roomId: string, targetRoom: Room | null): Promise<Message[]> {
|
||||
const textChannels = targetRoom?.id === roomId
|
||||
? (targetRoom.channels ?? []).filter((channel) => channel.type === 'text')
|
||||
: [];
|
||||
|
||||
if (textChannels.length <= 1) {
|
||||
|
||||
@@ -44,33 +44,35 @@ export const initialState: MessagesState = messagesAdapter.getInitialState({
|
||||
export const messagesReducer = createReducer(
|
||||
initialState,
|
||||
|
||||
// Load messages - clear stale messages when switching to a different room
|
||||
on(MessagesActions.loadMessages, (state, { roomId }) => {
|
||||
if (state.currentRoomId && state.currentRoomId !== roomId) {
|
||||
return messagesAdapter.removeAll({
|
||||
...state,
|
||||
loading: true,
|
||||
error: null,
|
||||
currentRoomId: roomId,
|
||||
exhaustedConversations: {}
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
...state,
|
||||
loading: true,
|
||||
error: null,
|
||||
currentRoomId: roomId
|
||||
};
|
||||
}),
|
||||
// Load messages - keep cached messages from other rooms in the slice so a
|
||||
// return visit (or a prefetched room) renders immediately from memory. The
|
||||
// selectors (`selectChannelMessages`, `channelMessages` computed) already
|
||||
// filter by `currentRoom.id`, so leaving stale rooms in the entity adapter
|
||||
// is safe. Memory cost is ~30 messages per saved room; tracked at
|
||||
// /memories/repo/electron-server-switch-performance.md.
|
||||
on(MessagesActions.loadMessages, (state, { roomId }) => ({
|
||||
...state,
|
||||
loading: true,
|
||||
error: null,
|
||||
currentRoomId: roomId,
|
||||
exhaustedConversations: state.currentRoomId === roomId
|
||||
? state.exhaustedConversations
|
||||
: {}
|
||||
})),
|
||||
|
||||
on(MessagesActions.loadMessagesSuccess, (state, { messages }) =>
|
||||
messagesAdapter.setAll(messages, {
|
||||
messagesAdapter.upsertMany(messages, {
|
||||
...state,
|
||||
loading: false
|
||||
})
|
||||
),
|
||||
|
||||
// Background prefetch result: merge into the cache without touching the
|
||||
// active-room loading flag or currentRoomId.
|
||||
on(MessagesActions.prefetchRoomMessagesSuccess, (state, { messages }) =>
|
||||
messagesAdapter.upsertMany(messages, state)
|
||||
),
|
||||
|
||||
on(MessagesActions.loadMessagesFailure, (state, { error }) => ({
|
||||
...state,
|
||||
loading: false,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { createFeatureSelector, createSelector } from '@ngrx/store';
|
||||
import { MessagesState, messagesAdapter } from './messages.reducer';
|
||||
import { selectActiveChannelId, selectCurrentRoomId as selectViewedRoomId } from '../rooms/rooms.selectors';
|
||||
|
||||
/** Selects the top-level messages feature state. */
|
||||
export const selectMessagesState = createFeatureSelector<MessagesState>('messages');
|
||||
@@ -60,25 +61,24 @@ export const selectCurrentRoomId = createSelector(
|
||||
/** Selects all messages belonging to the currently active room. */
|
||||
export const selectCurrentRoomMessages = createSelector(
|
||||
selectAllMessages,
|
||||
selectCurrentRoomId,
|
||||
selectViewedRoomId,
|
||||
(messages, roomId) => roomId ? messages.filter((message) => message.roomId === roomId) : []
|
||||
);
|
||||
|
||||
/** Creates a selector that returns messages for a specific text channel within the current room. */
|
||||
export const selectChannelMessages = (channelId: string) =>
|
||||
createSelector(
|
||||
selectAllMessages,
|
||||
selectCurrentRoomId,
|
||||
(messages, roomId) => {
|
||||
if (!roomId)
|
||||
return [];
|
||||
|
||||
return messages.filter(
|
||||
(message) => message.roomId === roomId && (message.channelId || 'general') === channelId
|
||||
);
|
||||
}
|
||||
selectCurrentRoomMessages,
|
||||
(messages) => messages.filter((message) => (message.channelId || 'general') === channelId)
|
||||
);
|
||||
|
||||
/** Selects messages in the currently viewed room and active text channel. */
|
||||
export const selectActiveChannelMessages = createSelector(
|
||||
selectCurrentRoomMessages,
|
||||
selectActiveChannelId,
|
||||
(messages, channelId) => messages.filter((message) => (message.channelId || 'general') === channelId)
|
||||
);
|
||||
|
||||
/** Creates a selector that returns a single message by its ID. */
|
||||
export const selectMessageById = (id: string) =>
|
||||
createSelector(selectMessagesEntities, (entities) => entities[id]);
|
||||
|
||||
@@ -417,6 +417,63 @@ export function areRoomMembersEqual(
|
||||
secondMembers: RoomMember[] = []
|
||||
): boolean {
|
||||
const now = Date.now();
|
||||
const first = pruneRoomMembers(firstMembers, now);
|
||||
const second = pruneRoomMembers(secondMembers, now);
|
||||
|
||||
return JSON.stringify(pruneRoomMembers(firstMembers, now)) === JSON.stringify(pruneRoomMembers(secondMembers, now));
|
||||
if (first.length !== second.length) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (let memberIndex = 0; memberIndex < first.length; memberIndex++) {
|
||||
if (!areRoomMembersIdentical(first[memberIndex], second[memberIndex])) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
function areRoomMembersIdentical(firstMember: RoomMember, secondMember: RoomMember): boolean {
|
||||
if (firstMember === secondMember) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return firstMember.id === secondMember.id
|
||||
&& firstMember.oderId === secondMember.oderId
|
||||
&& firstMember.username === secondMember.username
|
||||
&& firstMember.displayName === secondMember.displayName
|
||||
&& firstMember.role === secondMember.role
|
||||
&& firstMember.avatarUrl === secondMember.avatarUrl
|
||||
&& firstMember.avatarHash === secondMember.avatarHash
|
||||
&& firstMember.avatarMime === secondMember.avatarMime
|
||||
&& firstMember.avatarUpdatedAt === secondMember.avatarUpdatedAt
|
||||
&& firstMember.profileUpdatedAt === secondMember.profileUpdatedAt
|
||||
&& firstMember.description === secondMember.description
|
||||
&& firstMember.joinedAt === secondMember.joinedAt
|
||||
&& firstMember.lastSeenAt === secondMember.lastSeenAt
|
||||
&& areRoleIdsEqual(firstMember.roleIds, secondMember.roleIds);
|
||||
}
|
||||
|
||||
function areRoleIdsEqual(
|
||||
firstRoleIds: string[] | undefined,
|
||||
secondRoleIds: string[] | undefined
|
||||
): boolean {
|
||||
if (firstRoleIds === secondRoleIds) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const first = firstRoleIds ?? [];
|
||||
const second = secondRoleIds ?? [];
|
||||
|
||||
if (first.length !== second.length) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (let roleIndex = 0; roleIndex < first.length; roleIndex++) {
|
||||
if (first[roleIndex] !== second[roleIndex]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ type BlockedRoomAccessAction =
|
||||
| ReturnType<typeof RoomsActions.forgetRoom>
|
||||
| ReturnType<typeof RoomsActions.joinRoomFailure>;
|
||||
|
||||
const VIEW_SERVER_LOAD_DELAY_MS = 75;
|
||||
const VIEW_SERVER_LOAD_DELAY_MS = 0;
|
||||
|
||||
@Injectable()
|
||||
export class RoomsEffects {
|
||||
@@ -642,9 +642,11 @@ export class RoomsEffects {
|
||||
onViewServerSuccess$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(RoomsActions.viewServerSuccess),
|
||||
switchMap(({ room }) => timer(VIEW_SERVER_LOAD_DELAY_MS).pipe(
|
||||
mergeMap(() => [MessagesActions.loadMessages({ roomId: room.id }), UsersActions.loadBans()])
|
||||
))
|
||||
switchMap(({ room }) => VIEW_SERVER_LOAD_DELAY_MS > 0
|
||||
? timer(VIEW_SERVER_LOAD_DELAY_MS).pipe(
|
||||
mergeMap(() => [MessagesActions.loadMessages({ roomId: room.id }), UsersActions.loadBans()])
|
||||
)
|
||||
: of(MessagesActions.loadMessages({ roomId: room.id }), UsersActions.loadBans()))
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user