1169 lines
36 KiB
TypeScript
1169 lines
36 KiB
TypeScript
/* eslint-disable @typescript-eslint/member-ordering, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion, complexity, max-statements-per-line */
|
||
import {
|
||
Injectable,
|
||
inject,
|
||
signal,
|
||
effect
|
||
} from '@angular/core';
|
||
import { take } from 'rxjs';
|
||
import { v4 as uuidv4 } from 'uuid';
|
||
import { WebRTCService } from './webrtc.service';
|
||
import { Store } from '@ngrx/store';
|
||
import { selectCurrentRoomName } from '../../store/rooms/rooms.selectors';
|
||
import { DatabaseService } from './database.service';
|
||
import { recordDebugNetworkFileChunk } from './debug-network-metrics.service';
|
||
|
||
/** Size (bytes) of each chunk when streaming a file over RTCDataChannel. */
|
||
const FILE_CHUNK_SIZE_BYTES = 64 * 1024; // 64 KB
|
||
|
||
/** Maximum file size (bytes) that is automatically saved or pushed for inline previews. */
|
||
export const MAX_AUTO_SAVE_SIZE_BYTES = 10 * 1024 * 1024; // 10 MB
|
||
/**
|
||
* EWMA smoothing weight for the *previous* speed estimate.
|
||
* The complementary weight (1 − this value) is applied to the
|
||
* instantaneous measurement.
|
||
*/
|
||
const EWMA_PREVIOUS_WEIGHT = 0.7;
|
||
const EWMA_CURRENT_WEIGHT = 1 - EWMA_PREVIOUS_WEIGHT;
|
||
/** Fallback MIME type when none is provided by the sender. */
|
||
const DEFAULT_MIME_TYPE = 'application/octet-stream';
|
||
/** localStorage key used by the legacy attachment store (migration target). */
|
||
const LEGACY_STORAGE_KEY = 'metoyou_attachments';
|
||
/** User-facing error when no peers are available for a request. */
|
||
const NO_CONNECTED_PEERS_REQUEST_ERROR = 'No connected peers are available to provide this file right now.';
|
||
/** User-facing error when connected peers cannot provide a requested file. */
|
||
const FILE_NOT_FOUND_REQUEST_ERROR = 'The connected peers do not have this file right now.';
|
||
|
||
/**
|
||
* Metadata describing a file attachment linked to a chat message.
|
||
*/
|
||
export interface AttachmentMeta {
|
||
/** Unique attachment identifier. */
|
||
id: string;
|
||
/** ID of the parent message. */
|
||
messageId: string;
|
||
/** Original file name. */
|
||
filename: string;
|
||
/** File size in bytes. */
|
||
size: number;
|
||
/** MIME type (e.g. `image/png`). */
|
||
mime: string;
|
||
/** Whether the file is a raster/vector image. */
|
||
isImage: boolean;
|
||
/** Peer ID of the user who originally uploaded the file. */
|
||
uploaderPeerId?: string;
|
||
/** Electron-only: absolute path to the uploader's original file. */
|
||
filePath?: string;
|
||
/** Electron-only: disk-cache path where the file was saved locally. */
|
||
savedPath?: string;
|
||
}
|
||
|
||
/**
|
||
* Runtime representation of an attachment including download
|
||
* progress and blob URL state.
|
||
*/
|
||
export interface Attachment extends AttachmentMeta {
|
||
/** Whether the file content is available locally (blob URL set). */
|
||
available: boolean;
|
||
/** Object URL for in-browser rendering / download. */
|
||
objectUrl?: string;
|
||
/** Number of bytes received so far (during chunked download). */
|
||
receivedBytes?: number;
|
||
/** Estimated download speed (bytes / second), EWMA-smoothed. */
|
||
speedBps?: number;
|
||
/** Epoch ms when the download started. */
|
||
startedAtMs?: number;
|
||
/** Epoch ms of the most recent chunk received. */
|
||
lastUpdateMs?: number;
|
||
/** User-facing request failure shown in the attachment card. */
|
||
requestError?: string;
|
||
}
|
||
|
||
/**
|
||
* Manages peer-to-peer file transfer, local persistence, and
|
||
* in-memory caching of file attachments linked to chat messages.
|
||
*
|
||
* Files are announced to peers via a `file-announce` event and
|
||
* transferred using a chunked base-64 protocol over WebRTC data
|
||
* channels. On Electron, files under {@link MAX_AUTO_SAVE_SIZE_BYTES}
|
||
* are automatically persisted to the app-data directory.
|
||
*/
|
||
@Injectable({ providedIn: 'root' })
|
||
export class AttachmentService {
|
||
private readonly webrtc = inject(WebRTCService);
|
||
private readonly ngrxStore = inject(Store);
|
||
private readonly database = inject(DatabaseService);
|
||
|
||
/** Primary index: `messageId → Attachment[]`. */
|
||
private attachmentsByMessage = new Map<string, Attachment[]>();
|
||
|
||
/** Incremented on every mutation so signal consumers re-render. */
|
||
updated = signal<number>(0);
|
||
|
||
/**
|
||
* In-memory map of original `File` objects retained by the uploader
|
||
* so that file-request handlers can stream them on demand.
|
||
* Key format: `"messageId:fileId"`.
|
||
*/
|
||
private originalFiles = new Map<string, File>();
|
||
|
||
/** Set of `"messageId:fileId:peerId"` keys representing cancelled transfers. */
|
||
private cancelledTransfers = new Set<string>();
|
||
|
||
/**
|
||
* Map of `"messageId:fileId" → Set<peerId>` tracking which peers
|
||
* have already been asked for a particular file.
|
||
*/
|
||
private pendingRequests = new Map<string, Set<string>>();
|
||
|
||
/**
|
||
* In-flight chunk assembly buffers.
|
||
* `"messageId:fileId" → ArrayBuffer[]` (indexed by chunk ordinal).
|
||
*/
|
||
private chunkBuffers = new Map<string, ArrayBuffer[]>();
|
||
|
||
/**
|
||
* Number of chunks received for each in-flight transfer.
|
||
* `"messageId:fileId" → number`.
|
||
*/
|
||
private chunkCounts = new Map<string, number>();
|
||
|
||
/** Whether the initial DB load has been performed. */
|
||
private isDatabaseInitialised = false;
|
||
|
||
constructor() {
|
||
effect(() => {
|
||
if (this.database.isReady() && !this.isDatabaseInitialised) {
|
||
this.isDatabaseInitialised = true;
|
||
this.initFromDatabase();
|
||
}
|
||
});
|
||
}
|
||
|
||
/** Return the attachment list for a given message. */
|
||
getForMessage(messageId: string): Attachment[] {
|
||
return this.attachmentsByMessage.get(messageId) ?? [];
|
||
}
|
||
|
||
/** Remove every attachment associated with a message. */
|
||
async deleteForMessage(messageId: string): Promise<void> {
|
||
const attachments = this.attachmentsByMessage.get(messageId) ?? [];
|
||
const hadCachedAttachments = attachments.length > 0 || this.attachmentsByMessage.has(messageId);
|
||
const retainedSavedPaths = await this.getRetainedSavedPathsForOtherMessages(messageId);
|
||
const savedPathsToDelete = new Set<string>();
|
||
|
||
for (const attachment of attachments) {
|
||
if (attachment.objectUrl) {
|
||
try { URL.revokeObjectURL(attachment.objectUrl); } catch { /* ignore */ }
|
||
}
|
||
|
||
if (attachment.savedPath && !retainedSavedPaths.has(attachment.savedPath)) {
|
||
savedPathsToDelete.add(attachment.savedPath);
|
||
}
|
||
}
|
||
|
||
this.attachmentsByMessage.delete(messageId);
|
||
this.clearMessageScopedState(messageId);
|
||
|
||
if (hadCachedAttachments) {
|
||
this.touch();
|
||
}
|
||
|
||
if (this.database.isReady()) {
|
||
await this.database.deleteAttachmentsForMessage(messageId);
|
||
}
|
||
|
||
for (const diskPath of savedPathsToDelete) {
|
||
await this.deleteSavedFile(diskPath);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Build a map of minimal attachment metadata for a set of message IDs.
|
||
* Used during inventory-based message synchronisation so that peers
|
||
* learn about attachments without transferring file content.
|
||
*
|
||
* @param messageIds - Messages to collect metadata for.
|
||
* @returns Record keyed by messageId whose values are arrays of
|
||
* {@link AttachmentMeta} (local paths are scrubbed).
|
||
*/
|
||
getAttachmentMetasForMessages(
|
||
messageIds: string[]
|
||
): Record<string, AttachmentMeta[]> {
|
||
const result: Record<string, AttachmentMeta[]> = {};
|
||
|
||
for (const messageId of messageIds) {
|
||
const attachments = this.attachmentsByMessage.get(messageId);
|
||
|
||
if (attachments && attachments.length > 0) {
|
||
result[messageId] = attachments.map((attachment) => ({
|
||
id: attachment.id,
|
||
messageId: attachment.messageId,
|
||
filename: attachment.filename,
|
||
size: attachment.size,
|
||
mime: attachment.mime,
|
||
isImage: attachment.isImage,
|
||
uploaderPeerId: attachment.uploaderPeerId,
|
||
filePath: undefined, // never share local paths
|
||
savedPath: undefined // never share local paths
|
||
}));
|
||
}
|
||
}
|
||
|
||
return result;
|
||
}
|
||
|
||
/**
|
||
* Register attachment metadata received via message sync
|
||
* (content is not yet available - only metadata).
|
||
*
|
||
* @param attachmentMap - Map of `messageId → AttachmentMeta[]` from peer.
|
||
*/
|
||
registerSyncedAttachments(
|
||
attachmentMap: Record<string, AttachmentMeta[]>
|
||
): void {
|
||
const newAttachments: Attachment[] = [];
|
||
|
||
for (const [messageId, metas] of Object.entries(attachmentMap)) {
|
||
const existing = this.attachmentsByMessage.get(messageId) ?? [];
|
||
|
||
for (const meta of metas) {
|
||
const alreadyKnown = existing.find((entry) => entry.id === meta.id);
|
||
|
||
if (!alreadyKnown) {
|
||
const attachment: Attachment = { ...meta,
|
||
available: false,
|
||
receivedBytes: 0 };
|
||
|
||
existing.push(attachment);
|
||
newAttachments.push(attachment);
|
||
}
|
||
}
|
||
|
||
if (existing.length > 0) {
|
||
this.attachmentsByMessage.set(messageId, existing);
|
||
}
|
||
}
|
||
|
||
if (newAttachments.length > 0) {
|
||
this.touch();
|
||
|
||
for (const attachment of newAttachments) {
|
||
void this.persistAttachmentMeta(attachment);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Request a file from any connected peer that might have it.
|
||
* Automatically cycles through all connected peers if the first
|
||
* one does not have the file.
|
||
*
|
||
* @param messageId - Parent message.
|
||
* @param attachment - Attachment to request.
|
||
*/
|
||
requestFromAnyPeer(messageId: string, attachment: Attachment): void {
|
||
const clearedRequestError = this.clearAttachmentRequestError(attachment);
|
||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||
|
||
if (connectedPeers.length === 0) {
|
||
attachment.requestError = NO_CONNECTED_PEERS_REQUEST_ERROR;
|
||
this.touch();
|
||
console.warn('[Attachments] No connected peers to request file from');
|
||
return;
|
||
}
|
||
|
||
if (clearedRequestError)
|
||
this.touch();
|
||
|
||
const requestKey = this.buildRequestKey(messageId, attachment.id);
|
||
|
||
this.pendingRequests.set(requestKey, new Set());
|
||
this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId);
|
||
}
|
||
|
||
/**
|
||
* Handle a `file-not-found` response - try the next available peer.
|
||
*/
|
||
handleFileNotFound(payload: any): void {
|
||
const { messageId, fileId } = payload;
|
||
|
||
if (!messageId || !fileId)
|
||
return;
|
||
|
||
const attachments = this.attachmentsByMessage.get(messageId) ?? [];
|
||
const attachment = attachments.find((entry) => entry.id === fileId);
|
||
const didSendRequest = this.sendFileRequestToNextPeer(messageId, fileId, attachment?.uploaderPeerId);
|
||
|
||
if (!didSendRequest && attachment) {
|
||
attachment.requestError = FILE_NOT_FOUND_REQUEST_ERROR;
|
||
this.touch();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Alias for {@link requestFromAnyPeer}.
|
||
* Convenience wrapper for image-specific call-sites.
|
||
*/
|
||
requestImageFromAnyPeer(messageId: string, attachment: Attachment): void {
|
||
this.requestFromAnyPeer(messageId, attachment);
|
||
}
|
||
|
||
/** Alias for {@link requestFromAnyPeer}. */
|
||
requestFile(messageId: string, attachment: Attachment): void {
|
||
this.requestFromAnyPeer(messageId, attachment);
|
||
}
|
||
|
||
/**
|
||
* Announce and optionally stream files attached to a newly sent
|
||
* message to all connected peers.
|
||
*
|
||
* 1. Each file is assigned a UUID.
|
||
* 2. A `file-announce` event is broadcast to peers.
|
||
* 3. Inline-preview media ≤ {@link MAX_AUTO_SAVE_SIZE_BYTES}
|
||
* are immediately streamed as chunked base-64.
|
||
*
|
||
* @param messageId - ID of the parent message.
|
||
* @param files - Array of user-selected `File` objects.
|
||
* @param uploaderPeerId - Peer ID of the uploader (used by receivers
|
||
* to prefer the original source when requesting content).
|
||
*/
|
||
async publishAttachments(
|
||
messageId: string,
|
||
files: File[],
|
||
uploaderPeerId?: string
|
||
): Promise<void> {
|
||
const attachments: Attachment[] = [];
|
||
|
||
for (const file of files) {
|
||
const fileId = uuidv4();
|
||
const attachment: Attachment = {
|
||
id: fileId,
|
||
messageId,
|
||
filename: file.name,
|
||
size: file.size,
|
||
mime: file.type || DEFAULT_MIME_TYPE,
|
||
isImage: file.type.startsWith('image/'),
|
||
uploaderPeerId,
|
||
filePath: (file as any)?.path,
|
||
available: false
|
||
};
|
||
|
||
attachments.push(attachment);
|
||
|
||
// Retain the original File so we can serve file-request later
|
||
this.originalFiles.set(`${messageId}:${fileId}`, file);
|
||
|
||
// Make the file immediately visible to the uploader
|
||
try {
|
||
attachment.objectUrl = URL.createObjectURL(file);
|
||
attachment.available = true;
|
||
} catch { /* non-critical */ }
|
||
|
||
// Auto-save small files to Electron disk cache
|
||
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
||
void this.saveFileToDisk(attachment, file);
|
||
}
|
||
|
||
// Broadcast metadata to peers
|
||
this.webrtc.broadcastMessage({
|
||
type: 'file-announce',
|
||
messageId,
|
||
file: {
|
||
id: fileId,
|
||
filename: attachment.filename,
|
||
size: attachment.size,
|
||
mime: attachment.mime,
|
||
isImage: attachment.isImage,
|
||
uploaderPeerId
|
||
}
|
||
} as any);
|
||
|
||
// Auto-stream small inline-preview media
|
||
if (this.isMedia(attachment) && attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
||
await this.streamFileToPeers(messageId, fileId, file);
|
||
}
|
||
}
|
||
|
||
const existingList = this.attachmentsByMessage.get(messageId) ?? [];
|
||
|
||
this.attachmentsByMessage.set(messageId, [...existingList, ...attachments]);
|
||
this.touch();
|
||
|
||
for (const attachment of attachments) {
|
||
void this.persistAttachmentMeta(attachment);
|
||
}
|
||
}
|
||
|
||
/** Handle a `file-announce` event from a peer. */
|
||
handleFileAnnounce(payload: any): void {
|
||
const { messageId, file } = payload;
|
||
|
||
if (!messageId || !file)
|
||
return;
|
||
|
||
const list = this.attachmentsByMessage.get(messageId) ?? [];
|
||
const alreadyKnown = list.find((entry) => entry.id === file.id);
|
||
|
||
if (alreadyKnown)
|
||
return;
|
||
|
||
const attachment: Attachment = {
|
||
id: file.id,
|
||
messageId,
|
||
filename: file.filename,
|
||
size: file.size,
|
||
mime: file.mime,
|
||
isImage: !!file.isImage,
|
||
uploaderPeerId: file.uploaderPeerId,
|
||
available: false,
|
||
receivedBytes: 0
|
||
};
|
||
|
||
list.push(attachment);
|
||
this.attachmentsByMessage.set(messageId, list);
|
||
this.touch();
|
||
void this.persistAttachmentMeta(attachment);
|
||
}
|
||
|
||
/**
|
||
* Handle an incoming `file-chunk` event.
|
||
*
|
||
* Chunks are collected in {@link chunkBuffers} until the total
|
||
* expected count is reached, at which point the buffers are
|
||
* assembled into a Blob and an object URL is created.
|
||
*/
|
||
handleFileChunk(payload: any): void {
|
||
const { messageId, fileId, fromPeerId, index, total, data } = payload;
|
||
|
||
if (
|
||
!messageId || !fileId ||
|
||
typeof index !== 'number' ||
|
||
typeof total !== 'number' ||
|
||
!data
|
||
)
|
||
return;
|
||
|
||
const list = this.attachmentsByMessage.get(messageId) ?? [];
|
||
const attachment = list.find((entry) => entry.id === fileId);
|
||
|
||
if (!attachment)
|
||
return;
|
||
|
||
const decodedBytes = this.base64ToUint8Array(data);
|
||
const assemblyKey = `${messageId}:${fileId}`;
|
||
const requestKey = this.buildRequestKey(messageId, fileId);
|
||
|
||
this.pendingRequests.delete(requestKey);
|
||
this.clearAttachmentRequestError(attachment);
|
||
|
||
// Initialise assembly buffer on first chunk
|
||
let chunkBuffer = this.chunkBuffers.get(assemblyKey);
|
||
|
||
if (!chunkBuffer) {
|
||
chunkBuffer = new Array(total);
|
||
this.chunkBuffers.set(assemblyKey, chunkBuffer);
|
||
this.chunkCounts.set(assemblyKey, 0);
|
||
}
|
||
|
||
// Store the chunk (idempotent: ignore duplicate indices)
|
||
if (!chunkBuffer[index]) {
|
||
chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer;
|
||
this.chunkCounts.set(assemblyKey, (this.chunkCounts.get(assemblyKey) ?? 0) + 1);
|
||
}
|
||
|
||
// Update progress stats
|
||
const now = Date.now();
|
||
const previousReceived = attachment.receivedBytes ?? 0;
|
||
|
||
attachment.receivedBytes = previousReceived + decodedBytes.byteLength;
|
||
|
||
if (fromPeerId)
|
||
recordDebugNetworkFileChunk(fromPeerId, decodedBytes.byteLength, now);
|
||
|
||
if (!attachment.startedAtMs)
|
||
attachment.startedAtMs = now;
|
||
|
||
if (!attachment.lastUpdateMs)
|
||
attachment.lastUpdateMs = now;
|
||
|
||
const elapsedMs = Math.max(1, now - attachment.lastUpdateMs);
|
||
const instantaneousBps = (decodedBytes.byteLength / elapsedMs) * 1000;
|
||
const previousSpeed = attachment.speedBps ?? instantaneousBps;
|
||
|
||
attachment.speedBps =
|
||
EWMA_PREVIOUS_WEIGHT * previousSpeed +
|
||
EWMA_CURRENT_WEIGHT * instantaneousBps;
|
||
|
||
attachment.lastUpdateMs = now;
|
||
|
||
this.touch(); // trigger UI update for progress bars
|
||
|
||
// Check if assembly is complete
|
||
const receivedChunkCount = this.chunkCounts.get(assemblyKey) ?? 0;
|
||
|
||
if (receivedChunkCount === total || (attachment.receivedBytes ?? 0) >= attachment.size) {
|
||
const completeBuffer = this.chunkBuffers.get(assemblyKey);
|
||
|
||
if (completeBuffer && completeBuffer.every((part) => part instanceof ArrayBuffer)) {
|
||
const blob = new Blob(completeBuffer, { type: attachment.mime });
|
||
|
||
attachment.available = true;
|
||
attachment.objectUrl = URL.createObjectURL(blob);
|
||
|
||
if (this.shouldPersistDownloadedAttachment(attachment)) {
|
||
void this.saveFileToDisk(attachment, blob);
|
||
}
|
||
|
||
// Clean up assembly state
|
||
this.chunkBuffers.delete(assemblyKey);
|
||
this.chunkCounts.delete(assemblyKey);
|
||
this.touch();
|
||
void this.persistAttachmentMeta(attachment);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Handle an incoming `file-request` from a peer by streaming the
|
||
* file content if available locally.
|
||
*
|
||
* Lookup order:
|
||
* 1. In-memory original (`originalFiles` map).
|
||
* 2. Electron `filePath` (uploader's original on disk).
|
||
* 3. Electron `savedPath` (disk-cache copy).
|
||
* 4. Electron disk-cache by room name (backward compat).
|
||
* 5. In-memory object-URL blob (browser fallback).
|
||
*
|
||
* If none of these sources has the file, a `file-not-found`
|
||
* message is sent so the requester can try another peer.
|
||
*/
|
||
async handleFileRequest(payload: any): Promise<void> {
|
||
const { messageId, fileId, fromPeerId } = payload;
|
||
|
||
if (!messageId || !fileId || !fromPeerId)
|
||
return;
|
||
|
||
// 1. In-memory original
|
||
const exactKey = `${messageId}:${fileId}`;
|
||
|
||
let originalFile = this.originalFiles.get(exactKey);
|
||
|
||
// 1b. Fallback: search by fileId suffix (handles rare messageId drift)
|
||
if (!originalFile) {
|
||
for (const [key, file] of this.originalFiles) {
|
||
if (key.endsWith(`:${fileId}`)) {
|
||
originalFile = file;
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
if (originalFile) {
|
||
await this.streamFileToPeer(fromPeerId, messageId, fileId, originalFile);
|
||
return;
|
||
}
|
||
|
||
const list = this.attachmentsByMessage.get(messageId) ?? [];
|
||
const attachment = list.find((entry) => entry.id === fileId);
|
||
const electronApi = (window as any)?.electronAPI;
|
||
|
||
// 2. Electron filePath
|
||
if (attachment?.filePath && electronApi?.fileExists && electronApi?.readFile) {
|
||
try {
|
||
if (await electronApi.fileExists(attachment.filePath)) {
|
||
await this.streamFileFromDiskToPeer(fromPeerId, messageId, fileId, attachment.filePath);
|
||
return;
|
||
}
|
||
} catch { /* fall through */ }
|
||
}
|
||
|
||
// 3. Electron savedPath
|
||
if (attachment?.savedPath && electronApi?.fileExists && electronApi?.readFile) {
|
||
try {
|
||
if (await electronApi.fileExists(attachment.savedPath)) {
|
||
await this.streamFileFromDiskToPeer(fromPeerId, messageId, fileId, attachment.savedPath);
|
||
return;
|
||
}
|
||
} catch { /* fall through */ }
|
||
}
|
||
|
||
// 3b. Disk cache by room name (backward compatibility)
|
||
if (attachment?.isImage && electronApi?.getAppDataPath && electronApi?.fileExists && electronApi?.readFile) {
|
||
try {
|
||
const appDataPath = await electronApi.getAppDataPath();
|
||
|
||
if (appDataPath) {
|
||
const roomName = await this.resolveCurrentRoomName();
|
||
const sanitisedRoom = roomName.replace(/[^\w.-]+/g, '_') || 'room';
|
||
const diskPath = `${appDataPath}/server/${sanitisedRoom}/image/${attachment.filename}`;
|
||
|
||
if (await electronApi.fileExists(diskPath)) {
|
||
await this.streamFileFromDiskToPeer(fromPeerId, messageId, fileId, diskPath);
|
||
return;
|
||
}
|
||
}
|
||
} catch { /* fall through */ }
|
||
}
|
||
|
||
// 4. In-memory blob
|
||
if (attachment?.available && attachment.objectUrl) {
|
||
try {
|
||
const response = await fetch(attachment.objectUrl);
|
||
const blob = await response.blob();
|
||
const file = new File([blob], attachment.filename, { type: attachment.mime });
|
||
|
||
await this.streamFileToPeer(fromPeerId, messageId, fileId, file);
|
||
return;
|
||
} catch { /* fall through */ }
|
||
}
|
||
|
||
// 5. File not available locally
|
||
this.webrtc.sendToPeer(fromPeerId, {
|
||
type: 'file-not-found',
|
||
messageId,
|
||
fileId
|
||
} as any);
|
||
}
|
||
|
||
/**
|
||
* Cancel an in-progress download from the requester side.
|
||
* Resets local assembly state and notifies the uploader to stop.
|
||
*/
|
||
cancelRequest(messageId: string, attachment: Attachment): void {
|
||
const targetPeerId = attachment.uploaderPeerId;
|
||
|
||
if (!targetPeerId)
|
||
return;
|
||
|
||
try {
|
||
// Reset assembly state
|
||
const assemblyKey = `${messageId}:${attachment.id}`;
|
||
|
||
this.chunkBuffers.delete(assemblyKey);
|
||
this.chunkCounts.delete(assemblyKey);
|
||
|
||
attachment.receivedBytes = 0;
|
||
attachment.speedBps = 0;
|
||
attachment.startedAtMs = undefined;
|
||
attachment.lastUpdateMs = undefined;
|
||
|
||
if (attachment.objectUrl) {
|
||
try { URL.revokeObjectURL(attachment.objectUrl); } catch { /* ignore */ }
|
||
|
||
attachment.objectUrl = undefined;
|
||
}
|
||
|
||
attachment.available = false;
|
||
this.touch();
|
||
|
||
// Notify uploader to stop streaming
|
||
this.webrtc.sendToPeer(targetPeerId, {
|
||
type: 'file-cancel',
|
||
messageId,
|
||
fileId: attachment.id
|
||
} as any);
|
||
} catch { /* best-effort */ }
|
||
}
|
||
|
||
/**
|
||
* Handle a `file-cancel` from the requester - record the
|
||
* cancellation so the streaming loop breaks early.
|
||
*/
|
||
handleFileCancel(payload: any): void {
|
||
const { messageId, fileId, fromPeerId } = payload;
|
||
|
||
if (!messageId || !fileId || !fromPeerId)
|
||
return;
|
||
|
||
this.cancelledTransfers.add(
|
||
this.buildTransferKey(messageId, fileId, fromPeerId)
|
||
);
|
||
}
|
||
|
||
/**
|
||
* Provide a `File` for a pending request (uploader side) and
|
||
* stream it to the requesting peer.
|
||
*/
|
||
async fulfillRequestWithFile(
|
||
messageId: string,
|
||
fileId: string,
|
||
targetPeerId: string,
|
||
file: File
|
||
): Promise<void> {
|
||
this.originalFiles.set(`${messageId}:${fileId}`, file);
|
||
await this.streamFileToPeer(targetPeerId, messageId, fileId, file);
|
||
}
|
||
|
||
/** Bump the reactive update counter so signal-based consumers re-render. */
|
||
private touch(): void {
|
||
this.updated.set(this.updated() + 1);
|
||
}
|
||
|
||
/** Composite key for transfer-cancellation tracking. */
|
||
private buildTransferKey(messageId: string, fileId: string, peerId: string): string {
|
||
return `${messageId}:${fileId}:${peerId}`;
|
||
}
|
||
|
||
/** Composite key for pending-request tracking. */
|
||
private buildRequestKey(messageId: string, fileId: string): string {
|
||
return `${messageId}:${fileId}`;
|
||
}
|
||
|
||
private clearMessageScopedState(messageId: string): void {
|
||
const scopedPrefix = `${messageId}:`;
|
||
|
||
for (const key of Array.from(this.originalFiles.keys())) {
|
||
if (key.startsWith(scopedPrefix)) {
|
||
this.originalFiles.delete(key);
|
||
}
|
||
}
|
||
|
||
for (const key of Array.from(this.pendingRequests.keys())) {
|
||
if (key.startsWith(scopedPrefix)) {
|
||
this.pendingRequests.delete(key);
|
||
}
|
||
}
|
||
|
||
for (const key of Array.from(this.chunkBuffers.keys())) {
|
||
if (key.startsWith(scopedPrefix)) {
|
||
this.chunkBuffers.delete(key);
|
||
}
|
||
}
|
||
|
||
for (const key of Array.from(this.chunkCounts.keys())) {
|
||
if (key.startsWith(scopedPrefix)) {
|
||
this.chunkCounts.delete(key);
|
||
}
|
||
}
|
||
|
||
for (const key of Array.from(this.cancelledTransfers)) {
|
||
if (key.startsWith(scopedPrefix)) {
|
||
this.cancelledTransfers.delete(key);
|
||
}
|
||
}
|
||
}
|
||
|
||
private async getRetainedSavedPathsForOtherMessages(messageId: string): Promise<Set<string>> {
|
||
const retainedSavedPaths = new Set<string>();
|
||
|
||
for (const [existingMessageId, attachments] of this.attachmentsByMessage) {
|
||
if (existingMessageId === messageId)
|
||
continue;
|
||
|
||
for (const attachment of attachments) {
|
||
if (attachment.savedPath) {
|
||
retainedSavedPaths.add(attachment.savedPath);
|
||
}
|
||
}
|
||
}
|
||
|
||
if (!this.database.isReady()) {
|
||
return retainedSavedPaths;
|
||
}
|
||
|
||
const persistedAttachments = await this.database.getAllAttachments();
|
||
|
||
for (const attachment of persistedAttachments) {
|
||
if (attachment.messageId !== messageId && attachment.savedPath) {
|
||
retainedSavedPaths.add(attachment.savedPath);
|
||
}
|
||
}
|
||
|
||
return retainedSavedPaths;
|
||
}
|
||
|
||
private async deleteSavedFile(filePath: string): Promise<void> {
|
||
const electronApi = (window as any)?.electronAPI;
|
||
|
||
if (!electronApi?.deleteFile)
|
||
return;
|
||
|
||
await electronApi.deleteFile(filePath);
|
||
}
|
||
|
||
/** Clear any user-facing request error stored on an attachment. */
|
||
private clearAttachmentRequestError(attachment: Attachment): boolean {
|
||
if (!attachment.requestError)
|
||
return false;
|
||
|
||
attachment.requestError = undefined;
|
||
return true;
|
||
}
|
||
|
||
/** Check whether a specific transfer has been cancelled. */
|
||
private isTransferCancelled(targetPeerId: string, messageId: string, fileId: string): boolean {
|
||
return this.cancelledTransfers.has(
|
||
this.buildTransferKey(messageId, fileId, targetPeerId)
|
||
);
|
||
}
|
||
|
||
/** Check whether a file is inline-previewable media. */
|
||
private isMedia(attachment: { mime: string }): boolean {
|
||
return attachment.mime.startsWith('image/') ||
|
||
attachment.mime.startsWith('video/') ||
|
||
attachment.mime.startsWith('audio/');
|
||
}
|
||
|
||
/** Check whether a completed download should be cached on disk. */
|
||
private shouldPersistDownloadedAttachment(attachment: Attachment): boolean {
|
||
return attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES ||
|
||
attachment.mime.startsWith('video/') ||
|
||
attachment.mime.startsWith('audio/');
|
||
}
|
||
|
||
/**
|
||
* Send a `file-request` to the best untried peer.
|
||
* @returns `true` if a request was dispatched.
|
||
*/
|
||
private sendFileRequestToNextPeer(
|
||
messageId: string,
|
||
fileId: string,
|
||
preferredPeerId?: string
|
||
): boolean {
|
||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||
const requestKey = this.buildRequestKey(messageId, fileId);
|
||
const triedPeers = this.pendingRequests.get(requestKey) ?? new Set<string>();
|
||
|
||
// Pick the best untried peer: preferred first, then any
|
||
let targetPeerId: string | undefined;
|
||
|
||
if (preferredPeerId && connectedPeers.includes(preferredPeerId) && !triedPeers.has(preferredPeerId)) {
|
||
targetPeerId = preferredPeerId;
|
||
} else {
|
||
targetPeerId = connectedPeers.find((peerId) => !triedPeers.has(peerId));
|
||
}
|
||
|
||
if (!targetPeerId) {
|
||
this.pendingRequests.delete(requestKey);
|
||
return false;
|
||
}
|
||
|
||
triedPeers.add(targetPeerId);
|
||
this.pendingRequests.set(requestKey, triedPeers);
|
||
|
||
this.webrtc.sendToPeer(targetPeerId, {
|
||
type: 'file-request',
|
||
messageId,
|
||
fileId
|
||
} as any);
|
||
|
||
return true;
|
||
}
|
||
|
||
/** Broadcast a file in base-64 chunks to all connected peers. */
|
||
private async streamFileToPeers(
|
||
messageId: string,
|
||
fileId: string,
|
||
file: File
|
||
): Promise<void> {
|
||
const totalChunks = Math.ceil(file.size / FILE_CHUNK_SIZE_BYTES);
|
||
|
||
let offset = 0;
|
||
let chunkIndex = 0;
|
||
|
||
while (offset < file.size) {
|
||
const slice = file.slice(offset, offset + FILE_CHUNK_SIZE_BYTES);
|
||
const arrayBuffer = await slice.arrayBuffer();
|
||
const base64 = this.arrayBufferToBase64(arrayBuffer);
|
||
|
||
this.webrtc.broadcastMessage({
|
||
type: 'file-chunk',
|
||
messageId,
|
||
fileId,
|
||
index: chunkIndex,
|
||
total: totalChunks,
|
||
data: base64
|
||
} as any);
|
||
|
||
offset += FILE_CHUNK_SIZE_BYTES;
|
||
chunkIndex++;
|
||
}
|
||
}
|
||
|
||
/** Stream a file in base-64 chunks to a single peer. */
|
||
private async streamFileToPeer(
|
||
targetPeerId: string,
|
||
messageId: string,
|
||
fileId: string,
|
||
file: File
|
||
): Promise<void> {
|
||
const totalChunks = Math.ceil(file.size / FILE_CHUNK_SIZE_BYTES);
|
||
|
||
let offset = 0;
|
||
let chunkIndex = 0;
|
||
|
||
while (offset < file.size) {
|
||
if (this.isTransferCancelled(targetPeerId, messageId, fileId))
|
||
break;
|
||
|
||
const slice = file.slice(offset, offset + FILE_CHUNK_SIZE_BYTES);
|
||
const arrayBuffer = await slice.arrayBuffer();
|
||
const base64 = this.arrayBufferToBase64(arrayBuffer);
|
||
|
||
await this.webrtc.sendToPeerBuffered(targetPeerId, {
|
||
type: 'file-chunk',
|
||
messageId,
|
||
fileId,
|
||
index: chunkIndex,
|
||
total: totalChunks,
|
||
data: base64
|
||
} as any);
|
||
|
||
offset += FILE_CHUNK_SIZE_BYTES;
|
||
chunkIndex++;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Read a file from Electron disk and stream it to a peer as
|
||
* base-64 chunks.
|
||
*/
|
||
private async streamFileFromDiskToPeer(
|
||
targetPeerId: string,
|
||
messageId: string,
|
||
fileId: string,
|
||
diskPath: string
|
||
): Promise<void> {
|
||
const electronApi = (window as any)?.electronAPI;
|
||
const base64Full = await electronApi.readFile(diskPath);
|
||
const fileBytes = this.base64ToUint8Array(base64Full);
|
||
const totalChunks = Math.ceil(fileBytes.byteLength / FILE_CHUNK_SIZE_BYTES);
|
||
|
||
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
|
||
if (this.isTransferCancelled(targetPeerId, messageId, fileId))
|
||
break;
|
||
|
||
const start = chunkIndex * FILE_CHUNK_SIZE_BYTES;
|
||
const end = Math.min(fileBytes.byteLength, start + FILE_CHUNK_SIZE_BYTES);
|
||
const slice = fileBytes.subarray(start, end);
|
||
const sliceBuffer = (slice.buffer as ArrayBuffer).slice(
|
||
slice.byteOffset,
|
||
slice.byteOffset + slice.byteLength
|
||
);
|
||
const base64Chunk = this.arrayBufferToBase64(sliceBuffer);
|
||
|
||
this.webrtc.sendToPeer(targetPeerId, {
|
||
type: 'file-chunk',
|
||
messageId,
|
||
fileId,
|
||
index: chunkIndex,
|
||
total: totalChunks,
|
||
data: base64Chunk
|
||
} as any);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Save a file to the Electron app-data directory, organised by
|
||
* room name and media type.
|
||
*/
|
||
private async saveFileToDisk(attachment: Attachment, blob: Blob): Promise<void> {
|
||
try {
|
||
const electronApi = (window as any)?.electronAPI;
|
||
const appDataPath: string | undefined = await electronApi?.getAppDataPath?.();
|
||
|
||
if (!appDataPath)
|
||
return;
|
||
|
||
const roomName = await this.resolveCurrentRoomName();
|
||
const sanitisedRoom = roomName.replace(/[^\w.-]+/g, '_') || 'room';
|
||
const subDirectory = attachment.mime.startsWith('video/')
|
||
? 'video'
|
||
: attachment.mime.startsWith('audio/')
|
||
? 'audio'
|
||
: attachment.mime.startsWith('image/')
|
||
? 'image'
|
||
: 'files';
|
||
const directoryPath = `${appDataPath}/server/${sanitisedRoom}/${subDirectory}`;
|
||
|
||
await electronApi.ensureDir(directoryPath);
|
||
|
||
const arrayBuffer = await blob.arrayBuffer();
|
||
const base64 = this.arrayBufferToBase64(arrayBuffer);
|
||
const diskPath = `${directoryPath}/${attachment.filename}`;
|
||
|
||
await electronApi.writeFile(diskPath, base64);
|
||
|
||
attachment.savedPath = diskPath;
|
||
void this.persistAttachmentMeta(attachment);
|
||
} catch { /* disk save is best-effort */ }
|
||
}
|
||
|
||
/** On startup, try loading previously saved files from disk (Electron). */
|
||
private async tryLoadSavedFiles(): Promise<void> {
|
||
const electronApi = (window as any)?.electronAPI;
|
||
|
||
if (!electronApi?.fileExists || !electronApi?.readFile)
|
||
return;
|
||
|
||
try {
|
||
let hasChanges = false;
|
||
|
||
for (const [, attachments] of this.attachmentsByMessage) {
|
||
for (const attachment of attachments) {
|
||
if (attachment.available)
|
||
continue;
|
||
|
||
// 1. Try savedPath (disk cache)
|
||
if (attachment.savedPath) {
|
||
try {
|
||
if (await electronApi.fileExists(attachment.savedPath)) {
|
||
this.restoreAttachmentFromDisk(attachment, await electronApi.readFile(attachment.savedPath));
|
||
hasChanges = true;
|
||
continue;
|
||
}
|
||
} catch { /* fall through */ }
|
||
}
|
||
|
||
// 2. Try filePath (uploader's original)
|
||
if (attachment.filePath) {
|
||
try {
|
||
if (await electronApi.fileExists(attachment.filePath)) {
|
||
this.restoreAttachmentFromDisk(attachment, await electronApi.readFile(attachment.filePath));
|
||
hasChanges = true;
|
||
|
||
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
||
const response = await fetch(attachment.objectUrl!);
|
||
|
||
void this.saveFileToDisk(attachment, await response.blob());
|
||
}
|
||
|
||
continue;
|
||
}
|
||
} catch { /* fall through */ }
|
||
}
|
||
}
|
||
}
|
||
|
||
if (hasChanges)
|
||
this.touch();
|
||
} catch { /* startup load is best-effort */ }
|
||
}
|
||
|
||
/**
|
||
* Helper: decode a base-64 string from disk, create blob + object URL,
|
||
* and populate the `originalFiles` map for serving file requests.
|
||
*/
|
||
private restoreAttachmentFromDisk(attachment: Attachment, base64: string): void {
|
||
const bytes = this.base64ToUint8Array(base64);
|
||
const blob = new Blob([bytes.buffer as ArrayBuffer], { type: attachment.mime });
|
||
|
||
attachment.objectUrl = URL.createObjectURL(blob);
|
||
attachment.available = true;
|
||
const file = new File([blob], attachment.filename, { type: attachment.mime });
|
||
|
||
this.originalFiles.set(`${attachment.messageId}:${attachment.id}`, file);
|
||
}
|
||
|
||
/** Save attachment metadata to the database (without file content). */
|
||
private async persistAttachmentMeta(attachment: Attachment): Promise<void> {
|
||
if (!this.database.isReady())
|
||
return;
|
||
|
||
try {
|
||
await this.database.saveAttachment({
|
||
id: attachment.id,
|
||
messageId: attachment.messageId,
|
||
filename: attachment.filename,
|
||
size: attachment.size,
|
||
mime: attachment.mime,
|
||
isImage: attachment.isImage,
|
||
uploaderPeerId: attachment.uploaderPeerId,
|
||
filePath: attachment.filePath,
|
||
savedPath: attachment.savedPath
|
||
});
|
||
} catch { /* persistence is best-effort */ }
|
||
}
|
||
|
||
/** Load all attachment metadata from the database. */
|
||
private async loadFromDatabase(): Promise<void> {
|
||
try {
|
||
const allRecords: AttachmentMeta[] = await this.database.getAllAttachments();
|
||
const grouped = new Map<string, Attachment[]>();
|
||
|
||
for (const record of allRecords) {
|
||
const attachment: Attachment = { ...record,
|
||
available: false };
|
||
const bucket = grouped.get(record.messageId) ?? [];
|
||
|
||
bucket.push(attachment);
|
||
grouped.set(record.messageId, bucket);
|
||
}
|
||
|
||
this.attachmentsByMessage = grouped;
|
||
this.touch();
|
||
} catch { /* load is best-effort */ }
|
||
}
|
||
|
||
/** One-time migration from localStorage to the database. */
|
||
private async migrateFromLocalStorage(): Promise<void> {
|
||
try {
|
||
const raw = localStorage.getItem(LEGACY_STORAGE_KEY);
|
||
|
||
if (!raw)
|
||
return;
|
||
|
||
const legacyRecords: AttachmentMeta[] = JSON.parse(raw);
|
||
|
||
for (const meta of legacyRecords) {
|
||
const existing = this.attachmentsByMessage.get(meta.messageId) ?? [];
|
||
|
||
if (!existing.find((entry) => entry.id === meta.id)) {
|
||
const attachment: Attachment = { ...meta,
|
||
available: false };
|
||
|
||
existing.push(attachment);
|
||
this.attachmentsByMessage.set(meta.messageId, existing);
|
||
void this.persistAttachmentMeta(attachment);
|
||
}
|
||
}
|
||
|
||
localStorage.removeItem(LEGACY_STORAGE_KEY);
|
||
this.touch();
|
||
} catch { /* migration is best-effort */ }
|
||
}
|
||
|
||
/** Full initialisation sequence: load DB → migrate → restore files. */
|
||
private async initFromDatabase(): Promise<void> {
|
||
await this.loadFromDatabase();
|
||
await this.migrateFromLocalStorage();
|
||
await this.tryLoadSavedFiles();
|
||
}
|
||
|
||
/** Resolve the display name of the current room via the NgRx store. */
|
||
private resolveCurrentRoomName(): Promise<string> {
|
||
return new Promise<string>((resolve) => {
|
||
this.ngrxStore
|
||
.select(selectCurrentRoomName)
|
||
.pipe(take(1))
|
||
.subscribe((name) => resolve(name || ''));
|
||
});
|
||
}
|
||
|
||
/** Convert an ArrayBuffer to a base-64 string. */
|
||
private arrayBufferToBase64(buffer: ArrayBuffer): string {
|
||
let binary = '';
|
||
|
||
const bytes = new Uint8Array(buffer);
|
||
|
||
for (let index = 0; index < bytes.byteLength; index++) {
|
||
binary += String.fromCharCode(bytes[index]);
|
||
}
|
||
|
||
return btoa(binary);
|
||
}
|
||
|
||
/** Convert a base-64 string to a Uint8Array. */
|
||
private base64ToUint8Array(base64: string): Uint8Array {
|
||
const binary = atob(base64);
|
||
const bytes = new Uint8Array(binary.length);
|
||
|
||
for (let index = 0; index < binary.length; index++) {
|
||
bytes[index] = binary.charCodeAt(index);
|
||
}
|
||
|
||
return bytes;
|
||
}
|
||
}
|