567 lines
17 KiB
TypeScript
567 lines
17 KiB
TypeScript
import { Injectable, inject } from '@angular/core';
|
|
import { recordDebugNetworkFileChunk } from '../../../infrastructure/realtime/logging/debug-network-metrics';
|
|
import { RealtimeSessionFacade } from '../../../core/realtime';
|
|
import { AttachmentStorageService } from '../infrastructure/attachment-storage.service';
|
|
import { MAX_AUTO_SAVE_SIZE_BYTES } from '../domain/attachment.constants';
|
|
import { shouldPersistDownloadedAttachment } from '../domain/attachment.logic';
|
|
import type { Attachment, AttachmentMeta } from '../domain/attachment.models';
|
|
import {
|
|
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT,
|
|
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT,
|
|
DEFAULT_ATTACHMENT_MIME_TYPE,
|
|
FILE_NOT_FOUND_REQUEST_ERROR,
|
|
NO_CONNECTED_PEERS_REQUEST_ERROR
|
|
} from '../domain/attachment-transfer.constants';
|
|
import {
|
|
type FileAnnounceEvent,
|
|
type FileAnnouncePayload,
|
|
type FileCancelEvent,
|
|
type FileCancelPayload,
|
|
type FileChunkPayload,
|
|
type FileNotFoundEvent,
|
|
type FileNotFoundPayload,
|
|
type FileRequestEvent,
|
|
type FileRequestPayload,
|
|
type LocalFileWithPath
|
|
} from '../domain/attachment-transfer.models';
|
|
import { AttachmentPersistenceService } from './attachment-persistence.service';
|
|
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
|
import { AttachmentTransferTransportService } from './attachment-transfer-transport.service';
|
|
|
|
@Injectable({ providedIn: 'root' })
|
|
export class AttachmentTransferService {
|
|
private readonly webrtc = inject(RealtimeSessionFacade);
|
|
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
|
private readonly attachmentStorage = inject(AttachmentStorageService);
|
|
private readonly persistence = inject(AttachmentPersistenceService);
|
|
private readonly transport = inject(AttachmentTransferTransportService);
|
|
|
|
getAttachmentMetasForMessages(messageIds: string[]): Record<string, AttachmentMeta[]> {
|
|
const result: Record<string, AttachmentMeta[]> = {};
|
|
|
|
for (const messageId of messageIds) {
|
|
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
|
|
if (attachments.length > 0) {
|
|
result[messageId] = attachments.map((attachment) => ({
|
|
id: attachment.id,
|
|
messageId: attachment.messageId,
|
|
filename: attachment.filename,
|
|
size: attachment.size,
|
|
mime: attachment.mime,
|
|
isImage: attachment.isImage,
|
|
uploaderPeerId: attachment.uploaderPeerId,
|
|
filePath: undefined,
|
|
savedPath: undefined
|
|
}));
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
registerSyncedAttachments(
|
|
attachmentMap: Record<string, AttachmentMeta[]>,
|
|
messageRoomIds?: Record<string, string>
|
|
): void {
|
|
if (messageRoomIds) {
|
|
for (const [messageId, roomId] of Object.entries(messageRoomIds)) {
|
|
this.runtimeStore.rememberMessageRoom(messageId, roomId);
|
|
}
|
|
}
|
|
|
|
const newAttachments: Attachment[] = [];
|
|
|
|
for (const [messageId, metas] of Object.entries(attachmentMap)) {
|
|
const existing = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
|
|
|
for (const meta of metas) {
|
|
const alreadyKnown = existing.find((entry) => entry.id === meta.id);
|
|
|
|
if (!alreadyKnown) {
|
|
const attachment: Attachment = { ...meta,
|
|
available: false,
|
|
receivedBytes: 0 };
|
|
|
|
existing.push(attachment);
|
|
newAttachments.push(attachment);
|
|
}
|
|
}
|
|
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, existing);
|
|
}
|
|
|
|
if (newAttachments.length > 0) {
|
|
this.runtimeStore.touch();
|
|
|
|
for (const attachment of newAttachments) {
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
}
|
|
}
|
|
|
|
requestFromAnyPeer(messageId: string, attachment: Attachment): void {
|
|
const clearedRequestError = this.clearAttachmentRequestError(attachment);
|
|
const connectedPeers = this.webrtc.getConnectedPeers();
|
|
|
|
if (connectedPeers.length === 0) {
|
|
attachment.requestError = NO_CONNECTED_PEERS_REQUEST_ERROR;
|
|
this.runtimeStore.touch();
|
|
console.warn('[Attachments] No connected peers to request file from');
|
|
return;
|
|
}
|
|
|
|
if (clearedRequestError)
|
|
this.runtimeStore.touch();
|
|
|
|
this.runtimeStore.setPendingRequestPeers(
|
|
this.buildRequestKey(messageId, attachment.id),
|
|
new Set<string>()
|
|
);
|
|
|
|
this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId);
|
|
}
|
|
|
|
handleFileNotFound(payload: FileNotFoundPayload): void {
|
|
const { messageId, fileId } = payload;
|
|
|
|
if (!messageId || !fileId)
|
|
return;
|
|
|
|
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = attachments.find((entry) => entry.id === fileId);
|
|
const didSendRequest = this.sendFileRequestToNextPeer(messageId, fileId, attachment?.uploaderPeerId);
|
|
|
|
if (!didSendRequest && attachment) {
|
|
attachment.requestError = FILE_NOT_FOUND_REQUEST_ERROR;
|
|
this.runtimeStore.touch();
|
|
}
|
|
}
|
|
|
|
requestImageFromAnyPeer(messageId: string, attachment: Attachment): void {
|
|
this.requestFromAnyPeer(messageId, attachment);
|
|
}
|
|
|
|
requestFile(messageId: string, attachment: Attachment): void {
|
|
this.requestFromAnyPeer(messageId, attachment);
|
|
}
|
|
|
|
hasPendingRequest(messageId: string, fileId: string): boolean {
|
|
return this.runtimeStore.hasPendingRequest(this.buildRequestKey(messageId, fileId));
|
|
}
|
|
|
|
async publishAttachments(
|
|
messageId: string,
|
|
files: File[],
|
|
uploaderPeerId?: string
|
|
): Promise<void> {
|
|
const attachments: Attachment[] = [];
|
|
|
|
for (const file of files) {
|
|
const fileId = crypto.randomUUID?.() ?? `${Date.now()}-${Math.random()}`;
|
|
const attachment: Attachment = {
|
|
id: fileId,
|
|
messageId,
|
|
filename: file.name,
|
|
size: file.size,
|
|
mime: file.type || DEFAULT_ATTACHMENT_MIME_TYPE,
|
|
isImage: file.type.startsWith('image/'),
|
|
uploaderPeerId,
|
|
filePath: (file as LocalFileWithPath).path,
|
|
available: false
|
|
};
|
|
|
|
attachments.push(attachment);
|
|
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
|
|
|
try {
|
|
attachment.objectUrl = URL.createObjectURL(file);
|
|
attachment.available = true;
|
|
} catch { /* non-critical */ }
|
|
|
|
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
|
void this.persistence.saveFileToDisk(attachment, file);
|
|
}
|
|
|
|
const fileAnnounceEvent: FileAnnounceEvent = {
|
|
type: 'file-announce',
|
|
messageId,
|
|
file: {
|
|
id: fileId,
|
|
filename: attachment.filename,
|
|
size: attachment.size,
|
|
mime: attachment.mime,
|
|
isImage: attachment.isImage,
|
|
uploaderPeerId
|
|
}
|
|
};
|
|
|
|
this.webrtc.broadcastMessage(fileAnnounceEvent);
|
|
}
|
|
|
|
const existingList = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, [...existingList, ...attachments]);
|
|
this.runtimeStore.touch();
|
|
|
|
for (const attachment of attachments) {
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
}
|
|
|
|
handleFileAnnounce(payload: FileAnnouncePayload): void {
|
|
const { messageId, file } = payload;
|
|
|
|
if (!messageId || !file)
|
|
return;
|
|
|
|
const list = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
|
const alreadyKnown = list.find((entry) => entry.id === file.id);
|
|
|
|
if (alreadyKnown)
|
|
return;
|
|
|
|
const attachment: Attachment = {
|
|
id: file.id,
|
|
messageId,
|
|
filename: file.filename,
|
|
size: file.size,
|
|
mime: file.mime,
|
|
isImage: !!file.isImage,
|
|
uploaderPeerId: file.uploaderPeerId,
|
|
available: false,
|
|
receivedBytes: 0
|
|
};
|
|
|
|
list.push(attachment);
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, list);
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
|
|
handleFileChunk(payload: FileChunkPayload): void {
|
|
const { messageId, fileId, fromPeerId, index, total, data } = payload;
|
|
|
|
if (
|
|
!messageId || !fileId ||
|
|
typeof index !== 'number' ||
|
|
typeof total !== 'number' ||
|
|
typeof data !== 'string'
|
|
) {
|
|
return;
|
|
}
|
|
|
|
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = list.find((entry) => entry.id === fileId);
|
|
|
|
if (!attachment)
|
|
return;
|
|
|
|
const decodedBytes = this.transport.decodeBase64(data);
|
|
const assemblyKey = `${messageId}:${fileId}`;
|
|
const requestKey = this.buildRequestKey(messageId, fileId);
|
|
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
this.clearAttachmentRequestError(attachment);
|
|
|
|
const chunkBuffer = this.getOrCreateChunkBuffer(assemblyKey, total);
|
|
|
|
if (!chunkBuffer[index]) {
|
|
chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer;
|
|
this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1);
|
|
}
|
|
|
|
this.updateTransferProgress(attachment, decodedBytes, fromPeerId);
|
|
|
|
this.runtimeStore.touch();
|
|
this.finalizeTransferIfComplete(attachment, assemblyKey, total);
|
|
}
|
|
|
|
async handleFileRequest(payload: FileRequestPayload): Promise<void> {
|
|
const { messageId, fileId, fromPeerId } = payload;
|
|
|
|
if (!messageId || !fileId || !fromPeerId)
|
|
return;
|
|
|
|
const exactKey = `${messageId}:${fileId}`;
|
|
const originalFile = this.runtimeStore.getOriginalFile(exactKey)
|
|
?? this.runtimeStore.findOriginalFileByFileId(fileId);
|
|
|
|
if (originalFile) {
|
|
await this.transport.streamFileToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
originalFile,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = list.find((entry) => entry.id === fileId);
|
|
const diskPath = attachment
|
|
? await this.attachmentStorage.resolveExistingPath(attachment)
|
|
: null;
|
|
|
|
if (diskPath) {
|
|
await this.transport.streamFileFromDiskToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
diskPath,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
if (attachment?.isImage) {
|
|
const roomName = await this.persistence.resolveCurrentRoomName();
|
|
const legacyDiskPath = await this.attachmentStorage.resolveLegacyImagePath(
|
|
attachment.filename,
|
|
roomName
|
|
);
|
|
|
|
if (legacyDiskPath) {
|
|
await this.transport.streamFileFromDiskToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
legacyDiskPath,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (attachment?.available && attachment.objectUrl) {
|
|
try {
|
|
const response = await fetch(attachment.objectUrl);
|
|
const blob = await response.blob();
|
|
const file = new File([blob], attachment.filename, { type: attachment.mime });
|
|
|
|
await this.transport.streamFileToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
file,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
} catch { /* fall through */ }
|
|
}
|
|
|
|
const fileNotFoundEvent: FileNotFoundEvent = {
|
|
type: 'file-not-found',
|
|
messageId,
|
|
fileId
|
|
};
|
|
|
|
this.webrtc.sendToPeer(fromPeerId, fileNotFoundEvent);
|
|
}
|
|
|
|
cancelRequest(messageId: string, attachment: Attachment): void {
|
|
const targetPeerId = attachment.uploaderPeerId;
|
|
|
|
if (!targetPeerId)
|
|
return;
|
|
|
|
try {
|
|
const assemblyKey = `${messageId}:${attachment.id}`;
|
|
|
|
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
|
this.runtimeStore.deleteChunkCount(assemblyKey);
|
|
|
|
attachment.receivedBytes = 0;
|
|
attachment.speedBps = 0;
|
|
attachment.startedAtMs = undefined;
|
|
attachment.lastUpdateMs = undefined;
|
|
|
|
if (attachment.objectUrl) {
|
|
try {
|
|
URL.revokeObjectURL(attachment.objectUrl);
|
|
} catch { /* ignore */ }
|
|
|
|
attachment.objectUrl = undefined;
|
|
}
|
|
|
|
attachment.available = false;
|
|
this.runtimeStore.touch();
|
|
|
|
const fileCancelEvent: FileCancelEvent = {
|
|
type: 'file-cancel',
|
|
messageId,
|
|
fileId: attachment.id
|
|
};
|
|
|
|
this.webrtc.sendToPeer(targetPeerId, fileCancelEvent);
|
|
} catch { /* best-effort */ }
|
|
}
|
|
|
|
handleFileCancel(payload: FileCancelPayload): void {
|
|
const { messageId, fileId, fromPeerId } = payload;
|
|
|
|
if (!messageId || !fileId || !fromPeerId)
|
|
return;
|
|
|
|
this.runtimeStore.addCancelledTransfer(
|
|
this.buildTransferKey(messageId, fileId, fromPeerId)
|
|
);
|
|
}
|
|
|
|
async fulfillRequestWithFile(
|
|
messageId: string,
|
|
fileId: string,
|
|
targetPeerId: string,
|
|
file: File
|
|
): Promise<void> {
|
|
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
|
await this.transport.streamFileToPeer(
|
|
targetPeerId,
|
|
messageId,
|
|
fileId,
|
|
file,
|
|
() => this.isTransferCancelled(targetPeerId, messageId, fileId)
|
|
);
|
|
}
|
|
|
|
private buildTransferKey(messageId: string, fileId: string, peerId: string): string {
|
|
return `${messageId}:${fileId}:${peerId}`;
|
|
}
|
|
|
|
private buildRequestKey(messageId: string, fileId: string): string {
|
|
return `${messageId}:${fileId}`;
|
|
}
|
|
|
|
private clearAttachmentRequestError(attachment: Attachment): boolean {
|
|
if (!attachment.requestError)
|
|
return false;
|
|
|
|
attachment.requestError = undefined;
|
|
return true;
|
|
}
|
|
|
|
private isTransferCancelled(targetPeerId: string, messageId: string, fileId: string): boolean {
|
|
return this.runtimeStore.hasCancelledTransfer(
|
|
this.buildTransferKey(messageId, fileId, targetPeerId)
|
|
);
|
|
}
|
|
|
|
private sendFileRequestToNextPeer(
|
|
messageId: string,
|
|
fileId: string,
|
|
preferredPeerId?: string
|
|
): boolean {
|
|
const connectedPeers = this.webrtc.getConnectedPeers();
|
|
const requestKey = this.buildRequestKey(messageId, fileId);
|
|
const triedPeers = this.runtimeStore.getPendingRequestPeers(requestKey) ?? new Set<string>();
|
|
|
|
let targetPeerId: string | undefined;
|
|
|
|
if (preferredPeerId && connectedPeers.includes(preferredPeerId) && !triedPeers.has(preferredPeerId)) {
|
|
targetPeerId = preferredPeerId;
|
|
} else {
|
|
targetPeerId = connectedPeers.find((peerId) => !triedPeers.has(peerId));
|
|
}
|
|
|
|
if (!targetPeerId) {
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
return false;
|
|
}
|
|
|
|
triedPeers.add(targetPeerId);
|
|
this.runtimeStore.setPendingRequestPeers(requestKey, triedPeers);
|
|
|
|
const fileRequestEvent: FileRequestEvent = {
|
|
type: 'file-request',
|
|
messageId,
|
|
fileId
|
|
};
|
|
|
|
this.webrtc.sendToPeer(targetPeerId, fileRequestEvent);
|
|
|
|
return true;
|
|
}
|
|
|
|
private getOrCreateChunkBuffer(assemblyKey: string, total: number): ArrayBuffer[] {
|
|
const existingChunkBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
|
|
|
if (existingChunkBuffer) {
|
|
return existingChunkBuffer;
|
|
}
|
|
|
|
const createdChunkBuffer = new Array(total);
|
|
|
|
this.runtimeStore.setChunkBuffer(assemblyKey, createdChunkBuffer);
|
|
this.runtimeStore.setChunkCount(assemblyKey, 0);
|
|
|
|
return createdChunkBuffer;
|
|
}
|
|
|
|
private updateTransferProgress(
|
|
attachment: Attachment,
|
|
decodedBytes: Uint8Array,
|
|
fromPeerId?: string
|
|
): void {
|
|
const now = Date.now();
|
|
const previousReceived = attachment.receivedBytes ?? 0;
|
|
|
|
attachment.receivedBytes = previousReceived + decodedBytes.byteLength;
|
|
|
|
if (fromPeerId) {
|
|
recordDebugNetworkFileChunk(fromPeerId, decodedBytes.byteLength, now);
|
|
}
|
|
|
|
if (!attachment.startedAtMs)
|
|
attachment.startedAtMs = now;
|
|
|
|
if (!attachment.lastUpdateMs)
|
|
attachment.lastUpdateMs = now;
|
|
|
|
const elapsedMs = Math.max(1, now - attachment.lastUpdateMs);
|
|
const instantaneousBps = (decodedBytes.byteLength / elapsedMs) * 1000;
|
|
const previousSpeed = attachment.speedBps ?? instantaneousBps;
|
|
|
|
attachment.speedBps =
|
|
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT * previousSpeed +
|
|
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT * instantaneousBps;
|
|
|
|
attachment.lastUpdateMs = now;
|
|
}
|
|
|
|
private finalizeTransferIfComplete(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
total: number
|
|
): void {
|
|
const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0;
|
|
const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
|
|
|
if (
|
|
!completeBuffer
|
|
|| (receivedChunkCount !== total && (attachment.receivedBytes ?? 0) < attachment.size)
|
|
|| !completeBuffer.every((part) => part instanceof ArrayBuffer)
|
|
) {
|
|
return;
|
|
}
|
|
|
|
const blob = new Blob(completeBuffer, { type: attachment.mime });
|
|
|
|
attachment.available = true;
|
|
attachment.objectUrl = URL.createObjectURL(blob);
|
|
|
|
if (shouldPersistDownloadedAttachment(attachment)) {
|
|
void this.persistence.saveFileToDisk(attachment, blob);
|
|
}
|
|
|
|
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
|
this.runtimeStore.deleteChunkCount(assemblyKey);
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
}
|