This commit is contained in:
2025-12-28 05:37:19 +01:00
commit 87c722b5ae
74 changed files with 10264 additions and 0 deletions

View File

@@ -0,0 +1,509 @@
import { Injectable, inject } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Store } from '@ngrx/store';
import { of, from } from 'rxjs';
import { map, mergeMap, catchError, withLatestFrom, tap, switchMap } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';
import * as MessagesActions from './messages.actions';
import { selectCurrentUser } from '../users/users.selectors';
import { selectCurrentRoom } from '../rooms/rooms.selectors';
import { DatabaseService } from '../../core/services/database.service';
import { WebRTCService } from '../../core/services/webrtc.service';
import { TimeSyncService } from '../../core/services/time-sync.service';
import { Message, Reaction } from '../../core/models';
import * as UsersActions from '../users/users.actions';
import * as RoomsActions from '../rooms/rooms.actions';
@Injectable()
export class MessagesEffects {
private actions$ = inject(Actions);
private store = inject(Store);
private db = inject(DatabaseService);
private webrtc = inject(WebRTCService);
private timeSync = inject(TimeSyncService);
private readonly INVENTORY_LIMIT = 1000; // number of recent messages to consider
private readonly CHUNK_SIZE = 200; // chunk size for inventory/batch transfers
// Load messages from local database
loadMessages$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.loadMessages),
switchMap(({ roomId }) =>
from(this.db.getMessages(roomId)).pipe(
map((messages) => MessagesActions.loadMessagesSuccess({ messages })),
catchError((error) =>
of(MessagesActions.loadMessagesFailure({ error: error.message }))
)
)
)
)
);
// Send message
sendMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.sendMessage),
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom)
),
mergeMap(([{ content, replyToId }, currentUser, currentRoom]) => {
if (!currentUser || !currentRoom) {
return of(MessagesActions.sendMessageFailure({ error: 'Not connected to a room' }));
}
const message: Message = {
id: uuidv4(),
roomId: currentRoom.id,
senderId: currentUser.id,
senderName: currentUser.displayName || currentUser.username,
content,
timestamp: this.timeSync.now(),
reactions: [],
isDeleted: false,
replyToId,
};
// Save to local DB
this.db.saveMessage(message);
// Broadcast to all peers
this.webrtc.broadcastMessage({
type: 'chat-message',
message,
});
return of(MessagesActions.sendMessageSuccess({ message }));
}),
catchError((error) =>
of(MessagesActions.sendMessageFailure({ error: error.message }))
)
)
);
// Edit message
editMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.editMessage),
withLatestFrom(this.store.select(selectCurrentUser)),
switchMap(([{ messageId, content }, currentUser]) => {
if (!currentUser) {
return of(MessagesActions.editMessageFailure({ error: 'Not logged in' }));
}
return from(this.db.getMessageById(messageId)).pipe(
mergeMap((existingMessage) => {
if (!existingMessage) {
return of(MessagesActions.editMessageFailure({ error: 'Message not found' }));
}
// Check if user owns the message
if (existingMessage.senderId !== currentUser.id) {
return of(MessagesActions.editMessageFailure({ error: 'Cannot edit others messages' }));
}
const editedAt = this.timeSync.now();
// Update in DB
this.db.updateMessage(messageId, { content, editedAt });
// Broadcast to peers
this.webrtc.broadcastMessage({
type: 'message-edited',
messageId,
content,
editedAt,
});
return of(MessagesActions.editMessageSuccess({ messageId, content, editedAt }));
}),
catchError((error) =>
of(MessagesActions.editMessageFailure({ error: error.message }))
)
);
})
)
);
// Delete message (user's own)
deleteMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.deleteMessage),
withLatestFrom(this.store.select(selectCurrentUser)),
switchMap(([{ messageId }, currentUser]) => {
if (!currentUser) {
return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' }));
}
return from(this.db.getMessageById(messageId)).pipe(
mergeMap((existingMessage) => {
if (!existingMessage) {
return of(MessagesActions.deleteMessageFailure({ error: 'Message not found' }));
}
// Check if user owns the message
if (existingMessage.senderId !== currentUser.id) {
return of(MessagesActions.deleteMessageFailure({ error: 'Cannot delete others messages' }));
}
// Soft delete - mark as deleted
this.db.updateMessage(messageId, { isDeleted: true });
// Broadcast to peers
this.webrtc.broadcastMessage({
type: 'message-deleted',
messageId,
});
return of(MessagesActions.deleteMessageSuccess({ messageId }));
}),
catchError((error) =>
of(MessagesActions.deleteMessageFailure({ error: error.message }))
)
);
})
)
);
// Admin delete message
adminDeleteMessage$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.adminDeleteMessage),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ messageId }, currentUser]) => {
if (!currentUser) {
return of(MessagesActions.deleteMessageFailure({ error: 'Not logged in' }));
}
// Check admin permission
if (currentUser.role !== 'host' && currentUser.role !== 'admin' && currentUser.role !== 'moderator') {
return of(MessagesActions.deleteMessageFailure({ error: 'Permission denied' }));
}
// Soft delete
this.db.updateMessage(messageId, { isDeleted: true });
// Broadcast to peers
this.webrtc.broadcastMessage({
type: 'message-deleted',
messageId,
deletedBy: currentUser.id,
});
return of(MessagesActions.deleteMessageSuccess({ messageId }));
}),
catchError((error) =>
of(MessagesActions.deleteMessageFailure({ error: error.message }))
)
)
);
// Add reaction
addReaction$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.addReaction),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ messageId, emoji }, currentUser]) => {
if (!currentUser) {
return of({ type: 'NO_OP' });
}
const reaction: Reaction = {
id: uuidv4(),
messageId,
oderId: currentUser.id,
userId: currentUser.id,
emoji,
timestamp: this.timeSync.now(),
};
// Save to DB
this.db.saveReaction(reaction);
// Broadcast to peers
this.webrtc.broadcastMessage({
type: 'reaction-added',
reaction,
});
return of(MessagesActions.addReactionSuccess({ reaction }));
})
)
);
// Remove reaction
removeReaction$ = createEffect(() =>
this.actions$.pipe(
ofType(MessagesActions.removeReaction),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ messageId, emoji }, currentUser]) => {
if (!currentUser) {
return of({ type: 'NO_OP' });
}
// Remove from DB
this.db.removeReaction(messageId, currentUser.id, emoji);
// Broadcast to peers
this.webrtc.broadcastMessage({
type: 'reaction-removed',
messageId,
oderId: currentUser.id,
emoji,
});
return of(MessagesActions.removeReactionSuccess({ messageId, oderId: currentUser.id, emoji }));
})
)
);
// Listen to incoming messages from WebRTC peers
incomingMessages$ = createEffect(() =>
this.webrtc.onMessageReceived.pipe(
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom)
),
mergeMap(([event, currentUser, currentRoom]: [any, any, any]) => {
console.log('Received peer message:', event.type, event);
switch (event.type) {
// Precise sync via ID inventory and targeted requests
case 'chat-inventory-request': {
if (!currentRoom || !event.fromPeerId) return of({ type: 'NO_OP' });
return from(this.db.getMessages(currentRoom.id, this.INVENTORY_LIMIT, 0)).pipe(
tap((messages) => {
const items = messages
.map((m) => ({ id: m.id, ts: m.editedAt || m.timestamp || 0 }))
.sort((a, b) => a.ts - b.ts);
for (let i = 0; i < items.length; i += this.CHUNK_SIZE) {
const chunk = items.slice(i, i + this.CHUNK_SIZE);
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'chat-inventory',
roomId: currentRoom.id,
items: chunk,
total: items.length,
index: i,
} as any);
}
}),
map(() => ({ type: 'NO_OP' }))
);
}
case 'chat-inventory': {
if (!currentRoom || !Array.isArray(event.items) || !event.fromPeerId) return of({ type: 'NO_OP' });
// Determine which IDs we are missing or have older versions of
return from(this.db.getMessages(currentRoom.id, this.INVENTORY_LIMIT, 0)).pipe(
mergeMap(async (local) => {
const localMap = new Map(local.map((m) => [m.id, m.editedAt || m.timestamp || 0]));
const missing: string[] = [];
for (const { id, ts } of event.items as Array<{ id: string; ts: number }>) {
const lts = localMap.get(id);
if (lts === undefined || ts > lts) {
missing.push(id);
}
}
// Request in chunks from the sender
for (let i = 0; i < missing.length; i += this.CHUNK_SIZE) {
const chunk = missing.slice(i, i + this.CHUNK_SIZE);
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'chat-sync-request-ids',
roomId: currentRoom.id,
ids: chunk,
} as any);
}
return { type: 'NO_OP' } as any;
})
);
}
case 'chat-sync-request-ids': {
if (!currentRoom || !Array.isArray(event.ids) || !event.fromPeerId) return of({ type: 'NO_OP' });
const ids: string[] = event.ids;
return from(Promise.all(ids.map((id: string) => this.db.getMessageById(id)))).pipe(
tap((maybeMessages) => {
const messages = maybeMessages.filter((m): m is Message => !!m);
// Send in chunks to avoid large payloads
for (let i = 0; i < messages.length; i += this.CHUNK_SIZE) {
const chunk = messages.slice(i, i + this.CHUNK_SIZE);
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'chat-sync-batch',
roomId: currentRoom.id,
messages: chunk,
} as any);
}
}),
map(() => ({ type: 'NO_OP' }))
);
}
case 'chat-sync-batch': {
if (!currentRoom || !Array.isArray(event.messages)) return of({ type: 'NO_OP' });
return from((async () => {
const accepted: Message[] = [];
for (const m of event.messages as Message[]) {
const existing = await this.db.getMessageById(m.id);
const ets = existing ? (existing.editedAt || existing.timestamp || 0) : -1;
const its = m.editedAt || m.timestamp || 0;
if (!existing || its > ets) {
await this.db.saveMessage(m);
accepted.push(m);
}
}
return accepted;
})()).pipe(
mergeMap((accepted) => accepted.length ? of(MessagesActions.syncMessages({ messages: accepted })) : of({ type: 'NO_OP' }))
);
}
case 'voice-state':
// Update voice state for the sender
if (event.oderId && event.voiceState) {
const userId = event.oderId;
return of(UsersActions.updateVoiceState({ userId, voiceState: event.voiceState }));
}
break;
case 'chat-message':
// Save to local DB and dispatch receive action
// Skip if this is our own message (sent via server relay)
if (event.message && event.message.senderId !== currentUser?.id && event.message.senderId !== currentUser?.oderId) {
this.db.saveMessage(event.message);
return of(MessagesActions.receiveMessage({ message: event.message }));
}
break;
case 'message-edited':
if (event.messageId && event.content) {
this.db.updateMessage(event.messageId, { content: event.content, editedAt: event.editedAt });
return of(MessagesActions.editMessageSuccess({
messageId: event.messageId,
content: event.content,
editedAt: event.editedAt,
}));
}
break;
case 'message-deleted':
if (event.messageId) {
this.db.deleteMessage(event.messageId);
return of(MessagesActions.deleteMessageSuccess({ messageId: event.messageId }));
}
break;
case 'reaction-added':
if (event.messageId && event.reaction) {
this.db.saveReaction(event.reaction);
return of(MessagesActions.addReactionSuccess({
reaction: event.reaction,
}));
}
break;
case 'reaction-removed':
if (event.messageId && event.oderId && event.emoji) {
this.db.removeReaction(event.messageId, event.oderId, event.emoji);
return of(MessagesActions.removeReactionSuccess({
messageId: event.messageId,
oderId: event.oderId,
emoji: event.emoji,
}));
}
break;
// Chat sync handshake: summary -> request -> full
case 'chat-sync-summary':
// Compare summaries and request sync if the peer has newer data
if (!currentRoom) return of({ type: 'NO_OP' });
return from(this.db.getMessages(currentRoom.id, 10000, 0)).pipe(
tap((local) => {
const localCount = local.length;
const localLastUpdated = local.reduce((max, m) => Math.max(max, m.editedAt || m.timestamp || 0), 0);
const remoteLastUpdated = event.lastUpdated || 0;
const remoteCount = event.count || 0;
const identical = localLastUpdated === remoteLastUpdated && localCount === remoteCount;
const needsSync = remoteLastUpdated > localLastUpdated || (remoteLastUpdated === localLastUpdated && remoteCount > localCount);
if (!identical && needsSync && event.fromPeerId) {
this.webrtc.sendToPeer(event.fromPeerId, { type: 'chat-sync-request', roomId: currentRoom.id } as any);
}
}),
map(() => ({ type: 'NO_OP' }))
);
case 'chat-sync-request':
if (!currentRoom || !event.fromPeerId) return of({ type: 'NO_OP' });
return from(this.db.getMessages(currentRoom.id, 10000, 0)).pipe(
tap((all) => {
this.webrtc.sendToPeer(event.fromPeerId, { type: 'chat-sync-full', roomId: currentRoom.id, messages: all } as any);
}),
map(() => ({ type: 'NO_OP' }))
);
case 'chat-sync-full':
if (event.messages && Array.isArray(event.messages)) {
// Merge into local DB and update store
event.messages.forEach((m: Message) => this.db.saveMessage(m));
return of(MessagesActions.syncMessages({ messages: event.messages }));
}
break;
}
return of({ type: 'NO_OP' });
})
)
);
// On peer connect, broadcast local dataset summary
peerConnectedSync$ = createEffect(
() =>
this.webrtc.onPeerConnected.pipe(
withLatestFrom(this.store.select(selectCurrentRoom)),
mergeMap(([peerId, room]) => {
if (!room) return of({ type: 'NO_OP' });
return from(this.db.getMessages(room.id, 10000, 0)).pipe(
map((messages) => {
const count = messages.length;
const lastUpdated = messages.reduce((max, m) => Math.max(max, m.editedAt || m.timestamp || 0), 0);
// Send summary specifically to the newly connected peer
this.webrtc.sendToPeer(peerId, { type: 'chat-sync-summary', roomId: room.id, count, lastUpdated } as any);
// Also request their inventory for precise reconciliation
this.webrtc.sendToPeer(peerId, { type: 'chat-inventory-request', roomId: room.id } as any);
return { type: 'NO_OP' };
})
);
})
),
{ dispatch: false }
);
// Kick off sync to all currently connected peers shortly after joining a room
joinRoomSyncKickoff$ = createEffect(
() =>
this.actions$.pipe(
ofType(RoomsActions.joinRoomSuccess),
withLatestFrom(this.store.select(selectCurrentRoom)),
mergeMap(([{ room }, currentRoom]) => {
const activeRoom = currentRoom || room;
if (!activeRoom) return of({ type: 'NO_OP' });
return from(this.db.getMessages(activeRoom.id, 10000, 0)).pipe(
tap((messages) => {
const count = messages.length;
const lastUpdated = messages.reduce((max, m) => Math.max(max, m.editedAt || m.timestamp || 0), 0);
const peers = this.webrtc.getConnectedPeers();
peers.forEach((pid) => {
try {
this.webrtc.sendToPeer(pid, { type: 'chat-sync-summary', roomId: activeRoom.id, count, lastUpdated } as any);
this.webrtc.sendToPeer(pid, { type: 'chat-inventory-request', roomId: activeRoom.id } as any);
} catch {}
});
}),
map(() => ({ type: 'NO_OP' }))
);
})
),
{ dispatch: false }
);
}