Files
Toju/src/app/store/messages/messages-sync.effects.ts
2026-03-09 23:02:52 +01:00

248 lines
6.9 KiB
TypeScript

/**
* Sync-lifecycle effects for the messages store slice.
*
* These effects manage the periodic sync polling, peer-connect
* handshakes, and join-room kickoff that keep message databases
* in sync across peers.
*
* Extracted from the monolithic MessagesEffects to keep each
* class focused on a single concern.
*/
/* eslint-disable @typescript-eslint/member-ordering */
import { Injectable, inject } from '@angular/core';
import {
Actions,
createEffect,
ofType
} from '@ngrx/effects';
import { Store } from '@ngrx/store';
import {
of,
from,
timer,
Subject,
EMPTY
} from 'rxjs';
import {
map,
mergeMap,
catchError,
withLatestFrom,
tap,
filter,
exhaustMap,
switchMap,
repeat,
takeUntil
} from 'rxjs/operators';
import { MessagesActions } from './messages.actions';
import { RoomsActions } from '../rooms/rooms.actions';
import { selectMessagesSyncing } from './messages.selectors';
import { selectCurrentRoom } from '../rooms/rooms.selectors';
import { DatabaseService } from '../../core/services/database.service';
import { DebuggingService } from '../../core/services/debugging.service';
import { WebRTCService } from '../../core/services/webrtc.service';
import {
INVENTORY_LIMIT,
FULL_SYNC_LIMIT,
SYNC_POLL_FAST_MS,
SYNC_POLL_SLOW_MS,
SYNC_TIMEOUT_MS,
getLatestTimestamp
} from './messages.helpers';
@Injectable()
export class MessagesSyncEffects {
private readonly actions$ = inject(Actions);
private readonly store = inject(Store);
private readonly db = inject(DatabaseService);
private readonly debugging = inject(DebuggingService);
private readonly webrtc = inject(WebRTCService);
/** Tracks whether the last sync cycle found no new messages. */
private lastSyncClean = false;
/** Subject to reset the periodic sync timer. */
private readonly syncReset$ = new Subject<void>();
/**
* When a new peer connects, sends our dataset summary and an
* inventory request so both sides can reconcile.
*/
peerConnectedSync$ = createEffect(
() =>
this.webrtc.onPeerConnected.pipe(
withLatestFrom(this.store.select(selectCurrentRoom)),
mergeMap(([peerId, room]) => {
if (!room)
return EMPTY;
return from(
this.db.getMessages(room.id, FULL_SYNC_LIMIT, 0)
).pipe(
tap((messages) => {
const count = messages.length;
const lastUpdated = getLatestTimestamp(messages);
this.webrtc.sendToPeer(peerId, {
type: 'chat-sync-summary',
roomId: room.id,
count,
lastUpdated
});
this.webrtc.sendToPeer(peerId, {
type: 'chat-inventory-request',
roomId: room.id
});
})
);
})
),
{ dispatch: false }
);
/**
* When the user joins a room, sends a summary and inventory
* request to every already-connected peer.
*/
joinRoomSyncKickoff$ = createEffect(
() =>
this.actions$.pipe(
ofType(RoomsActions.joinRoomSuccess),
withLatestFrom(this.store.select(selectCurrentRoom)),
mergeMap(([{ room }, currentRoom]) => {
const activeRoom = currentRoom || room;
if (!activeRoom)
return EMPTY;
return from(
this.db.getMessages(activeRoom.id, FULL_SYNC_LIMIT, 0)
).pipe(
tap((messages) => {
const count = messages.length;
const lastUpdated = getLatestTimestamp(messages);
for (const pid of this.webrtc.getConnectedPeers()) {
try {
this.webrtc.sendToPeer(pid, {
type: 'chat-sync-summary',
roomId: activeRoom.id,
count,
lastUpdated
});
this.webrtc.sendToPeer(pid, {
type: 'chat-inventory-request',
roomId: activeRoom.id
});
} catch (error) {
this.debugging.warn('messages', 'Failed to kick off room sync for peer', {
error,
peerId: pid,
roomId: activeRoom.id
});
}
}
})
);
})
),
{ dispatch: false }
);
/**
* Alternates between fast (10 s) and slow (15 min) sync intervals.
* Sends inventory requests to all connected peers.
*/
periodicSyncPoll$ = createEffect(() =>
timer(SYNC_POLL_FAST_MS).pipe(
repeat({
delay: () =>
timer(
this.lastSyncClean ? SYNC_POLL_SLOW_MS : SYNC_POLL_FAST_MS
)
}),
takeUntil(this.syncReset$),
withLatestFrom(this.store.select(selectCurrentRoom)),
filter(
([, room]) =>
!!room && this.webrtc.getConnectedPeers().length > 0
),
exhaustMap(([, room]) => {
const peers = this.webrtc.getConnectedPeers();
if (!room || peers.length === 0) {
return of(MessagesActions.syncComplete());
}
return from(
this.db.getMessages(room.id, INVENTORY_LIMIT, 0)
).pipe(
map(() => {
for (const pid of peers) {
try {
this.webrtc.sendToPeer(pid, {
type: 'chat-inventory-request',
roomId: room.id
});
} catch (error) {
this.debugging.warn('messages', 'Failed to request peer inventory during sync poll', {
error,
peerId: pid,
roomId: room.id
});
}
}
return MessagesActions.startSync();
}),
catchError((error) => {
this.lastSyncClean = false;
this.debugging.warn('messages', 'Periodic sync poll failed', {
error,
roomId: room.id
});
return of(MessagesActions.syncComplete());
})
);
})
)
);
/**
* Auto-completes a sync cycle after a timeout if no messages arrive.
* Switches to slow polling when the cycle is clean.
*/
syncTimeout$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.startSync),
switchMap(() => from(
new Promise<void>((resolve) => setTimeout(resolve, SYNC_TIMEOUT_MS))
)),
withLatestFrom(this.store.select(selectMessagesSyncing)),
filter(([, syncing]) => syncing),
map(() => {
this.lastSyncClean = true;
return MessagesActions.syncComplete();
})
)
);
/**
* When a peer (re)connects, revert to aggressive polling in case
* we missed messages while disconnected.
*/
syncReceivedMessages$ = createEffect(
() =>
this.webrtc.onPeerConnected.pipe(
tap(() => {
this.lastSyncClean = false;
})
),
{ dispatch: false }
);
}