/** * Sync-lifecycle effects for the messages store slice. * * These effects manage the periodic sync polling, peer-connect * handshakes, and room-activation kickoff that keep message databases * in sync across peers. * * Extracted from the monolithic MessagesEffects to keep each * class focused on a single concern. */ /* eslint-disable @typescript-eslint/member-ordering */ import { Injectable, inject } from '@angular/core'; import { Actions, createEffect, ofType } from '@ngrx/effects'; import { Store } from '@ngrx/store'; import { of, from, timer, Subject, EMPTY } from 'rxjs'; import { map, mergeMap, catchError, withLatestFrom, tap, filter, exhaustMap, switchMap, repeat, startWith } from 'rxjs/operators'; import { MessagesActions } from './messages.actions'; import { RoomsActions } from '../rooms/rooms.actions'; import { selectMessagesSyncing } from './messages.selectors'; import { selectCurrentRoom } from '../rooms/rooms.selectors'; import { RealtimeSessionFacade } from '../../core/realtime'; import { DatabaseService } from '../../infrastructure/persistence'; import { DebuggingService } from '../../core/services/debugging.service'; import { INVENTORY_LIMIT, FULL_SYNC_LIMIT, SYNC_POLL_FAST_MS, SYNC_POLL_SLOW_MS, SYNC_TIMEOUT_MS, getLatestTimestamp } from './messages.helpers'; @Injectable() export class MessagesSyncEffects { private readonly actions$ = inject(Actions); private readonly store = inject(Store); private readonly db = inject(DatabaseService); private readonly debugging = inject(DebuggingService); private readonly webrtc = inject(RealtimeSessionFacade); /** Tracks whether the last sync cycle found no new messages. */ private lastSyncClean = false; /** Subject to reset the periodic sync timer. */ private readonly syncReset$ = new Subject(); /** * When a new peer connects, sends our dataset summary and an * inventory request so both sides can reconcile. */ peerConnectedSync$ = createEffect( () => this.webrtc.onPeerConnected.pipe( withLatestFrom(this.store.select(selectCurrentRoom)), mergeMap(([peerId, room]) => { if (!room) return EMPTY; return from( this.db.getMessages(room.id, FULL_SYNC_LIMIT, 0) ).pipe( tap((messages) => { const count = messages.length; const lastUpdated = getLatestTimestamp(messages); this.webrtc.sendToPeer(peerId, { type: 'chat-sync-summary', roomId: room.id, count, lastUpdated }); this.webrtc.sendToPeer(peerId, { type: 'chat-inventory-request', roomId: room.id }); }) ); }) ), { dispatch: false } ); /** * When the user joins or views a room, sends a summary and inventory * request to every already-connected peer. */ roomActivationSyncKickoff$ = createEffect( () => this.actions$.pipe( ofType(RoomsActions.joinRoomSuccess, RoomsActions.viewServerSuccess), withLatestFrom(this.store.select(selectCurrentRoom)), switchMap(([{ room }, currentRoom]) => { const requestedRoomId = room.id; return timer(75).pipe( withLatestFrom(this.store.select(selectCurrentRoom)), switchMap(([, latestCurrentRoom]) => { const activeRoom = latestCurrentRoom ?? currentRoom ?? room; const peers = this.webrtc.getConnectedPeers(); if (!activeRoom || activeRoom.id !== requestedRoomId || peers.length === 0) { return EMPTY; } return from(this.db.getMessages(activeRoom.id, FULL_SYNC_LIMIT, 0)).pipe( tap((messages) => { const count = messages.length; const lastUpdated = getLatestTimestamp(messages); for (const pid of peers) { try { this.webrtc.sendToPeer(pid, { type: 'chat-sync-summary', roomId: activeRoom.id, count, lastUpdated }); this.webrtc.sendToPeer(pid, { type: 'chat-inventory-request', roomId: activeRoom.id }); } catch (error) { this.debugging.warn('messages', 'Failed to kick off room sync for peer', { error, peerId: pid, roomId: activeRoom.id }); } } }) ); }) ); }) ), { dispatch: false } ); /** * Reset the polling cadence when the active room changes so the next * room does not inherit a stale slow-poll delay. */ resetPeriodicSyncOnRoomActivation$ = createEffect( () => this.actions$.pipe( ofType(RoomsActions.joinRoomSuccess, RoomsActions.viewServerSuccess), tap(() => { this.lastSyncClean = false; this.syncReset$.next(); }) ), { dispatch: false } ); /** * Alternates between fast (10 s) and slow (15 min) sync intervals. * Sends inventory requests to all connected peers for the active room. */ periodicSyncPoll$ = createEffect(() => this.syncReset$.pipe( startWith(undefined), switchMap(() => timer(SYNC_POLL_FAST_MS).pipe( repeat({ delay: () => timer( this.lastSyncClean ? SYNC_POLL_SLOW_MS : SYNC_POLL_FAST_MS ) }), withLatestFrom(this.store.select(selectCurrentRoom)), filter( ([, room]) => !!room && this.webrtc.getConnectedPeers().length > 0 ), exhaustMap(([, room]) => { const peers = this.webrtc.getConnectedPeers(); if (!room || peers.length === 0) { return of(MessagesActions.syncComplete()); } return from( this.db.getMessages(room.id, INVENTORY_LIMIT, 0) ).pipe( map(() => { for (const pid of peers) { try { this.webrtc.sendToPeer(pid, { type: 'chat-inventory-request', roomId: room.id }); } catch (error) { this.debugging.warn('messages', 'Failed to request peer inventory during sync poll', { error, peerId: pid, roomId: room.id }); } } return MessagesActions.startSync(); }), catchError((error) => { this.lastSyncClean = false; this.debugging.warn('messages', 'Periodic sync poll failed', { error, roomId: room.id }); return of(MessagesActions.syncComplete()); }) ); }) ) ) ) ); /** * Auto-completes a sync cycle after a timeout if no messages arrive. * Switches to slow polling when the cycle is clean. */ syncTimeout$ = createEffect(() => this.actions$.pipe( ofType(MessagesActions.startSync), switchMap(() => from( new Promise((resolve) => setTimeout(resolve, SYNC_TIMEOUT_MS)) )), withLatestFrom(this.store.select(selectMessagesSyncing)), filter(([, syncing]) => syncing), map(() => { this.lastSyncClean = true; return MessagesActions.syncComplete(); }) ) ); /** * When a peer (re)connects, revert to aggressive polling in case * we missed messages while disconnected. */ syncReceivedMessages$ = createEffect( () => this.webrtc.onPeerConnected.pipe( tap(() => { this.lastSyncClean = false; }) ), { dispatch: false } ); }