Fix private calls
This commit is contained in:
@@ -70,17 +70,20 @@ export class AttachmentPersistenceService {
|
||||
} catch { /* persistence is best-effort */ }
|
||||
}
|
||||
|
||||
async saveFileToDisk(attachment: Attachment, blob: Blob): Promise<void> {
|
||||
async saveFileToDisk(attachment: Attachment, blob: Blob): Promise<string | null> {
|
||||
try {
|
||||
const roomName = await this.resolveCurrentRoomName();
|
||||
const diskPath = await this.attachmentStorage.saveBlob(attachment, blob, roomName);
|
||||
const storageContainer = await this.resolveStorageContainerName(attachment);
|
||||
const diskPath = await this.attachmentStorage.saveBlob(attachment, blob, storageContainer);
|
||||
|
||||
if (!diskPath)
|
||||
return;
|
||||
return null;
|
||||
|
||||
attachment.savedPath = diskPath;
|
||||
void this.persistAttachmentMeta(attachment);
|
||||
return diskPath;
|
||||
} catch { /* disk save is best-effort */ }
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async initFromDatabase(): Promise<void> {
|
||||
@@ -120,6 +123,10 @@ export class AttachmentPersistenceService {
|
||||
});
|
||||
}
|
||||
|
||||
async resolveStorageContainerName(attachment: Pick<Attachment, 'messageId'>): Promise<string> {
|
||||
return this.runtimeStore.getMessageRoomId(attachment.messageId) ?? await this.resolveCurrentRoomName();
|
||||
}
|
||||
|
||||
private async loadFromDatabase(): Promise<void> {
|
||||
try {
|
||||
const allRecords: AttachmentMeta[] = await this.database.getAllAttachments();
|
||||
@@ -176,6 +183,11 @@ export class AttachmentPersistenceService {
|
||||
continue;
|
||||
|
||||
if (attachment.savedPath) {
|
||||
if (await this.restoreMediaAttachmentFromFileUrl(attachment, attachment.savedPath)) {
|
||||
hasChanges = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
const savedBase64 = await this.attachmentStorage.readFile(attachment.savedPath);
|
||||
|
||||
if (savedBase64) {
|
||||
@@ -186,6 +198,11 @@ export class AttachmentPersistenceService {
|
||||
}
|
||||
|
||||
if (attachment.filePath) {
|
||||
if (await this.restoreMediaAttachmentFromFileUrl(attachment, attachment.filePath)) {
|
||||
hasChanges = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
const originalBase64 = await this.attachmentStorage.readFile(attachment.filePath);
|
||||
|
||||
if (originalBase64) {
|
||||
@@ -222,6 +239,26 @@ export class AttachmentPersistenceService {
|
||||
);
|
||||
}
|
||||
|
||||
private async restoreMediaAttachmentFromFileUrl(attachment: Attachment, filePath: string): Promise<boolean> {
|
||||
if (!this.isPlayableMedia(attachment)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const fileUrl = await this.attachmentStorage.getFileUrl(filePath);
|
||||
|
||||
if (!fileUrl) {
|
||||
return false;
|
||||
}
|
||||
|
||||
attachment.objectUrl = fileUrl;
|
||||
attachment.available = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
private isPlayableMedia(attachment: Pick<Attachment, 'mime'>): boolean {
|
||||
return attachment.mime.startsWith('video/') || attachment.mime.startsWith('audio/');
|
||||
}
|
||||
|
||||
private async getRetainedSavedPathsForOtherMessages(messageId: string): Promise<Set<string>> {
|
||||
const retainedSavedPaths = new Set<string>();
|
||||
|
||||
|
||||
@@ -49,6 +49,11 @@ export class AttachmentTransferTransportService {
|
||||
diskPath: string,
|
||||
isCancelled: () => boolean
|
||||
): Promise<void> {
|
||||
if (this.attachmentStorage.canReadFileChunks()) {
|
||||
await this.streamFileFromDiskChunksToPeer(targetPeerId, messageId, fileId, diskPath, isCancelled);
|
||||
return;
|
||||
}
|
||||
|
||||
const base64Full = await this.attachmentStorage.readFile(diskPath);
|
||||
|
||||
if (!base64Full)
|
||||
@@ -78,7 +83,45 @@ export class AttachmentTransferTransportService {
|
||||
data: base64Chunk
|
||||
};
|
||||
|
||||
this.webrtc.sendToPeer(targetPeerId, fileChunkEvent);
|
||||
await this.webrtc.sendToPeerBuffered(targetPeerId, fileChunkEvent);
|
||||
}
|
||||
}
|
||||
|
||||
private async streamFileFromDiskChunksToPeer(
|
||||
targetPeerId: string,
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
diskPath: string,
|
||||
isCancelled: () => boolean
|
||||
): Promise<void> {
|
||||
const fileSize = await this.attachmentStorage.getFileSize(diskPath);
|
||||
|
||||
if (fileSize === null)
|
||||
return;
|
||||
|
||||
const totalChunks = Math.ceil(fileSize / FILE_CHUNK_SIZE_BYTES);
|
||||
|
||||
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
|
||||
if (isCancelled())
|
||||
break;
|
||||
|
||||
const start = chunkIndex * FILE_CHUNK_SIZE_BYTES;
|
||||
const end = Math.min(fileSize, start + FILE_CHUNK_SIZE_BYTES);
|
||||
const base64Chunk = await this.attachmentStorage.readFileChunk(diskPath, start, end);
|
||||
|
||||
if (base64Chunk === null)
|
||||
return;
|
||||
|
||||
const fileChunkEvent: FileChunkEvent = {
|
||||
type: 'file-chunk',
|
||||
messageId,
|
||||
fileId,
|
||||
index: chunkIndex,
|
||||
total: totalChunks,
|
||||
data: base64Chunk
|
||||
};
|
||||
|
||||
await this.webrtc.sendToPeerBuffered(targetPeerId, fileChunkEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,22 @@ import { AttachmentPersistenceService } from './attachment-persistence.service';
|
||||
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
||||
import { AttachmentTransferTransportService } from './attachment-transfer-transport.service';
|
||||
|
||||
interface DiskReceiveAssembly {
|
||||
path: string;
|
||||
receivedCount: number;
|
||||
receivedIndexes: Set<number>;
|
||||
total: number;
|
||||
}
|
||||
|
||||
interface ValidFileChunkPayload {
|
||||
data: string;
|
||||
fileId: string;
|
||||
fromPeerId?: string;
|
||||
index: number;
|
||||
messageId: string;
|
||||
total: number;
|
||||
}
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentTransferService {
|
||||
private readonly webrtc = inject(RealtimeSessionFacade);
|
||||
@@ -36,6 +52,9 @@ export class AttachmentTransferService {
|
||||
private readonly persistence = inject(AttachmentPersistenceService);
|
||||
private readonly transport = inject(AttachmentTransferTransportService);
|
||||
|
||||
private readonly diskReceiveAssemblies = new Map<string, DiskReceiveAssembly>();
|
||||
private readonly diskReceiveChains = new Map<string, Promise<void>>();
|
||||
|
||||
getAttachmentMetasForMessages(messageIds: string[]): Record<string, AttachmentMeta[]> {
|
||||
const result: Record<string, AttachmentMeta[]> = {};
|
||||
|
||||
@@ -174,10 +193,19 @@ export class AttachmentTransferService {
|
||||
attachments.push(attachment);
|
||||
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
||||
|
||||
try {
|
||||
attachment.objectUrl = URL.createObjectURL(file);
|
||||
const fileUrl = attachment.filePath && this.isPlayableMedia(attachment)
|
||||
? await this.attachmentStorage.getFileUrl(attachment.filePath)
|
||||
: null;
|
||||
|
||||
if (fileUrl) {
|
||||
attachment.objectUrl = fileUrl;
|
||||
attachment.available = true;
|
||||
} catch { /* non-critical */ }
|
||||
} else {
|
||||
try {
|
||||
attachment.objectUrl = URL.createObjectURL(file);
|
||||
attachment.available = true;
|
||||
} catch { /* non-critical */ }
|
||||
}
|
||||
|
||||
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
||||
void this.persistence.saveFileToDisk(attachment, file);
|
||||
@@ -257,6 +285,19 @@ export class AttachmentTransferService {
|
||||
if (!attachment)
|
||||
return;
|
||||
|
||||
if (this.shouldReceiveToDisk(attachment)) {
|
||||
this.enqueueDiskFileChunk(attachment, {
|
||||
data,
|
||||
fileId,
|
||||
fromPeerId,
|
||||
index,
|
||||
messageId,
|
||||
total
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const decodedBytes = this.transport.decodeBase64(data);
|
||||
const assemblyKey = `${messageId}:${fileId}`;
|
||||
const requestKey = this.buildRequestKey(messageId, fileId);
|
||||
@@ -274,7 +315,7 @@ export class AttachmentTransferService {
|
||||
this.updateTransferProgress(attachment, decodedBytes, fromPeerId);
|
||||
|
||||
this.runtimeStore.touch();
|
||||
this.finalizeTransferIfComplete(attachment, assemblyKey, total);
|
||||
void this.finalizeTransferIfComplete(attachment, assemblyKey, total);
|
||||
}
|
||||
|
||||
async handleFileRequest(payload: FileRequestPayload): Promise<void> {
|
||||
@@ -375,6 +416,7 @@ export class AttachmentTransferService {
|
||||
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
void this.deleteDiskReceiveAssembly(assemblyKey);
|
||||
|
||||
attachment.receivedBytes = 0;
|
||||
attachment.speedBps = 0;
|
||||
@@ -533,11 +575,11 @@ export class AttachmentTransferService {
|
||||
attachment.lastUpdateMs = now;
|
||||
}
|
||||
|
||||
private finalizeTransferIfComplete(
|
||||
private async finalizeTransferIfComplete(
|
||||
attachment: Attachment,
|
||||
assemblyKey: string,
|
||||
total: number
|
||||
): void {
|
||||
): Promise<void> {
|
||||
const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0;
|
||||
const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
||||
|
||||
@@ -551,16 +593,167 @@ export class AttachmentTransferService {
|
||||
|
||||
const blob = new Blob(completeBuffer, { type: attachment.mime });
|
||||
|
||||
attachment.available = true;
|
||||
attachment.objectUrl = URL.createObjectURL(blob);
|
||||
|
||||
if (shouldPersistDownloadedAttachment(attachment)) {
|
||||
void this.persistence.saveFileToDisk(attachment, blob);
|
||||
}
|
||||
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
|
||||
if (shouldPersistDownloadedAttachment(attachment)) {
|
||||
const diskPath = await this.persistence.saveFileToDisk(attachment, blob);
|
||||
const fileUrl = diskPath && this.isPlayableMedia(attachment)
|
||||
? await this.attachmentStorage.getFileUrl(diskPath)
|
||||
: null;
|
||||
|
||||
if (fileUrl) {
|
||||
attachment.objectUrl = fileUrl;
|
||||
} else {
|
||||
attachment.objectUrl = URL.createObjectURL(blob);
|
||||
}
|
||||
} else {
|
||||
attachment.objectUrl = URL.createObjectURL(blob);
|
||||
}
|
||||
|
||||
attachment.available = true;
|
||||
this.runtimeStore.touch();
|
||||
void this.persistence.persistAttachmentMeta(attachment);
|
||||
}
|
||||
|
||||
private isPlayableMedia(attachment: Pick<Attachment, 'mime'>): boolean {
|
||||
return attachment.mime.startsWith('video/') || attachment.mime.startsWith('audio/');
|
||||
}
|
||||
|
||||
private shouldReceiveToDisk(attachment: Attachment): boolean {
|
||||
return this.isPlayableMedia(attachment) && !attachment.filePath && this.attachmentStorage.canWriteFiles();
|
||||
}
|
||||
|
||||
private enqueueDiskFileChunk(
|
||||
attachment: Attachment,
|
||||
payload: ValidFileChunkPayload
|
||||
): void {
|
||||
const assemblyKey = `${payload.messageId}:${payload.fileId}`;
|
||||
const previous = this.diskReceiveChains.get(assemblyKey) ?? Promise.resolve();
|
||||
const next = previous
|
||||
.catch(() => undefined)
|
||||
.then(() => this.handleDiskFileChunk(attachment, assemblyKey, payload))
|
||||
.catch((error: unknown) => this.handleDiskReceiveFailure(attachment, assemblyKey, error));
|
||||
|
||||
this.diskReceiveChains.set(assemblyKey, next);
|
||||
void next.finally(() => {
|
||||
if (this.diskReceiveChains.get(assemblyKey) === next) {
|
||||
this.diskReceiveChains.delete(assemblyKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async handleDiskFileChunk(
|
||||
attachment: Attachment,
|
||||
assemblyKey: string,
|
||||
payload: ValidFileChunkPayload
|
||||
): Promise<void> {
|
||||
const decodedBytes = this.transport.decodeBase64(payload.data);
|
||||
const requestKey = this.buildRequestKey(payload.messageId, payload.fileId);
|
||||
|
||||
this.runtimeStore.deletePendingRequest(requestKey);
|
||||
this.clearAttachmentRequestError(attachment);
|
||||
|
||||
const assembly = await this.getOrCreateDiskReceiveAssembly(attachment, assemblyKey, payload.total);
|
||||
|
||||
if (!assembly) {
|
||||
throw new Error('Could not prepare media download on disk.');
|
||||
}
|
||||
|
||||
if (assembly.receivedIndexes.has(payload.index)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (payload.index !== assembly.receivedCount) {
|
||||
throw new Error('Received media chunks out of order. Retry the download.');
|
||||
}
|
||||
|
||||
const didAppend = await this.attachmentStorage.appendBase64(assembly.path, payload.data);
|
||||
|
||||
if (!didAppend) {
|
||||
throw new Error('Could not write media download to disk.');
|
||||
}
|
||||
|
||||
assembly.receivedIndexes.add(payload.index);
|
||||
assembly.receivedCount += 1;
|
||||
this.updateTransferProgress(attachment, decodedBytes, payload.fromPeerId);
|
||||
this.runtimeStore.touch();
|
||||
|
||||
if (assembly.receivedCount < assembly.total && (attachment.receivedBytes ?? 0) < attachment.size) {
|
||||
return;
|
||||
}
|
||||
|
||||
const fileUrl = await this.attachmentStorage.getFileUrl(assembly.path);
|
||||
|
||||
if (!fileUrl) {
|
||||
throw new Error('Could not open completed media download from disk.');
|
||||
}
|
||||
|
||||
attachment.savedPath = assembly.path;
|
||||
attachment.objectUrl = fileUrl;
|
||||
attachment.available = true;
|
||||
this.diskReceiveAssemblies.delete(assemblyKey);
|
||||
this.runtimeStore.touch();
|
||||
void this.persistence.persistAttachmentMeta(attachment);
|
||||
}
|
||||
|
||||
private async getOrCreateDiskReceiveAssembly(
|
||||
attachment: Attachment,
|
||||
assemblyKey: string,
|
||||
total: number
|
||||
): Promise<DiskReceiveAssembly | null> {
|
||||
const existing = this.diskReceiveAssemblies.get(assemblyKey);
|
||||
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const storageContainer = await this.persistence.resolveStorageContainerName(attachment);
|
||||
const path = await this.attachmentStorage.createWritableFile(attachment, storageContainer);
|
||||
|
||||
if (!path) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const assembly: DiskReceiveAssembly = {
|
||||
path,
|
||||
receivedCount: 0,
|
||||
receivedIndexes: new Set<number>(),
|
||||
total
|
||||
};
|
||||
|
||||
this.diskReceiveAssemblies.set(assemblyKey, assembly);
|
||||
|
||||
return assembly;
|
||||
}
|
||||
|
||||
private async handleDiskReceiveFailure(
|
||||
attachment: Attachment,
|
||||
assemblyKey: string,
|
||||
error: unknown
|
||||
): Promise<void> {
|
||||
await this.deleteDiskReceiveAssembly(assemblyKey);
|
||||
|
||||
attachment.available = false;
|
||||
attachment.objectUrl = undefined;
|
||||
attachment.receivedBytes = 0;
|
||||
attachment.speedBps = 0;
|
||||
attachment.startedAtMs = undefined;
|
||||
attachment.lastUpdateMs = undefined;
|
||||
attachment.requestError = error instanceof Error && error.message
|
||||
? error.message
|
||||
: 'Media download failed. Retry the download.';
|
||||
|
||||
this.runtimeStore.touch();
|
||||
}
|
||||
|
||||
private async deleteDiskReceiveAssembly(assemblyKey: string): Promise<void> {
|
||||
const assembly = this.diskReceiveAssemblies.get(assemblyKey);
|
||||
|
||||
this.diskReceiveAssemblies.delete(assemblyKey);
|
||||
|
||||
if (assembly?.path) {
|
||||
await this.attachmentStorage.deleteFile(assembly.path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user