760 lines
23 KiB
TypeScript
760 lines
23 KiB
TypeScript
import { Injectable, inject } from '@angular/core';
|
|
import { recordDebugNetworkFileChunk } from '../../../../infrastructure/realtime/logging/debug-network-metrics';
|
|
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
|
import { AttachmentStorageService } from '../../infrastructure/services/attachment-storage.service';
|
|
import { MAX_AUTO_SAVE_SIZE_BYTES } from '../../domain/constants/attachment.constants';
|
|
import { shouldPersistDownloadedAttachment } from '../../domain/logic/attachment.logic';
|
|
import type { Attachment, AttachmentMeta } from '../../domain/models/attachment.model';
|
|
import {
|
|
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT,
|
|
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT,
|
|
DEFAULT_ATTACHMENT_MIME_TYPE,
|
|
FILE_NOT_FOUND_REQUEST_ERROR,
|
|
NO_CONNECTED_PEERS_REQUEST_ERROR
|
|
} from '../../domain/constants/attachment-transfer.constants';
|
|
import {
|
|
type FileAnnounceEvent,
|
|
type FileAnnouncePayload,
|
|
type FileCancelEvent,
|
|
type FileCancelPayload,
|
|
type FileChunkPayload,
|
|
type FileNotFoundEvent,
|
|
type FileNotFoundPayload,
|
|
type FileRequestEvent,
|
|
type FileRequestPayload,
|
|
type LocalFileWithPath
|
|
} from '../../domain/models/attachment-transfer.model';
|
|
import { AttachmentPersistenceService } from './attachment-persistence.service';
|
|
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
|
import { AttachmentTransferTransportService } from './attachment-transfer-transport.service';
|
|
|
|
interface DiskReceiveAssembly {
|
|
path: string;
|
|
receivedCount: number;
|
|
receivedIndexes: Set<number>;
|
|
total: number;
|
|
}
|
|
|
|
interface ValidFileChunkPayload {
|
|
data: string;
|
|
fileId: string;
|
|
fromPeerId?: string;
|
|
index: number;
|
|
messageId: string;
|
|
total: number;
|
|
}
|
|
|
|
@Injectable({ providedIn: 'root' })
|
|
export class AttachmentTransferService {
|
|
private readonly webrtc = inject(RealtimeSessionFacade);
|
|
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
|
private readonly attachmentStorage = inject(AttachmentStorageService);
|
|
private readonly persistence = inject(AttachmentPersistenceService);
|
|
private readonly transport = inject(AttachmentTransferTransportService);
|
|
|
|
private readonly diskReceiveAssemblies = new Map<string, DiskReceiveAssembly>();
|
|
private readonly diskReceiveChains = new Map<string, Promise<void>>();
|
|
|
|
getAttachmentMetasForMessages(messageIds: string[]): Record<string, AttachmentMeta[]> {
|
|
const result: Record<string, AttachmentMeta[]> = {};
|
|
|
|
for (const messageId of messageIds) {
|
|
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
|
|
if (attachments.length > 0) {
|
|
result[messageId] = attachments.map((attachment) => ({
|
|
id: attachment.id,
|
|
messageId: attachment.messageId,
|
|
filename: attachment.filename,
|
|
size: attachment.size,
|
|
mime: attachment.mime,
|
|
isImage: attachment.isImage,
|
|
uploaderPeerId: attachment.uploaderPeerId,
|
|
filePath: undefined,
|
|
savedPath: undefined
|
|
}));
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
registerSyncedAttachments(
|
|
attachmentMap: Record<string, AttachmentMeta[]>,
|
|
messageRoomIds?: Record<string, string>
|
|
): void {
|
|
if (messageRoomIds) {
|
|
for (const [messageId, roomId] of Object.entries(messageRoomIds)) {
|
|
this.runtimeStore.rememberMessageRoom(messageId, roomId);
|
|
}
|
|
}
|
|
|
|
const newAttachments: Attachment[] = [];
|
|
|
|
for (const [messageId, metas] of Object.entries(attachmentMap)) {
|
|
const existing = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
|
|
|
for (const meta of metas) {
|
|
const alreadyKnown = existing.find((entry) => entry.id === meta.id);
|
|
|
|
if (!alreadyKnown) {
|
|
const attachment: Attachment = { ...meta,
|
|
available: false,
|
|
receivedBytes: 0 };
|
|
|
|
existing.push(attachment);
|
|
newAttachments.push(attachment);
|
|
}
|
|
}
|
|
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, existing);
|
|
}
|
|
|
|
if (newAttachments.length > 0) {
|
|
this.runtimeStore.touch();
|
|
|
|
for (const attachment of newAttachments) {
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
}
|
|
}
|
|
|
|
requestFromAnyPeer(messageId: string, attachment: Attachment): void {
|
|
const clearedRequestError = this.clearAttachmentRequestError(attachment);
|
|
const connectedPeers = this.webrtc.getConnectedPeers();
|
|
|
|
if (connectedPeers.length === 0) {
|
|
attachment.requestError = NO_CONNECTED_PEERS_REQUEST_ERROR;
|
|
this.runtimeStore.touch();
|
|
console.warn('[Attachments] No connected peers to request file from');
|
|
return;
|
|
}
|
|
|
|
if (clearedRequestError)
|
|
this.runtimeStore.touch();
|
|
|
|
this.runtimeStore.setPendingRequestPeers(
|
|
this.buildRequestKey(messageId, attachment.id),
|
|
new Set<string>()
|
|
);
|
|
|
|
this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId);
|
|
}
|
|
|
|
handleFileNotFound(payload: FileNotFoundPayload): void {
|
|
const { messageId, fileId } = payload;
|
|
|
|
if (!messageId || !fileId)
|
|
return;
|
|
|
|
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = attachments.find((entry) => entry.id === fileId);
|
|
const didSendRequest = this.sendFileRequestToNextPeer(messageId, fileId, attachment?.uploaderPeerId);
|
|
|
|
if (!didSendRequest && attachment) {
|
|
attachment.requestError = FILE_NOT_FOUND_REQUEST_ERROR;
|
|
this.runtimeStore.touch();
|
|
}
|
|
}
|
|
|
|
requestImageFromAnyPeer(messageId: string, attachment: Attachment): void {
|
|
this.requestFromAnyPeer(messageId, attachment);
|
|
}
|
|
|
|
requestFile(messageId: string, attachment: Attachment): void {
|
|
this.requestFromAnyPeer(messageId, attachment);
|
|
}
|
|
|
|
hasPendingRequest(messageId: string, fileId: string): boolean {
|
|
return this.runtimeStore.hasPendingRequest(this.buildRequestKey(messageId, fileId));
|
|
}
|
|
|
|
async publishAttachments(
|
|
messageId: string,
|
|
files: File[],
|
|
uploaderPeerId?: string
|
|
): Promise<void> {
|
|
const attachments: Attachment[] = [];
|
|
|
|
for (const file of files) {
|
|
const fileId = crypto.randomUUID?.() ?? `${Date.now()}-${Math.random()}`;
|
|
const attachment: Attachment = {
|
|
id: fileId,
|
|
messageId,
|
|
filename: file.name,
|
|
size: file.size,
|
|
mime: file.type || DEFAULT_ATTACHMENT_MIME_TYPE,
|
|
isImage: file.type.startsWith('image/'),
|
|
uploaderPeerId,
|
|
filePath: (file as LocalFileWithPath).path,
|
|
available: false
|
|
};
|
|
|
|
attachments.push(attachment);
|
|
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
|
|
|
const fileUrl = attachment.filePath && this.isPlayableMedia(attachment)
|
|
? await this.attachmentStorage.getFileUrl(attachment.filePath)
|
|
: null;
|
|
|
|
if (fileUrl) {
|
|
attachment.objectUrl = fileUrl;
|
|
attachment.available = true;
|
|
} else {
|
|
try {
|
|
attachment.objectUrl = URL.createObjectURL(file);
|
|
attachment.available = true;
|
|
} catch { /* non-critical */ }
|
|
}
|
|
|
|
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
|
void this.persistence.saveFileToDisk(attachment, file);
|
|
}
|
|
|
|
const fileAnnounceEvent: FileAnnounceEvent = {
|
|
type: 'file-announce',
|
|
messageId,
|
|
file: {
|
|
id: fileId,
|
|
filename: attachment.filename,
|
|
size: attachment.size,
|
|
mime: attachment.mime,
|
|
isImage: attachment.isImage,
|
|
uploaderPeerId
|
|
}
|
|
};
|
|
|
|
this.webrtc.broadcastMessage(fileAnnounceEvent);
|
|
}
|
|
|
|
const existingList = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, [...existingList, ...attachments]);
|
|
this.runtimeStore.touch();
|
|
|
|
for (const attachment of attachments) {
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
}
|
|
|
|
handleFileAnnounce(payload: FileAnnouncePayload): void {
|
|
const { messageId, file } = payload;
|
|
|
|
if (!messageId || !file)
|
|
return;
|
|
|
|
const list = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
|
const alreadyKnown = list.find((entry) => entry.id === file.id);
|
|
|
|
if (alreadyKnown)
|
|
return;
|
|
|
|
const attachment: Attachment = {
|
|
id: file.id,
|
|
messageId,
|
|
filename: file.filename,
|
|
size: file.size,
|
|
mime: file.mime,
|
|
isImage: !!file.isImage,
|
|
uploaderPeerId: file.uploaderPeerId,
|
|
available: false,
|
|
receivedBytes: 0
|
|
};
|
|
|
|
list.push(attachment);
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, list);
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
|
|
handleFileChunk(payload: FileChunkPayload): void {
|
|
const { messageId, fileId, fromPeerId, index, total, data } = payload;
|
|
|
|
if (
|
|
!messageId || !fileId ||
|
|
typeof index !== 'number' ||
|
|
typeof total !== 'number' ||
|
|
typeof data !== 'string'
|
|
) {
|
|
return;
|
|
}
|
|
|
|
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = list.find((entry) => entry.id === fileId);
|
|
|
|
if (!attachment)
|
|
return;
|
|
|
|
if (this.shouldReceiveToDisk(attachment)) {
|
|
this.enqueueDiskFileChunk(attachment, {
|
|
data,
|
|
fileId,
|
|
fromPeerId,
|
|
index,
|
|
messageId,
|
|
total
|
|
});
|
|
|
|
return;
|
|
}
|
|
|
|
const decodedBytes = this.transport.decodeBase64(data);
|
|
const assemblyKey = `${messageId}:${fileId}`;
|
|
const requestKey = this.buildRequestKey(messageId, fileId);
|
|
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
this.clearAttachmentRequestError(attachment);
|
|
|
|
const chunkBuffer = this.getOrCreateChunkBuffer(assemblyKey, total);
|
|
|
|
if (!chunkBuffer[index]) {
|
|
chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer;
|
|
this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1);
|
|
}
|
|
|
|
this.updateTransferProgress(attachment, decodedBytes, fromPeerId);
|
|
|
|
this.runtimeStore.touch();
|
|
void this.finalizeTransferIfComplete(attachment, assemblyKey, total);
|
|
}
|
|
|
|
async handleFileRequest(payload: FileRequestPayload): Promise<void> {
|
|
const { messageId, fileId, fromPeerId } = payload;
|
|
|
|
if (!messageId || !fileId || !fromPeerId)
|
|
return;
|
|
|
|
const exactKey = `${messageId}:${fileId}`;
|
|
const originalFile = this.runtimeStore.getOriginalFile(exactKey)
|
|
?? this.runtimeStore.findOriginalFileByFileId(fileId);
|
|
|
|
if (originalFile) {
|
|
await this.transport.streamFileToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
originalFile,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = list.find((entry) => entry.id === fileId);
|
|
const diskPath = attachment
|
|
? await this.attachmentStorage.resolveExistingPath(attachment)
|
|
: null;
|
|
|
|
if (diskPath) {
|
|
await this.transport.streamFileFromDiskToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
diskPath,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
if (attachment?.isImage) {
|
|
const roomName = await this.persistence.resolveCurrentRoomName();
|
|
const legacyDiskPath = await this.attachmentStorage.resolveLegacyImagePath(
|
|
attachment.filename,
|
|
roomName
|
|
);
|
|
|
|
if (legacyDiskPath) {
|
|
await this.transport.streamFileFromDiskToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
legacyDiskPath,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (attachment?.available && attachment.objectUrl) {
|
|
try {
|
|
const response = await fetch(attachment.objectUrl);
|
|
const blob = await response.blob();
|
|
const file = new File([blob], attachment.filename, { type: attachment.mime });
|
|
|
|
await this.transport.streamFileToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
file,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
} catch { /* fall through */ }
|
|
}
|
|
|
|
const fileNotFoundEvent: FileNotFoundEvent = {
|
|
type: 'file-not-found',
|
|
messageId,
|
|
fileId
|
|
};
|
|
|
|
this.webrtc.sendToPeer(fromPeerId, fileNotFoundEvent);
|
|
}
|
|
|
|
cancelRequest(messageId: string, attachment: Attachment): void {
|
|
const targetPeerId = attachment.uploaderPeerId;
|
|
|
|
if (!targetPeerId)
|
|
return;
|
|
|
|
try {
|
|
const assemblyKey = `${messageId}:${attachment.id}`;
|
|
|
|
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
|
this.runtimeStore.deleteChunkCount(assemblyKey);
|
|
void this.deleteDiskReceiveAssembly(assemblyKey);
|
|
|
|
attachment.receivedBytes = 0;
|
|
attachment.speedBps = 0;
|
|
attachment.startedAtMs = undefined;
|
|
attachment.lastUpdateMs = undefined;
|
|
|
|
if (attachment.objectUrl) {
|
|
try {
|
|
URL.revokeObjectURL(attachment.objectUrl);
|
|
} catch { /* ignore */ }
|
|
|
|
attachment.objectUrl = undefined;
|
|
}
|
|
|
|
attachment.available = false;
|
|
this.runtimeStore.touch();
|
|
|
|
const fileCancelEvent: FileCancelEvent = {
|
|
type: 'file-cancel',
|
|
messageId,
|
|
fileId: attachment.id
|
|
};
|
|
|
|
this.webrtc.sendToPeer(targetPeerId, fileCancelEvent);
|
|
} catch { /* best-effort */ }
|
|
}
|
|
|
|
handleFileCancel(payload: FileCancelPayload): void {
|
|
const { messageId, fileId, fromPeerId } = payload;
|
|
|
|
if (!messageId || !fileId || !fromPeerId)
|
|
return;
|
|
|
|
this.runtimeStore.addCancelledTransfer(
|
|
this.buildTransferKey(messageId, fileId, fromPeerId)
|
|
);
|
|
}
|
|
|
|
async fulfillRequestWithFile(
|
|
messageId: string,
|
|
fileId: string,
|
|
targetPeerId: string,
|
|
file: File
|
|
): Promise<void> {
|
|
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
|
await this.transport.streamFileToPeer(
|
|
targetPeerId,
|
|
messageId,
|
|
fileId,
|
|
file,
|
|
() => this.isTransferCancelled(targetPeerId, messageId, fileId)
|
|
);
|
|
}
|
|
|
|
private buildTransferKey(messageId: string, fileId: string, peerId: string): string {
|
|
return `${messageId}:${fileId}:${peerId}`;
|
|
}
|
|
|
|
private buildRequestKey(messageId: string, fileId: string): string {
|
|
return `${messageId}:${fileId}`;
|
|
}
|
|
|
|
private clearAttachmentRequestError(attachment: Attachment): boolean {
|
|
if (!attachment.requestError)
|
|
return false;
|
|
|
|
attachment.requestError = undefined;
|
|
return true;
|
|
}
|
|
|
|
private isTransferCancelled(targetPeerId: string, messageId: string, fileId: string): boolean {
|
|
return this.runtimeStore.hasCancelledTransfer(
|
|
this.buildTransferKey(messageId, fileId, targetPeerId)
|
|
);
|
|
}
|
|
|
|
private sendFileRequestToNextPeer(
|
|
messageId: string,
|
|
fileId: string,
|
|
preferredPeerId?: string
|
|
): boolean {
|
|
const connectedPeers = this.webrtc.getConnectedPeers();
|
|
const requestKey = this.buildRequestKey(messageId, fileId);
|
|
const triedPeers = this.runtimeStore.getPendingRequestPeers(requestKey) ?? new Set<string>();
|
|
|
|
let targetPeerId: string | undefined;
|
|
|
|
if (preferredPeerId && connectedPeers.includes(preferredPeerId) && !triedPeers.has(preferredPeerId)) {
|
|
targetPeerId = preferredPeerId;
|
|
} else {
|
|
targetPeerId = connectedPeers.find((peerId) => !triedPeers.has(peerId));
|
|
}
|
|
|
|
if (!targetPeerId) {
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
return false;
|
|
}
|
|
|
|
triedPeers.add(targetPeerId);
|
|
this.runtimeStore.setPendingRequestPeers(requestKey, triedPeers);
|
|
|
|
const fileRequestEvent: FileRequestEvent = {
|
|
type: 'file-request',
|
|
messageId,
|
|
fileId
|
|
};
|
|
|
|
this.webrtc.sendToPeer(targetPeerId, fileRequestEvent);
|
|
|
|
return true;
|
|
}
|
|
|
|
private getOrCreateChunkBuffer(assemblyKey: string, total: number): ArrayBuffer[] {
|
|
const existingChunkBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
|
|
|
if (existingChunkBuffer) {
|
|
return existingChunkBuffer;
|
|
}
|
|
|
|
const createdChunkBuffer = new Array(total);
|
|
|
|
this.runtimeStore.setChunkBuffer(assemblyKey, createdChunkBuffer);
|
|
this.runtimeStore.setChunkCount(assemblyKey, 0);
|
|
|
|
return createdChunkBuffer;
|
|
}
|
|
|
|
private updateTransferProgress(
|
|
attachment: Attachment,
|
|
decodedBytes: Uint8Array,
|
|
fromPeerId?: string
|
|
): void {
|
|
const now = Date.now();
|
|
const previousReceived = attachment.receivedBytes ?? 0;
|
|
|
|
attachment.receivedBytes = previousReceived + decodedBytes.byteLength;
|
|
|
|
if (fromPeerId) {
|
|
recordDebugNetworkFileChunk(fromPeerId, decodedBytes.byteLength, now);
|
|
}
|
|
|
|
if (!attachment.startedAtMs)
|
|
attachment.startedAtMs = now;
|
|
|
|
if (!attachment.lastUpdateMs)
|
|
attachment.lastUpdateMs = now;
|
|
|
|
const elapsedMs = Math.max(1, now - attachment.lastUpdateMs);
|
|
const instantaneousBps = (decodedBytes.byteLength / elapsedMs) * 1000;
|
|
const previousSpeed = attachment.speedBps ?? instantaneousBps;
|
|
|
|
attachment.speedBps =
|
|
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT * previousSpeed +
|
|
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT * instantaneousBps;
|
|
|
|
attachment.lastUpdateMs = now;
|
|
}
|
|
|
|
private async finalizeTransferIfComplete(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
total: number
|
|
): Promise<void> {
|
|
const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0;
|
|
const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
|
|
|
if (
|
|
!completeBuffer
|
|
|| (receivedChunkCount !== total && (attachment.receivedBytes ?? 0) < attachment.size)
|
|
|| !completeBuffer.every((part) => part instanceof ArrayBuffer)
|
|
) {
|
|
return;
|
|
}
|
|
|
|
const blob = new Blob(completeBuffer, { type: attachment.mime });
|
|
|
|
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
|
this.runtimeStore.deleteChunkCount(assemblyKey);
|
|
|
|
if (shouldPersistDownloadedAttachment(attachment)) {
|
|
const diskPath = await this.persistence.saveFileToDisk(attachment, blob);
|
|
const fileUrl = diskPath && this.isPlayableMedia(attachment)
|
|
? await this.attachmentStorage.getFileUrl(diskPath)
|
|
: null;
|
|
|
|
if (fileUrl) {
|
|
attachment.objectUrl = fileUrl;
|
|
} else {
|
|
attachment.objectUrl = URL.createObjectURL(blob);
|
|
}
|
|
} else {
|
|
attachment.objectUrl = URL.createObjectURL(blob);
|
|
}
|
|
|
|
attachment.available = true;
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
|
|
private isPlayableMedia(attachment: Pick<Attachment, 'mime'>): boolean {
|
|
return attachment.mime.startsWith('video/') || attachment.mime.startsWith('audio/');
|
|
}
|
|
|
|
private shouldReceiveToDisk(attachment: Attachment): boolean {
|
|
return this.isPlayableMedia(attachment) && !attachment.filePath && this.attachmentStorage.canWriteFiles();
|
|
}
|
|
|
|
private enqueueDiskFileChunk(
|
|
attachment: Attachment,
|
|
payload: ValidFileChunkPayload
|
|
): void {
|
|
const assemblyKey = `${payload.messageId}:${payload.fileId}`;
|
|
const previous = this.diskReceiveChains.get(assemblyKey) ?? Promise.resolve();
|
|
const next = previous
|
|
.catch(() => undefined)
|
|
.then(() => this.handleDiskFileChunk(attachment, assemblyKey, payload))
|
|
.catch((error: unknown) => this.handleDiskReceiveFailure(attachment, assemblyKey, error));
|
|
|
|
this.diskReceiveChains.set(assemblyKey, next);
|
|
void next.finally(() => {
|
|
if (this.diskReceiveChains.get(assemblyKey) === next) {
|
|
this.diskReceiveChains.delete(assemblyKey);
|
|
}
|
|
});
|
|
}
|
|
|
|
private async handleDiskFileChunk(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
payload: ValidFileChunkPayload
|
|
): Promise<void> {
|
|
const decodedBytes = this.transport.decodeBase64(payload.data);
|
|
const requestKey = this.buildRequestKey(payload.messageId, payload.fileId);
|
|
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
this.clearAttachmentRequestError(attachment);
|
|
|
|
const assembly = await this.getOrCreateDiskReceiveAssembly(attachment, assemblyKey, payload.total);
|
|
|
|
if (!assembly) {
|
|
throw new Error('Could not prepare media download on disk.');
|
|
}
|
|
|
|
if (assembly.receivedIndexes.has(payload.index)) {
|
|
return;
|
|
}
|
|
|
|
if (payload.index !== assembly.receivedCount) {
|
|
throw new Error('Received media chunks out of order. Retry the download.');
|
|
}
|
|
|
|
const didAppend = await this.attachmentStorage.appendBase64(assembly.path, payload.data);
|
|
|
|
if (!didAppend) {
|
|
throw new Error('Could not write media download to disk.');
|
|
}
|
|
|
|
assembly.receivedIndexes.add(payload.index);
|
|
assembly.receivedCount += 1;
|
|
this.updateTransferProgress(attachment, decodedBytes, payload.fromPeerId);
|
|
this.runtimeStore.touch();
|
|
|
|
if (assembly.receivedCount < assembly.total && (attachment.receivedBytes ?? 0) < attachment.size) {
|
|
return;
|
|
}
|
|
|
|
const fileUrl = await this.attachmentStorage.getFileUrl(assembly.path);
|
|
|
|
if (!fileUrl) {
|
|
throw new Error('Could not open completed media download from disk.');
|
|
}
|
|
|
|
attachment.savedPath = assembly.path;
|
|
attachment.objectUrl = fileUrl;
|
|
attachment.available = true;
|
|
this.diskReceiveAssemblies.delete(assemblyKey);
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
|
|
private async getOrCreateDiskReceiveAssembly(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
total: number
|
|
): Promise<DiskReceiveAssembly | null> {
|
|
const existing = this.diskReceiveAssemblies.get(assemblyKey);
|
|
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
|
|
const storageContainer = await this.persistence.resolveStorageContainerName(attachment);
|
|
const path = await this.attachmentStorage.createWritableFile(attachment, storageContainer);
|
|
|
|
if (!path) {
|
|
return null;
|
|
}
|
|
|
|
const assembly: DiskReceiveAssembly = {
|
|
path,
|
|
receivedCount: 0,
|
|
receivedIndexes: new Set<number>(),
|
|
total
|
|
};
|
|
|
|
this.diskReceiveAssemblies.set(assemblyKey, assembly);
|
|
|
|
return assembly;
|
|
}
|
|
|
|
private async handleDiskReceiveFailure(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
error: unknown
|
|
): Promise<void> {
|
|
await this.deleteDiskReceiveAssembly(assemblyKey);
|
|
|
|
attachment.available = false;
|
|
attachment.objectUrl = undefined;
|
|
attachment.receivedBytes = 0;
|
|
attachment.speedBps = 0;
|
|
attachment.startedAtMs = undefined;
|
|
attachment.lastUpdateMs = undefined;
|
|
attachment.requestError = error instanceof Error && error.message
|
|
? error.message
|
|
: 'Media download failed. Retry the download.';
|
|
|
|
this.runtimeStore.touch();
|
|
}
|
|
|
|
private async deleteDiskReceiveAssembly(assemblyKey: string): Promise<void> {
|
|
const assembly = this.diskReceiveAssemblies.get(assemblyKey);
|
|
|
|
this.diskReceiveAssemblies.delete(assemblyKey);
|
|
|
|
if (assembly?.path) {
|
|
await this.attachmentStorage.deleteFile(assembly.path);
|
|
}
|
|
}
|
|
}
|