From 1cdd1c5d2bd79ccaf354e66d7a2460db951ab064 Mon Sep 17 00:00:00 2001 From: Myx Date: Wed, 18 Mar 2026 22:10:11 +0100 Subject: [PATCH] fix typing indicator on wrong server --- server/src/websocket/handler.ts | 3 +- server/src/websocket/index.ts | 3 +- src/app/core/services/attachment.service.ts | 158 +++++++++++++++++- src/app/core/services/webrtc.service.ts | 75 +++++++-- .../typing-indicator.component.ts | 24 ++- .../floating-voice-controls.component.ts | 24 ++- .../services/voice-playback.service.ts | 30 ++++ .../voice-controls.component.ts | 45 +---- .../messages/messages-incoming.handlers.ts | 45 ++--- src/app/store/messages/messages.effects.ts | 8 + src/app/store/rooms/rooms.effects.ts | 124 ++++++++++++-- 11 files changed, 431 insertions(+), 108 deletions(-) diff --git a/server/src/websocket/handler.ts b/server/src/websocket/handler.ts index 2779bb5..3a3dce0 100644 --- a/server/src/websocket/handler.ts +++ b/server/src/websocket/handler.ts @@ -88,7 +88,8 @@ function handleLeaveServer(user: ConnectedUser, message: WsMessage, connectionId type: 'user_left', oderId: user.oderId, displayName: user.displayName ?? 'Anonymous', - serverId: leaveSid + serverId: leaveSid, + serverIds: Array.from(user.serverIds) }, user.oderId); } diff --git a/server/src/websocket/index.ts b/server/src/websocket/index.ts index 15c2c32..9169080 100644 --- a/server/src/websocket/index.ts +++ b/server/src/websocket/index.ts @@ -25,7 +25,8 @@ function removeDeadConnection(connectionId: string): void { type: 'user_left', oderId: user.oderId, displayName: user.displayName, - serverId: sid + serverId: sid, + serverIds: [] }, user.oderId); }); diff --git a/src/app/core/services/attachment.service.ts b/src/app/core/services/attachment.service.ts index 0237104..b3942c2 100644 --- a/src/app/core/services/attachment.service.ts +++ b/src/app/core/services/attachment.service.ts @@ -5,6 +5,10 @@ import { signal, effect } from '@angular/core'; +import { + NavigationEnd, + Router +} from '@angular/router'; import { take } from 'rxjs'; import { v4 as uuidv4 } from 'uuid'; import { WebRTCService } from './webrtc.service'; @@ -12,6 +16,7 @@ import { Store } from '@ngrx/store'; import { selectCurrentRoomName } from '../../store/rooms/rooms.selectors'; import { DatabaseService } from './database.service'; import { recordDebugNetworkFileChunk } from './debug-network-metrics.service'; +import { ROOM_URL_PATTERN } from '../constants'; import type { ChatAttachmentAnnouncement, ChatAttachmentMeta, @@ -145,9 +150,14 @@ export class AttachmentService { private readonly webrtc = inject(WebRTCService); private readonly ngrxStore = inject(Store); private readonly database = inject(DatabaseService); + private readonly router = inject(Router); /** Primary index: `messageId → Attachment[]`. */ private attachmentsByMessage = new Map(); + /** Runtime cache of `messageId → roomId` for attachment gating. */ + private messageRoomIds = new Map(); + /** Room currently being watched in the router, or `null` outside room routes. */ + private watchedRoomId: string | null = this.extractWatchedRoomId(this.router.url); /** Incremented on every mutation so signal consumers re-render. */ updated = signal(0); @@ -190,6 +200,24 @@ export class AttachmentService { this.initFromDatabase(); } }); + + this.router.events.subscribe((event) => { + if (!(event instanceof NavigationEnd)) { + return; + } + + this.watchedRoomId = this.extractWatchedRoomId(event.urlAfterRedirects || event.url); + + if (this.watchedRoomId) { + void this.requestAutoDownloadsForRoom(this.watchedRoomId); + } + }); + + this.webrtc.onPeerConnected.subscribe(() => { + if (this.watchedRoomId) { + void this.requestAutoDownloadsForRoom(this.watchedRoomId); + } + }); } private getElectronApi(): AttachmentElectronApi | undefined { @@ -201,6 +229,44 @@ export class AttachmentService { return this.attachmentsByMessage.get(messageId) ?? []; } + /** Cache the room that owns a message so background downloads can be gated by the watched server. */ + rememberMessageRoom(messageId: string, roomId: string): void { + if (!messageId || !roomId) + return; + + this.messageRoomIds.set(messageId, roomId); + } + + /** Queue best-effort auto-download checks for a message's eligible attachments. */ + queueAutoDownloadsForMessage(messageId: string, attachmentId?: string): void { + void this.requestAutoDownloadsForMessage(messageId, attachmentId); + } + + /** Auto-request eligible missing attachments for the currently watched room. */ + async requestAutoDownloadsForRoom(roomId: string): Promise { + if (!roomId || !this.isRoomWatched(roomId)) + return; + + if (this.database.isReady()) { + const messages = await this.database.getMessages(roomId, 500, 0); + + for (const message of messages) { + this.rememberMessageRoom(message.id, message.roomId); + await this.requestAutoDownloadsForMessage(message.id); + } + + return; + } + + for (const [messageId] of this.attachmentsByMessage) { + const attachmentRoomId = await this.resolveMessageRoomId(messageId); + + if (attachmentRoomId === roomId) { + await this.requestAutoDownloadsForMessage(messageId); + } + } + } + /** Remove every attachment associated with a message. */ async deleteForMessage(messageId: string): Promise { const attachments = this.attachmentsByMessage.get(messageId) ?? []; @@ -219,6 +285,7 @@ export class AttachmentService { } this.attachmentsByMessage.delete(messageId); + this.messageRoomIds.delete(messageId); this.clearMessageScopedState(messageId); if (hadCachedAttachments) { @@ -276,8 +343,15 @@ export class AttachmentService { * @param attachmentMap - Map of `messageId → AttachmentMeta[]` from peer. */ registerSyncedAttachments( - attachmentMap: Record + attachmentMap: Record, + messageRoomIds?: Record ): void { + if (messageRoomIds) { + for (const [messageId, roomId] of Object.entries(messageRoomIds)) { + this.rememberMessageRoom(messageId, roomId); + } + } + const newAttachments: Attachment[] = []; for (const [messageId, metas] of Object.entries(attachmentMap)) { @@ -306,6 +380,7 @@ export class AttachmentService { for (const attachment of newAttachments) { void this.persistAttachmentMeta(attachment); + this.queueAutoDownloadsForMessage(attachment.messageId, attachment.id); } } } @@ -375,9 +450,9 @@ export class AttachmentService { * message to all connected peers. * * 1. Each file is assigned a UUID. - * 2. A `file-announce` event is broadcast to peers. - * 3. Inline-preview media ≤ {@link MAX_AUTO_SAVE_SIZE_BYTES} - * are immediately streamed as chunked base-64. + * 2. A `file-announce` event is broadcast to peers. + * 3. Peers watching the message's server can request any + * auto-download-eligible media on demand. * * @param messageId - ID of the parent message. * @param files - Array of user-selected `File` objects. @@ -437,10 +512,6 @@ export class AttachmentService { this.webrtc.broadcastMessage(fileAnnounceEvent); - // Auto-stream small inline-preview media - if (this.isMedia(attachment) && attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) { - await this.streamFileToPeers(messageId, fileId, file); - } } const existingList = this.attachmentsByMessage.get(messageId) ?? []; @@ -482,6 +553,7 @@ export class AttachmentService { this.attachmentsByMessage.set(messageId, list); this.touch(); void this.persistAttachmentMeta(attachment); + this.queueAutoDownloadsForMessage(messageId, attachment.id); } /** @@ -772,6 +844,38 @@ export class AttachmentService { return `${messageId}:${fileId}`; } + private async requestAutoDownloadsForMessage(messageId: string, attachmentId?: string): Promise { + if (!messageId) + return; + + const roomId = await this.resolveMessageRoomId(messageId); + + if (!roomId || !this.isRoomWatched(roomId) || this.webrtc.getConnectedPeers().length === 0) { + return; + } + + const attachments = this.attachmentsByMessage.get(messageId) ?? []; + + for (const attachment of attachments) { + if (attachmentId && attachment.id !== attachmentId) + continue; + + if (!this.shouldAutoRequestWhenWatched(attachment)) + continue; + + if (attachment.available) + continue; + + if ((attachment.receivedBytes ?? 0) > 0) + continue; + + if (this.pendingRequests.has(this.buildRequestKey(messageId, attachment.id))) + continue; + + this.requestFromAnyPeer(messageId, attachment); + } + } + private clearMessageScopedState(messageId: string): void { const scopedPrefix = `${messageId}:`; @@ -867,6 +971,12 @@ export class AttachmentService { attachment.mime.startsWith('audio/'); } + /** Auto-download only the assets that already supported eager loading when watched. */ + private shouldAutoRequestWhenWatched(attachment: Attachment): boolean { + return attachment.isImage || + (this.isMedia(attachment) && attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES); + } + /** Check whether a completed download should be cached on disk. */ private shouldPersistDownloadedAttachment(attachment: Attachment): boolean { return attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES || @@ -1167,6 +1277,38 @@ export class AttachmentService { } catch { /* load is best-effort */ } } + private extractWatchedRoomId(url: string): string | null { + const roomMatch = url.match(ROOM_URL_PATTERN); + + return roomMatch ? roomMatch[1] : null; + } + + private isRoomWatched(roomId: string | null | undefined): boolean { + return !!roomId && roomId === this.watchedRoomId; + } + + private async resolveMessageRoomId(messageId: string): Promise { + const cachedRoomId = this.messageRoomIds.get(messageId); + + if (cachedRoomId) + return cachedRoomId; + + if (!this.database.isReady()) + return null; + + try { + const message = await this.database.getMessageById(messageId); + + if (!message?.roomId) + return null; + + this.rememberMessageRoom(messageId, message.roomId); + return message.roomId; + } catch { + return null; + } + } + /** One-time migration from localStorage to the database. */ private async migrateFromLocalStorage(): Promise { try { diff --git a/src/app/core/services/webrtc.service.ts b/src/app/core/services/webrtc.service.ts index 6cd0f68..3ce8964 100644 --- a/src/app/core/services/webrtc.service.ts +++ b/src/app/core/services/webrtc.service.ts @@ -71,6 +71,7 @@ type IncomingSignalingMessage = Omit, 'type' | 'payloa oderId?: string; serverTime?: number; serverId?: string; + serverIds?: string[]; users?: SignalingUserSummary[]; displayName?: string; fromUserId?: string; @@ -92,8 +93,8 @@ export class WebRTCService implements OnDestroy { private activeServerId: string | null = null; /** The server ID where voice is currently active, or `null` when not in voice. */ private voiceServerId: string | null = null; - /** Maps each remote peer ID to the server they were discovered from. */ - private readonly peerServerMap = new Map(); + /** Maps each remote peer ID to the shared servers they currently belong to. */ + private readonly peerServerMap = new Map>(); private readonly serviceDestroyed$ = new Subject(); private remoteScreenShareRequestsEnabled = false; private readonly desiredRemoteScreenSharePeers = new Set(); @@ -275,6 +276,7 @@ export class WebRTCService implements OnDestroy { this.peerManager.peerDisconnected$.subscribe((peerId) => { this.activeRemoteScreenSharePeers.delete(peerId); + this.peerServerMap.delete(peerId); this.screenShareManager.clearScreenShareRequest(peerId); }); @@ -349,6 +351,10 @@ export class WebRTCService implements OnDestroy { if (!user.oderId) continue; + if (message.serverId) { + this.trackPeerInServer(user.oderId, message.serverId); + } + const existing = this.peerManager.activePeerConnections.get(user.oderId); const healthy = this.isPeerHealthy(existing); @@ -367,10 +373,6 @@ export class WebRTCService implements OnDestroy { this.peerManager.createPeerConnection(user.oderId, true); this.peerManager.createAndSendOffer(user.oderId); - - if (message.serverId) { - this.peerServerMap.set(user.oderId, message.serverId); - } } } @@ -379,6 +381,10 @@ export class WebRTCService implements OnDestroy { displayName: message.displayName, oderId: message.oderId }); + + if (message.oderId && message.serverId) { + this.trackPeerInServer(message.oderId, message.serverId); + } } private handleUserLeftSignalingMessage(message: IncomingSignalingMessage): void { @@ -389,8 +395,16 @@ export class WebRTCService implements OnDestroy { }); if (message.oderId) { - this.peerManager.removePeer(message.oderId); - this.peerServerMap.delete(message.oderId); + const hasRemainingSharedServers = Array.isArray(message.serverIds) + ? this.replacePeerSharedServers(message.oderId, message.serverIds) + : (message.serverId + ? this.untrackPeerFromServer(message.oderId, message.serverId) + : false); + + if (!hasRemainingSharedServers) { + this.peerManager.removePeer(message.oderId); + this.peerServerMap.delete(message.oderId); + } } } @@ -404,7 +418,7 @@ export class WebRTCService implements OnDestroy { const offerEffectiveServer = this.voiceServerId || this.activeServerId; if (offerEffectiveServer && !this.peerServerMap.has(fromUserId)) { - this.peerServerMap.set(fromUserId, offerEffectiveServer); + this.trackPeerInServer(fromUserId, offerEffectiveServer); } this.peerManager.handleOffer(fromUserId, sdp); @@ -441,8 +455,8 @@ export class WebRTCService implements OnDestroy { private closePeersNotInServer(serverId: string): void { const peersToClose: string[] = []; - this.peerServerMap.forEach((peerServerId, peerId) => { - if (peerServerId !== serverId) { + this.peerServerMap.forEach((peerServerIds, peerId) => { + if (!peerServerIds.has(serverId)) { peersToClose.push(peerId); } }); @@ -479,6 +493,45 @@ export class WebRTCService implements OnDestroy { return this.signalingManager.connect(serverUrl); } + private trackPeerInServer(peerId: string, serverId: string): void { + if (!peerId || !serverId) + return; + + const trackedServers = this.peerServerMap.get(peerId) ?? new Set(); + + trackedServers.add(serverId); + this.peerServerMap.set(peerId, trackedServers); + } + + private replacePeerSharedServers(peerId: string, serverIds: string[]): boolean { + const sharedServerIds = serverIds.filter((serverId) => this.memberServerIds.has(serverId)); + + if (sharedServerIds.length === 0) { + this.peerServerMap.delete(peerId); + return false; + } + + this.peerServerMap.set(peerId, new Set(sharedServerIds)); + return true; + } + + private untrackPeerFromServer(peerId: string, serverId: string): boolean { + const trackedServers = this.peerServerMap.get(peerId); + + if (!trackedServers) + return false; + + trackedServers.delete(serverId); + + if (trackedServers.size === 0) { + this.peerServerMap.delete(peerId); + return false; + } + + this.peerServerMap.set(peerId, trackedServers); + return true; + } + /** * Ensure the signaling WebSocket is connected, reconnecting if needed. * diff --git a/src/app/features/chat/typing-indicator/typing-indicator.component.ts b/src/app/features/chat/typing-indicator/typing-indicator.component.ts index 1b97d90..7d243f5 100644 --- a/src/app/features/chat/typing-indicator/typing-indicator.component.ts +++ b/src/app/features/chat/typing-indicator/typing-indicator.component.ts @@ -3,10 +3,13 @@ import { Component, inject, signal, - DestroyRef + DestroyRef, + effect } from '@angular/core'; import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; +import { Store } from '@ngrx/store'; import { WebRTCService } from '../../../core/services/webrtc.service'; +import { selectCurrentRoom } from '../../../store/rooms/rooms.selectors'; import { merge, interval, @@ -23,6 +26,7 @@ interface TypingSignalingMessage { type: string; displayName: string; oderId: string; + serverId: string; } @Component({ @@ -36,6 +40,9 @@ interface TypingSignalingMessage { }) export class TypingIndicatorComponent { private readonly typingMap = new Map(); + private readonly store = inject(Store); + private readonly currentRoom = this.store.selectSignal(selectCurrentRoom); + private lastRoomId: string | null = null; typingDisplay = signal([]); typingOthersCount = signal(0); @@ -47,8 +54,10 @@ export class TypingIndicatorComponent { filter((msg): msg is TypingSignalingMessage => msg?.type === 'user_typing' && typeof msg.displayName === 'string' && - typeof msg.oderId === 'string' + typeof msg.oderId === 'string' && + typeof msg.serverId === 'string' ), + filter((msg) => msg.serverId === this.currentRoom()?.id), tap((msg) => { const now = Date.now(); @@ -77,6 +86,17 @@ export class TypingIndicatorComponent { merge(typing$, purge$) .pipe(takeUntilDestroyed(destroyRef)) .subscribe(() => this.recomputeDisplay()); + + effect(() => { + const roomId = this.currentRoom()?.id ?? null; + + if (roomId === this.lastRoomId) + return; + + this.lastRoomId = roomId; + this.typingMap.clear(); + this.recomputeDisplay(); + }); } private recomputeDisplay(): void { diff --git a/src/app/features/voice/floating-voice-controls/floating-voice-controls.component.ts b/src/app/features/voice/floating-voice-controls/floating-voice-controls.component.ts index d2dc1f5..211575d 100644 --- a/src/app/features/voice/floating-voice-controls/floating-voice-controls.component.ts +++ b/src/app/features/voice/floating-voice-controls/floating-voice-controls.component.ts @@ -4,13 +4,11 @@ import { inject, signal, computed, - OnInit, - OnDestroy + OnInit } from '@angular/core'; import { CommonModule } from '@angular/common'; import { Store } from '@ngrx/store'; import { NgIcon, provideIcons } from '@ng-icons/core'; -import { Subscription } from 'rxjs'; import { lucideMic, lucideMicOff, @@ -28,6 +26,7 @@ import { ScreenShareQuality } from '../../../core/services/webrtc'; import { UsersActions } from '../../../store/users/users.actions'; import { selectCurrentUser } from '../../../store/users/users.selectors'; import { DebugConsoleComponent, ScreenShareQualityDialogComponent } from '../../../shared'; +import { VoicePlaybackService } from '../voice-controls/services/voice-playback.service'; @Component({ selector: 'app-floating-voice-controls', @@ -55,9 +54,10 @@ import { DebugConsoleComponent, ScreenShareQualityDialogComponent } from '../../ * Floating voice controls displayed when the user navigates away from the voice-connected server. * Provides mute, deafen, screen-share, and disconnect actions in a compact overlay. */ -export class FloatingVoiceControlsComponent implements OnInit, OnDestroy { +export class FloatingVoiceControlsComponent implements OnInit { private webrtcService = inject(WebRTCService); private voiceSessionService = inject(VoiceSessionService); + private voicePlayback = inject(VoicePlaybackService); private store = inject(Store); currentUser = this.store.selectSignal(selectCurrentUser); @@ -75,8 +75,6 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy { askScreenShareQuality = signal(true); showScreenShareQualityDialog = signal(false); - private stateSubscription: Subscription | null = null; - /** Sync local mute/deafen/screen-share state from the WebRTC service on init. */ ngOnInit(): void { // Sync mute/deafen state from webrtc service @@ -84,10 +82,15 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy { this.isDeafened.set(this.webrtcService.isDeafened()); this.isScreenSharing.set(this.webrtcService.isScreenSharing()); this.syncScreenShareSettings(); - } - ngOnDestroy(): void { - this.stateSubscription?.unsubscribe(); + const settings = loadVoiceSettingsFromStorage(); + + this.voicePlayback.updateOutputVolume(settings.outputVolume / 100); + this.voicePlayback.updateDeafened(this.isDeafened()); + + if (settings.outputDevice) { + this.voicePlayback.applyOutputDevice(settings.outputDevice); + } } /** Navigate back to the voice-connected server. */ @@ -117,6 +120,7 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy { toggleDeafen(): void { this.isDeafened.update((current) => !current); this.webrtcService.toggleDeafen(this.isDeafened()); + this.voicePlayback.updateDeafened(this.isDeafened()); // When deafening, also mute if (this.isDeafened() && !this.isMuted()) { @@ -189,6 +193,8 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy { // Disable voice this.webrtcService.disableVoice(); + this.voicePlayback.teardownAll(); + this.voicePlayback.updateDeafened(false); // Update user voice state in store const user = this.currentUser(); diff --git a/src/app/features/voice/voice-controls/services/voice-playback.service.ts b/src/app/features/voice/voice-controls/services/voice-playback.service.ts index e6213e3..72d9485 100644 --- a/src/app/features/voice/voice-controls/services/voice-playback.service.ts +++ b/src/app/features/voice/voice-controls/services/voice-playback.service.ts @@ -60,6 +60,28 @@ export class VoicePlaybackService { : null; void this.applyEffectiveOutputDeviceToAllPipelines(); }); + + this.webrtc.onRemoteStream.subscribe(({ peerId }) => { + const voiceStream = this.webrtc.getRemoteVoiceStream(peerId); + + if (!voiceStream) { + this.removeRemoteAudio(peerId); + return; + } + + this.handleRemoteStream(peerId, voiceStream, this.buildPlaybackOptions()); + }); + + this.webrtc.onVoiceConnected.subscribe(() => { + const options = this.buildPlaybackOptions(true); + + this.playPendingStreams(options); + this.ensureAllRemoteStreamsPlaying(options); + }); + + this.webrtc.onPeerDisconnected.subscribe((peerId) => { + this.removeRemoteAudio(peerId); + }); } handleRemoteStream(peerId: string, stream: MediaStream, options: PlaybackOptions): void { @@ -158,6 +180,14 @@ export class VoicePlaybackService { this.pendingRemoteStreams.clear(); } + private buildPlaybackOptions(forceConnected = this.webrtc.isVoiceConnected()): PlaybackOptions { + return { + isConnected: forceConnected, + outputVolume: this.masterVolume, + isDeafened: this.deafened + }; + } + /** * Build the Web Audio graph for a remote peer: * diff --git a/src/app/features/voice/voice-controls/voice-controls.component.ts b/src/app/features/voice/voice-controls/voice-controls.component.ts index c5d77d5..565c5e1 100644 --- a/src/app/features/voice/voice-controls/voice-controls.component.ts +++ b/src/app/features/voice/voice-controls/voice-controls.component.ts @@ -10,7 +10,6 @@ import { import { CommonModule } from '@angular/common'; import { Store } from '@ngrx/store'; import { NgIcon, provideIcons } from '@ng-icons/core'; -import { Subscription } from 'rxjs'; import { lucideMic, lucideMicOff, @@ -76,7 +75,6 @@ export class VoiceControlsComponent implements OnInit, OnDestroy { private voicePlayback = inject(VoicePlaybackService); private store = inject(Store); private settingsModal = inject(SettingsModalService); - private remoteStreamSubscription: Subscription | null = null; currentUser = this.store.selectSignal(selectCurrentUser); currentRoom = this.store.selectSignal(selectCurrentRoom); @@ -110,56 +108,18 @@ export class VoiceControlsComponent implements OnInit, OnDestroy { isDeafened: this.isDeafened() }; } - - private voiceConnectedSubscription: Subscription | null = null; - async ngOnInit(): Promise { await this.loadAudioDevices(); // Load persisted voice settings and apply this.loadSettings(); this.applySettingsToWebRTC(); - - // Subscribe to remote streams to play audio from peers - this.remoteStreamSubscription = this.webrtcService.onRemoteStream.subscribe( - ({ peerId }) => { - const voiceStream = this.webrtcService.getRemoteVoiceStream(peerId); - - if (!voiceStream) { - this.voicePlayback.removeRemoteAudio(peerId); - return; - } - - this.voicePlayback.handleRemoteStream(peerId, voiceStream, this.playbackOptions()); - } - ); - - // Subscribe to voice connected event to play pending streams and ensure all remote audio is set up - this.voiceConnectedSubscription = this.webrtcService.onVoiceConnected.subscribe(() => { - const options = this.playbackOptions(); - - this.voicePlayback.playPendingStreams(options); - // Also ensure all remote streams from connected peers are playing - // This handles the case where streams were received while voice was "connected" - // from a previous session but audio elements weren't set up - this.voicePlayback.ensureAllRemoteStreamsPlaying(options); - }); - - // Clean up audio when peer disconnects - this.webrtcService.onPeerDisconnected.subscribe((peerId) => { - this.voicePlayback.removeRemoteAudio(peerId); - }); } ngOnDestroy(): void { - if (this.isConnected()) { - this.disconnect(); + if (!this.webrtcService.isVoiceConnected()) { + this.voicePlayback.teardownAll(); } - - this.voicePlayback.teardownAll(); - - this.remoteStreamSubscription?.unsubscribe(); - this.voiceConnectedSubscription?.unsubscribe(); } async loadAudioDevices(): Promise { @@ -304,6 +264,7 @@ export class VoiceControlsComponent implements OnInit, OnDestroy { // Disable voice (stops audio tracks but keeps peer connections open for chat) this.webrtcService.disableVoice(); this.voicePlayback.teardownAll(); + this.voicePlayback.updateDeafened(false); const user = this.currentUser(); diff --git a/src/app/store/messages/messages-incoming.handlers.ts b/src/app/store/messages/messages-incoming.handlers.ts index 0da7b36..fe915eb 100644 --- a/src/app/store/messages/messages-incoming.handlers.ts +++ b/src/app/store/messages/messages-incoming.handlers.ts @@ -256,7 +256,10 @@ function handleSyncBatch( return EMPTY; if (hasAttachmentMetaMap(event.attachments)) { - attachments.registerSyncedAttachments(event.attachments); + attachments.registerSyncedAttachments( + event.attachments, + Object.fromEntries(event.messages.map((message) => [message.id, message.roomId])) + ); } return from(processSyncBatch(event, db, attachments)).pipe( @@ -277,6 +280,8 @@ async function processSyncBatch( const toUpsert: Message[] = []; for (const incoming of event.messages) { + attachments.rememberMessageRoom(incoming.id, incoming.roomId); + const { message, changed } = await mergeIncomingMessage(incoming, db); if (incoming.isDeleted) { @@ -292,40 +297,31 @@ async function processSyncBatch( } if (hasAttachmentMetaMap(event.attachments)) { - requestMissingImages(event.attachments, attachments); + queueWatchedAttachmentDownloads(event.attachments, attachments); } return toUpsert; } -/** Auto-requests any unavailable image attachments from any connected peer. */ -function requestMissingImages( +/** Queue best-effort auto-downloads for watched-room attachments. */ +function queueWatchedAttachmentDownloads( attachmentMap: AttachmentMetaMap, attachments: AttachmentService ): void { - for (const [msgId, metas] of Object.entries(attachmentMap)) { - for (const meta of metas) { - if (!meta.isImage) - continue; - - const atts = attachments.getForMessage(msgId); - const matchingAttachment = atts.find((attachment) => attachment.id === meta.id); - - if ( - matchingAttachment && - !matchingAttachment.available && - !(matchingAttachment.receivedBytes && matchingAttachment.receivedBytes > 0) - ) { - attachments.requestImageFromAnyPeer(msgId, matchingAttachment); - } - } + for (const msgId of Object.keys(attachmentMap)) { + attachments.queueAutoDownloadsForMessage(msgId); } } /** Saves an incoming chat message to DB and dispatches receiveMessage. */ function handleChatMessage( event: IncomingMessageEvent, - { db, debugging, currentUser }: IncomingMessageContext + { + db, + debugging, + attachments, + currentUser + }: IncomingMessageContext ): Observable { const msg = event.message; @@ -340,6 +336,8 @@ function handleChatMessage( if (isOwnMessage) return EMPTY; + attachments.rememberMessageRoom(msg.id, msg.roomId); + trackBackgroundOperation( db.saveMessage(msg), debugging, @@ -492,6 +490,11 @@ function handleFileAnnounce( { attachments }: IncomingMessageContext ): Observable { attachments.handleFileAnnounce(event); + + if (event.messageId) { + attachments.queueAutoDownloadsForMessage(event.messageId, event.file?.id); + } + return EMPTY; } diff --git a/src/app/store/messages/messages.effects.ts b/src/app/store/messages/messages.effects.ts index e8a8e5d..8ca4bf6 100644 --- a/src/app/store/messages/messages.effects.ts +++ b/src/app/store/messages/messages.effects.ts @@ -64,6 +64,12 @@ export class MessagesEffects { mergeMap(async (messages) => { const hydrated = await hydrateMessages(messages, this.db); + for (const message of hydrated) { + this.attachments.rememberMessageRoom(message.id, message.roomId); + } + + void this.attachments.requestAutoDownloadsForRoom(roomId); + return MessagesActions.loadMessagesSuccess({ messages: hydrated }); }), catchError((error) => @@ -104,6 +110,8 @@ export class MessagesEffects { replyToId }; + this.attachments.rememberMessageRoom(message.id, message.roomId); + this.trackBackgroundOperation( this.db.saveMessage(message), 'Failed to persist outgoing chat message', diff --git a/src/app/store/rooms/rooms.effects.ts b/src/app/store/rooms/rooms.effects.ts index dd7378d..7d68b8f 100644 --- a/src/app/store/rooms/rooms.effects.ts +++ b/src/app/store/rooms/rooms.effects.ts @@ -44,6 +44,7 @@ import { } from '../../core/models/index'; import { NotificationAudioService, AppSound } from '../../core/services/notification-audio.service'; import { hasRoomBanForUser } from '../../core/helpers/room-ban.helpers'; +import { ROOM_URL_PATTERN } from '../../core/constants'; import { findRoomMember, removeRoomMember, @@ -149,6 +150,33 @@ export class RoomsEffects { ) ); + /** Reconnects saved rooms so joined servers stay online while the app is running. */ + keepSavedRoomsConnected$ = createEffect( + () => + this.actions$.pipe( + ofType( + RoomsActions.loadRoomsSuccess, + RoomsActions.forgetRoomSuccess, + RoomsActions.deleteRoomSuccess, + UsersActions.loadCurrentUserSuccess, + UsersActions.setCurrentUser + ), + withLatestFrom( + this.store.select(selectCurrentUser), + this.store.select(selectCurrentRoom), + this.store.select(selectSavedRooms) + ), + tap(([ + , user, + currentRoom, + savedRooms + ]) => { + this.syncSavedRoomConnections(user ?? null, currentRoom, savedRooms); + }) + ), + { dispatch: false } + ); + /** Creates a new room, saves it locally, and registers it with the server directory. */ createRoom$ = createEffect(() => this.actions$.pipe( @@ -319,9 +347,9 @@ export class RoomsEffects { () => this.actions$.pipe( ofType(RoomsActions.createRoomSuccess, RoomsActions.joinRoomSuccess), - withLatestFrom(this.store.select(selectCurrentUser)), - tap(([{ room }, user]) => { - this.connectToRoomSignaling(room, user ?? null); + withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectSavedRooms)), + tap(([{ room }, user, savedRooms]) => { + this.connectToRoomSignaling(room, user ?? null, undefined, savedRooms); this.router.navigate(['/room', room.id]); }) @@ -333,8 +361,8 @@ export class RoomsEffects { viewServer$ = createEffect(() => this.actions$.pipe( ofType(RoomsActions.viewServer), - withLatestFrom(this.store.select(selectCurrentUser)), - switchMap(([{ room }, user]) => { + withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectSavedRooms)), + switchMap(([{ room }, user, savedRooms]) => { if (!user) { return of(RoomsActions.joinRoomFailure({ error: 'Not logged in' })); } @@ -347,7 +375,7 @@ export class RoomsEffects { const oderId = user.oderId || this.webrtc.peerId(); - this.connectToRoomSignaling(room, user, oderId); + this.connectToRoomSignaling(room, user, oderId, savedRooms); this.router.navigate(['/room', room.id]); return of(RoomsActions.viewServerSuccess({ room })); @@ -1288,7 +1316,8 @@ export class RoomsEffects { private connectToRoomSignaling( room: Room, user: User | null, - resolvedOderId?: string + resolvedOderId?: string, + savedRooms: Room[] = [] ): void { const wsUrl = this.serverDirectory.getWebSocketUrl({ sourceId: room.sourceId, @@ -1298,17 +1327,27 @@ export class RoomsEffects { const oderId = resolvedOderId || user?.oderId || this.webrtc.peerId(); const displayName = user?.displayName || 'Anonymous'; const sameSignalServer = currentWsUrl === wsUrl; - - if (this.webrtc.isConnected() && sameSignalServer) { + const sameSignalRooms = this.getRoomsForSignalingUrl(this.includeRoom(savedRooms, room), wsUrl); + const backgroundRooms = sameSignalRooms.filter((candidate) => candidate.id !== room.id); + const joinCurrentEndpointRooms = () => { this.webrtc.setCurrentServer(room.id); + this.webrtc.identify(oderId, displayName); + + for (const backgroundRoom of backgroundRooms) { + if (!this.webrtc.hasJoinedServer(backgroundRoom.id)) { + this.webrtc.joinRoom(backgroundRoom.id, oderId); + } + } if (this.webrtc.hasJoinedServer(room.id)) { this.webrtc.switchServer(room.id, oderId); } else { - this.webrtc.identify(oderId, displayName); this.webrtc.joinRoom(room.id, oderId); } + }; + if (this.webrtc.isConnected() && sameSignalServer) { + joinCurrentEndpointRooms(); return; } @@ -1321,14 +1360,73 @@ export class RoomsEffects { if (!connected) return; - this.webrtc.setCurrentServer(room.id); - this.webrtc.identify(oderId, displayName); - this.webrtc.joinRoom(room.id, oderId); + joinCurrentEndpointRooms(); }, error: () => {} }); } + private syncSavedRoomConnections(user: User | null, currentRoom: Room | null, savedRooms: Room[]): void { + if (!user || savedRooms.length === 0) { + return; + } + + const watchedRoomId = this.extractRoomIdFromUrl(this.router.url); + const currentWsUrl = this.webrtc.getCurrentSignalingUrl(); + const targetRoom = (watchedRoomId + ? savedRooms.find((room) => room.id === watchedRoomId) ?? null + : null) + ?? (currentWsUrl ? this.findRoomBySignalingUrl(savedRooms, currentWsUrl) : null) + ?? currentRoom + ?? savedRooms[0] + ?? null; + + if (!targetRoom) { + return; + } + + this.connectToRoomSignaling(targetRoom, user, user.oderId || this.webrtc.peerId(), savedRooms); + } + + private includeRoom(rooms: Room[], room: Room): Room[] { + return rooms.some((candidate) => candidate.id === room.id) + ? rooms + : [...rooms, room]; + } + + private getRoomsForSignalingUrl(rooms: Room[], wsUrl: string): Room[] { + const seenRoomIds = new Set(); + const matchingRooms: Room[] = []; + + for (const room of rooms) { + if (seenRoomIds.has(room.id)) { + continue; + } + + if (this.serverDirectory.getWebSocketUrl({ + sourceId: room.sourceId, + sourceUrl: room.sourceUrl + }) !== wsUrl) { + continue; + } + + seenRoomIds.add(room.id); + matchingRooms.push(room); + } + + return matchingRooms; + } + + private findRoomBySignalingUrl(rooms: Room[], wsUrl: string): Room | null { + return this.getRoomsForSignalingUrl(rooms, wsUrl)[0] ?? null; + } + + private extractRoomIdFromUrl(url: string): string | null { + const roomMatch = url.match(ROOM_URL_PATTERN); + + return roomMatch ? roomMatch[1] : null; + } + private getUserRoleForRoom(room: Room, currentUser: User, currentRoom: Room | null): User['role'] | null { if (room.hostId === currentUser.id || room.hostId === currentUser.oderId) return 'host';