Refactor and code designing
This commit is contained in:
@@ -1,11 +1,20 @@
|
||||
/**
|
||||
* Core message CRUD effects (load, send, edit, delete, react)
|
||||
* and the central incoming-message dispatcher.
|
||||
*
|
||||
* Sync-lifecycle effects (polling, peer-connect handshakes) live in
|
||||
* `messages-sync.effects.ts` to keep this file focused.
|
||||
*
|
||||
* The giant `incomingMessages$` switch-case has been replaced by a
|
||||
* handler registry in `messages-incoming.handlers.ts`.
|
||||
*/
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { Actions, createEffect, ofType } from '@ngrx/effects';
|
||||
import { Store } from '@ngrx/store';
|
||||
import { of, from, timer, Subject } from 'rxjs';
|
||||
import { map, mergeMap, catchError, withLatestFrom, tap, switchMap, filter, exhaustMap, repeat, takeUntil } from 'rxjs/operators';
|
||||
import { of, from, EMPTY } from 'rxjs';
|
||||
import { mergeMap, catchError, withLatestFrom, switchMap } from 'rxjs/operators';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import * as MessagesActions from './messages.actions';
|
||||
import { selectMessagesSyncing } from './messages.selectors';
|
||||
import { MessagesActions } from './messages.actions';
|
||||
import { selectCurrentUser } from '../users/users.selectors';
|
||||
import { selectCurrentRoom } from '../rooms/rooms.selectors';
|
||||
import { DatabaseService } from '../../core/services/database.service';
|
||||
@@ -13,55 +22,46 @@ import { WebRTCService } from '../../core/services/webrtc.service';
|
||||
import { TimeSyncService } from '../../core/services/time-sync.service';
|
||||
import { AttachmentService } from '../../core/services/attachment.service';
|
||||
import { Message, Reaction } from '../../core/models';
|
||||
import * as UsersActions from '../users/users.actions';
|
||||
import * as RoomsActions from '../rooms/rooms.actions';
|
||||
import { hydrateMessages } from './messages.helpers';
|
||||
import {
|
||||
dispatchIncomingMessage,
|
||||
IncomingMessageContext,
|
||||
} from './messages-incoming.handlers';
|
||||
|
||||
@Injectable()
|
||||
export class MessagesEffects {
|
||||
private actions$ = inject(Actions);
|
||||
private store = inject(Store);
|
||||
private db = inject(DatabaseService);
|
||||
private webrtc = inject(WebRTCService);
|
||||
private timeSync = inject(TimeSyncService);
|
||||
private attachments = inject(AttachmentService);
|
||||
private readonly actions$ = inject(Actions);
|
||||
private readonly store = inject(Store);
|
||||
private readonly db = inject(DatabaseService);
|
||||
private readonly webrtc = inject(WebRTCService);
|
||||
private readonly timeSync = inject(TimeSyncService);
|
||||
private readonly attachments = inject(AttachmentService);
|
||||
|
||||
private readonly INVENTORY_LIMIT = 1000; // number of recent messages to consider
|
||||
private readonly CHUNK_SIZE = 200; // chunk size for inventory/batch transfers
|
||||
private readonly SYNC_POLL_FAST_MS = 10_000; // 10s — aggressive poll
|
||||
private readonly SYNC_POLL_SLOW_MS = 900_000; // 15min — idle poll after clean sync
|
||||
private lastSyncClean = false; // true after a sync cycle with no new messages
|
||||
|
||||
// Load messages from local database (hydrate reactions from separate table)
|
||||
/** Loads messages for a room from the local database, hydrating reactions. */
|
||||
loadMessages$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.loadMessages),
|
||||
switchMap(({ roomId }) =>
|
||||
from(this.db.getMessages(roomId)).pipe(
|
||||
mergeMap(async (messages) => {
|
||||
// Hydrate each message with its reactions from the reactions table
|
||||
const hydrated = await Promise.all(
|
||||
messages.map(async (m) => {
|
||||
const reactions = await this.db.getReactionsForMessage(m.id);
|
||||
return reactions.length > 0 ? { ...m, reactions } : m;
|
||||
})
|
||||
);
|
||||
const hydrated = await hydrateMessages(messages, this.db);
|
||||
return MessagesActions.loadMessagesSuccess({ messages: hydrated });
|
||||
}),
|
||||
catchError((error) =>
|
||||
of(MessagesActions.loadMessagesFailure({ error: error.message }))
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
of(MessagesActions.loadMessagesFailure({ error: error.message })),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
// Send message
|
||||
/** Constructs a new message, persists it locally, and broadcasts to all peers. */
|
||||
sendMessage$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.sendMessage),
|
||||
withLatestFrom(
|
||||
this.store.select(selectCurrentUser),
|
||||
this.store.select(selectCurrentRoom)
|
||||
this.store.select(selectCurrentRoom),
|
||||
),
|
||||
mergeMap(([{ content, replyToId, channelId }, currentUser, currentRoom]) => {
|
||||
if (!currentUser || !currentRoom) {
|
||||
@@ -81,24 +81,18 @@ export class MessagesEffects {
|
||||
replyToId,
|
||||
};
|
||||
|
||||
// Save to local DB
|
||||
this.db.saveMessage(message);
|
||||
|
||||
// Broadcast to all peers
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'chat-message',
|
||||
message,
|
||||
});
|
||||
this.webrtc.broadcastMessage({ type: 'chat-message', message });
|
||||
|
||||
return of(MessagesActions.sendMessageSuccess({ message }));
|
||||
}),
|
||||
catchError((error) =>
|
||||
of(MessagesActions.sendMessageFailure({ error: error.message }))
|
||||
)
|
||||
)
|
||||
of(MessagesActions.sendMessageFailure({ error: error.message })),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
// Edit message
|
||||
/** Edits an existing message (author-only), updates DB, and broadcasts the change. */
|
||||
editMessage$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.editMessage),
|
||||
@@ -109,40 +103,29 @@ export class MessagesEffects {
|
||||
}
|
||||
|
||||
return from(this.db.getMessageById(messageId)).pipe(
|
||||
mergeMap((existingMessage) => {
|
||||
if (!existingMessage) {
|
||||
mergeMap((existing) => {
|
||||
if (!existing) {
|
||||
return of(MessagesActions.editMessageFailure({ error: 'Message not found' }));
|
||||
}
|
||||
|
||||
// Check if user owns the message
|
||||
if (existingMessage.senderId !== currentUser.id) {
|
||||
if (existing.senderId !== currentUser.id) {
|
||||
return of(MessagesActions.editMessageFailure({ error: 'Cannot edit others messages' }));
|
||||
}
|
||||
|
||||
const editedAt = this.timeSync.now();
|
||||
|
||||
// Update in DB
|
||||
this.db.updateMessage(messageId, { content, editedAt });
|
||||
|
||||
// Broadcast to peers
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'message-edited',
|
||||
messageId,
|
||||
content,
|
||||
editedAt,
|
||||
});
|
||||
this.webrtc.broadcastMessage({ type: 'message-edited', messageId, content, editedAt });
|
||||
|
||||
return of(MessagesActions.editMessageSuccess({ messageId, content, editedAt }));
|
||||
}),
|
||||
catchError((error) =>
|
||||
of(MessagesActions.editMessageFailure({ error: error.message }))
|
||||
)
|
||||
of(MessagesActions.editMessageFailure({ error: error.message })),
|
||||
),
|
||||
);
|
||||
})
|
||||
)
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Delete message (user's own)
|
||||
/** Soft-deletes a message (author-only), marks it deleted in DB, and broadcasts. */
|
||||
deleteMessage$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.deleteMessage),
|
||||
@@ -153,36 +136,28 @@ export class MessagesEffects {
|
||||
}
|
||||
|
||||
return from(this.db.getMessageById(messageId)).pipe(
|
||||
mergeMap((existingMessage) => {
|
||||
if (!existingMessage) {
|
||||
mergeMap((existing) => {
|
||||
if (!existing) {
|
||||
return of(MessagesActions.deleteMessageFailure({ error: 'Message not found' }));
|
||||
}
|
||||
|
||||
// Check if user owns the message
|
||||
if (existingMessage.senderId !== currentUser.id) {
|
||||
if (existing.senderId !== currentUser.id) {
|
||||
return of(MessagesActions.deleteMessageFailure({ error: 'Cannot delete others messages' }));
|
||||
}
|
||||
|
||||
// Soft delete - mark as deleted
|
||||
this.db.updateMessage(messageId, { isDeleted: true });
|
||||
|
||||
// Broadcast to peers
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'message-deleted',
|
||||
messageId,
|
||||
});
|
||||
this.webrtc.broadcastMessage({ type: 'message-deleted', messageId });
|
||||
|
||||
return of(MessagesActions.deleteMessageSuccess({ messageId }));
|
||||
}),
|
||||
catchError((error) =>
|
||||
of(MessagesActions.deleteMessageFailure({ error: error.message }))
|
||||
)
|
||||
of(MessagesActions.deleteMessageFailure({ error: error.message })),
|
||||
),
|
||||
);
|
||||
})
|
||||
)
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Admin delete message
|
||||
/** Soft-deletes any message (admin+ only). */
|
||||
adminDeleteMessage$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.adminDeleteMessage),
|
||||
@@ -192,38 +167,33 @@ export class MessagesEffects {
|
||||
return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' }));
|
||||
}
|
||||
|
||||
// Check admin permission
|
||||
if (currentUser.role !== 'host' && currentUser.role !== 'admin' && currentUser.role !== 'moderator') {
|
||||
const hasPermission =
|
||||
currentUser.role === 'host' ||
|
||||
currentUser.role === 'admin' ||
|
||||
currentUser.role === 'moderator';
|
||||
|
||||
if (!hasPermission) {
|
||||
return of(MessagesActions.deleteMessageFailure({ error: 'Permission denied' }));
|
||||
}
|
||||
|
||||
// Soft delete
|
||||
this.db.updateMessage(messageId, { isDeleted: true });
|
||||
|
||||
// Broadcast to peers
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'message-deleted',
|
||||
messageId,
|
||||
deletedBy: currentUser.id,
|
||||
});
|
||||
this.webrtc.broadcastMessage({ type: 'message-deleted', messageId, deletedBy: currentUser.id });
|
||||
|
||||
return of(MessagesActions.deleteMessageSuccess({ messageId }));
|
||||
}),
|
||||
catchError((error) =>
|
||||
of(MessagesActions.deleteMessageFailure({ error: error.message }))
|
||||
)
|
||||
)
|
||||
of(MessagesActions.deleteMessageFailure({ error: error.message })),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
// Add reaction
|
||||
/** Adds an emoji reaction to a message, persists it, and broadcasts to peers. */
|
||||
addReaction$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.addReaction),
|
||||
withLatestFrom(this.store.select(selectCurrentUser)),
|
||||
mergeMap(([{ messageId, emoji }, currentUser]) => {
|
||||
if (!currentUser) {
|
||||
return of({ type: 'NO_OP' });
|
||||
}
|
||||
if (!currentUser) return EMPTY;
|
||||
|
||||
const reaction: Reaction = {
|
||||
id: uuidv4(),
|
||||
@@ -234,35 +204,23 @@ export class MessagesEffects {
|
||||
timestamp: this.timeSync.now(),
|
||||
};
|
||||
|
||||
// Save to DB
|
||||
this.db.saveReaction(reaction);
|
||||
|
||||
// Broadcast to peers
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'reaction-added',
|
||||
messageId,
|
||||
reaction,
|
||||
});
|
||||
this.webrtc.broadcastMessage({ type: 'reaction-added', messageId, reaction });
|
||||
|
||||
return of(MessagesActions.addReactionSuccess({ reaction }));
|
||||
})
|
||||
)
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Remove reaction
|
||||
/** Removes the current user's reaction from a message, deletes from DB, and broadcasts. */
|
||||
removeReaction$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.removeReaction),
|
||||
withLatestFrom(this.store.select(selectCurrentUser)),
|
||||
mergeMap(([{ messageId, emoji }, currentUser]) => {
|
||||
if (!currentUser) {
|
||||
return of({ type: 'NO_OP' });
|
||||
}
|
||||
if (!currentUser) return EMPTY;
|
||||
|
||||
// Remove from DB
|
||||
this.db.removeReaction(messageId, currentUser.id, emoji);
|
||||
|
||||
// Broadcast to peers
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'reaction-removed',
|
||||
messageId,
|
||||
@@ -270,412 +228,37 @@ export class MessagesEffects {
|
||||
emoji,
|
||||
});
|
||||
|
||||
return of(MessagesActions.removeReactionSuccess({ messageId, oderId: currentUser.id, emoji }));
|
||||
})
|
||||
)
|
||||
return of(
|
||||
MessagesActions.removeReactionSuccess({
|
||||
messageId,
|
||||
oderId: currentUser.id,
|
||||
emoji,
|
||||
}),
|
||||
);
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Listen to incoming messages from WebRTC peers
|
||||
/**
|
||||
* Central dispatcher for all incoming P2P messages.
|
||||
* Delegates to handler functions in `messages-incoming.handlers.ts`.
|
||||
*/
|
||||
incomingMessages$ = createEffect(() =>
|
||||
this.webrtc.onMessageReceived.pipe(
|
||||
withLatestFrom(
|
||||
this.store.select(selectCurrentUser),
|
||||
this.store.select(selectCurrentRoom)
|
||||
this.store.select(selectCurrentRoom),
|
||||
),
|
||||
mergeMap(([event, currentUser, currentRoom]: [any, any, any]) => {
|
||||
console.log('Received peer message:', event.type, event);
|
||||
|
||||
switch (event.type) {
|
||||
// Precise sync via ID inventory and targeted requests
|
||||
case 'chat-inventory-request': {
|
||||
const reqRoomId = event.roomId;
|
||||
if (!reqRoomId || !event.fromPeerId) return of({ type: 'NO_OP' });
|
||||
return from(this.db.getMessages(reqRoomId, this.INVENTORY_LIMIT, 0)).pipe(
|
||||
mergeMap(async (messages) => {
|
||||
const items = await Promise.all(
|
||||
messages.map(async (m) => {
|
||||
const reactions = await this.db.getReactionsForMessage(m.id);
|
||||
return { id: m.id, ts: m.editedAt || m.timestamp || 0, rc: reactions.length };
|
||||
})
|
||||
);
|
||||
items.sort((a, b) => a.ts - b.ts);
|
||||
console.log(`[Sync] Sending inventory of ${items.length} items for room ${reqRoomId}`);
|
||||
for (let i = 0; i < items.length; i += this.CHUNK_SIZE) {
|
||||
const chunk = items.slice(i, i + this.CHUNK_SIZE);
|
||||
this.webrtc.sendToPeer(event.fromPeerId, {
|
||||
type: 'chat-inventory',
|
||||
roomId: reqRoomId,
|
||||
items: chunk,
|
||||
total: items.length,
|
||||
index: i,
|
||||
} as any);
|
||||
}
|
||||
}),
|
||||
map(() => ({ type: 'NO_OP' }))
|
||||
);
|
||||
}
|
||||
|
||||
case 'chat-inventory': {
|
||||
const invRoomId = event.roomId;
|
||||
if (!invRoomId || !Array.isArray(event.items) || !event.fromPeerId) return of({ type: 'NO_OP' });
|
||||
// Determine which IDs we are missing or have older versions of
|
||||
return from(this.db.getMessages(invRoomId, this.INVENTORY_LIMIT, 0)).pipe(
|
||||
mergeMap(async (local) => {
|
||||
// Build local map with timestamps and reaction counts
|
||||
const localMap = new Map<string, { ts: number; rc: number }>();
|
||||
await Promise.all(
|
||||
local.map(async (m) => {
|
||||
const reactions = await this.db.getReactionsForMessage(m.id);
|
||||
localMap.set(m.id, { ts: m.editedAt || m.timestamp || 0, rc: reactions.length });
|
||||
})
|
||||
);
|
||||
const missing: string[] = [];
|
||||
for (const item of event.items as Array<{ id: string; ts: number; rc?: number }>) {
|
||||
const localEntry = localMap.get(item.id);
|
||||
if (!localEntry) {
|
||||
missing.push(item.id);
|
||||
} else if (item.ts > localEntry.ts) {
|
||||
missing.push(item.id);
|
||||
} else if (item.rc !== undefined && item.rc !== localEntry.rc) {
|
||||
missing.push(item.id);
|
||||
}
|
||||
}
|
||||
console.log(`[Sync] Inventory received: ${event.items.length} remote, ${missing.length} missing/stale`);
|
||||
// Request in chunks from the sender
|
||||
for (let i = 0; i < missing.length; i += this.CHUNK_SIZE) {
|
||||
const chunk = missing.slice(i, i + this.CHUNK_SIZE);
|
||||
this.webrtc.sendToPeer(event.fromPeerId, {
|
||||
type: 'chat-sync-request-ids',
|
||||
roomId: invRoomId,
|
||||
ids: chunk,
|
||||
} as any);
|
||||
}
|
||||
return { type: 'NO_OP' } as any;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
case 'chat-sync-request-ids': {
|
||||
const syncReqRoomId = event.roomId;
|
||||
if (!Array.isArray(event.ids) || !event.fromPeerId) return of({ type: 'NO_OP' });
|
||||
const ids: string[] = event.ids;
|
||||
return from(Promise.all(ids.map((id: string) => this.db.getMessageById(id)))).pipe(
|
||||
mergeMap(async (maybeMessages) => {
|
||||
const messages = maybeMessages.filter((m): m is Message => !!m);
|
||||
// Hydrate reactions from the separate reactions table
|
||||
const hydrated = await Promise.all(
|
||||
messages.map(async (m) => {
|
||||
const reactions = await this.db.getReactionsForMessage(m.id);
|
||||
return { ...m, reactions };
|
||||
})
|
||||
);
|
||||
// Collect attachment metadata for synced messages
|
||||
const msgIds = hydrated.map(m => m.id);
|
||||
const attachmentMetas = this.attachments.getAttachmentMetasForMessages(msgIds);
|
||||
console.log(`[Sync] Sending ${hydrated.length} messages for ${ids.length} requested IDs`);
|
||||
// Send in chunks to avoid large payloads
|
||||
for (let i = 0; i < hydrated.length; i += this.CHUNK_SIZE) {
|
||||
const chunk = hydrated.slice(i, i + this.CHUNK_SIZE);
|
||||
// Include only attachments for this chunk
|
||||
const chunkAttachments: Record<string, any> = {};
|
||||
for (const m of chunk) {
|
||||
if (attachmentMetas[m.id]) chunkAttachments[m.id] = attachmentMetas[m.id];
|
||||
}
|
||||
this.webrtc.sendToPeer(event.fromPeerId, {
|
||||
type: 'chat-sync-batch',
|
||||
roomId: syncReqRoomId || '',
|
||||
messages: chunk,
|
||||
attachments: Object.keys(chunkAttachments).length > 0 ? chunkAttachments : undefined,
|
||||
} as any);
|
||||
}
|
||||
}),
|
||||
map(() => ({ type: 'NO_OP' }))
|
||||
);
|
||||
}
|
||||
|
||||
case 'chat-sync-batch': {
|
||||
if (!Array.isArray(event.messages)) return of({ type: 'NO_OP' });
|
||||
// Register synced attachment metadata so the UI knows about them
|
||||
if (event.attachments && typeof event.attachments === 'object') {
|
||||
this.attachments.registerSyncedAttachments(event.attachments);
|
||||
}
|
||||
return from((async () => {
|
||||
const toUpsert: Message[] = [];
|
||||
for (const m of event.messages as Message[]) {
|
||||
const existing = await this.db.getMessageById(m.id);
|
||||
const ets = existing ? (existing.editedAt || existing.timestamp || 0) : -1;
|
||||
const its = m.editedAt || m.timestamp || 0;
|
||||
const isNewer = !existing || its > ets;
|
||||
|
||||
if (isNewer) {
|
||||
await this.db.saveMessage(m);
|
||||
}
|
||||
|
||||
// Persist incoming reactions to the reactions table (deduped)
|
||||
const incomingReactions = m.reactions ?? [];
|
||||
for (const r of incomingReactions) {
|
||||
await this.db.saveReaction(r);
|
||||
}
|
||||
|
||||
// Hydrate merged reactions from DB and upsert if anything changed
|
||||
if (isNewer || incomingReactions.length > 0) {
|
||||
const reactions = await this.db.getReactionsForMessage(m.id);
|
||||
toUpsert.push({ ...(isNewer ? m : existing!), reactions });
|
||||
}
|
||||
}
|
||||
|
||||
// Auto-request unavailable images from the sender
|
||||
if (event.attachments && event.fromPeerId) {
|
||||
for (const [msgId, metas] of Object.entries(event.attachments) as [string, any[]][]) {
|
||||
for (const meta of metas) {
|
||||
if (meta.isImage) {
|
||||
const atts = this.attachments.getForMessage(msgId);
|
||||
const att = atts.find((a: any) => a.id === meta.id);
|
||||
if (att && !att.available && !(att.receivedBytes && att.receivedBytes > 0)) {
|
||||
this.attachments.requestImageFromAnyPeer(msgId, att);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return toUpsert;
|
||||
})()).pipe(
|
||||
mergeMap((toUpsert) => toUpsert.length ? of(MessagesActions.syncMessages({ messages: toUpsert })) : of({ type: 'NO_OP' }))
|
||||
);
|
||||
}
|
||||
case 'voice-state':
|
||||
// Update voice state for the sender
|
||||
if (event.oderId && event.voiceState) {
|
||||
const userId = event.oderId;
|
||||
return of(UsersActions.updateVoiceState({ userId, voiceState: event.voiceState }));
|
||||
}
|
||||
break;
|
||||
|
||||
case 'chat-message':
|
||||
// Save to local DB and dispatch receive action
|
||||
// Skip if this is our own message (sent via server relay)
|
||||
if (event.message && event.message.senderId !== currentUser?.id && event.message.senderId !== currentUser?.oderId) {
|
||||
this.db.saveMessage(event.message);
|
||||
return of(MessagesActions.receiveMessage({ message: event.message }));
|
||||
}
|
||||
break;
|
||||
|
||||
case 'file-announce':
|
||||
this.attachments.handleFileAnnounce(event);
|
||||
return of({ type: 'NO_OP' });
|
||||
|
||||
case 'file-chunk':
|
||||
this.attachments.handleFileChunk(event);
|
||||
return of({ type: 'NO_OP' });
|
||||
|
||||
case 'file-request':
|
||||
// Uploader can fulfill request directly via AttachmentService
|
||||
this.attachments.handleFileRequest(event);
|
||||
return of({ type: 'NO_OP' });
|
||||
|
||||
case 'file-cancel':
|
||||
// Stop any in-progress upload to the requester
|
||||
this.attachments.handleFileCancel(event);
|
||||
return of({ type: 'NO_OP' });
|
||||
|
||||
case 'file-not-found':
|
||||
// Peer couldn't serve the file – try another peer automatically
|
||||
this.attachments.handleFileNotFound(event);
|
||||
return of({ type: 'NO_OP' });
|
||||
|
||||
case 'message-edited':
|
||||
if (event.messageId && event.content) {
|
||||
this.db.updateMessage(event.messageId, { content: event.content, editedAt: event.editedAt });
|
||||
return of(MessagesActions.editMessageSuccess({
|
||||
messageId: event.messageId,
|
||||
content: event.content,
|
||||
editedAt: event.editedAt,
|
||||
}));
|
||||
}
|
||||
break;
|
||||
|
||||
case 'message-deleted':
|
||||
if (event.messageId) {
|
||||
this.db.deleteMessage(event.messageId);
|
||||
return of(MessagesActions.deleteMessageSuccess({ messageId: event.messageId }));
|
||||
}
|
||||
break;
|
||||
|
||||
case 'reaction-added':
|
||||
if (event.messageId && event.reaction) {
|
||||
this.db.saveReaction(event.reaction);
|
||||
return of(MessagesActions.addReactionSuccess({
|
||||
reaction: event.reaction,
|
||||
}));
|
||||
}
|
||||
break;
|
||||
|
||||
case 'reaction-removed':
|
||||
if (event.messageId && event.oderId && event.emoji) {
|
||||
this.db.removeReaction(event.messageId, event.oderId, event.emoji);
|
||||
return of(MessagesActions.removeReactionSuccess({
|
||||
messageId: event.messageId,
|
||||
oderId: event.oderId,
|
||||
emoji: event.emoji,
|
||||
}));
|
||||
}
|
||||
break;
|
||||
|
||||
// Chat sync handshake: summary -> request -> full
|
||||
case 'chat-sync-summary':
|
||||
// Compare summaries and request sync if the peer has newer data
|
||||
if (!currentRoom) return of({ type: 'NO_OP' });
|
||||
return from(this.db.getMessages(currentRoom.id, 10000, 0)).pipe(
|
||||
tap((local) => {
|
||||
const localCount = local.length;
|
||||
const localLastUpdated = local.reduce((max, m) => Math.max(max, m.editedAt || m.timestamp || 0), 0);
|
||||
const remoteLastUpdated = event.lastUpdated || 0;
|
||||
const remoteCount = event.count || 0;
|
||||
|
||||
const identical = localLastUpdated === remoteLastUpdated && localCount === remoteCount;
|
||||
const needsSync = remoteLastUpdated > localLastUpdated || (remoteLastUpdated === localLastUpdated && remoteCount > localCount);
|
||||
|
||||
if (!identical && needsSync && event.fromPeerId) {
|
||||
this.webrtc.sendToPeer(event.fromPeerId, { type: 'chat-sync-request', roomId: currentRoom.id } as any);
|
||||
}
|
||||
}),
|
||||
map(() => ({ type: 'NO_OP' }))
|
||||
);
|
||||
|
||||
case 'chat-sync-request':
|
||||
if (!currentRoom || !event.fromPeerId) return of({ type: 'NO_OP' });
|
||||
return from(this.db.getMessages(currentRoom.id, 10000, 0)).pipe(
|
||||
tap((all) => {
|
||||
this.webrtc.sendToPeer(event.fromPeerId, { type: 'chat-sync-full', roomId: currentRoom.id, messages: all } as any);
|
||||
}),
|
||||
map(() => ({ type: 'NO_OP' }))
|
||||
);
|
||||
|
||||
case 'chat-sync-full':
|
||||
if (event.messages && Array.isArray(event.messages)) {
|
||||
// Merge into local DB and update store
|
||||
event.messages.forEach((m: Message) => this.db.saveMessage(m));
|
||||
return of(MessagesActions.syncMessages({ messages: event.messages }));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return of({ type: 'NO_OP' });
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
// On peer connect, broadcast local dataset summary
|
||||
peerConnectedSync$ = createEffect(
|
||||
() =>
|
||||
this.webrtc.onPeerConnected.pipe(
|
||||
withLatestFrom(this.store.select(selectCurrentRoom)),
|
||||
mergeMap(([peerId, room]) => {
|
||||
if (!room) return of({ type: 'NO_OP' });
|
||||
return from(this.db.getMessages(room.id, 10000, 0)).pipe(
|
||||
map((messages) => {
|
||||
const count = messages.length;
|
||||
const lastUpdated = messages.reduce((max, m) => Math.max(max, m.editedAt || m.timestamp || 0), 0);
|
||||
// Send summary specifically to the newly connected peer
|
||||
this.webrtc.sendToPeer(peerId, { type: 'chat-sync-summary', roomId: room.id, count, lastUpdated } as any);
|
||||
// Also request their inventory for precise reconciliation
|
||||
this.webrtc.sendToPeer(peerId, { type: 'chat-inventory-request', roomId: room.id } as any);
|
||||
return { type: 'NO_OP' };
|
||||
})
|
||||
);
|
||||
})
|
||||
),
|
||||
{ dispatch: false }
|
||||
);
|
||||
|
||||
// Kick off sync to all currently connected peers shortly after joining a room
|
||||
joinRoomSyncKickoff$ = createEffect(
|
||||
() =>
|
||||
this.actions$.pipe(
|
||||
ofType(RoomsActions.joinRoomSuccess),
|
||||
withLatestFrom(this.store.select(selectCurrentRoom)),
|
||||
mergeMap(([{ room }, currentRoom]) => {
|
||||
const activeRoom = currentRoom || room;
|
||||
if (!activeRoom) return of({ type: 'NO_OP' });
|
||||
return from(this.db.getMessages(activeRoom.id, 10000, 0)).pipe(
|
||||
tap((messages) => {
|
||||
const count = messages.length;
|
||||
const lastUpdated = messages.reduce((max, m) => Math.max(max, m.editedAt || m.timestamp || 0), 0);
|
||||
const peers = this.webrtc.getConnectedPeers();
|
||||
peers.forEach((pid) => {
|
||||
try {
|
||||
this.webrtc.sendToPeer(pid, { type: 'chat-sync-summary', roomId: activeRoom.id, count, lastUpdated } as any);
|
||||
this.webrtc.sendToPeer(pid, { type: 'chat-inventory-request', roomId: activeRoom.id } as any);
|
||||
} catch {}
|
||||
});
|
||||
}),
|
||||
map(() => ({ type: 'NO_OP' }))
|
||||
);
|
||||
})
|
||||
),
|
||||
{ dispatch: false }
|
||||
);
|
||||
|
||||
// Periodic sync poll – 10s when catching up, 15min after a clean sync
|
||||
private syncReset$ = new Subject<void>();
|
||||
|
||||
periodicSyncPoll$ = createEffect(() =>
|
||||
timer(this.SYNC_POLL_FAST_MS).pipe(
|
||||
// After each emission, decide the next delay based on last result
|
||||
repeat({ delay: () => timer(this.lastSyncClean ? this.SYNC_POLL_SLOW_MS : this.SYNC_POLL_FAST_MS) }),
|
||||
takeUntil(this.syncReset$), // restart via syncReset$ is handled externally if needed
|
||||
withLatestFrom(
|
||||
this.store.select(selectCurrentRoom)
|
||||
),
|
||||
filter(([, room]) => !!room && this.webrtc.getConnectedPeers().length > 0),
|
||||
exhaustMap(([, room]) => {
|
||||
const peers = this.webrtc.getConnectedPeers();
|
||||
if (!room || peers.length === 0) return of(MessagesActions.syncComplete());
|
||||
|
||||
return from(this.db.getMessages(room.id, this.INVENTORY_LIMIT, 0)).pipe(
|
||||
map((messages) => {
|
||||
peers.forEach((pid) => {
|
||||
try {
|
||||
this.webrtc.sendToPeer(pid, { type: 'chat-inventory-request', roomId: room.id } as any);
|
||||
} catch {}
|
||||
});
|
||||
return MessagesActions.startSync();
|
||||
}),
|
||||
catchError(() => {
|
||||
this.lastSyncClean = false;
|
||||
return of(MessagesActions.syncComplete());
|
||||
})
|
||||
);
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
// Auto-complete sync after a timeout if no sync messages arrive
|
||||
syncTimeout$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(MessagesActions.startSync),
|
||||
switchMap(() => {
|
||||
// If no syncMessages or syncComplete within 5s, auto-complete
|
||||
return new Promise<void>((resolve) => setTimeout(resolve, 5000));
|
||||
const ctx: IncomingMessageContext = {
|
||||
db: this.db,
|
||||
webrtc: this.webrtc,
|
||||
attachments: this.attachments,
|
||||
currentUser,
|
||||
currentRoom,
|
||||
};
|
||||
return dispatchIncomingMessage(event, ctx);
|
||||
}),
|
||||
withLatestFrom(this.store.select(selectMessagesSyncing)),
|
||||
filter(([, syncing]) => syncing),
|
||||
map(() => {
|
||||
// No new messages arrived during this cycle → clean sync, slow down
|
||||
this.lastSyncClean = true;
|
||||
return MessagesActions.syncComplete();
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
// When new messages actually arrive via sync, switch back to fast polling
|
||||
syncReceivedMessages$ = createEffect(
|
||||
() =>
|
||||
this.webrtc.onPeerConnected.pipe(
|
||||
// A peer (re)connecting means we may have been offline — revert to aggressive polling
|
||||
tap(() => { this.lastSyncClean = false; })
|
||||
),
|
||||
{ dispatch: false }
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user