import { Injectable, inject } from '@angular/core'; import { recordDebugNetworkFileChunk } from '../../../../infrastructure/realtime/logging/debug-network-metrics'; import { RealtimeSessionFacade } from '../../../../core/realtime'; import { AttachmentStorageService } from '../../infrastructure/services/attachment-storage.service'; import { MAX_AUTO_SAVE_SIZE_BYTES } from '../../domain/constants/attachment.constants'; import { shouldPersistDownloadedAttachment } from '../../domain/logic/attachment.logic'; import type { Attachment, AttachmentMeta } from '../../domain/models/attachment.model'; import { ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT, ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT, DEFAULT_ATTACHMENT_MIME_TYPE, FILE_NOT_FOUND_REQUEST_ERROR, NO_CONNECTED_PEERS_REQUEST_ERROR } from '../../domain/constants/attachment-transfer.constants'; import { type FileAnnounceEvent, type FileAnnouncePayload, type FileCancelEvent, type FileCancelPayload, type FileChunkPayload, type FileNotFoundEvent, type FileNotFoundPayload, type FileRequestEvent, type FileRequestPayload, type LocalFileWithPath } from '../../domain/models/attachment-transfer.model'; import { AttachmentPersistenceService } from './attachment-persistence.service'; import { AttachmentRuntimeStore } from './attachment-runtime.store'; import { AttachmentTransferTransportService } from './attachment-transfer-transport.service'; @Injectable({ providedIn: 'root' }) export class AttachmentTransferService { private readonly webrtc = inject(RealtimeSessionFacade); private readonly runtimeStore = inject(AttachmentRuntimeStore); private readonly attachmentStorage = inject(AttachmentStorageService); private readonly persistence = inject(AttachmentPersistenceService); private readonly transport = inject(AttachmentTransferTransportService); getAttachmentMetasForMessages(messageIds: string[]): Record { const result: Record = {}; for (const messageId of messageIds) { const attachments = this.runtimeStore.getAttachmentsForMessage(messageId); if (attachments.length > 0) { result[messageId] = attachments.map((attachment) => ({ id: attachment.id, messageId: attachment.messageId, filename: attachment.filename, size: attachment.size, mime: attachment.mime, isImage: attachment.isImage, uploaderPeerId: attachment.uploaderPeerId, filePath: undefined, savedPath: undefined })); } } return result; } registerSyncedAttachments( attachmentMap: Record, messageRoomIds?: Record ): void { if (messageRoomIds) { for (const [messageId, roomId] of Object.entries(messageRoomIds)) { this.runtimeStore.rememberMessageRoom(messageId, roomId); } } const newAttachments: Attachment[] = []; for (const [messageId, metas] of Object.entries(attachmentMap)) { const existing = [...this.runtimeStore.getAttachmentsForMessage(messageId)]; for (const meta of metas) { const alreadyKnown = existing.find((entry) => entry.id === meta.id); if (!alreadyKnown) { const attachment: Attachment = { ...meta, available: false, receivedBytes: 0 }; existing.push(attachment); newAttachments.push(attachment); } } this.runtimeStore.setAttachmentsForMessage(messageId, existing); } if (newAttachments.length > 0) { this.runtimeStore.touch(); for (const attachment of newAttachments) { void this.persistence.persistAttachmentMeta(attachment); } } } requestFromAnyPeer(messageId: string, attachment: Attachment): void { const clearedRequestError = this.clearAttachmentRequestError(attachment); const connectedPeers = this.webrtc.getConnectedPeers(); if (connectedPeers.length === 0) { attachment.requestError = NO_CONNECTED_PEERS_REQUEST_ERROR; this.runtimeStore.touch(); console.warn('[Attachments] No connected peers to request file from'); return; } if (clearedRequestError) this.runtimeStore.touch(); this.runtimeStore.setPendingRequestPeers( this.buildRequestKey(messageId, attachment.id), new Set() ); this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId); } handleFileNotFound(payload: FileNotFoundPayload): void { const { messageId, fileId } = payload; if (!messageId || !fileId) return; const attachments = this.runtimeStore.getAttachmentsForMessage(messageId); const attachment = attachments.find((entry) => entry.id === fileId); const didSendRequest = this.sendFileRequestToNextPeer(messageId, fileId, attachment?.uploaderPeerId); if (!didSendRequest && attachment) { attachment.requestError = FILE_NOT_FOUND_REQUEST_ERROR; this.runtimeStore.touch(); } } requestImageFromAnyPeer(messageId: string, attachment: Attachment): void { this.requestFromAnyPeer(messageId, attachment); } requestFile(messageId: string, attachment: Attachment): void { this.requestFromAnyPeer(messageId, attachment); } hasPendingRequest(messageId: string, fileId: string): boolean { return this.runtimeStore.hasPendingRequest(this.buildRequestKey(messageId, fileId)); } async publishAttachments( messageId: string, files: File[], uploaderPeerId?: string ): Promise { const attachments: Attachment[] = []; for (const file of files) { const fileId = crypto.randomUUID?.() ?? `${Date.now()}-${Math.random()}`; const attachment: Attachment = { id: fileId, messageId, filename: file.name, size: file.size, mime: file.type || DEFAULT_ATTACHMENT_MIME_TYPE, isImage: file.type.startsWith('image/'), uploaderPeerId, filePath: (file as LocalFileWithPath).path, available: false }; attachments.push(attachment); this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file); try { attachment.objectUrl = URL.createObjectURL(file); attachment.available = true; } catch { /* non-critical */ } if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) { void this.persistence.saveFileToDisk(attachment, file); } const fileAnnounceEvent: FileAnnounceEvent = { type: 'file-announce', messageId, file: { id: fileId, filename: attachment.filename, size: attachment.size, mime: attachment.mime, isImage: attachment.isImage, uploaderPeerId } }; this.webrtc.broadcastMessage(fileAnnounceEvent); } const existingList = this.runtimeStore.getAttachmentsForMessage(messageId); this.runtimeStore.setAttachmentsForMessage(messageId, [...existingList, ...attachments]); this.runtimeStore.touch(); for (const attachment of attachments) { void this.persistence.persistAttachmentMeta(attachment); } } handleFileAnnounce(payload: FileAnnouncePayload): void { const { messageId, file } = payload; if (!messageId || !file) return; const list = [...this.runtimeStore.getAttachmentsForMessage(messageId)]; const alreadyKnown = list.find((entry) => entry.id === file.id); if (alreadyKnown) return; const attachment: Attachment = { id: file.id, messageId, filename: file.filename, size: file.size, mime: file.mime, isImage: !!file.isImage, uploaderPeerId: file.uploaderPeerId, available: false, receivedBytes: 0 }; list.push(attachment); this.runtimeStore.setAttachmentsForMessage(messageId, list); this.runtimeStore.touch(); void this.persistence.persistAttachmentMeta(attachment); } handleFileChunk(payload: FileChunkPayload): void { const { messageId, fileId, fromPeerId, index, total, data } = payload; if ( !messageId || !fileId || typeof index !== 'number' || typeof total !== 'number' || typeof data !== 'string' ) { return; } const list = this.runtimeStore.getAttachmentsForMessage(messageId); const attachment = list.find((entry) => entry.id === fileId); if (!attachment) return; const decodedBytes = this.transport.decodeBase64(data); const assemblyKey = `${messageId}:${fileId}`; const requestKey = this.buildRequestKey(messageId, fileId); this.runtimeStore.deletePendingRequest(requestKey); this.clearAttachmentRequestError(attachment); const chunkBuffer = this.getOrCreateChunkBuffer(assemblyKey, total); if (!chunkBuffer[index]) { chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer; this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1); } this.updateTransferProgress(attachment, decodedBytes, fromPeerId); this.runtimeStore.touch(); this.finalizeTransferIfComplete(attachment, assemblyKey, total); } async handleFileRequest(payload: FileRequestPayload): Promise { const { messageId, fileId, fromPeerId } = payload; if (!messageId || !fileId || !fromPeerId) return; const exactKey = `${messageId}:${fileId}`; const originalFile = this.runtimeStore.getOriginalFile(exactKey) ?? this.runtimeStore.findOriginalFileByFileId(fileId); if (originalFile) { await this.transport.streamFileToPeer( fromPeerId, messageId, fileId, originalFile, () => this.isTransferCancelled(fromPeerId, messageId, fileId) ); return; } const list = this.runtimeStore.getAttachmentsForMessage(messageId); const attachment = list.find((entry) => entry.id === fileId); const diskPath = attachment ? await this.attachmentStorage.resolveExistingPath(attachment) : null; if (diskPath) { await this.transport.streamFileFromDiskToPeer( fromPeerId, messageId, fileId, diskPath, () => this.isTransferCancelled(fromPeerId, messageId, fileId) ); return; } if (attachment?.isImage) { const roomName = await this.persistence.resolveCurrentRoomName(); const legacyDiskPath = await this.attachmentStorage.resolveLegacyImagePath( attachment.filename, roomName ); if (legacyDiskPath) { await this.transport.streamFileFromDiskToPeer( fromPeerId, messageId, fileId, legacyDiskPath, () => this.isTransferCancelled(fromPeerId, messageId, fileId) ); return; } } if (attachment?.available && attachment.objectUrl) { try { const response = await fetch(attachment.objectUrl); const blob = await response.blob(); const file = new File([blob], attachment.filename, { type: attachment.mime }); await this.transport.streamFileToPeer( fromPeerId, messageId, fileId, file, () => this.isTransferCancelled(fromPeerId, messageId, fileId) ); return; } catch { /* fall through */ } } const fileNotFoundEvent: FileNotFoundEvent = { type: 'file-not-found', messageId, fileId }; this.webrtc.sendToPeer(fromPeerId, fileNotFoundEvent); } cancelRequest(messageId: string, attachment: Attachment): void { const targetPeerId = attachment.uploaderPeerId; if (!targetPeerId) return; try { const assemblyKey = `${messageId}:${attachment.id}`; this.runtimeStore.deleteChunkBuffer(assemblyKey); this.runtimeStore.deleteChunkCount(assemblyKey); attachment.receivedBytes = 0; attachment.speedBps = 0; attachment.startedAtMs = undefined; attachment.lastUpdateMs = undefined; if (attachment.objectUrl) { try { URL.revokeObjectURL(attachment.objectUrl); } catch { /* ignore */ } attachment.objectUrl = undefined; } attachment.available = false; this.runtimeStore.touch(); const fileCancelEvent: FileCancelEvent = { type: 'file-cancel', messageId, fileId: attachment.id }; this.webrtc.sendToPeer(targetPeerId, fileCancelEvent); } catch { /* best-effort */ } } handleFileCancel(payload: FileCancelPayload): void { const { messageId, fileId, fromPeerId } = payload; if (!messageId || !fileId || !fromPeerId) return; this.runtimeStore.addCancelledTransfer( this.buildTransferKey(messageId, fileId, fromPeerId) ); } async fulfillRequestWithFile( messageId: string, fileId: string, targetPeerId: string, file: File ): Promise { this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file); await this.transport.streamFileToPeer( targetPeerId, messageId, fileId, file, () => this.isTransferCancelled(targetPeerId, messageId, fileId) ); } private buildTransferKey(messageId: string, fileId: string, peerId: string): string { return `${messageId}:${fileId}:${peerId}`; } private buildRequestKey(messageId: string, fileId: string): string { return `${messageId}:${fileId}`; } private clearAttachmentRequestError(attachment: Attachment): boolean { if (!attachment.requestError) return false; attachment.requestError = undefined; return true; } private isTransferCancelled(targetPeerId: string, messageId: string, fileId: string): boolean { return this.runtimeStore.hasCancelledTransfer( this.buildTransferKey(messageId, fileId, targetPeerId) ); } private sendFileRequestToNextPeer( messageId: string, fileId: string, preferredPeerId?: string ): boolean { const connectedPeers = this.webrtc.getConnectedPeers(); const requestKey = this.buildRequestKey(messageId, fileId); const triedPeers = this.runtimeStore.getPendingRequestPeers(requestKey) ?? new Set(); let targetPeerId: string | undefined; if (preferredPeerId && connectedPeers.includes(preferredPeerId) && !triedPeers.has(preferredPeerId)) { targetPeerId = preferredPeerId; } else { targetPeerId = connectedPeers.find((peerId) => !triedPeers.has(peerId)); } if (!targetPeerId) { this.runtimeStore.deletePendingRequest(requestKey); return false; } triedPeers.add(targetPeerId); this.runtimeStore.setPendingRequestPeers(requestKey, triedPeers); const fileRequestEvent: FileRequestEvent = { type: 'file-request', messageId, fileId }; this.webrtc.sendToPeer(targetPeerId, fileRequestEvent); return true; } private getOrCreateChunkBuffer(assemblyKey: string, total: number): ArrayBuffer[] { const existingChunkBuffer = this.runtimeStore.getChunkBuffer(assemblyKey); if (existingChunkBuffer) { return existingChunkBuffer; } const createdChunkBuffer = new Array(total); this.runtimeStore.setChunkBuffer(assemblyKey, createdChunkBuffer); this.runtimeStore.setChunkCount(assemblyKey, 0); return createdChunkBuffer; } private updateTransferProgress( attachment: Attachment, decodedBytes: Uint8Array, fromPeerId?: string ): void { const now = Date.now(); const previousReceived = attachment.receivedBytes ?? 0; attachment.receivedBytes = previousReceived + decodedBytes.byteLength; if (fromPeerId) { recordDebugNetworkFileChunk(fromPeerId, decodedBytes.byteLength, now); } if (!attachment.startedAtMs) attachment.startedAtMs = now; if (!attachment.lastUpdateMs) attachment.lastUpdateMs = now; const elapsedMs = Math.max(1, now - attachment.lastUpdateMs); const instantaneousBps = (decodedBytes.byteLength / elapsedMs) * 1000; const previousSpeed = attachment.speedBps ?? instantaneousBps; attachment.speedBps = ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT * previousSpeed + ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT * instantaneousBps; attachment.lastUpdateMs = now; } private finalizeTransferIfComplete( attachment: Attachment, assemblyKey: string, total: number ): void { const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0; const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey); if ( !completeBuffer || (receivedChunkCount !== total && (attachment.receivedBytes ?? 0) < attachment.size) || !completeBuffer.every((part) => part instanceof ArrayBuffer) ) { return; } const blob = new Blob(completeBuffer, { type: attachment.mime }); attachment.available = true; attachment.objectUrl = URL.createObjectURL(blob); if (shouldPersistDownloadedAttachment(attachment)) { void this.persistence.saveFileToDisk(attachment, blob); } this.runtimeStore.deleteChunkBuffer(assemblyKey); this.runtimeStore.deleteChunkCount(assemblyKey); this.runtimeStore.touch(); void this.persistence.persistAttachmentMeta(attachment); } }