fix typing indicator on wrong server
Some checks failed
Queue Release Build / build-linux (push) Blocked by required conditions
Queue Release Build / prepare (push) Successful in 15s
Deploy Web Apps / deploy (push) Successful in 16m15s
Queue Release Build / finalize (push) Has been cancelled
Queue Release Build / build-windows (push) Has been cancelled

This commit is contained in:
2026-03-18 22:10:11 +01:00
parent 141de64767
commit 1cdd1c5d2b
11 changed files with 431 additions and 108 deletions

View File

@@ -88,7 +88,8 @@ function handleLeaveServer(user: ConnectedUser, message: WsMessage, connectionId
type: 'user_left',
oderId: user.oderId,
displayName: user.displayName ?? 'Anonymous',
serverId: leaveSid
serverId: leaveSid,
serverIds: Array.from(user.serverIds)
}, user.oderId);
}

View File

@@ -25,7 +25,8 @@ function removeDeadConnection(connectionId: string): void {
type: 'user_left',
oderId: user.oderId,
displayName: user.displayName,
serverId: sid
serverId: sid,
serverIds: []
}, user.oderId);
});

View File

@@ -5,6 +5,10 @@ import {
signal,
effect
} from '@angular/core';
import {
NavigationEnd,
Router
} from '@angular/router';
import { take } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { WebRTCService } from './webrtc.service';
@@ -12,6 +16,7 @@ import { Store } from '@ngrx/store';
import { selectCurrentRoomName } from '../../store/rooms/rooms.selectors';
import { DatabaseService } from './database.service';
import { recordDebugNetworkFileChunk } from './debug-network-metrics.service';
import { ROOM_URL_PATTERN } from '../constants';
import type {
ChatAttachmentAnnouncement,
ChatAttachmentMeta,
@@ -145,9 +150,14 @@ export class AttachmentService {
private readonly webrtc = inject(WebRTCService);
private readonly ngrxStore = inject(Store);
private readonly database = inject(DatabaseService);
private readonly router = inject(Router);
/** Primary index: `messageId → Attachment[]`. */
private attachmentsByMessage = new Map<string, Attachment[]>();
/** Runtime cache of `messageId → roomId` for attachment gating. */
private messageRoomIds = new Map<string, string>();
/** Room currently being watched in the router, or `null` outside room routes. */
private watchedRoomId: string | null = this.extractWatchedRoomId(this.router.url);
/** Incremented on every mutation so signal consumers re-render. */
updated = signal<number>(0);
@@ -190,6 +200,24 @@ export class AttachmentService {
this.initFromDatabase();
}
});
this.router.events.subscribe((event) => {
if (!(event instanceof NavigationEnd)) {
return;
}
this.watchedRoomId = this.extractWatchedRoomId(event.urlAfterRedirects || event.url);
if (this.watchedRoomId) {
void this.requestAutoDownloadsForRoom(this.watchedRoomId);
}
});
this.webrtc.onPeerConnected.subscribe(() => {
if (this.watchedRoomId) {
void this.requestAutoDownloadsForRoom(this.watchedRoomId);
}
});
}
private getElectronApi(): AttachmentElectronApi | undefined {
@@ -201,6 +229,44 @@ export class AttachmentService {
return this.attachmentsByMessage.get(messageId) ?? [];
}
/** Cache the room that owns a message so background downloads can be gated by the watched server. */
rememberMessageRoom(messageId: string, roomId: string): void {
if (!messageId || !roomId)
return;
this.messageRoomIds.set(messageId, roomId);
}
/** Queue best-effort auto-download checks for a message's eligible attachments. */
queueAutoDownloadsForMessage(messageId: string, attachmentId?: string): void {
void this.requestAutoDownloadsForMessage(messageId, attachmentId);
}
/** Auto-request eligible missing attachments for the currently watched room. */
async requestAutoDownloadsForRoom(roomId: string): Promise<void> {
if (!roomId || !this.isRoomWatched(roomId))
return;
if (this.database.isReady()) {
const messages = await this.database.getMessages(roomId, 500, 0);
for (const message of messages) {
this.rememberMessageRoom(message.id, message.roomId);
await this.requestAutoDownloadsForMessage(message.id);
}
return;
}
for (const [messageId] of this.attachmentsByMessage) {
const attachmentRoomId = await this.resolveMessageRoomId(messageId);
if (attachmentRoomId === roomId) {
await this.requestAutoDownloadsForMessage(messageId);
}
}
}
/** Remove every attachment associated with a message. */
async deleteForMessage(messageId: string): Promise<void> {
const attachments = this.attachmentsByMessage.get(messageId) ?? [];
@@ -219,6 +285,7 @@ export class AttachmentService {
}
this.attachmentsByMessage.delete(messageId);
this.messageRoomIds.delete(messageId);
this.clearMessageScopedState(messageId);
if (hadCachedAttachments) {
@@ -276,8 +343,15 @@ export class AttachmentService {
* @param attachmentMap - Map of `messageId → AttachmentMeta[]` from peer.
*/
registerSyncedAttachments(
attachmentMap: Record<string, AttachmentMeta[]>
attachmentMap: Record<string, AttachmentMeta[]>,
messageRoomIds?: Record<string, string>
): void {
if (messageRoomIds) {
for (const [messageId, roomId] of Object.entries(messageRoomIds)) {
this.rememberMessageRoom(messageId, roomId);
}
}
const newAttachments: Attachment[] = [];
for (const [messageId, metas] of Object.entries(attachmentMap)) {
@@ -306,6 +380,7 @@ export class AttachmentService {
for (const attachment of newAttachments) {
void this.persistAttachmentMeta(attachment);
this.queueAutoDownloadsForMessage(attachment.messageId, attachment.id);
}
}
}
@@ -375,9 +450,9 @@ export class AttachmentService {
* message to all connected peers.
*
* 1. Each file is assigned a UUID.
* 2. A `file-announce` event is broadcast to peers.
* 3. Inline-preview media ≤ {@link MAX_AUTO_SAVE_SIZE_BYTES}
* are immediately streamed as chunked base-64.
* 2. A `file-announce` event is broadcast to peers.
* 3. Peers watching the message's server can request any
* auto-download-eligible media on demand.
*
* @param messageId - ID of the parent message.
* @param files - Array of user-selected `File` objects.
@@ -437,10 +512,6 @@ export class AttachmentService {
this.webrtc.broadcastMessage(fileAnnounceEvent);
// Auto-stream small inline-preview media
if (this.isMedia(attachment) && attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
await this.streamFileToPeers(messageId, fileId, file);
}
}
const existingList = this.attachmentsByMessage.get(messageId) ?? [];
@@ -482,6 +553,7 @@ export class AttachmentService {
this.attachmentsByMessage.set(messageId, list);
this.touch();
void this.persistAttachmentMeta(attachment);
this.queueAutoDownloadsForMessage(messageId, attachment.id);
}
/**
@@ -772,6 +844,38 @@ export class AttachmentService {
return `${messageId}:${fileId}`;
}
private async requestAutoDownloadsForMessage(messageId: string, attachmentId?: string): Promise<void> {
if (!messageId)
return;
const roomId = await this.resolveMessageRoomId(messageId);
if (!roomId || !this.isRoomWatched(roomId) || this.webrtc.getConnectedPeers().length === 0) {
return;
}
const attachments = this.attachmentsByMessage.get(messageId) ?? [];
for (const attachment of attachments) {
if (attachmentId && attachment.id !== attachmentId)
continue;
if (!this.shouldAutoRequestWhenWatched(attachment))
continue;
if (attachment.available)
continue;
if ((attachment.receivedBytes ?? 0) > 0)
continue;
if (this.pendingRequests.has(this.buildRequestKey(messageId, attachment.id)))
continue;
this.requestFromAnyPeer(messageId, attachment);
}
}
private clearMessageScopedState(messageId: string): void {
const scopedPrefix = `${messageId}:`;
@@ -867,6 +971,12 @@ export class AttachmentService {
attachment.mime.startsWith('audio/');
}
/** Auto-download only the assets that already supported eager loading when watched. */
private shouldAutoRequestWhenWatched(attachment: Attachment): boolean {
return attachment.isImage ||
(this.isMedia(attachment) && attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES);
}
/** Check whether a completed download should be cached on disk. */
private shouldPersistDownloadedAttachment(attachment: Attachment): boolean {
return attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES ||
@@ -1167,6 +1277,38 @@ export class AttachmentService {
} catch { /* load is best-effort */ }
}
private extractWatchedRoomId(url: string): string | null {
const roomMatch = url.match(ROOM_URL_PATTERN);
return roomMatch ? roomMatch[1] : null;
}
private isRoomWatched(roomId: string | null | undefined): boolean {
return !!roomId && roomId === this.watchedRoomId;
}
private async resolveMessageRoomId(messageId: string): Promise<string | null> {
const cachedRoomId = this.messageRoomIds.get(messageId);
if (cachedRoomId)
return cachedRoomId;
if (!this.database.isReady())
return null;
try {
const message = await this.database.getMessageById(messageId);
if (!message?.roomId)
return null;
this.rememberMessageRoom(messageId, message.roomId);
return message.roomId;
} catch {
return null;
}
}
/** One-time migration from localStorage to the database. */
private async migrateFromLocalStorage(): Promise<void> {
try {

View File

@@ -71,6 +71,7 @@ type IncomingSignalingMessage = Omit<Partial<SignalingMessage>, 'type' | 'payloa
oderId?: string;
serverTime?: number;
serverId?: string;
serverIds?: string[];
users?: SignalingUserSummary[];
displayName?: string;
fromUserId?: string;
@@ -92,8 +93,8 @@ export class WebRTCService implements OnDestroy {
private activeServerId: string | null = null;
/** The server ID where voice is currently active, or `null` when not in voice. */
private voiceServerId: string | null = null;
/** Maps each remote peer ID to the server they were discovered from. */
private readonly peerServerMap = new Map<string, string>();
/** Maps each remote peer ID to the shared servers they currently belong to. */
private readonly peerServerMap = new Map<string, Set<string>>();
private readonly serviceDestroyed$ = new Subject<void>();
private remoteScreenShareRequestsEnabled = false;
private readonly desiredRemoteScreenSharePeers = new Set<string>();
@@ -275,6 +276,7 @@ export class WebRTCService implements OnDestroy {
this.peerManager.peerDisconnected$.subscribe((peerId) => {
this.activeRemoteScreenSharePeers.delete(peerId);
this.peerServerMap.delete(peerId);
this.screenShareManager.clearScreenShareRequest(peerId);
});
@@ -349,6 +351,10 @@ export class WebRTCService implements OnDestroy {
if (!user.oderId)
continue;
if (message.serverId) {
this.trackPeerInServer(user.oderId, message.serverId);
}
const existing = this.peerManager.activePeerConnections.get(user.oderId);
const healthy = this.isPeerHealthy(existing);
@@ -367,10 +373,6 @@ export class WebRTCService implements OnDestroy {
this.peerManager.createPeerConnection(user.oderId, true);
this.peerManager.createAndSendOffer(user.oderId);
if (message.serverId) {
this.peerServerMap.set(user.oderId, message.serverId);
}
}
}
@@ -379,6 +381,10 @@ export class WebRTCService implements OnDestroy {
displayName: message.displayName,
oderId: message.oderId
});
if (message.oderId && message.serverId) {
this.trackPeerInServer(message.oderId, message.serverId);
}
}
private handleUserLeftSignalingMessage(message: IncomingSignalingMessage): void {
@@ -389,8 +395,16 @@ export class WebRTCService implements OnDestroy {
});
if (message.oderId) {
this.peerManager.removePeer(message.oderId);
this.peerServerMap.delete(message.oderId);
const hasRemainingSharedServers = Array.isArray(message.serverIds)
? this.replacePeerSharedServers(message.oderId, message.serverIds)
: (message.serverId
? this.untrackPeerFromServer(message.oderId, message.serverId)
: false);
if (!hasRemainingSharedServers) {
this.peerManager.removePeer(message.oderId);
this.peerServerMap.delete(message.oderId);
}
}
}
@@ -404,7 +418,7 @@ export class WebRTCService implements OnDestroy {
const offerEffectiveServer = this.voiceServerId || this.activeServerId;
if (offerEffectiveServer && !this.peerServerMap.has(fromUserId)) {
this.peerServerMap.set(fromUserId, offerEffectiveServer);
this.trackPeerInServer(fromUserId, offerEffectiveServer);
}
this.peerManager.handleOffer(fromUserId, sdp);
@@ -441,8 +455,8 @@ export class WebRTCService implements OnDestroy {
private closePeersNotInServer(serverId: string): void {
const peersToClose: string[] = [];
this.peerServerMap.forEach((peerServerId, peerId) => {
if (peerServerId !== serverId) {
this.peerServerMap.forEach((peerServerIds, peerId) => {
if (!peerServerIds.has(serverId)) {
peersToClose.push(peerId);
}
});
@@ -479,6 +493,45 @@ export class WebRTCService implements OnDestroy {
return this.signalingManager.connect(serverUrl);
}
private trackPeerInServer(peerId: string, serverId: string): void {
if (!peerId || !serverId)
return;
const trackedServers = this.peerServerMap.get(peerId) ?? new Set<string>();
trackedServers.add(serverId);
this.peerServerMap.set(peerId, trackedServers);
}
private replacePeerSharedServers(peerId: string, serverIds: string[]): boolean {
const sharedServerIds = serverIds.filter((serverId) => this.memberServerIds.has(serverId));
if (sharedServerIds.length === 0) {
this.peerServerMap.delete(peerId);
return false;
}
this.peerServerMap.set(peerId, new Set(sharedServerIds));
return true;
}
private untrackPeerFromServer(peerId: string, serverId: string): boolean {
const trackedServers = this.peerServerMap.get(peerId);
if (!trackedServers)
return false;
trackedServers.delete(serverId);
if (trackedServers.size === 0) {
this.peerServerMap.delete(peerId);
return false;
}
this.peerServerMap.set(peerId, trackedServers);
return true;
}
/**
* Ensure the signaling WebSocket is connected, reconnecting if needed.
*

View File

@@ -3,10 +3,13 @@ import {
Component,
inject,
signal,
DestroyRef
DestroyRef,
effect
} from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { Store } from '@ngrx/store';
import { WebRTCService } from '../../../core/services/webrtc.service';
import { selectCurrentRoom } from '../../../store/rooms/rooms.selectors';
import {
merge,
interval,
@@ -23,6 +26,7 @@ interface TypingSignalingMessage {
type: string;
displayName: string;
oderId: string;
serverId: string;
}
@Component({
@@ -36,6 +40,9 @@ interface TypingSignalingMessage {
})
export class TypingIndicatorComponent {
private readonly typingMap = new Map<string, { name: string; expiresAt: number }>();
private readonly store = inject(Store);
private readonly currentRoom = this.store.selectSignal(selectCurrentRoom);
private lastRoomId: string | null = null;
typingDisplay = signal<string[]>([]);
typingOthersCount = signal<number>(0);
@@ -47,8 +54,10 @@ export class TypingIndicatorComponent {
filter((msg): msg is TypingSignalingMessage =>
msg?.type === 'user_typing' &&
typeof msg.displayName === 'string' &&
typeof msg.oderId === 'string'
typeof msg.oderId === 'string' &&
typeof msg.serverId === 'string'
),
filter((msg) => msg.serverId === this.currentRoom()?.id),
tap((msg) => {
const now = Date.now();
@@ -77,6 +86,17 @@ export class TypingIndicatorComponent {
merge(typing$, purge$)
.pipe(takeUntilDestroyed(destroyRef))
.subscribe(() => this.recomputeDisplay());
effect(() => {
const roomId = this.currentRoom()?.id ?? null;
if (roomId === this.lastRoomId)
return;
this.lastRoomId = roomId;
this.typingMap.clear();
this.recomputeDisplay();
});
}
private recomputeDisplay(): void {

View File

@@ -4,13 +4,11 @@ import {
inject,
signal,
computed,
OnInit,
OnDestroy
OnInit
} from '@angular/core';
import { CommonModule } from '@angular/common';
import { Store } from '@ngrx/store';
import { NgIcon, provideIcons } from '@ng-icons/core';
import { Subscription } from 'rxjs';
import {
lucideMic,
lucideMicOff,
@@ -28,6 +26,7 @@ import { ScreenShareQuality } from '../../../core/services/webrtc';
import { UsersActions } from '../../../store/users/users.actions';
import { selectCurrentUser } from '../../../store/users/users.selectors';
import { DebugConsoleComponent, ScreenShareQualityDialogComponent } from '../../../shared';
import { VoicePlaybackService } from '../voice-controls/services/voice-playback.service';
@Component({
selector: 'app-floating-voice-controls',
@@ -55,9 +54,10 @@ import { DebugConsoleComponent, ScreenShareQualityDialogComponent } from '../../
* Floating voice controls displayed when the user navigates away from the voice-connected server.
* Provides mute, deafen, screen-share, and disconnect actions in a compact overlay.
*/
export class FloatingVoiceControlsComponent implements OnInit, OnDestroy {
export class FloatingVoiceControlsComponent implements OnInit {
private webrtcService = inject(WebRTCService);
private voiceSessionService = inject(VoiceSessionService);
private voicePlayback = inject(VoicePlaybackService);
private store = inject(Store);
currentUser = this.store.selectSignal(selectCurrentUser);
@@ -75,8 +75,6 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy {
askScreenShareQuality = signal(true);
showScreenShareQualityDialog = signal(false);
private stateSubscription: Subscription | null = null;
/** Sync local mute/deafen/screen-share state from the WebRTC service on init. */
ngOnInit(): void {
// Sync mute/deafen state from webrtc service
@@ -84,10 +82,15 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy {
this.isDeafened.set(this.webrtcService.isDeafened());
this.isScreenSharing.set(this.webrtcService.isScreenSharing());
this.syncScreenShareSettings();
}
ngOnDestroy(): void {
this.stateSubscription?.unsubscribe();
const settings = loadVoiceSettingsFromStorage();
this.voicePlayback.updateOutputVolume(settings.outputVolume / 100);
this.voicePlayback.updateDeafened(this.isDeafened());
if (settings.outputDevice) {
this.voicePlayback.applyOutputDevice(settings.outputDevice);
}
}
/** Navigate back to the voice-connected server. */
@@ -117,6 +120,7 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy {
toggleDeafen(): void {
this.isDeafened.update((current) => !current);
this.webrtcService.toggleDeafen(this.isDeafened());
this.voicePlayback.updateDeafened(this.isDeafened());
// When deafening, also mute
if (this.isDeafened() && !this.isMuted()) {
@@ -189,6 +193,8 @@ export class FloatingVoiceControlsComponent implements OnInit, OnDestroy {
// Disable voice
this.webrtcService.disableVoice();
this.voicePlayback.teardownAll();
this.voicePlayback.updateDeafened(false);
// Update user voice state in store
const user = this.currentUser();

View File

@@ -60,6 +60,28 @@ export class VoicePlaybackService {
: null;
void this.applyEffectiveOutputDeviceToAllPipelines();
});
this.webrtc.onRemoteStream.subscribe(({ peerId }) => {
const voiceStream = this.webrtc.getRemoteVoiceStream(peerId);
if (!voiceStream) {
this.removeRemoteAudio(peerId);
return;
}
this.handleRemoteStream(peerId, voiceStream, this.buildPlaybackOptions());
});
this.webrtc.onVoiceConnected.subscribe(() => {
const options = this.buildPlaybackOptions(true);
this.playPendingStreams(options);
this.ensureAllRemoteStreamsPlaying(options);
});
this.webrtc.onPeerDisconnected.subscribe((peerId) => {
this.removeRemoteAudio(peerId);
});
}
handleRemoteStream(peerId: string, stream: MediaStream, options: PlaybackOptions): void {
@@ -158,6 +180,14 @@ export class VoicePlaybackService {
this.pendingRemoteStreams.clear();
}
private buildPlaybackOptions(forceConnected = this.webrtc.isVoiceConnected()): PlaybackOptions {
return {
isConnected: forceConnected,
outputVolume: this.masterVolume,
isDeafened: this.deafened
};
}
/**
* Build the Web Audio graph for a remote peer:
*

View File

@@ -10,7 +10,6 @@ import {
import { CommonModule } from '@angular/common';
import { Store } from '@ngrx/store';
import { NgIcon, provideIcons } from '@ng-icons/core';
import { Subscription } from 'rxjs';
import {
lucideMic,
lucideMicOff,
@@ -76,7 +75,6 @@ export class VoiceControlsComponent implements OnInit, OnDestroy {
private voicePlayback = inject(VoicePlaybackService);
private store = inject(Store);
private settingsModal = inject(SettingsModalService);
private remoteStreamSubscription: Subscription | null = null;
currentUser = this.store.selectSignal(selectCurrentUser);
currentRoom = this.store.selectSignal(selectCurrentRoom);
@@ -110,56 +108,18 @@ export class VoiceControlsComponent implements OnInit, OnDestroy {
isDeafened: this.isDeafened()
};
}
private voiceConnectedSubscription: Subscription | null = null;
async ngOnInit(): Promise<void> {
await this.loadAudioDevices();
// Load persisted voice settings and apply
this.loadSettings();
this.applySettingsToWebRTC();
// Subscribe to remote streams to play audio from peers
this.remoteStreamSubscription = this.webrtcService.onRemoteStream.subscribe(
({ peerId }) => {
const voiceStream = this.webrtcService.getRemoteVoiceStream(peerId);
if (!voiceStream) {
this.voicePlayback.removeRemoteAudio(peerId);
return;
}
this.voicePlayback.handleRemoteStream(peerId, voiceStream, this.playbackOptions());
}
);
// Subscribe to voice connected event to play pending streams and ensure all remote audio is set up
this.voiceConnectedSubscription = this.webrtcService.onVoiceConnected.subscribe(() => {
const options = this.playbackOptions();
this.voicePlayback.playPendingStreams(options);
// Also ensure all remote streams from connected peers are playing
// This handles the case where streams were received while voice was "connected"
// from a previous session but audio elements weren't set up
this.voicePlayback.ensureAllRemoteStreamsPlaying(options);
});
// Clean up audio when peer disconnects
this.webrtcService.onPeerDisconnected.subscribe((peerId) => {
this.voicePlayback.removeRemoteAudio(peerId);
});
}
ngOnDestroy(): void {
if (this.isConnected()) {
this.disconnect();
if (!this.webrtcService.isVoiceConnected()) {
this.voicePlayback.teardownAll();
}
this.voicePlayback.teardownAll();
this.remoteStreamSubscription?.unsubscribe();
this.voiceConnectedSubscription?.unsubscribe();
}
async loadAudioDevices(): Promise<void> {
@@ -304,6 +264,7 @@ export class VoiceControlsComponent implements OnInit, OnDestroy {
// Disable voice (stops audio tracks but keeps peer connections open for chat)
this.webrtcService.disableVoice();
this.voicePlayback.teardownAll();
this.voicePlayback.updateDeafened(false);
const user = this.currentUser();

View File

@@ -256,7 +256,10 @@ function handleSyncBatch(
return EMPTY;
if (hasAttachmentMetaMap(event.attachments)) {
attachments.registerSyncedAttachments(event.attachments);
attachments.registerSyncedAttachments(
event.attachments,
Object.fromEntries(event.messages.map((message) => [message.id, message.roomId]))
);
}
return from(processSyncBatch(event, db, attachments)).pipe(
@@ -277,6 +280,8 @@ async function processSyncBatch(
const toUpsert: Message[] = [];
for (const incoming of event.messages) {
attachments.rememberMessageRoom(incoming.id, incoming.roomId);
const { message, changed } = await mergeIncomingMessage(incoming, db);
if (incoming.isDeleted) {
@@ -292,40 +297,31 @@ async function processSyncBatch(
}
if (hasAttachmentMetaMap(event.attachments)) {
requestMissingImages(event.attachments, attachments);
queueWatchedAttachmentDownloads(event.attachments, attachments);
}
return toUpsert;
}
/** Auto-requests any unavailable image attachments from any connected peer. */
function requestMissingImages(
/** Queue best-effort auto-downloads for watched-room attachments. */
function queueWatchedAttachmentDownloads(
attachmentMap: AttachmentMetaMap,
attachments: AttachmentService
): void {
for (const [msgId, metas] of Object.entries(attachmentMap)) {
for (const meta of metas) {
if (!meta.isImage)
continue;
const atts = attachments.getForMessage(msgId);
const matchingAttachment = atts.find((attachment) => attachment.id === meta.id);
if (
matchingAttachment &&
!matchingAttachment.available &&
!(matchingAttachment.receivedBytes && matchingAttachment.receivedBytes > 0)
) {
attachments.requestImageFromAnyPeer(msgId, matchingAttachment);
}
}
for (const msgId of Object.keys(attachmentMap)) {
attachments.queueAutoDownloadsForMessage(msgId);
}
}
/** Saves an incoming chat message to DB and dispatches receiveMessage. */
function handleChatMessage(
event: IncomingMessageEvent,
{ db, debugging, currentUser }: IncomingMessageContext
{
db,
debugging,
attachments,
currentUser
}: IncomingMessageContext
): Observable<Action> {
const msg = event.message;
@@ -340,6 +336,8 @@ function handleChatMessage(
if (isOwnMessage)
return EMPTY;
attachments.rememberMessageRoom(msg.id, msg.roomId);
trackBackgroundOperation(
db.saveMessage(msg),
debugging,
@@ -492,6 +490,11 @@ function handleFileAnnounce(
{ attachments }: IncomingMessageContext
): Observable<Action> {
attachments.handleFileAnnounce(event);
if (event.messageId) {
attachments.queueAutoDownloadsForMessage(event.messageId, event.file?.id);
}
return EMPTY;
}

View File

@@ -64,6 +64,12 @@ export class MessagesEffects {
mergeMap(async (messages) => {
const hydrated = await hydrateMessages(messages, this.db);
for (const message of hydrated) {
this.attachments.rememberMessageRoom(message.id, message.roomId);
}
void this.attachments.requestAutoDownloadsForRoom(roomId);
return MessagesActions.loadMessagesSuccess({ messages: hydrated });
}),
catchError((error) =>
@@ -104,6 +110,8 @@ export class MessagesEffects {
replyToId
};
this.attachments.rememberMessageRoom(message.id, message.roomId);
this.trackBackgroundOperation(
this.db.saveMessage(message),
'Failed to persist outgoing chat message',

View File

@@ -44,6 +44,7 @@ import {
} from '../../core/models/index';
import { NotificationAudioService, AppSound } from '../../core/services/notification-audio.service';
import { hasRoomBanForUser } from '../../core/helpers/room-ban.helpers';
import { ROOM_URL_PATTERN } from '../../core/constants';
import {
findRoomMember,
removeRoomMember,
@@ -149,6 +150,33 @@ export class RoomsEffects {
)
);
/** Reconnects saved rooms so joined servers stay online while the app is running. */
keepSavedRoomsConnected$ = createEffect(
() =>
this.actions$.pipe(
ofType(
RoomsActions.loadRoomsSuccess,
RoomsActions.forgetRoomSuccess,
RoomsActions.deleteRoomSuccess,
UsersActions.loadCurrentUserSuccess,
UsersActions.setCurrentUser
),
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom),
this.store.select(selectSavedRooms)
),
tap(([
, user,
currentRoom,
savedRooms
]) => {
this.syncSavedRoomConnections(user ?? null, currentRoom, savedRooms);
})
),
{ dispatch: false }
);
/** Creates a new room, saves it locally, and registers it with the server directory. */
createRoom$ = createEffect(() =>
this.actions$.pipe(
@@ -319,9 +347,9 @@ export class RoomsEffects {
() =>
this.actions$.pipe(
ofType(RoomsActions.createRoomSuccess, RoomsActions.joinRoomSuccess),
withLatestFrom(this.store.select(selectCurrentUser)),
tap(([{ room }, user]) => {
this.connectToRoomSignaling(room, user ?? null);
withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectSavedRooms)),
tap(([{ room }, user, savedRooms]) => {
this.connectToRoomSignaling(room, user ?? null, undefined, savedRooms);
this.router.navigate(['/room', room.id]);
})
@@ -333,8 +361,8 @@ export class RoomsEffects {
viewServer$ = createEffect(() =>
this.actions$.pipe(
ofType(RoomsActions.viewServer),
withLatestFrom(this.store.select(selectCurrentUser)),
switchMap(([{ room }, user]) => {
withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectSavedRooms)),
switchMap(([{ room }, user, savedRooms]) => {
if (!user) {
return of(RoomsActions.joinRoomFailure({ error: 'Not logged in' }));
}
@@ -347,7 +375,7 @@ export class RoomsEffects {
const oderId = user.oderId || this.webrtc.peerId();
this.connectToRoomSignaling(room, user, oderId);
this.connectToRoomSignaling(room, user, oderId, savedRooms);
this.router.navigate(['/room', room.id]);
return of(RoomsActions.viewServerSuccess({ room }));
@@ -1288,7 +1316,8 @@ export class RoomsEffects {
private connectToRoomSignaling(
room: Room,
user: User | null,
resolvedOderId?: string
resolvedOderId?: string,
savedRooms: Room[] = []
): void {
const wsUrl = this.serverDirectory.getWebSocketUrl({
sourceId: room.sourceId,
@@ -1298,17 +1327,27 @@ export class RoomsEffects {
const oderId = resolvedOderId || user?.oderId || this.webrtc.peerId();
const displayName = user?.displayName || 'Anonymous';
const sameSignalServer = currentWsUrl === wsUrl;
if (this.webrtc.isConnected() && sameSignalServer) {
const sameSignalRooms = this.getRoomsForSignalingUrl(this.includeRoom(savedRooms, room), wsUrl);
const backgroundRooms = sameSignalRooms.filter((candidate) => candidate.id !== room.id);
const joinCurrentEndpointRooms = () => {
this.webrtc.setCurrentServer(room.id);
this.webrtc.identify(oderId, displayName);
for (const backgroundRoom of backgroundRooms) {
if (!this.webrtc.hasJoinedServer(backgroundRoom.id)) {
this.webrtc.joinRoom(backgroundRoom.id, oderId);
}
}
if (this.webrtc.hasJoinedServer(room.id)) {
this.webrtc.switchServer(room.id, oderId);
} else {
this.webrtc.identify(oderId, displayName);
this.webrtc.joinRoom(room.id, oderId);
}
};
if (this.webrtc.isConnected() && sameSignalServer) {
joinCurrentEndpointRooms();
return;
}
@@ -1321,14 +1360,73 @@ export class RoomsEffects {
if (!connected)
return;
this.webrtc.setCurrentServer(room.id);
this.webrtc.identify(oderId, displayName);
this.webrtc.joinRoom(room.id, oderId);
joinCurrentEndpointRooms();
},
error: () => {}
});
}
private syncSavedRoomConnections(user: User | null, currentRoom: Room | null, savedRooms: Room[]): void {
if (!user || savedRooms.length === 0) {
return;
}
const watchedRoomId = this.extractRoomIdFromUrl(this.router.url);
const currentWsUrl = this.webrtc.getCurrentSignalingUrl();
const targetRoom = (watchedRoomId
? savedRooms.find((room) => room.id === watchedRoomId) ?? null
: null)
?? (currentWsUrl ? this.findRoomBySignalingUrl(savedRooms, currentWsUrl) : null)
?? currentRoom
?? savedRooms[0]
?? null;
if (!targetRoom) {
return;
}
this.connectToRoomSignaling(targetRoom, user, user.oderId || this.webrtc.peerId(), savedRooms);
}
private includeRoom(rooms: Room[], room: Room): Room[] {
return rooms.some((candidate) => candidate.id === room.id)
? rooms
: [...rooms, room];
}
private getRoomsForSignalingUrl(rooms: Room[], wsUrl: string): Room[] {
const seenRoomIds = new Set<string>();
const matchingRooms: Room[] = [];
for (const room of rooms) {
if (seenRoomIds.has(room.id)) {
continue;
}
if (this.serverDirectory.getWebSocketUrl({
sourceId: room.sourceId,
sourceUrl: room.sourceUrl
}) !== wsUrl) {
continue;
}
seenRoomIds.add(room.id);
matchingRooms.push(room);
}
return matchingRooms;
}
private findRoomBySignalingUrl(rooms: Room[], wsUrl: string): Room | null {
return this.getRoomsForSignalingUrl(rooms, wsUrl)[0] ?? null;
}
private extractRoomIdFromUrl(url: string): string | null {
const roomMatch = url.match(ROOM_URL_PATTERN);
return roomMatch ? roomMatch[1] : null;
}
private getUserRoleForRoom(room: Room, currentUser: User, currentRoom: Room | null): User['role'] | null {
if (room.hostId === currentUser.id || room.hostId === currentUser.oderId)
return 'host';