Big commit

This commit is contained in:
2026-03-02 00:13:34 +01:00
parent d146138fca
commit 6d7465ff18
54 changed files with 5999 additions and 2291 deletions

View File

@@ -1,10 +1,11 @@
import { Injectable, inject } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Store } from '@ngrx/store';
import { of, from } from 'rxjs';
import { map, mergeMap, catchError, withLatestFrom, tap, switchMap } from 'rxjs/operators';
import { of, from, timer, Subject } from 'rxjs';
import { map, mergeMap, catchError, withLatestFrom, tap, switchMap, filter, exhaustMap, repeat, takeUntil } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';
import * as MessagesActions from './messages.actions';
import { selectMessagesSyncing } from './messages.selectors';
import { selectCurrentUser } from '../users/users.selectors';
import { selectCurrentRoom } from '../rooms/rooms.selectors';
import { DatabaseService } from '../../core/services/database.service';
@@ -26,14 +27,26 @@ export class MessagesEffects {
private readonly INVENTORY_LIMIT = 1000; // number of recent messages to consider
private readonly CHUNK_SIZE = 200; // chunk size for inventory/batch transfers
private readonly SYNC_POLL_FAST_MS = 10_000; // 10s — aggressive poll
private readonly SYNC_POLL_SLOW_MS = 900_000; // 15min — idle poll after clean sync
private lastSyncClean = false; // true after a sync cycle with no new messages
// Load messages from local database
// Load messages from local database (hydrate reactions from separate table)
loadMessages$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.loadMessages),
switchMap(({ roomId }) =>
from(this.db.getMessages(roomId)).pipe(
map((messages) => MessagesActions.loadMessagesSuccess({ messages })),
mergeMap(async (messages) => {
// Hydrate each message with its reactions from the reactions table
const hydrated = await Promise.all(
messages.map(async (m) => {
const reactions = await this.db.getReactionsForMessage(m.id);
return reactions.length > 0 ? { ...m, reactions } : m;
})
);
return MessagesActions.loadMessagesSuccess({ messages: hydrated });
}),
catchError((error) =>
of(MessagesActions.loadMessagesFailure({ error: error.message }))
)
@@ -50,7 +63,7 @@ export class MessagesEffects {
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom)
),
mergeMap(([{ content, replyToId }, currentUser, currentRoom]) => {
mergeMap(([{ content, replyToId, channelId }, currentUser, currentRoom]) => {
if (!currentUser || !currentRoom) {
return of(MessagesActions.sendMessageFailure({ error: 'Not connected to a room' }));
}
@@ -58,6 +71,7 @@ export class MessagesEffects {
const message: Message = {
id: uuidv4(),
roomId: currentRoom.id,
channelId: channelId || 'general',
senderId: currentUser.id,
senderName: currentUser.displayName || currentUser.username,
content,
@@ -226,6 +240,7 @@ export class MessagesEffects {
// Broadcast to peers
this.webrtc.broadcastMessage({
type: 'reaction-added',
messageId,
reaction,
});
@@ -273,17 +288,23 @@ export class MessagesEffects {
switch (event.type) {
// Precise sync via ID inventory and targeted requests
case 'chat-inventory-request': {
if (!currentRoom || !event.fromPeerId) return of({ type: 'NO_OP' });
return from(this.db.getMessages(currentRoom.id, this.INVENTORY_LIMIT, 0)).pipe(
tap((messages) => {
const items = messages
.map((m) => ({ id: m.id, ts: m.editedAt || m.timestamp || 0 }))
.sort((a, b) => a.ts - b.ts);
const reqRoomId = event.roomId;
if (!reqRoomId || !event.fromPeerId) return of({ type: 'NO_OP' });
return from(this.db.getMessages(reqRoomId, this.INVENTORY_LIMIT, 0)).pipe(
mergeMap(async (messages) => {
const items = await Promise.all(
messages.map(async (m) => {
const reactions = await this.db.getReactionsForMessage(m.id);
return { id: m.id, ts: m.editedAt || m.timestamp || 0, rc: reactions.length };
})
);
items.sort((a, b) => a.ts - b.ts);
console.log(`[Sync] Sending inventory of ${items.length} items for room ${reqRoomId}`);
for (let i = 0; i < items.length; i += this.CHUNK_SIZE) {
const chunk = items.slice(i, i + this.CHUNK_SIZE);
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'chat-inventory',
roomId: currentRoom.id,
roomId: reqRoomId,
items: chunk,
total: items.length,
index: i,
@@ -295,24 +316,37 @@ export class MessagesEffects {
}
case 'chat-inventory': {
if (!currentRoom || !Array.isArray(event.items) || !event.fromPeerId) return of({ type: 'NO_OP' });
const invRoomId = event.roomId;
if (!invRoomId || !Array.isArray(event.items) || !event.fromPeerId) return of({ type: 'NO_OP' });
// Determine which IDs we are missing or have older versions of
return from(this.db.getMessages(currentRoom.id, this.INVENTORY_LIMIT, 0)).pipe(
return from(this.db.getMessages(invRoomId, this.INVENTORY_LIMIT, 0)).pipe(
mergeMap(async (local) => {
const localMap = new Map(local.map((m) => [m.id, m.editedAt || m.timestamp || 0]));
// Build local map with timestamps and reaction counts
const localMap = new Map<string, { ts: number; rc: number }>();
await Promise.all(
local.map(async (m) => {
const reactions = await this.db.getReactionsForMessage(m.id);
localMap.set(m.id, { ts: m.editedAt || m.timestamp || 0, rc: reactions.length });
})
);
const missing: string[] = [];
for (const { id, ts } of event.items as Array<{ id: string; ts: number }>) {
const lts = localMap.get(id);
if (lts === undefined || ts > lts) {
missing.push(id);
for (const item of event.items as Array<{ id: string; ts: number; rc?: number }>) {
const localEntry = localMap.get(item.id);
if (!localEntry) {
missing.push(item.id);
} else if (item.ts > localEntry.ts) {
missing.push(item.id);
} else if (item.rc !== undefined && item.rc !== localEntry.rc) {
missing.push(item.id);
}
}
console.log(`[Sync] Inventory received: ${event.items.length} remote, ${missing.length} missing/stale`);
// Request in chunks from the sender
for (let i = 0; i < missing.length; i += this.CHUNK_SIZE) {
const chunk = missing.slice(i, i + this.CHUNK_SIZE);
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'chat-sync-request-ids',
roomId: currentRoom.id,
roomId: invRoomId,
ids: chunk,
} as any);
}
@@ -322,18 +356,36 @@ export class MessagesEffects {
}
case 'chat-sync-request-ids': {
if (!currentRoom || !Array.isArray(event.ids) || !event.fromPeerId) return of({ type: 'NO_OP' });
const syncReqRoomId = event.roomId;
if (!Array.isArray(event.ids) || !event.fromPeerId) return of({ type: 'NO_OP' });
const ids: string[] = event.ids;
return from(Promise.all(ids.map((id: string) => this.db.getMessageById(id)))).pipe(
tap((maybeMessages) => {
mergeMap(async (maybeMessages) => {
const messages = maybeMessages.filter((m): m is Message => !!m);
// Hydrate reactions from the separate reactions table
const hydrated = await Promise.all(
messages.map(async (m) => {
const reactions = await this.db.getReactionsForMessage(m.id);
return { ...m, reactions };
})
);
// Collect attachment metadata for synced messages
const msgIds = hydrated.map(m => m.id);
const attachmentMetas = this.attachments.getAttachmentMetasForMessages(msgIds);
console.log(`[Sync] Sending ${hydrated.length} messages for ${ids.length} requested IDs`);
// Send in chunks to avoid large payloads
for (let i = 0; i < messages.length; i += this.CHUNK_SIZE) {
const chunk = messages.slice(i, i + this.CHUNK_SIZE);
for (let i = 0; i < hydrated.length; i += this.CHUNK_SIZE) {
const chunk = hydrated.slice(i, i + this.CHUNK_SIZE);
// Include only attachments for this chunk
const chunkAttachments: Record<string, any> = {};
for (const m of chunk) {
if (attachmentMetas[m.id]) chunkAttachments[m.id] = attachmentMetas[m.id];
}
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'chat-sync-batch',
roomId: currentRoom.id,
roomId: syncReqRoomId || '',
messages: chunk,
attachments: Object.keys(chunkAttachments).length > 0 ? chunkAttachments : undefined,
} as any);
}
}),
@@ -342,21 +394,54 @@ export class MessagesEffects {
}
case 'chat-sync-batch': {
if (!currentRoom || !Array.isArray(event.messages)) return of({ type: 'NO_OP' });
if (!Array.isArray(event.messages)) return of({ type: 'NO_OP' });
// Register synced attachment metadata so the UI knows about them
if (event.attachments && typeof event.attachments === 'object') {
this.attachments.registerSyncedAttachments(event.attachments);
}
return from((async () => {
const accepted: Message[] = [];
const toUpsert: Message[] = [];
for (const m of event.messages as Message[]) {
const existing = await this.db.getMessageById(m.id);
const ets = existing ? (existing.editedAt || existing.timestamp || 0) : -1;
const its = m.editedAt || m.timestamp || 0;
if (!existing || its > ets) {
const isNewer = !existing || its > ets;
if (isNewer) {
await this.db.saveMessage(m);
accepted.push(m);
}
// Persist incoming reactions to the reactions table (deduped)
const incomingReactions = m.reactions ?? [];
for (const r of incomingReactions) {
await this.db.saveReaction(r);
}
// Hydrate merged reactions from DB and upsert if anything changed
if (isNewer || incomingReactions.length > 0) {
const reactions = await this.db.getReactionsForMessage(m.id);
toUpsert.push({ ...(isNewer ? m : existing!), reactions });
}
}
return accepted;
// Auto-request unavailable images from the sender
if (event.attachments && event.fromPeerId) {
for (const [msgId, metas] of Object.entries(event.attachments) as [string, any[]][]) {
for (const meta of metas) {
if (meta.isImage) {
const atts = this.attachments.getForMessage(msgId);
const att = atts.find((a: any) => a.id === meta.id);
if (att && !att.available && !(att.receivedBytes && att.receivedBytes > 0)) {
this.attachments.requestImageFromAnyPeer(msgId, att);
}
}
}
}
}
return toUpsert;
})()).pipe(
mergeMap((accepted) => accepted.length ? of(MessagesActions.syncMessages({ messages: accepted })) : of({ type: 'NO_OP' }))
mergeMap((toUpsert) => toUpsert.length ? of(MessagesActions.syncMessages({ messages: toUpsert })) : of({ type: 'NO_OP' }))
);
}
case 'voice-state':
@@ -394,6 +479,11 @@ export class MessagesEffects {
this.attachments.handleFileCancel(event);
return of({ type: 'NO_OP' });
case 'file-not-found':
// Peer couldn't serve the file try another peer automatically
this.attachments.handleFileNotFound(event);
return of({ type: 'NO_OP' });
case 'message-edited':
if (event.messageId && event.content) {
this.db.updateMessage(event.messageId, { content: event.content, editedAt: event.editedAt });
@@ -526,4 +616,66 @@ export class MessagesEffects {
),
{ dispatch: false }
);
// Periodic sync poll 10s when catching up, 15min after a clean sync
private syncReset$ = new Subject<void>();
periodicSyncPoll$ = createEffect(() =>
timer(this.SYNC_POLL_FAST_MS).pipe(
// After each emission, decide the next delay based on last result
repeat({ delay: () => timer(this.lastSyncClean ? this.SYNC_POLL_SLOW_MS : this.SYNC_POLL_FAST_MS) }),
takeUntil(this.syncReset$), // restart via syncReset$ is handled externally if needed
withLatestFrom(
this.store.select(selectCurrentRoom)
),
filter(([, room]) => !!room && this.webrtc.getConnectedPeers().length > 0),
exhaustMap(([, room]) => {
const peers = this.webrtc.getConnectedPeers();
if (!room || peers.length === 0) return of(MessagesActions.syncComplete());
return from(this.db.getMessages(room.id, this.INVENTORY_LIMIT, 0)).pipe(
map((messages) => {
peers.forEach((pid) => {
try {
this.webrtc.sendToPeer(pid, { type: 'chat-inventory-request', roomId: room.id } as any);
} catch {}
});
return MessagesActions.startSync();
}),
catchError(() => {
this.lastSyncClean = false;
return of(MessagesActions.syncComplete());
})
);
})
)
);
// Auto-complete sync after a timeout if no sync messages arrive
syncTimeout$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.startSync),
switchMap(() => {
// If no syncMessages or syncComplete within 5s, auto-complete
return new Promise<void>((resolve) => setTimeout(resolve, 5000));
}),
withLatestFrom(this.store.select(selectMessagesSyncing)),
filter(([, syncing]) => syncing),
map(() => {
// No new messages arrived during this cycle → clean sync, slow down
this.lastSyncClean = true;
return MessagesActions.syncComplete();
})
)
);
// When new messages actually arrive via sync, switch back to fast polling
syncReceivedMessages$ = createEffect(
() =>
this.webrtc.onPeerConnected.pipe(
// A peer (re)connecting means we may have been offline — revert to aggressive polling
tap(() => { this.lastSyncClean = false; })
),
{ dispatch: false }
);
}