Some checks failed
Deploy Web Apps / deploy (push) Successful in 5m52s
Build Android APK / build-android-apk (push) Failing after 23m15s
Queue Release Build / prepare (push) Successful in 1m42s
Queue Release Build / build-linux (push) Failing after 9m33s
Queue Release Build / build-windows (push) Successful in 26m5s
Queue Release Build / finalize (push) Has been skipped
826 lines
25 KiB
TypeScript
826 lines
25 KiB
TypeScript
import { Injectable, inject } from '@angular/core';
|
|
import { take } from 'rxjs';
|
|
import { Store } from '@ngrx/store';
|
|
import { recordDebugNetworkFileChunk } from '../../../../infrastructure/realtime/logging/debug-network-metrics';
|
|
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
|
import { AppI18nService } from '../../../../core/i18n';
|
|
import { selectCurrentUserId } from '../../../../store/users/users.selectors';
|
|
import { AttachmentStorageService } from '../../infrastructure/services/attachment-storage.service';
|
|
import { MAX_AUTO_SAVE_SIZE_BYTES } from '../../domain/constants/attachment.constants';
|
|
import { isImageAttachment, resolvePublishAttachmentIsImage } from '../../domain/logic/attachment-image.rules';
|
|
import { shouldCopyUploaderMediaToAppData, shouldPersistDownloadedAttachment } from '../../domain/logic/attachment.logic';
|
|
import type { Attachment, AttachmentMeta } from '../../domain/models/attachment.model';
|
|
import {
|
|
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT,
|
|
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT,
|
|
DEFAULT_ATTACHMENT_MIME_TYPE,
|
|
ATTACHMENT_DOWNLOAD_FAILED_KEY,
|
|
ATTACHMENT_CHUNKS_OUT_OF_ORDER_KEY,
|
|
ATTACHMENT_OPEN_DOWNLOAD_FAILED_KEY,
|
|
ATTACHMENT_PREPARE_DOWNLOAD_FAILED_KEY,
|
|
ATTACHMENT_WRITE_DOWNLOAD_FAILED_KEY,
|
|
FILE_NOT_FOUND_REQUEST_ERROR_KEY,
|
|
NO_CONNECTED_PEERS_REQUEST_ERROR_KEY,
|
|
UPLOADER_LOCAL_FILE_MISSING_ERROR_KEY
|
|
} from '../../domain/constants/attachment-transfer.constants';
|
|
import {
|
|
type FileAnnounceEvent,
|
|
type FileAnnouncePayload,
|
|
type FileCancelEvent,
|
|
type FileCancelPayload,
|
|
type FileChunkPayload,
|
|
type FileNotFoundEvent,
|
|
type FileNotFoundPayload,
|
|
type FileRequestEvent,
|
|
type FileRequestPayload,
|
|
type LocalFileWithPath
|
|
} from '../../domain/models/attachment-transfer.model';
|
|
import { AttachmentPersistenceService } from './attachment-persistence.service';
|
|
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
|
import { AttachmentTransferTransportService } from './attachment-transfer-transport.service';
|
|
|
|
interface DiskReceiveAssembly {
|
|
path: string;
|
|
receivedCount: number;
|
|
receivedIndexes: Set<number>;
|
|
total: number;
|
|
}
|
|
|
|
interface ValidFileChunkPayload {
|
|
data: string;
|
|
fileId: string;
|
|
fromPeerId?: string;
|
|
index: number;
|
|
messageId: string;
|
|
total: number;
|
|
}
|
|
|
|
@Injectable({ providedIn: 'root' })
|
|
export class AttachmentTransferService {
|
|
private readonly ngrxStore = inject(Store);
|
|
private readonly webrtc = inject(RealtimeSessionFacade);
|
|
private readonly appI18n = inject(AppI18nService);
|
|
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
|
private readonly attachmentStorage = inject(AttachmentStorageService);
|
|
private readonly persistence = inject(AttachmentPersistenceService);
|
|
private readonly transport = inject(AttachmentTransferTransportService);
|
|
|
|
private readonly diskReceiveAssemblies = new Map<string, DiskReceiveAssembly>();
|
|
private readonly diskReceiveChains = new Map<string, Promise<void>>();
|
|
|
|
getAttachmentMetasForMessages(messageIds: string[]): Record<string, AttachmentMeta[]> {
|
|
const result: Record<string, AttachmentMeta[]> = {};
|
|
|
|
for (const messageId of messageIds) {
|
|
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
|
|
if (attachments.length > 0) {
|
|
result[messageId] = attachments.map((attachment) => ({
|
|
id: attachment.id,
|
|
messageId: attachment.messageId,
|
|
filename: attachment.filename,
|
|
size: attachment.size,
|
|
mime: attachment.mime,
|
|
isImage: attachment.isImage,
|
|
uploaderPeerId: attachment.uploaderPeerId,
|
|
filePath: undefined,
|
|
savedPath: undefined
|
|
}));
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async registerSyncedAttachments(
|
|
attachmentMap: Record<string, AttachmentMeta[]>,
|
|
messageRoomIds?: Record<string, string>
|
|
): Promise<void> {
|
|
await this.persistence.whenReady();
|
|
|
|
if (messageRoomIds) {
|
|
for (const [messageId, roomId] of Object.entries(messageRoomIds)) {
|
|
this.runtimeStore.rememberMessageRoom(messageId, roomId);
|
|
}
|
|
}
|
|
|
|
const newAttachments: Attachment[] = [];
|
|
|
|
for (const [messageId, metas] of Object.entries(attachmentMap)) {
|
|
const existing = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
|
|
|
for (const meta of metas) {
|
|
const alreadyKnown = existing.find((entry) => entry.id === meta.id);
|
|
|
|
if (!alreadyKnown) {
|
|
const attachment: Attachment = { ...meta,
|
|
available: false,
|
|
receivedBytes: 0 };
|
|
|
|
existing.push(attachment);
|
|
newAttachments.push(attachment);
|
|
}
|
|
}
|
|
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, existing);
|
|
}
|
|
|
|
if (newAttachments.length > 0) {
|
|
this.runtimeStore.touch();
|
|
|
|
for (const attachment of newAttachments) {
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
}
|
|
}
|
|
|
|
async requestFromAnyPeer(messageId: string, attachment: Attachment): Promise<void> {
|
|
const clearedRequestError = this.clearAttachmentRequestError(attachment);
|
|
|
|
if (!attachment.available) {
|
|
const restoredLocally = await this.persistence.tryRestoreAttachmentFromLocal(attachment);
|
|
|
|
if (restoredLocally) {
|
|
this.runtimeStore.touch();
|
|
return;
|
|
}
|
|
}
|
|
|
|
const connectedPeers = this.webrtc.getConnectedPeers();
|
|
const currentUserId = await this.resolveCurrentUserId();
|
|
const isUploader = !!attachment.uploaderPeerId &&
|
|
!!currentUserId &&
|
|
attachment.uploaderPeerId === currentUserId;
|
|
|
|
if (connectedPeers.length === 0) {
|
|
attachment.requestError = isUploader
|
|
? this.appI18n.instant(UPLOADER_LOCAL_FILE_MISSING_ERROR_KEY)
|
|
: this.appI18n.instant(NO_CONNECTED_PEERS_REQUEST_ERROR_KEY);
|
|
|
|
this.runtimeStore.touch();
|
|
console.warn('[Attachments] No connected peers to request file from');
|
|
return;
|
|
}
|
|
|
|
if (clearedRequestError)
|
|
this.runtimeStore.touch();
|
|
|
|
this.runtimeStore.setPendingRequestPeers(
|
|
this.buildRequestKey(messageId, attachment.id),
|
|
new Set<string>()
|
|
);
|
|
|
|
this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId);
|
|
}
|
|
|
|
handleFileNotFound(payload: FileNotFoundPayload): void {
|
|
const { messageId, fileId } = payload;
|
|
|
|
if (!messageId || !fileId)
|
|
return;
|
|
|
|
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = attachments.find((entry) => entry.id === fileId);
|
|
const didSendRequest = this.sendFileRequestToNextPeer(messageId, fileId, attachment?.uploaderPeerId);
|
|
|
|
if (!didSendRequest && attachment) {
|
|
attachment.requestError = this.appI18n.instant(FILE_NOT_FOUND_REQUEST_ERROR_KEY);
|
|
this.runtimeStore.touch();
|
|
}
|
|
}
|
|
|
|
requestImageFromAnyPeer(messageId: string, attachment: Attachment): Promise<void> {
|
|
return this.requestFromAnyPeer(messageId, attachment);
|
|
}
|
|
|
|
requestFile(messageId: string, attachment: Attachment): Promise<void> {
|
|
return this.requestFromAnyPeer(messageId, attachment);
|
|
}
|
|
|
|
hasPendingRequest(messageId: string, fileId: string): boolean {
|
|
return this.runtimeStore.hasPendingRequest(this.buildRequestKey(messageId, fileId));
|
|
}
|
|
|
|
async publishAttachments(
|
|
messageId: string,
|
|
files: File[],
|
|
uploaderPeerId?: string
|
|
): Promise<void> {
|
|
const attachments: Attachment[] = [];
|
|
|
|
for (const file of files) {
|
|
const fileId = crypto.randomUUID?.() ?? `${Date.now()}-${Math.random()}`;
|
|
const attachment: Attachment = {
|
|
id: fileId,
|
|
messageId,
|
|
filename: file.name,
|
|
size: file.size,
|
|
mime: file.type || DEFAULT_ATTACHMENT_MIME_TYPE,
|
|
isImage: resolvePublishAttachmentIsImage(file),
|
|
uploaderPeerId,
|
|
filePath: (file as LocalFileWithPath).path,
|
|
available: false
|
|
};
|
|
|
|
attachments.push(attachment);
|
|
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
|
|
|
const fileUrl = attachment.filePath && this.isPlayableMedia(attachment)
|
|
? await this.attachmentStorage.getFileUrl(attachment.filePath)
|
|
: null;
|
|
|
|
if (fileUrl) {
|
|
attachment.objectUrl = fileUrl;
|
|
attachment.available = true;
|
|
} else {
|
|
try {
|
|
attachment.objectUrl = URL.createObjectURL(file);
|
|
attachment.available = true;
|
|
} catch { /* non-critical */ }
|
|
}
|
|
|
|
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
|
void this.persistence.saveFileToDisk(attachment, file);
|
|
} else if (shouldCopyUploaderMediaToAppData(
|
|
attachment,
|
|
attachment.filePath,
|
|
this.attachmentStorage.canCopyFiles()
|
|
) && attachment.filePath) {
|
|
const savedPath = await this.persistence.persistUploadCopyFromSourcePath(attachment, attachment.filePath);
|
|
|
|
if (savedPath) {
|
|
const fileUrl = await this.attachmentStorage.getFileUrl(savedPath);
|
|
|
|
if (fileUrl) {
|
|
attachment.objectUrl = fileUrl;
|
|
attachment.available = true;
|
|
}
|
|
}
|
|
} else if (
|
|
this.isPlayableMedia(attachment) &&
|
|
attachment.size > MAX_AUTO_SAVE_SIZE_BYTES &&
|
|
this.attachmentStorage.canWriteFiles()
|
|
) {
|
|
const savedPath = await this.persistence.saveFileToDisk(attachment, file);
|
|
|
|
if (savedPath) {
|
|
const fileUrl = await this.attachmentStorage.getFileUrl(savedPath);
|
|
|
|
if (fileUrl) {
|
|
attachment.objectUrl = fileUrl;
|
|
attachment.available = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
const fileAnnounceEvent: FileAnnounceEvent = {
|
|
type: 'file-announce',
|
|
messageId,
|
|
file: {
|
|
id: fileId,
|
|
filename: attachment.filename,
|
|
size: attachment.size,
|
|
mime: attachment.mime,
|
|
isImage: attachment.isImage,
|
|
uploaderPeerId
|
|
}
|
|
};
|
|
|
|
this.webrtc.broadcastMessage(fileAnnounceEvent);
|
|
}
|
|
|
|
const existingList = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, [...existingList, ...attachments]);
|
|
this.runtimeStore.touch();
|
|
|
|
for (const attachment of attachments) {
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
}
|
|
|
|
handleFileAnnounce(payload: FileAnnouncePayload): void {
|
|
const { messageId, file } = payload;
|
|
|
|
if (!messageId || !file)
|
|
return;
|
|
|
|
const list = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
|
const alreadyKnown = list.find((entry) => entry.id === file.id);
|
|
|
|
if (alreadyKnown)
|
|
return;
|
|
|
|
const attachment: Attachment = {
|
|
id: file.id,
|
|
messageId,
|
|
filename: file.filename,
|
|
size: file.size,
|
|
mime: file.mime,
|
|
isImage: isImageAttachment({
|
|
filename: file.filename,
|
|
isImage: !!file.isImage,
|
|
mime: file.mime
|
|
}),
|
|
uploaderPeerId: file.uploaderPeerId,
|
|
available: false,
|
|
receivedBytes: 0
|
|
};
|
|
|
|
list.push(attachment);
|
|
this.runtimeStore.setAttachmentsForMessage(messageId, list);
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
|
|
handleFileChunk(payload: FileChunkPayload): void {
|
|
const { messageId, fileId, fromPeerId, index, total, data } = payload;
|
|
|
|
if (
|
|
!messageId || !fileId ||
|
|
typeof index !== 'number' ||
|
|
typeof total !== 'number' ||
|
|
typeof data !== 'string'
|
|
) {
|
|
return;
|
|
}
|
|
|
|
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = list.find((entry) => entry.id === fileId);
|
|
|
|
if (!attachment)
|
|
return;
|
|
|
|
if (this.shouldReceiveToDisk(attachment)) {
|
|
this.enqueueDiskFileChunk(attachment, {
|
|
data,
|
|
fileId,
|
|
fromPeerId,
|
|
index,
|
|
messageId,
|
|
total
|
|
});
|
|
|
|
return;
|
|
}
|
|
|
|
const decodedBytes = this.transport.decodeBase64(data);
|
|
const assemblyKey = `${messageId}:${fileId}`;
|
|
const requestKey = this.buildRequestKey(messageId, fileId);
|
|
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
this.clearAttachmentRequestError(attachment);
|
|
|
|
const chunkBuffer = this.getOrCreateChunkBuffer(assemblyKey, total);
|
|
|
|
if (!chunkBuffer[index]) {
|
|
chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer;
|
|
this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1);
|
|
}
|
|
|
|
this.updateTransferProgress(attachment, decodedBytes, fromPeerId);
|
|
|
|
this.runtimeStore.touch();
|
|
void this.finalizeTransferIfComplete(attachment, assemblyKey, total);
|
|
}
|
|
|
|
async handleFileRequest(payload: FileRequestPayload): Promise<void> {
|
|
const { messageId, fileId, fromPeerId } = payload;
|
|
|
|
if (!messageId || !fileId || !fromPeerId)
|
|
return;
|
|
|
|
const exactKey = `${messageId}:${fileId}`;
|
|
const originalFile = this.runtimeStore.getOriginalFile(exactKey)
|
|
?? this.runtimeStore.findOriginalFileByFileId(fileId);
|
|
|
|
if (originalFile) {
|
|
await this.transport.streamFileToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
originalFile,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
|
const attachment = list.find((entry) => entry.id === fileId);
|
|
const diskPath = attachment
|
|
? await this.attachmentStorage.resolveExistingPath(attachment)
|
|
: null;
|
|
|
|
if (diskPath) {
|
|
await this.transport.streamFileFromDiskToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
diskPath,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
if (attachment?.isImage) {
|
|
const roomName = await this.persistence.resolveCurrentRoomName();
|
|
const legacyDiskPath = await this.attachmentStorage.resolveLegacyImagePath(
|
|
attachment.filename,
|
|
roomName
|
|
);
|
|
|
|
if (legacyDiskPath) {
|
|
await this.transport.streamFileFromDiskToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
legacyDiskPath,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (attachment?.available && attachment.objectUrl) {
|
|
try {
|
|
const response = await fetch(attachment.objectUrl);
|
|
const blob = await response.blob();
|
|
const file = new File([blob], attachment.filename, { type: attachment.mime });
|
|
|
|
await this.transport.streamFileToPeer(
|
|
fromPeerId,
|
|
messageId,
|
|
fileId,
|
|
file,
|
|
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
|
);
|
|
|
|
return;
|
|
} catch { /* fall through */ }
|
|
}
|
|
|
|
const fileNotFoundEvent: FileNotFoundEvent = {
|
|
type: 'file-not-found',
|
|
messageId,
|
|
fileId
|
|
};
|
|
|
|
this.webrtc.sendToPeer(fromPeerId, fileNotFoundEvent);
|
|
}
|
|
|
|
cancelRequest(messageId: string, attachment: Attachment): void {
|
|
const targetPeerId = attachment.uploaderPeerId;
|
|
|
|
if (!targetPeerId)
|
|
return;
|
|
|
|
try {
|
|
const assemblyKey = `${messageId}:${attachment.id}`;
|
|
|
|
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
|
this.runtimeStore.deleteChunkCount(assemblyKey);
|
|
void this.deleteDiskReceiveAssembly(assemblyKey);
|
|
|
|
attachment.receivedBytes = 0;
|
|
attachment.speedBps = 0;
|
|
attachment.startedAtMs = undefined;
|
|
attachment.lastUpdateMs = undefined;
|
|
|
|
if (attachment.objectUrl) {
|
|
try {
|
|
URL.revokeObjectURL(attachment.objectUrl);
|
|
} catch { /* ignore */ }
|
|
|
|
attachment.objectUrl = undefined;
|
|
}
|
|
|
|
attachment.available = false;
|
|
this.runtimeStore.touch();
|
|
|
|
const fileCancelEvent: FileCancelEvent = {
|
|
type: 'file-cancel',
|
|
messageId,
|
|
fileId: attachment.id
|
|
};
|
|
|
|
this.webrtc.sendToPeer(targetPeerId, fileCancelEvent);
|
|
} catch { /* best-effort */ }
|
|
}
|
|
|
|
handleFileCancel(payload: FileCancelPayload): void {
|
|
const { messageId, fileId, fromPeerId } = payload;
|
|
|
|
if (!messageId || !fileId || !fromPeerId)
|
|
return;
|
|
|
|
this.runtimeStore.addCancelledTransfer(
|
|
this.buildTransferKey(messageId, fileId, fromPeerId)
|
|
);
|
|
}
|
|
|
|
async fulfillRequestWithFile(
|
|
messageId: string,
|
|
fileId: string,
|
|
targetPeerId: string,
|
|
file: File
|
|
): Promise<void> {
|
|
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
|
await this.transport.streamFileToPeer(
|
|
targetPeerId,
|
|
messageId,
|
|
fileId,
|
|
file,
|
|
() => this.isTransferCancelled(targetPeerId, messageId, fileId)
|
|
);
|
|
}
|
|
|
|
private resolveCurrentUserId(): Promise<string | null> {
|
|
return new Promise<string | null>((resolve) => {
|
|
this.ngrxStore
|
|
.select(selectCurrentUserId)
|
|
.pipe(take(1))
|
|
.subscribe((userId) => resolve(userId));
|
|
});
|
|
}
|
|
|
|
private buildTransferKey(messageId: string, fileId: string, peerId: string): string {
|
|
return `${messageId}:${fileId}:${peerId}`;
|
|
}
|
|
|
|
private buildRequestKey(messageId: string, fileId: string): string {
|
|
return `${messageId}:${fileId}`;
|
|
}
|
|
|
|
private clearAttachmentRequestError(attachment: Attachment): boolean {
|
|
if (!attachment.requestError)
|
|
return false;
|
|
|
|
attachment.requestError = undefined;
|
|
return true;
|
|
}
|
|
|
|
private isTransferCancelled(targetPeerId: string, messageId: string, fileId: string): boolean {
|
|
return this.runtimeStore.hasCancelledTransfer(
|
|
this.buildTransferKey(messageId, fileId, targetPeerId)
|
|
);
|
|
}
|
|
|
|
private sendFileRequestToNextPeer(
|
|
messageId: string,
|
|
fileId: string,
|
|
preferredPeerId?: string
|
|
): boolean {
|
|
const connectedPeers = this.webrtc.getConnectedPeers();
|
|
const requestKey = this.buildRequestKey(messageId, fileId);
|
|
const triedPeers = this.runtimeStore.getPendingRequestPeers(requestKey) ?? new Set<string>();
|
|
|
|
let targetPeerId: string | undefined;
|
|
|
|
if (preferredPeerId && connectedPeers.includes(preferredPeerId) && !triedPeers.has(preferredPeerId)) {
|
|
targetPeerId = preferredPeerId;
|
|
} else {
|
|
targetPeerId = connectedPeers.find((peerId) => !triedPeers.has(peerId));
|
|
}
|
|
|
|
if (!targetPeerId) {
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
return false;
|
|
}
|
|
|
|
triedPeers.add(targetPeerId);
|
|
this.runtimeStore.setPendingRequestPeers(requestKey, triedPeers);
|
|
|
|
const fileRequestEvent: FileRequestEvent = {
|
|
type: 'file-request',
|
|
messageId,
|
|
fileId
|
|
};
|
|
|
|
this.webrtc.sendToPeer(targetPeerId, fileRequestEvent);
|
|
|
|
return true;
|
|
}
|
|
|
|
private getOrCreateChunkBuffer(assemblyKey: string, total: number): ArrayBuffer[] {
|
|
const existingChunkBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
|
|
|
if (existingChunkBuffer) {
|
|
return existingChunkBuffer;
|
|
}
|
|
|
|
const createdChunkBuffer = new Array(total);
|
|
|
|
this.runtimeStore.setChunkBuffer(assemblyKey, createdChunkBuffer);
|
|
this.runtimeStore.setChunkCount(assemblyKey, 0);
|
|
|
|
return createdChunkBuffer;
|
|
}
|
|
|
|
private updateTransferProgress(
|
|
attachment: Attachment,
|
|
decodedBytes: Uint8Array,
|
|
fromPeerId?: string
|
|
): void {
|
|
const now = Date.now();
|
|
const previousReceived = attachment.receivedBytes ?? 0;
|
|
|
|
attachment.receivedBytes = previousReceived + decodedBytes.byteLength;
|
|
|
|
if (fromPeerId) {
|
|
recordDebugNetworkFileChunk(fromPeerId, decodedBytes.byteLength, now);
|
|
}
|
|
|
|
if (!attachment.startedAtMs)
|
|
attachment.startedAtMs = now;
|
|
|
|
if (!attachment.lastUpdateMs)
|
|
attachment.lastUpdateMs = now;
|
|
|
|
const elapsedMs = Math.max(1, now - attachment.lastUpdateMs);
|
|
const instantaneousBps = (decodedBytes.byteLength / elapsedMs) * 1000;
|
|
const previousSpeed = attachment.speedBps ?? instantaneousBps;
|
|
|
|
attachment.speedBps =
|
|
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT * previousSpeed +
|
|
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT * instantaneousBps;
|
|
|
|
attachment.lastUpdateMs = now;
|
|
}
|
|
|
|
private async finalizeTransferIfComplete(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
total: number
|
|
): Promise<void> {
|
|
const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0;
|
|
const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
|
|
|
if (
|
|
!completeBuffer
|
|
|| (receivedChunkCount !== total && (attachment.receivedBytes ?? 0) < attachment.size)
|
|
|| !completeBuffer.every((part) => part instanceof ArrayBuffer)
|
|
) {
|
|
return;
|
|
}
|
|
|
|
const blob = new Blob(completeBuffer, { type: attachment.mime });
|
|
|
|
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
|
this.runtimeStore.deleteChunkCount(assemblyKey);
|
|
|
|
if (shouldPersistDownloadedAttachment(attachment)) {
|
|
await this.persistence.saveFileToDisk(attachment, blob);
|
|
}
|
|
|
|
attachment.objectUrl = URL.createObjectURL(blob);
|
|
|
|
attachment.available = true;
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
|
|
private isPlayableMedia(attachment: Pick<Attachment, 'mime'>): boolean {
|
|
return attachment.mime.startsWith('video/') || attachment.mime.startsWith('audio/');
|
|
}
|
|
|
|
private shouldReceiveToDisk(attachment: Attachment): boolean {
|
|
return this.isPlayableMedia(attachment) && !attachment.filePath && this.attachmentStorage.canWriteFiles();
|
|
}
|
|
|
|
private enqueueDiskFileChunk(
|
|
attachment: Attachment,
|
|
payload: ValidFileChunkPayload
|
|
): void {
|
|
const assemblyKey = `${payload.messageId}:${payload.fileId}`;
|
|
const previous = this.diskReceiveChains.get(assemblyKey) ?? Promise.resolve();
|
|
const next = previous
|
|
.catch(() => undefined)
|
|
.then(() => this.handleDiskFileChunk(attachment, assemblyKey, payload))
|
|
.catch((error: unknown) => this.handleDiskReceiveFailure(attachment, assemblyKey, error));
|
|
|
|
this.diskReceiveChains.set(assemblyKey, next);
|
|
void next.finally(() => {
|
|
if (this.diskReceiveChains.get(assemblyKey) === next) {
|
|
this.diskReceiveChains.delete(assemblyKey);
|
|
}
|
|
});
|
|
}
|
|
|
|
private async handleDiskFileChunk(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
payload: ValidFileChunkPayload
|
|
): Promise<void> {
|
|
const decodedBytes = this.transport.decodeBase64(payload.data);
|
|
const requestKey = this.buildRequestKey(payload.messageId, payload.fileId);
|
|
|
|
this.runtimeStore.deletePendingRequest(requestKey);
|
|
this.clearAttachmentRequestError(attachment);
|
|
|
|
const assembly = await this.getOrCreateDiskReceiveAssembly(attachment, assemblyKey, payload.total);
|
|
|
|
if (!assembly) {
|
|
throw new Error(this.appI18n.instant(ATTACHMENT_PREPARE_DOWNLOAD_FAILED_KEY));
|
|
}
|
|
|
|
if (assembly.receivedIndexes.has(payload.index)) {
|
|
return;
|
|
}
|
|
|
|
if (payload.index !== assembly.receivedCount) {
|
|
throw new Error(this.appI18n.instant(ATTACHMENT_CHUNKS_OUT_OF_ORDER_KEY));
|
|
}
|
|
|
|
const didAppend = await this.attachmentStorage.appendBase64(assembly.path, payload.data);
|
|
|
|
if (!didAppend) {
|
|
throw new Error(this.appI18n.instant(ATTACHMENT_WRITE_DOWNLOAD_FAILED_KEY));
|
|
}
|
|
|
|
assembly.receivedIndexes.add(payload.index);
|
|
assembly.receivedCount += 1;
|
|
this.updateTransferProgress(attachment, decodedBytes, payload.fromPeerId);
|
|
this.runtimeStore.touch();
|
|
|
|
if (assembly.receivedCount < assembly.total && (attachment.receivedBytes ?? 0) < attachment.size) {
|
|
return;
|
|
}
|
|
|
|
attachment.savedPath = assembly.path;
|
|
|
|
const restoredForDisplay = await this.persistence.ensureInlineDisplayObjectUrl(attachment);
|
|
|
|
if (!restoredForDisplay) {
|
|
throw new Error(this.appI18n.instant(ATTACHMENT_OPEN_DOWNLOAD_FAILED_KEY));
|
|
}
|
|
|
|
attachment.available = true;
|
|
this.diskReceiveAssemblies.delete(assemblyKey);
|
|
this.runtimeStore.touch();
|
|
void this.persistence.persistAttachmentMeta(attachment);
|
|
}
|
|
|
|
private async getOrCreateDiskReceiveAssembly(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
total: number
|
|
): Promise<DiskReceiveAssembly | null> {
|
|
const existing = this.diskReceiveAssemblies.get(assemblyKey);
|
|
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
|
|
const storageContainer = await this.persistence.resolveStorageContainerName(attachment);
|
|
const path = await this.attachmentStorage.createWritableFile(attachment, storageContainer);
|
|
|
|
if (!path) {
|
|
return null;
|
|
}
|
|
|
|
const assembly: DiskReceiveAssembly = {
|
|
path,
|
|
receivedCount: 0,
|
|
receivedIndexes: new Set<number>(),
|
|
total
|
|
};
|
|
|
|
this.diskReceiveAssemblies.set(assemblyKey, assembly);
|
|
|
|
return assembly;
|
|
}
|
|
|
|
private async handleDiskReceiveFailure(
|
|
attachment: Attachment,
|
|
assemblyKey: string,
|
|
error: unknown
|
|
): Promise<void> {
|
|
await this.deleteDiskReceiveAssembly(assemblyKey);
|
|
|
|
attachment.available = false;
|
|
attachment.objectUrl = undefined;
|
|
attachment.receivedBytes = 0;
|
|
attachment.speedBps = 0;
|
|
attachment.startedAtMs = undefined;
|
|
attachment.lastUpdateMs = undefined;
|
|
attachment.requestError = error instanceof Error && error.message
|
|
? error.message
|
|
: this.appI18n.instant(ATTACHMENT_DOWNLOAD_FAILED_KEY);
|
|
|
|
this.runtimeStore.touch();
|
|
}
|
|
|
|
private async deleteDiskReceiveAssembly(assemblyKey: string): Promise<void> {
|
|
const assembly = this.diskReceiveAssemblies.get(assemblyKey);
|
|
|
|
this.diskReceiveAssemblies.delete(assemblyKey);
|
|
|
|
if (assembly?.path) {
|
|
await this.attachmentStorage.deleteFile(assembly.path);
|
|
}
|
|
}
|
|
}
|