ddd test
This commit is contained in:
@@ -0,0 +1,224 @@
|
||||
import {
|
||||
Injectable,
|
||||
effect,
|
||||
inject
|
||||
} from '@angular/core';
|
||||
import { NavigationEnd, Router } from '@angular/router';
|
||||
import { RealtimeSessionFacade } from '../../../core/realtime';
|
||||
import { DatabaseService } from '../../../infrastructure/persistence';
|
||||
import { ROOM_URL_PATTERN } from '../../../core/constants';
|
||||
import { shouldAutoRequestWhenWatched } from '../domain/attachment.logic';
|
||||
import type { Attachment, AttachmentMeta } from '../domain/attachment.models';
|
||||
import type {
|
||||
FileAnnouncePayload,
|
||||
FileCancelPayload,
|
||||
FileChunkPayload,
|
||||
FileNotFoundPayload,
|
||||
FileRequestPayload
|
||||
} from '../domain/attachment-transfer.models';
|
||||
import { AttachmentPersistenceService } from './attachment-persistence.service';
|
||||
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
||||
import { AttachmentTransferService } from './attachment-transfer.service';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentManagerService {
|
||||
get updated() {
|
||||
return this.runtimeStore.updated;
|
||||
}
|
||||
|
||||
private readonly webrtc = inject(RealtimeSessionFacade);
|
||||
private readonly router = inject(Router);
|
||||
private readonly database = inject(DatabaseService);
|
||||
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
||||
private readonly persistence = inject(AttachmentPersistenceService);
|
||||
private readonly transfer = inject(AttachmentTransferService);
|
||||
|
||||
private watchedRoomId: string | null = this.extractWatchedRoomId(this.router.url);
|
||||
private isDatabaseInitialised = false;
|
||||
|
||||
constructor() {
|
||||
effect(() => {
|
||||
if (this.database.isReady() && !this.isDatabaseInitialised) {
|
||||
this.isDatabaseInitialised = true;
|
||||
void this.persistence.initFromDatabase();
|
||||
}
|
||||
});
|
||||
|
||||
this.router.events.subscribe((event) => {
|
||||
if (!(event instanceof NavigationEnd)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.watchedRoomId = this.extractWatchedRoomId(event.urlAfterRedirects || event.url);
|
||||
|
||||
if (this.watchedRoomId) {
|
||||
void this.requestAutoDownloadsForRoom(this.watchedRoomId);
|
||||
}
|
||||
});
|
||||
|
||||
this.webrtc.onPeerConnected.subscribe(() => {
|
||||
if (this.watchedRoomId) {
|
||||
void this.requestAutoDownloadsForRoom(this.watchedRoomId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
getForMessage(messageId: string): Attachment[] {
|
||||
return this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
}
|
||||
|
||||
rememberMessageRoom(messageId: string, roomId: string): void {
|
||||
if (!messageId || !roomId)
|
||||
return;
|
||||
|
||||
this.runtimeStore.rememberMessageRoom(messageId, roomId);
|
||||
}
|
||||
|
||||
queueAutoDownloadsForMessage(messageId: string, attachmentId?: string): void {
|
||||
void this.requestAutoDownloadsForMessage(messageId, attachmentId);
|
||||
}
|
||||
|
||||
async requestAutoDownloadsForRoom(roomId: string): Promise<void> {
|
||||
if (!roomId || !this.isRoomWatched(roomId))
|
||||
return;
|
||||
|
||||
if (this.database.isReady()) {
|
||||
const messages = await this.database.getMessages(roomId, 500, 0);
|
||||
|
||||
for (const message of messages) {
|
||||
this.runtimeStore.rememberMessageRoom(message.id, message.roomId);
|
||||
await this.requestAutoDownloadsForMessage(message.id);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
for (const [messageId] of this.runtimeStore.getAttachmentEntries()) {
|
||||
const attachmentRoomId = await this.persistence.resolveMessageRoomId(messageId);
|
||||
|
||||
if (attachmentRoomId === roomId) {
|
||||
await this.requestAutoDownloadsForMessage(messageId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async deleteForMessage(messageId: string): Promise<void> {
|
||||
await this.persistence.deleteForMessage(messageId);
|
||||
}
|
||||
|
||||
getAttachmentMetasForMessages(messageIds: string[]): Record<string, AttachmentMeta[]> {
|
||||
return this.transfer.getAttachmentMetasForMessages(messageIds);
|
||||
}
|
||||
|
||||
registerSyncedAttachments(
|
||||
attachmentMap: Record<string, AttachmentMeta[]>,
|
||||
messageRoomIds?: Record<string, string>
|
||||
): void {
|
||||
this.transfer.registerSyncedAttachments(attachmentMap, messageRoomIds);
|
||||
|
||||
for (const [messageId, attachments] of Object.entries(attachmentMap)) {
|
||||
for (const attachment of attachments) {
|
||||
this.queueAutoDownloadsForMessage(messageId, attachment.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
requestFromAnyPeer(messageId: string, attachment: Attachment): void {
|
||||
this.transfer.requestFromAnyPeer(messageId, attachment);
|
||||
}
|
||||
|
||||
handleFileNotFound(payload: FileNotFoundPayload): void {
|
||||
this.transfer.handleFileNotFound(payload);
|
||||
}
|
||||
|
||||
requestImageFromAnyPeer(messageId: string, attachment: Attachment): void {
|
||||
this.transfer.requestImageFromAnyPeer(messageId, attachment);
|
||||
}
|
||||
|
||||
requestFile(messageId: string, attachment: Attachment): void {
|
||||
this.transfer.requestFile(messageId, attachment);
|
||||
}
|
||||
|
||||
async publishAttachments(
|
||||
messageId: string,
|
||||
files: File[],
|
||||
uploaderPeerId?: string
|
||||
): Promise<void> {
|
||||
await this.transfer.publishAttachments(messageId, files, uploaderPeerId);
|
||||
}
|
||||
|
||||
handleFileAnnounce(payload: FileAnnouncePayload): void {
|
||||
this.transfer.handleFileAnnounce(payload);
|
||||
|
||||
if (payload.messageId && payload.file?.id) {
|
||||
this.queueAutoDownloadsForMessage(payload.messageId, payload.file.id);
|
||||
}
|
||||
}
|
||||
|
||||
handleFileChunk(payload: FileChunkPayload): void {
|
||||
this.transfer.handleFileChunk(payload);
|
||||
}
|
||||
|
||||
async handleFileRequest(payload: FileRequestPayload): Promise<void> {
|
||||
await this.transfer.handleFileRequest(payload);
|
||||
}
|
||||
|
||||
cancelRequest(messageId: string, attachment: Attachment): void {
|
||||
this.transfer.cancelRequest(messageId, attachment);
|
||||
}
|
||||
|
||||
handleFileCancel(payload: FileCancelPayload): void {
|
||||
this.transfer.handleFileCancel(payload);
|
||||
}
|
||||
|
||||
async fulfillRequestWithFile(
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
targetPeerId: string,
|
||||
file: File
|
||||
): Promise<void> {
|
||||
await this.transfer.fulfillRequestWithFile(messageId, fileId, targetPeerId, file);
|
||||
}
|
||||
|
||||
private async requestAutoDownloadsForMessage(messageId: string, attachmentId?: string): Promise<void> {
|
||||
if (!messageId)
|
||||
return;
|
||||
|
||||
const roomId = await this.persistence.resolveMessageRoomId(messageId);
|
||||
|
||||
if (!roomId || !this.isRoomWatched(roomId) || this.webrtc.getConnectedPeers().length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
|
||||
for (const attachment of attachments) {
|
||||
if (attachmentId && attachment.id !== attachmentId)
|
||||
continue;
|
||||
|
||||
if (!shouldAutoRequestWhenWatched(attachment))
|
||||
continue;
|
||||
|
||||
if (attachment.available)
|
||||
continue;
|
||||
|
||||
if ((attachment.receivedBytes ?? 0) > 0)
|
||||
continue;
|
||||
|
||||
if (this.transfer.hasPendingRequest(messageId, attachment.id))
|
||||
continue;
|
||||
|
||||
this.transfer.requestFromAnyPeer(messageId, attachment);
|
||||
}
|
||||
}
|
||||
|
||||
private extractWatchedRoomId(url: string): string | null {
|
||||
const roomMatch = url.match(ROOM_URL_PATTERN);
|
||||
|
||||
return roomMatch ? roomMatch[1] : null;
|
||||
}
|
||||
|
||||
private isRoomWatched(roomId: string | null | undefined): boolean {
|
||||
return !!roomId && roomId === this.watchedRoomId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,264 @@
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { take } from 'rxjs';
|
||||
import { Store } from '@ngrx/store';
|
||||
import { selectCurrentRoomName } from '../../../store/rooms/rooms.selectors';
|
||||
import { DatabaseService } from '../../../infrastructure/persistence';
|
||||
import { AttachmentStorageService } from '../infrastructure/attachment-storage.service';
|
||||
import type { Attachment, AttachmentMeta } from '../domain/attachment.models';
|
||||
import { MAX_AUTO_SAVE_SIZE_BYTES } from '../domain/attachment.constants';
|
||||
import { LEGACY_ATTACHMENTS_STORAGE_KEY } from '../domain/attachment-transfer.constants';
|
||||
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentPersistenceService {
|
||||
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
||||
private readonly ngrxStore = inject(Store);
|
||||
private readonly attachmentStorage = inject(AttachmentStorageService);
|
||||
private readonly database = inject(DatabaseService);
|
||||
|
||||
async deleteForMessage(messageId: string): Promise<void> {
|
||||
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
const hadCachedAttachments = attachments.length > 0 || this.runtimeStore.hasAttachmentsForMessage(messageId);
|
||||
const retainedSavedPaths = await this.getRetainedSavedPathsForOtherMessages(messageId);
|
||||
const savedPathsToDelete = new Set<string>();
|
||||
|
||||
for (const attachment of attachments) {
|
||||
if (attachment.objectUrl) {
|
||||
try {
|
||||
URL.revokeObjectURL(attachment.objectUrl);
|
||||
} catch { /* ignore */ }
|
||||
}
|
||||
|
||||
if (attachment.savedPath && !retainedSavedPaths.has(attachment.savedPath)) {
|
||||
savedPathsToDelete.add(attachment.savedPath);
|
||||
}
|
||||
}
|
||||
|
||||
this.runtimeStore.deleteAttachmentsForMessage(messageId);
|
||||
this.runtimeStore.deleteMessageRoom(messageId);
|
||||
this.runtimeStore.clearMessageScopedState(messageId);
|
||||
|
||||
if (hadCachedAttachments) {
|
||||
this.runtimeStore.touch();
|
||||
}
|
||||
|
||||
if (this.database.isReady()) {
|
||||
await this.database.deleteAttachmentsForMessage(messageId);
|
||||
}
|
||||
|
||||
for (const diskPath of savedPathsToDelete) {
|
||||
await this.attachmentStorage.deleteFile(diskPath);
|
||||
}
|
||||
}
|
||||
|
||||
async persistAttachmentMeta(attachment: Attachment): Promise<void> {
|
||||
if (!this.database.isReady())
|
||||
return;
|
||||
|
||||
try {
|
||||
await this.database.saveAttachment({
|
||||
id: attachment.id,
|
||||
messageId: attachment.messageId,
|
||||
filename: attachment.filename,
|
||||
size: attachment.size,
|
||||
mime: attachment.mime,
|
||||
isImage: attachment.isImage,
|
||||
uploaderPeerId: attachment.uploaderPeerId,
|
||||
filePath: attachment.filePath,
|
||||
savedPath: attachment.savedPath
|
||||
});
|
||||
} catch { /* persistence is best-effort */ }
|
||||
}
|
||||
|
||||
async saveFileToDisk(attachment: Attachment, blob: Blob): Promise<void> {
|
||||
try {
|
||||
const roomName = await this.resolveCurrentRoomName();
|
||||
const diskPath = await this.attachmentStorage.saveBlob(attachment, blob, roomName);
|
||||
|
||||
if (!diskPath)
|
||||
return;
|
||||
|
||||
attachment.savedPath = diskPath;
|
||||
void this.persistAttachmentMeta(attachment);
|
||||
} catch { /* disk save is best-effort */ }
|
||||
}
|
||||
|
||||
async initFromDatabase(): Promise<void> {
|
||||
await this.loadFromDatabase();
|
||||
await this.migrateFromLocalStorage();
|
||||
await this.tryLoadSavedFiles();
|
||||
}
|
||||
|
||||
async resolveMessageRoomId(messageId: string): Promise<string | null> {
|
||||
const cachedRoomId = this.runtimeStore.getMessageRoomId(messageId);
|
||||
|
||||
if (cachedRoomId)
|
||||
return cachedRoomId;
|
||||
|
||||
if (!this.database.isReady())
|
||||
return null;
|
||||
|
||||
try {
|
||||
const message = await this.database.getMessageById(messageId);
|
||||
|
||||
if (!message?.roomId)
|
||||
return null;
|
||||
|
||||
this.runtimeStore.rememberMessageRoom(messageId, message.roomId);
|
||||
return message.roomId;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async resolveCurrentRoomName(): Promise<string> {
|
||||
return new Promise<string>((resolve) => {
|
||||
this.ngrxStore
|
||||
.select(selectCurrentRoomName)
|
||||
.pipe(take(1))
|
||||
.subscribe((name) => resolve(name || ''));
|
||||
});
|
||||
}
|
||||
|
||||
private async loadFromDatabase(): Promise<void> {
|
||||
try {
|
||||
const allRecords: AttachmentMeta[] = await this.database.getAllAttachments();
|
||||
const grouped = new Map<string, Attachment[]>();
|
||||
|
||||
for (const record of allRecords) {
|
||||
const attachment: Attachment = { ...record,
|
||||
available: false };
|
||||
const bucket = grouped.get(record.messageId) ?? [];
|
||||
|
||||
bucket.push(attachment);
|
||||
grouped.set(record.messageId, bucket);
|
||||
}
|
||||
|
||||
this.runtimeStore.replaceAttachments(grouped);
|
||||
this.runtimeStore.touch();
|
||||
} catch { /* load is best-effort */ }
|
||||
}
|
||||
|
||||
private async migrateFromLocalStorage(): Promise<void> {
|
||||
try {
|
||||
const raw = localStorage.getItem(LEGACY_ATTACHMENTS_STORAGE_KEY);
|
||||
|
||||
if (!raw)
|
||||
return;
|
||||
|
||||
const legacyRecords: AttachmentMeta[] = JSON.parse(raw);
|
||||
|
||||
for (const meta of legacyRecords) {
|
||||
const existing = [...this.runtimeStore.getAttachmentsForMessage(meta.messageId)];
|
||||
|
||||
if (!existing.find((entry) => entry.id === meta.id)) {
|
||||
const attachment: Attachment = { ...meta,
|
||||
available: false };
|
||||
|
||||
existing.push(attachment);
|
||||
this.runtimeStore.setAttachmentsForMessage(meta.messageId, existing);
|
||||
void this.persistAttachmentMeta(attachment);
|
||||
}
|
||||
}
|
||||
|
||||
localStorage.removeItem(LEGACY_ATTACHMENTS_STORAGE_KEY);
|
||||
this.runtimeStore.touch();
|
||||
} catch { /* migration is best-effort */ }
|
||||
}
|
||||
|
||||
private async tryLoadSavedFiles(): Promise<void> {
|
||||
try {
|
||||
let hasChanges = false;
|
||||
|
||||
for (const [, attachments] of this.runtimeStore.getAttachmentEntries()) {
|
||||
for (const attachment of attachments) {
|
||||
if (attachment.available)
|
||||
continue;
|
||||
|
||||
if (attachment.savedPath) {
|
||||
const savedBase64 = await this.attachmentStorage.readFile(attachment.savedPath);
|
||||
|
||||
if (savedBase64) {
|
||||
this.restoreAttachmentFromDisk(attachment, savedBase64);
|
||||
hasChanges = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (attachment.filePath) {
|
||||
const originalBase64 = await this.attachmentStorage.readFile(attachment.filePath);
|
||||
|
||||
if (originalBase64) {
|
||||
this.restoreAttachmentFromDisk(attachment, originalBase64);
|
||||
hasChanges = true;
|
||||
|
||||
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES && attachment.objectUrl) {
|
||||
const response = await fetch(attachment.objectUrl);
|
||||
|
||||
void this.saveFileToDisk(attachment, await response.blob());
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasChanges)
|
||||
this.runtimeStore.touch();
|
||||
} catch { /* startup load is best-effort */ }
|
||||
}
|
||||
|
||||
private restoreAttachmentFromDisk(attachment: Attachment, base64: string): void {
|
||||
const bytes = this.base64ToUint8Array(base64);
|
||||
const blob = new Blob([bytes.buffer as ArrayBuffer], { type: attachment.mime });
|
||||
|
||||
attachment.objectUrl = URL.createObjectURL(blob);
|
||||
attachment.available = true;
|
||||
|
||||
this.runtimeStore.setOriginalFile(
|
||||
`${attachment.messageId}:${attachment.id}`,
|
||||
new File([blob], attachment.filename, { type: attachment.mime })
|
||||
);
|
||||
}
|
||||
|
||||
private async getRetainedSavedPathsForOtherMessages(messageId: string): Promise<Set<string>> {
|
||||
const retainedSavedPaths = new Set<string>();
|
||||
|
||||
for (const [existingMessageId, attachments] of this.runtimeStore.getAttachmentEntries()) {
|
||||
if (existingMessageId === messageId)
|
||||
continue;
|
||||
|
||||
for (const attachment of attachments) {
|
||||
if (attachment.savedPath) {
|
||||
retainedSavedPaths.add(attachment.savedPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.database.isReady()) {
|
||||
return retainedSavedPaths;
|
||||
}
|
||||
|
||||
const persistedAttachments = await this.database.getAllAttachments();
|
||||
|
||||
for (const attachment of persistedAttachments) {
|
||||
if (attachment.messageId !== messageId && attachment.savedPath) {
|
||||
retainedSavedPaths.add(attachment.savedPath);
|
||||
}
|
||||
}
|
||||
|
||||
return retainedSavedPaths;
|
||||
}
|
||||
|
||||
private base64ToUint8Array(base64: string): Uint8Array {
|
||||
const binary = atob(base64);
|
||||
const bytes = new Uint8Array(binary.length);
|
||||
|
||||
for (let index = 0; index < binary.length; index++) {
|
||||
bytes[index] = binary.charCodeAt(index);
|
||||
}
|
||||
|
||||
return bytes;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
import { Injectable, signal } from '@angular/core';
|
||||
import type { Attachment } from '../domain/attachment.models';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentRuntimeStore {
|
||||
readonly updated = signal<number>(0);
|
||||
|
||||
private attachmentsByMessage = new Map<string, Attachment[]>();
|
||||
private messageRoomIds = new Map<string, string>();
|
||||
private originalFiles = new Map<string, File>();
|
||||
private cancelledTransfers = new Set<string>();
|
||||
private pendingRequests = new Map<string, Set<string>>();
|
||||
private chunkBuffers = new Map<string, ArrayBuffer[]>();
|
||||
private chunkCounts = new Map<string, number>();
|
||||
|
||||
touch(): void {
|
||||
this.updated.set(this.updated() + 1);
|
||||
}
|
||||
|
||||
getAttachmentsForMessage(messageId: string): Attachment[] {
|
||||
return this.attachmentsByMessage.get(messageId) ?? [];
|
||||
}
|
||||
|
||||
setAttachmentsForMessage(messageId: string, attachments: Attachment[]): void {
|
||||
if (attachments.length === 0) {
|
||||
this.attachmentsByMessage.delete(messageId);
|
||||
return;
|
||||
}
|
||||
|
||||
this.attachmentsByMessage.set(messageId, attachments);
|
||||
}
|
||||
|
||||
hasAttachmentsForMessage(messageId: string): boolean {
|
||||
return this.attachmentsByMessage.has(messageId);
|
||||
}
|
||||
|
||||
deleteAttachmentsForMessage(messageId: string): void {
|
||||
this.attachmentsByMessage.delete(messageId);
|
||||
}
|
||||
|
||||
replaceAttachments(nextAttachments: Map<string, Attachment[]>): void {
|
||||
this.attachmentsByMessage = nextAttachments;
|
||||
}
|
||||
|
||||
getAttachmentEntries(): IterableIterator<[string, Attachment[]]> {
|
||||
return this.attachmentsByMessage.entries();
|
||||
}
|
||||
|
||||
rememberMessageRoom(messageId: string, roomId: string): void {
|
||||
this.messageRoomIds.set(messageId, roomId);
|
||||
}
|
||||
|
||||
getMessageRoomId(messageId: string): string | undefined {
|
||||
return this.messageRoomIds.get(messageId);
|
||||
}
|
||||
|
||||
deleteMessageRoom(messageId: string): void {
|
||||
this.messageRoomIds.delete(messageId);
|
||||
}
|
||||
|
||||
setOriginalFile(key: string, file: File): void {
|
||||
this.originalFiles.set(key, file);
|
||||
}
|
||||
|
||||
getOriginalFile(key: string): File | undefined {
|
||||
return this.originalFiles.get(key);
|
||||
}
|
||||
|
||||
findOriginalFileByFileId(fileId: string): File | null {
|
||||
for (const [key, file] of this.originalFiles) {
|
||||
if (key.endsWith(`:${fileId}`)) {
|
||||
return file;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
addCancelledTransfer(key: string): void {
|
||||
this.cancelledTransfers.add(key);
|
||||
}
|
||||
|
||||
hasCancelledTransfer(key: string): boolean {
|
||||
return this.cancelledTransfers.has(key);
|
||||
}
|
||||
|
||||
setPendingRequestPeers(key: string, peers: Set<string>): void {
|
||||
this.pendingRequests.set(key, peers);
|
||||
}
|
||||
|
||||
getPendingRequestPeers(key: string): Set<string> | undefined {
|
||||
return this.pendingRequests.get(key);
|
||||
}
|
||||
|
||||
hasPendingRequest(key: string): boolean {
|
||||
return this.pendingRequests.has(key);
|
||||
}
|
||||
|
||||
deletePendingRequest(key: string): void {
|
||||
this.pendingRequests.delete(key);
|
||||
}
|
||||
|
||||
setChunkBuffer(key: string, buffer: ArrayBuffer[]): void {
|
||||
this.chunkBuffers.set(key, buffer);
|
||||
}
|
||||
|
||||
getChunkBuffer(key: string): ArrayBuffer[] | undefined {
|
||||
return this.chunkBuffers.get(key);
|
||||
}
|
||||
|
||||
deleteChunkBuffer(key: string): void {
|
||||
this.chunkBuffers.delete(key);
|
||||
}
|
||||
|
||||
setChunkCount(key: string, count: number): void {
|
||||
this.chunkCounts.set(key, count);
|
||||
}
|
||||
|
||||
getChunkCount(key: string): number | undefined {
|
||||
return this.chunkCounts.get(key);
|
||||
}
|
||||
|
||||
deleteChunkCount(key: string): void {
|
||||
this.chunkCounts.delete(key);
|
||||
}
|
||||
|
||||
clearMessageScopedState(messageId: string): void {
|
||||
const scopedPrefix = `${messageId}:`;
|
||||
|
||||
for (const key of Array.from(this.originalFiles.keys())) {
|
||||
if (key.startsWith(scopedPrefix)) {
|
||||
this.originalFiles.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
for (const key of Array.from(this.pendingRequests.keys())) {
|
||||
if (key.startsWith(scopedPrefix)) {
|
||||
this.pendingRequests.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
for (const key of Array.from(this.chunkBuffers.keys())) {
|
||||
if (key.startsWith(scopedPrefix)) {
|
||||
this.chunkBuffers.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
for (const key of Array.from(this.chunkCounts.keys())) {
|
||||
if (key.startsWith(scopedPrefix)) {
|
||||
this.chunkCounts.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
for (const key of Array.from(this.cancelledTransfers)) {
|
||||
if (key.startsWith(scopedPrefix)) {
|
||||
this.cancelledTransfers.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { RealtimeSessionFacade } from '../../../core/realtime';
|
||||
import { AttachmentStorageService } from '../infrastructure/attachment-storage.service';
|
||||
import { FILE_CHUNK_SIZE_BYTES } from '../domain/attachment-transfer.constants';
|
||||
import { FileChunkEvent } from '../domain/attachment-transfer.models';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentTransferTransportService {
|
||||
private readonly webrtc = inject(RealtimeSessionFacade);
|
||||
private readonly attachmentStorage = inject(AttachmentStorageService);
|
||||
|
||||
decodeBase64(base64: string): Uint8Array {
|
||||
const binary = atob(base64);
|
||||
const bytes = new Uint8Array(binary.length);
|
||||
|
||||
for (let index = 0; index < binary.length; index++) {
|
||||
bytes[index] = binary.charCodeAt(index);
|
||||
}
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
async streamFileToPeer(
|
||||
targetPeerId: string,
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
file: File,
|
||||
isCancelled: () => boolean
|
||||
): Promise<void> {
|
||||
const totalChunks = Math.ceil(file.size / FILE_CHUNK_SIZE_BYTES);
|
||||
|
||||
let offset = 0;
|
||||
let chunkIndex = 0;
|
||||
|
||||
while (offset < file.size) {
|
||||
if (isCancelled())
|
||||
break;
|
||||
|
||||
const slice = file.slice(offset, offset + FILE_CHUNK_SIZE_BYTES);
|
||||
const arrayBuffer = await slice.arrayBuffer();
|
||||
const base64 = this.arrayBufferToBase64(arrayBuffer);
|
||||
const fileChunkEvent: FileChunkEvent = {
|
||||
type: 'file-chunk',
|
||||
messageId,
|
||||
fileId,
|
||||
index: chunkIndex,
|
||||
total: totalChunks,
|
||||
data: base64
|
||||
};
|
||||
|
||||
await this.webrtc.sendToPeerBuffered(targetPeerId, fileChunkEvent);
|
||||
|
||||
offset += FILE_CHUNK_SIZE_BYTES;
|
||||
chunkIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
async streamFileFromDiskToPeer(
|
||||
targetPeerId: string,
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
diskPath: string,
|
||||
isCancelled: () => boolean
|
||||
): Promise<void> {
|
||||
const base64Full = await this.attachmentStorage.readFile(diskPath);
|
||||
|
||||
if (!base64Full)
|
||||
return;
|
||||
|
||||
const fileBytes = this.decodeBase64(base64Full);
|
||||
const totalChunks = Math.ceil(fileBytes.byteLength / FILE_CHUNK_SIZE_BYTES);
|
||||
|
||||
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
|
||||
if (isCancelled())
|
||||
break;
|
||||
|
||||
const start = chunkIndex * FILE_CHUNK_SIZE_BYTES;
|
||||
const end = Math.min(fileBytes.byteLength, start + FILE_CHUNK_SIZE_BYTES);
|
||||
const slice = fileBytes.subarray(start, end);
|
||||
const sliceBuffer = (slice.buffer as ArrayBuffer).slice(
|
||||
slice.byteOffset,
|
||||
slice.byteOffset + slice.byteLength
|
||||
);
|
||||
const base64Chunk = this.arrayBufferToBase64(sliceBuffer);
|
||||
const fileChunkEvent: FileChunkEvent = {
|
||||
type: 'file-chunk',
|
||||
messageId,
|
||||
fileId,
|
||||
index: chunkIndex,
|
||||
total: totalChunks,
|
||||
data: base64Chunk
|
||||
};
|
||||
|
||||
this.webrtc.sendToPeer(targetPeerId, fileChunkEvent);
|
||||
}
|
||||
}
|
||||
|
||||
private arrayBufferToBase64(buffer: ArrayBuffer): string {
|
||||
let binary = '';
|
||||
|
||||
const bytes = new Uint8Array(buffer);
|
||||
|
||||
for (let index = 0; index < bytes.byteLength; index++) {
|
||||
binary += String.fromCharCode(bytes[index]);
|
||||
}
|
||||
|
||||
return btoa(binary);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,566 @@
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { recordDebugNetworkFileChunk } from '../../../infrastructure/realtime/logging/debug-network-metrics';
|
||||
import { RealtimeSessionFacade } from '../../../core/realtime';
|
||||
import { AttachmentStorageService } from '../infrastructure/attachment-storage.service';
|
||||
import { MAX_AUTO_SAVE_SIZE_BYTES } from '../domain/attachment.constants';
|
||||
import { shouldPersistDownloadedAttachment } from '../domain/attachment.logic';
|
||||
import type { Attachment, AttachmentMeta } from '../domain/attachment.models';
|
||||
import {
|
||||
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT,
|
||||
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT,
|
||||
DEFAULT_ATTACHMENT_MIME_TYPE,
|
||||
FILE_NOT_FOUND_REQUEST_ERROR,
|
||||
NO_CONNECTED_PEERS_REQUEST_ERROR
|
||||
} from '../domain/attachment-transfer.constants';
|
||||
import {
|
||||
type FileAnnounceEvent,
|
||||
type FileAnnouncePayload,
|
||||
type FileCancelEvent,
|
||||
type FileCancelPayload,
|
||||
type FileChunkPayload,
|
||||
type FileNotFoundEvent,
|
||||
type FileNotFoundPayload,
|
||||
type FileRequestEvent,
|
||||
type FileRequestPayload,
|
||||
type LocalFileWithPath
|
||||
} from '../domain/attachment-transfer.models';
|
||||
import { AttachmentPersistenceService } from './attachment-persistence.service';
|
||||
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
||||
import { AttachmentTransferTransportService } from './attachment-transfer-transport.service';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentTransferService {
|
||||
private readonly webrtc = inject(RealtimeSessionFacade);
|
||||
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
||||
private readonly attachmentStorage = inject(AttachmentStorageService);
|
||||
private readonly persistence = inject(AttachmentPersistenceService);
|
||||
private readonly transport = inject(AttachmentTransferTransportService);
|
||||
|
||||
getAttachmentMetasForMessages(messageIds: string[]): Record<string, AttachmentMeta[]> {
|
||||
const result: Record<string, AttachmentMeta[]> = {};
|
||||
|
||||
for (const messageId of messageIds) {
|
||||
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
|
||||
if (attachments.length > 0) {
|
||||
result[messageId] = attachments.map((attachment) => ({
|
||||
id: attachment.id,
|
||||
messageId: attachment.messageId,
|
||||
filename: attachment.filename,
|
||||
size: attachment.size,
|
||||
mime: attachment.mime,
|
||||
isImage: attachment.isImage,
|
||||
uploaderPeerId: attachment.uploaderPeerId,
|
||||
filePath: undefined,
|
||||
savedPath: undefined
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
registerSyncedAttachments(
|
||||
attachmentMap: Record<string, AttachmentMeta[]>,
|
||||
messageRoomIds?: Record<string, string>
|
||||
): void {
|
||||
if (messageRoomIds) {
|
||||
for (const [messageId, roomId] of Object.entries(messageRoomIds)) {
|
||||
this.runtimeStore.rememberMessageRoom(messageId, roomId);
|
||||
}
|
||||
}
|
||||
|
||||
const newAttachments: Attachment[] = [];
|
||||
|
||||
for (const [messageId, metas] of Object.entries(attachmentMap)) {
|
||||
const existing = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
||||
|
||||
for (const meta of metas) {
|
||||
const alreadyKnown = existing.find((entry) => entry.id === meta.id);
|
||||
|
||||
if (!alreadyKnown) {
|
||||
const attachment: Attachment = { ...meta,
|
||||
available: false,
|
||||
receivedBytes: 0 };
|
||||
|
||||
existing.push(attachment);
|
||||
newAttachments.push(attachment);
|
||||
}
|
||||
}
|
||||
|
||||
this.runtimeStore.setAttachmentsForMessage(messageId, existing);
|
||||
}
|
||||
|
||||
if (newAttachments.length > 0) {
|
||||
this.runtimeStore.touch();
|
||||
|
||||
for (const attachment of newAttachments) {
|
||||
void this.persistence.persistAttachmentMeta(attachment);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
requestFromAnyPeer(messageId: string, attachment: Attachment): void {
|
||||
const clearedRequestError = this.clearAttachmentRequestError(attachment);
|
||||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||||
|
||||
if (connectedPeers.length === 0) {
|
||||
attachment.requestError = NO_CONNECTED_PEERS_REQUEST_ERROR;
|
||||
this.runtimeStore.touch();
|
||||
console.warn('[Attachments] No connected peers to request file from');
|
||||
return;
|
||||
}
|
||||
|
||||
if (clearedRequestError)
|
||||
this.runtimeStore.touch();
|
||||
|
||||
this.runtimeStore.setPendingRequestPeers(
|
||||
this.buildRequestKey(messageId, attachment.id),
|
||||
new Set<string>()
|
||||
);
|
||||
|
||||
this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId);
|
||||
}
|
||||
|
||||
handleFileNotFound(payload: FileNotFoundPayload): void {
|
||||
const { messageId, fileId } = payload;
|
||||
|
||||
if (!messageId || !fileId)
|
||||
return;
|
||||
|
||||
const attachments = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
const attachment = attachments.find((entry) => entry.id === fileId);
|
||||
const didSendRequest = this.sendFileRequestToNextPeer(messageId, fileId, attachment?.uploaderPeerId);
|
||||
|
||||
if (!didSendRequest && attachment) {
|
||||
attachment.requestError = FILE_NOT_FOUND_REQUEST_ERROR;
|
||||
this.runtimeStore.touch();
|
||||
}
|
||||
}
|
||||
|
||||
requestImageFromAnyPeer(messageId: string, attachment: Attachment): void {
|
||||
this.requestFromAnyPeer(messageId, attachment);
|
||||
}
|
||||
|
||||
requestFile(messageId: string, attachment: Attachment): void {
|
||||
this.requestFromAnyPeer(messageId, attachment);
|
||||
}
|
||||
|
||||
hasPendingRequest(messageId: string, fileId: string): boolean {
|
||||
return this.runtimeStore.hasPendingRequest(this.buildRequestKey(messageId, fileId));
|
||||
}
|
||||
|
||||
async publishAttachments(
|
||||
messageId: string,
|
||||
files: File[],
|
||||
uploaderPeerId?: string
|
||||
): Promise<void> {
|
||||
const attachments: Attachment[] = [];
|
||||
|
||||
for (const file of files) {
|
||||
const fileId = crypto.randomUUID?.() ?? `${Date.now()}-${Math.random()}`;
|
||||
const attachment: Attachment = {
|
||||
id: fileId,
|
||||
messageId,
|
||||
filename: file.name,
|
||||
size: file.size,
|
||||
mime: file.type || DEFAULT_ATTACHMENT_MIME_TYPE,
|
||||
isImage: file.type.startsWith('image/'),
|
||||
uploaderPeerId,
|
||||
filePath: (file as LocalFileWithPath).path,
|
||||
available: false
|
||||
};
|
||||
|
||||
attachments.push(attachment);
|
||||
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
||||
|
||||
try {
|
||||
attachment.objectUrl = URL.createObjectURL(file);
|
||||
attachment.available = true;
|
||||
} catch { /* non-critical */ }
|
||||
|
||||
if (attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES) {
|
||||
void this.persistence.saveFileToDisk(attachment, file);
|
||||
}
|
||||
|
||||
const fileAnnounceEvent: FileAnnounceEvent = {
|
||||
type: 'file-announce',
|
||||
messageId,
|
||||
file: {
|
||||
id: fileId,
|
||||
filename: attachment.filename,
|
||||
size: attachment.size,
|
||||
mime: attachment.mime,
|
||||
isImage: attachment.isImage,
|
||||
uploaderPeerId
|
||||
}
|
||||
};
|
||||
|
||||
this.webrtc.broadcastMessage(fileAnnounceEvent);
|
||||
}
|
||||
|
||||
const existingList = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
|
||||
this.runtimeStore.setAttachmentsForMessage(messageId, [...existingList, ...attachments]);
|
||||
this.runtimeStore.touch();
|
||||
|
||||
for (const attachment of attachments) {
|
||||
void this.persistence.persistAttachmentMeta(attachment);
|
||||
}
|
||||
}
|
||||
|
||||
handleFileAnnounce(payload: FileAnnouncePayload): void {
|
||||
const { messageId, file } = payload;
|
||||
|
||||
if (!messageId || !file)
|
||||
return;
|
||||
|
||||
const list = [...this.runtimeStore.getAttachmentsForMessage(messageId)];
|
||||
const alreadyKnown = list.find((entry) => entry.id === file.id);
|
||||
|
||||
if (alreadyKnown)
|
||||
return;
|
||||
|
||||
const attachment: Attachment = {
|
||||
id: file.id,
|
||||
messageId,
|
||||
filename: file.filename,
|
||||
size: file.size,
|
||||
mime: file.mime,
|
||||
isImage: !!file.isImage,
|
||||
uploaderPeerId: file.uploaderPeerId,
|
||||
available: false,
|
||||
receivedBytes: 0
|
||||
};
|
||||
|
||||
list.push(attachment);
|
||||
this.runtimeStore.setAttachmentsForMessage(messageId, list);
|
||||
this.runtimeStore.touch();
|
||||
void this.persistence.persistAttachmentMeta(attachment);
|
||||
}
|
||||
|
||||
handleFileChunk(payload: FileChunkPayload): void {
|
||||
const { messageId, fileId, fromPeerId, index, total, data } = payload;
|
||||
|
||||
if (
|
||||
!messageId || !fileId ||
|
||||
typeof index !== 'number' ||
|
||||
typeof total !== 'number' ||
|
||||
typeof data !== 'string'
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
const attachment = list.find((entry) => entry.id === fileId);
|
||||
|
||||
if (!attachment)
|
||||
return;
|
||||
|
||||
const decodedBytes = this.transport.decodeBase64(data);
|
||||
const assemblyKey = `${messageId}:${fileId}`;
|
||||
const requestKey = this.buildRequestKey(messageId, fileId);
|
||||
|
||||
this.runtimeStore.deletePendingRequest(requestKey);
|
||||
this.clearAttachmentRequestError(attachment);
|
||||
|
||||
const chunkBuffer = this.getOrCreateChunkBuffer(assemblyKey, total);
|
||||
|
||||
if (!chunkBuffer[index]) {
|
||||
chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer;
|
||||
this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1);
|
||||
}
|
||||
|
||||
this.updateTransferProgress(attachment, decodedBytes, fromPeerId);
|
||||
|
||||
this.runtimeStore.touch();
|
||||
this.finalizeTransferIfComplete(attachment, assemblyKey, total);
|
||||
}
|
||||
|
||||
async handleFileRequest(payload: FileRequestPayload): Promise<void> {
|
||||
const { messageId, fileId, fromPeerId } = payload;
|
||||
|
||||
if (!messageId || !fileId || !fromPeerId)
|
||||
return;
|
||||
|
||||
const exactKey = `${messageId}:${fileId}`;
|
||||
const originalFile = this.runtimeStore.getOriginalFile(exactKey)
|
||||
?? this.runtimeStore.findOriginalFileByFileId(fileId);
|
||||
|
||||
if (originalFile) {
|
||||
await this.transport.streamFileToPeer(
|
||||
fromPeerId,
|
||||
messageId,
|
||||
fileId,
|
||||
originalFile,
|
||||
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
const attachment = list.find((entry) => entry.id === fileId);
|
||||
const diskPath = attachment
|
||||
? await this.attachmentStorage.resolveExistingPath(attachment)
|
||||
: null;
|
||||
|
||||
if (diskPath) {
|
||||
await this.transport.streamFileFromDiskToPeer(
|
||||
fromPeerId,
|
||||
messageId,
|
||||
fileId,
|
||||
diskPath,
|
||||
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (attachment?.isImage) {
|
||||
const roomName = await this.persistence.resolveCurrentRoomName();
|
||||
const legacyDiskPath = await this.attachmentStorage.resolveLegacyImagePath(
|
||||
attachment.filename,
|
||||
roomName
|
||||
);
|
||||
|
||||
if (legacyDiskPath) {
|
||||
await this.transport.streamFileFromDiskToPeer(
|
||||
fromPeerId,
|
||||
messageId,
|
||||
fileId,
|
||||
legacyDiskPath,
|
||||
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (attachment?.available && attachment.objectUrl) {
|
||||
try {
|
||||
const response = await fetch(attachment.objectUrl);
|
||||
const blob = await response.blob();
|
||||
const file = new File([blob], attachment.filename, { type: attachment.mime });
|
||||
|
||||
await this.transport.streamFileToPeer(
|
||||
fromPeerId,
|
||||
messageId,
|
||||
fileId,
|
||||
file,
|
||||
() => this.isTransferCancelled(fromPeerId, messageId, fileId)
|
||||
);
|
||||
|
||||
return;
|
||||
} catch { /* fall through */ }
|
||||
}
|
||||
|
||||
const fileNotFoundEvent: FileNotFoundEvent = {
|
||||
type: 'file-not-found',
|
||||
messageId,
|
||||
fileId
|
||||
};
|
||||
|
||||
this.webrtc.sendToPeer(fromPeerId, fileNotFoundEvent);
|
||||
}
|
||||
|
||||
cancelRequest(messageId: string, attachment: Attachment): void {
|
||||
const targetPeerId = attachment.uploaderPeerId;
|
||||
|
||||
if (!targetPeerId)
|
||||
return;
|
||||
|
||||
try {
|
||||
const assemblyKey = `${messageId}:${attachment.id}`;
|
||||
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
|
||||
attachment.receivedBytes = 0;
|
||||
attachment.speedBps = 0;
|
||||
attachment.startedAtMs = undefined;
|
||||
attachment.lastUpdateMs = undefined;
|
||||
|
||||
if (attachment.objectUrl) {
|
||||
try {
|
||||
URL.revokeObjectURL(attachment.objectUrl);
|
||||
} catch { /* ignore */ }
|
||||
|
||||
attachment.objectUrl = undefined;
|
||||
}
|
||||
|
||||
attachment.available = false;
|
||||
this.runtimeStore.touch();
|
||||
|
||||
const fileCancelEvent: FileCancelEvent = {
|
||||
type: 'file-cancel',
|
||||
messageId,
|
||||
fileId: attachment.id
|
||||
};
|
||||
|
||||
this.webrtc.sendToPeer(targetPeerId, fileCancelEvent);
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
|
||||
handleFileCancel(payload: FileCancelPayload): void {
|
||||
const { messageId, fileId, fromPeerId } = payload;
|
||||
|
||||
if (!messageId || !fileId || !fromPeerId)
|
||||
return;
|
||||
|
||||
this.runtimeStore.addCancelledTransfer(
|
||||
this.buildTransferKey(messageId, fileId, fromPeerId)
|
||||
);
|
||||
}
|
||||
|
||||
async fulfillRequestWithFile(
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
targetPeerId: string,
|
||||
file: File
|
||||
): Promise<void> {
|
||||
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
||||
await this.transport.streamFileToPeer(
|
||||
targetPeerId,
|
||||
messageId,
|
||||
fileId,
|
||||
file,
|
||||
() => this.isTransferCancelled(targetPeerId, messageId, fileId)
|
||||
);
|
||||
}
|
||||
|
||||
private buildTransferKey(messageId: string, fileId: string, peerId: string): string {
|
||||
return `${messageId}:${fileId}:${peerId}`;
|
||||
}
|
||||
|
||||
private buildRequestKey(messageId: string, fileId: string): string {
|
||||
return `${messageId}:${fileId}`;
|
||||
}
|
||||
|
||||
private clearAttachmentRequestError(attachment: Attachment): boolean {
|
||||
if (!attachment.requestError)
|
||||
return false;
|
||||
|
||||
attachment.requestError = undefined;
|
||||
return true;
|
||||
}
|
||||
|
||||
private isTransferCancelled(targetPeerId: string, messageId: string, fileId: string): boolean {
|
||||
return this.runtimeStore.hasCancelledTransfer(
|
||||
this.buildTransferKey(messageId, fileId, targetPeerId)
|
||||
);
|
||||
}
|
||||
|
||||
private sendFileRequestToNextPeer(
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
preferredPeerId?: string
|
||||
): boolean {
|
||||
const connectedPeers = this.webrtc.getConnectedPeers();
|
||||
const requestKey = this.buildRequestKey(messageId, fileId);
|
||||
const triedPeers = this.runtimeStore.getPendingRequestPeers(requestKey) ?? new Set<string>();
|
||||
|
||||
let targetPeerId: string | undefined;
|
||||
|
||||
if (preferredPeerId && connectedPeers.includes(preferredPeerId) && !triedPeers.has(preferredPeerId)) {
|
||||
targetPeerId = preferredPeerId;
|
||||
} else {
|
||||
targetPeerId = connectedPeers.find((peerId) => !triedPeers.has(peerId));
|
||||
}
|
||||
|
||||
if (!targetPeerId) {
|
||||
this.runtimeStore.deletePendingRequest(requestKey);
|
||||
return false;
|
||||
}
|
||||
|
||||
triedPeers.add(targetPeerId);
|
||||
this.runtimeStore.setPendingRequestPeers(requestKey, triedPeers);
|
||||
|
||||
const fileRequestEvent: FileRequestEvent = {
|
||||
type: 'file-request',
|
||||
messageId,
|
||||
fileId
|
||||
};
|
||||
|
||||
this.webrtc.sendToPeer(targetPeerId, fileRequestEvent);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private getOrCreateChunkBuffer(assemblyKey: string, total: number): ArrayBuffer[] {
|
||||
const existingChunkBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
||||
|
||||
if (existingChunkBuffer) {
|
||||
return existingChunkBuffer;
|
||||
}
|
||||
|
||||
const createdChunkBuffer = new Array(total);
|
||||
|
||||
this.runtimeStore.setChunkBuffer(assemblyKey, createdChunkBuffer);
|
||||
this.runtimeStore.setChunkCount(assemblyKey, 0);
|
||||
|
||||
return createdChunkBuffer;
|
||||
}
|
||||
|
||||
private updateTransferProgress(
|
||||
attachment: Attachment,
|
||||
decodedBytes: Uint8Array,
|
||||
fromPeerId?: string
|
||||
): void {
|
||||
const now = Date.now();
|
||||
const previousReceived = attachment.receivedBytes ?? 0;
|
||||
|
||||
attachment.receivedBytes = previousReceived + decodedBytes.byteLength;
|
||||
|
||||
if (fromPeerId) {
|
||||
recordDebugNetworkFileChunk(fromPeerId, decodedBytes.byteLength, now);
|
||||
}
|
||||
|
||||
if (!attachment.startedAtMs)
|
||||
attachment.startedAtMs = now;
|
||||
|
||||
if (!attachment.lastUpdateMs)
|
||||
attachment.lastUpdateMs = now;
|
||||
|
||||
const elapsedMs = Math.max(1, now - attachment.lastUpdateMs);
|
||||
const instantaneousBps = (decodedBytes.byteLength / elapsedMs) * 1000;
|
||||
const previousSpeed = attachment.speedBps ?? instantaneousBps;
|
||||
|
||||
attachment.speedBps =
|
||||
ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT * previousSpeed +
|
||||
ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT * instantaneousBps;
|
||||
|
||||
attachment.lastUpdateMs = now;
|
||||
}
|
||||
|
||||
private finalizeTransferIfComplete(
|
||||
attachment: Attachment,
|
||||
assemblyKey: string,
|
||||
total: number
|
||||
): void {
|
||||
const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0;
|
||||
const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
||||
|
||||
if (
|
||||
!completeBuffer
|
||||
|| (receivedChunkCount !== total && (attachment.receivedBytes ?? 0) < attachment.size)
|
||||
|| !completeBuffer.every((part) => part instanceof ArrayBuffer)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const blob = new Blob(completeBuffer, { type: attachment.mime });
|
||||
|
||||
attachment.available = true;
|
||||
attachment.objectUrl = URL.createObjectURL(blob);
|
||||
|
||||
if (shouldPersistDownloadedAttachment(attachment)) {
|
||||
void this.persistence.saveFileToDisk(attachment, blob);
|
||||
}
|
||||
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
this.runtimeStore.touch();
|
||||
void this.persistence.persistAttachmentMeta(attachment);
|
||||
}
|
||||
}
|
||||
119
src/app/domains/attachment/application/attachment.facade.ts
Normal file
119
src/app/domains/attachment/application/attachment.facade.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { AttachmentManagerService } from './attachment-manager.service';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentFacade {
|
||||
get updated() {
|
||||
return this.manager.updated;
|
||||
}
|
||||
|
||||
private readonly manager = inject(AttachmentManagerService);
|
||||
|
||||
getForMessage(
|
||||
...args: Parameters<AttachmentManagerService['getForMessage']>
|
||||
): ReturnType<AttachmentManagerService['getForMessage']> {
|
||||
return this.manager.getForMessage(...args);
|
||||
}
|
||||
|
||||
rememberMessageRoom(
|
||||
...args: Parameters<AttachmentManagerService['rememberMessageRoom']>
|
||||
): ReturnType<AttachmentManagerService['rememberMessageRoom']> {
|
||||
return this.manager.rememberMessageRoom(...args);
|
||||
}
|
||||
|
||||
queueAutoDownloadsForMessage(
|
||||
...args: Parameters<AttachmentManagerService['queueAutoDownloadsForMessage']>
|
||||
): ReturnType<AttachmentManagerService['queueAutoDownloadsForMessage']> {
|
||||
return this.manager.queueAutoDownloadsForMessage(...args);
|
||||
}
|
||||
|
||||
requestAutoDownloadsForRoom(
|
||||
...args: Parameters<AttachmentManagerService['requestAutoDownloadsForRoom']>
|
||||
): ReturnType<AttachmentManagerService['requestAutoDownloadsForRoom']> {
|
||||
return this.manager.requestAutoDownloadsForRoom(...args);
|
||||
}
|
||||
|
||||
deleteForMessage(
|
||||
...args: Parameters<AttachmentManagerService['deleteForMessage']>
|
||||
): ReturnType<AttachmentManagerService['deleteForMessage']> {
|
||||
return this.manager.deleteForMessage(...args);
|
||||
}
|
||||
|
||||
getAttachmentMetasForMessages(
|
||||
...args: Parameters<AttachmentManagerService['getAttachmentMetasForMessages']>
|
||||
): ReturnType<AttachmentManagerService['getAttachmentMetasForMessages']> {
|
||||
return this.manager.getAttachmentMetasForMessages(...args);
|
||||
}
|
||||
|
||||
registerSyncedAttachments(
|
||||
...args: Parameters<AttachmentManagerService['registerSyncedAttachments']>
|
||||
): ReturnType<AttachmentManagerService['registerSyncedAttachments']> {
|
||||
return this.manager.registerSyncedAttachments(...args);
|
||||
}
|
||||
|
||||
requestFromAnyPeer(
|
||||
...args: Parameters<AttachmentManagerService['requestFromAnyPeer']>
|
||||
): ReturnType<AttachmentManagerService['requestFromAnyPeer']> {
|
||||
return this.manager.requestFromAnyPeer(...args);
|
||||
}
|
||||
|
||||
handleFileNotFound(
|
||||
...args: Parameters<AttachmentManagerService['handleFileNotFound']>
|
||||
): ReturnType<AttachmentManagerService['handleFileNotFound']> {
|
||||
return this.manager.handleFileNotFound(...args);
|
||||
}
|
||||
|
||||
requestImageFromAnyPeer(
|
||||
...args: Parameters<AttachmentManagerService['requestImageFromAnyPeer']>
|
||||
): ReturnType<AttachmentManagerService['requestImageFromAnyPeer']> {
|
||||
return this.manager.requestImageFromAnyPeer(...args);
|
||||
}
|
||||
|
||||
requestFile(
|
||||
...args: Parameters<AttachmentManagerService['requestFile']>
|
||||
): ReturnType<AttachmentManagerService['requestFile']> {
|
||||
return this.manager.requestFile(...args);
|
||||
}
|
||||
|
||||
publishAttachments(
|
||||
...args: Parameters<AttachmentManagerService['publishAttachments']>
|
||||
): ReturnType<AttachmentManagerService['publishAttachments']> {
|
||||
return this.manager.publishAttachments(...args);
|
||||
}
|
||||
|
||||
handleFileAnnounce(
|
||||
...args: Parameters<AttachmentManagerService['handleFileAnnounce']>
|
||||
): ReturnType<AttachmentManagerService['handleFileAnnounce']> {
|
||||
return this.manager.handleFileAnnounce(...args);
|
||||
}
|
||||
|
||||
handleFileChunk(
|
||||
...args: Parameters<AttachmentManagerService['handleFileChunk']>
|
||||
): ReturnType<AttachmentManagerService['handleFileChunk']> {
|
||||
return this.manager.handleFileChunk(...args);
|
||||
}
|
||||
|
||||
handleFileRequest(
|
||||
...args: Parameters<AttachmentManagerService['handleFileRequest']>
|
||||
): ReturnType<AttachmentManagerService['handleFileRequest']> {
|
||||
return this.manager.handleFileRequest(...args);
|
||||
}
|
||||
|
||||
cancelRequest(
|
||||
...args: Parameters<AttachmentManagerService['cancelRequest']>
|
||||
): ReturnType<AttachmentManagerService['cancelRequest']> {
|
||||
return this.manager.cancelRequest(...args);
|
||||
}
|
||||
|
||||
handleFileCancel(
|
||||
...args: Parameters<AttachmentManagerService['handleFileCancel']>
|
||||
): ReturnType<AttachmentManagerService['handleFileCancel']> {
|
||||
return this.manager.handleFileCancel(...args);
|
||||
}
|
||||
|
||||
fulfillRequestWithFile(
|
||||
...args: Parameters<AttachmentManagerService['fulfillRequestWithFile']>
|
||||
): ReturnType<AttachmentManagerService['fulfillRequestWithFile']> {
|
||||
return this.manager.fulfillRequestWithFile(...args);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
/** Size (bytes) of each chunk when streaming a file over RTCDataChannel. */
|
||||
export const FILE_CHUNK_SIZE_BYTES = 64 * 1024; // 64 KB
|
||||
|
||||
/**
|
||||
* EWMA smoothing weight for the previous speed estimate.
|
||||
* The complementary weight is applied to the latest sample.
|
||||
*/
|
||||
export const ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT = 0.7;
|
||||
export const ATTACHMENT_TRANSFER_EWMA_CURRENT_WEIGHT = 1 - ATTACHMENT_TRANSFER_EWMA_PREVIOUS_WEIGHT;
|
||||
|
||||
/** Fallback MIME type when none is provided by the sender. */
|
||||
export const DEFAULT_ATTACHMENT_MIME_TYPE = 'application/octet-stream';
|
||||
|
||||
/** localStorage key used by the legacy attachment store during migration. */
|
||||
export const LEGACY_ATTACHMENTS_STORAGE_KEY = 'metoyou_attachments';
|
||||
|
||||
/** User-facing error when no peers are available for a request. */
|
||||
export const NO_CONNECTED_PEERS_REQUEST_ERROR = 'No connected peers are available to provide this file right now.';
|
||||
|
||||
/** User-facing error when connected peers cannot provide a requested file. */
|
||||
export const FILE_NOT_FOUND_REQUEST_ERROR = 'The connected peers do not have this file right now.';
|
||||
@@ -0,0 +1,56 @@
|
||||
import type { ChatAttachmentAnnouncement, ChatEvent } from '../../../core/models/index';
|
||||
|
||||
export type FileAnnounceEvent = ChatEvent & {
|
||||
type: 'file-announce';
|
||||
messageId: string;
|
||||
file: ChatAttachmentAnnouncement;
|
||||
};
|
||||
|
||||
export type FileChunkEvent = ChatEvent & {
|
||||
type: 'file-chunk';
|
||||
messageId: string;
|
||||
fileId: string;
|
||||
index: number;
|
||||
total: number;
|
||||
data: string;
|
||||
fromPeerId?: string;
|
||||
};
|
||||
|
||||
export type FileRequestEvent = ChatEvent & {
|
||||
type: 'file-request';
|
||||
messageId: string;
|
||||
fileId: string;
|
||||
fromPeerId?: string;
|
||||
};
|
||||
|
||||
export type FileCancelEvent = ChatEvent & {
|
||||
type: 'file-cancel';
|
||||
messageId: string;
|
||||
fileId: string;
|
||||
fromPeerId?: string;
|
||||
};
|
||||
|
||||
export type FileNotFoundEvent = ChatEvent & {
|
||||
type: 'file-not-found';
|
||||
messageId: string;
|
||||
fileId: string;
|
||||
};
|
||||
|
||||
export type FileAnnouncePayload = Pick<ChatEvent, 'messageId' | 'file'>;
|
||||
|
||||
export interface FileChunkPayload {
|
||||
messageId?: string;
|
||||
fileId?: string;
|
||||
fromPeerId?: string;
|
||||
index?: number;
|
||||
total?: number;
|
||||
data?: ChatEvent['data'];
|
||||
}
|
||||
|
||||
export type FileRequestPayload = Pick<ChatEvent, 'messageId' | 'fileId' | 'fromPeerId'>;
|
||||
export type FileCancelPayload = Pick<ChatEvent, 'messageId' | 'fileId' | 'fromPeerId'>;
|
||||
export type FileNotFoundPayload = Pick<ChatEvent, 'messageId' | 'fileId'>;
|
||||
|
||||
export type LocalFileWithPath = File & {
|
||||
path?: string;
|
||||
};
|
||||
@@ -0,0 +1,2 @@
|
||||
/** Maximum file size (bytes) that is automatically saved or pushed for inline previews. */
|
||||
export const MAX_AUTO_SAVE_SIZE_BYTES = 10 * 1024 * 1024; // 10 MB
|
||||
19
src/app/domains/attachment/domain/attachment.logic.ts
Normal file
19
src/app/domains/attachment/domain/attachment.logic.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
import { MAX_AUTO_SAVE_SIZE_BYTES } from './attachment.constants';
|
||||
import type { Attachment } from './attachment.models';
|
||||
|
||||
export function isAttachmentMedia(attachment: Pick<Attachment, 'mime'>): boolean {
|
||||
return attachment.mime.startsWith('image/') ||
|
||||
attachment.mime.startsWith('video/') ||
|
||||
attachment.mime.startsWith('audio/');
|
||||
}
|
||||
|
||||
export function shouldAutoRequestWhenWatched(attachment: Attachment): boolean {
|
||||
return attachment.isImage ||
|
||||
(isAttachmentMedia(attachment) && attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES);
|
||||
}
|
||||
|
||||
export function shouldPersistDownloadedAttachment(attachment: Pick<Attachment, 'size' | 'mime'>): boolean {
|
||||
return attachment.size <= MAX_AUTO_SAVE_SIZE_BYTES ||
|
||||
attachment.mime.startsWith('video/') ||
|
||||
attachment.mime.startsWith('audio/');
|
||||
}
|
||||
13
src/app/domains/attachment/domain/attachment.models.ts
Normal file
13
src/app/domains/attachment/domain/attachment.models.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import type { ChatAttachmentMeta } from '../../../core/models/index';
|
||||
|
||||
export type AttachmentMeta = ChatAttachmentMeta;
|
||||
|
||||
export interface Attachment extends AttachmentMeta {
|
||||
available: boolean;
|
||||
objectUrl?: string;
|
||||
receivedBytes?: number;
|
||||
speedBps?: number;
|
||||
startedAtMs?: number;
|
||||
lastUpdateMs?: number;
|
||||
requestError?: string;
|
||||
}
|
||||
3
src/app/domains/attachment/index.ts
Normal file
3
src/app/domains/attachment/index.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
export * from './application/attachment.facade';
|
||||
export * from './domain/attachment.constants';
|
||||
export * from './domain/attachment.models';
|
||||
@@ -0,0 +1,23 @@
|
||||
const ROOM_NAME_SANITIZER = /[^\w.-]+/g;
|
||||
|
||||
export function sanitizeAttachmentRoomName(roomName: string): string {
|
||||
const sanitizedRoomName = roomName.trim().replace(ROOM_NAME_SANITIZER, '_');
|
||||
|
||||
return sanitizedRoomName || 'room';
|
||||
}
|
||||
|
||||
export function resolveAttachmentStorageBucket(mime: string): 'video' | 'audio' | 'image' | 'files' {
|
||||
if (mime.startsWith('video/')) {
|
||||
return 'video';
|
||||
}
|
||||
|
||||
if (mime.startsWith('audio/')) {
|
||||
return 'audio';
|
||||
}
|
||||
|
||||
if (mime.startsWith('image/')) {
|
||||
return 'image';
|
||||
}
|
||||
|
||||
return 'files';
|
||||
}
|
||||
@@ -0,0 +1,127 @@
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { ElectronBridgeService } from '../../../core/platform/electron/electron-bridge.service';
|
||||
import type { Attachment } from '../domain/attachment.models';
|
||||
import { resolveAttachmentStorageBucket, sanitizeAttachmentRoomName } from './attachment-storage.helpers';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentStorageService {
|
||||
private readonly electronBridge = inject(ElectronBridgeService);
|
||||
|
||||
async resolveExistingPath(
|
||||
attachment: Pick<Attachment, 'filePath' | 'savedPath'>
|
||||
): Promise<string | null> {
|
||||
return this.findExistingPath([attachment.filePath, attachment.savedPath]);
|
||||
}
|
||||
|
||||
async resolveLegacyImagePath(filename: string, roomName: string): Promise<string | null> {
|
||||
const appDataPath = await this.resolveAppDataPath();
|
||||
|
||||
if (!appDataPath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.findExistingPath([`${appDataPath}/server/${sanitizeAttachmentRoomName(roomName)}/image/${filename}`]);
|
||||
}
|
||||
|
||||
async readFile(filePath: string): Promise<string | null> {
|
||||
const electronApi = this.electronBridge.getApi();
|
||||
|
||||
if (!electronApi || !filePath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return await electronApi.readFile(filePath);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async saveBlob(
|
||||
attachment: Pick<Attachment, 'filename' | 'mime'>,
|
||||
blob: Blob,
|
||||
roomName: string
|
||||
): Promise<string | null> {
|
||||
const electronApi = this.electronBridge.getApi();
|
||||
const appDataPath = await this.resolveAppDataPath();
|
||||
|
||||
if (!electronApi || !appDataPath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const directoryPath = `${appDataPath}/server/${sanitizeAttachmentRoomName(roomName)}/${resolveAttachmentStorageBucket(attachment.mime)}`;
|
||||
|
||||
await electronApi.ensureDir(directoryPath);
|
||||
|
||||
const arrayBuffer = await blob.arrayBuffer();
|
||||
const diskPath = `${directoryPath}/${attachment.filename}`;
|
||||
|
||||
await electronApi.writeFile(diskPath, this.arrayBufferToBase64(arrayBuffer));
|
||||
|
||||
return diskPath;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async deleteFile(filePath: string): Promise<void> {
|
||||
const electronApi = this.electronBridge.getApi();
|
||||
|
||||
if (!electronApi || !filePath) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await electronApi.deleteFile(filePath);
|
||||
} catch { /* best-effort cleanup */ }
|
||||
}
|
||||
|
||||
private async resolveAppDataPath(): Promise<string | null> {
|
||||
const electronApi = this.electronBridge.getApi();
|
||||
|
||||
if (!electronApi) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return await electronApi.getAppDataPath();
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async findExistingPath(candidates: (string | null | undefined)[]): Promise<string | null> {
|
||||
const electronApi = this.electronBridge.getApi();
|
||||
|
||||
if (!electronApi) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (const candidatePath of candidates) {
|
||||
if (!candidatePath) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
if (await electronApi.fileExists(candidatePath)) {
|
||||
return candidatePath;
|
||||
}
|
||||
} catch { /* keep trying remaining candidates */ }
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private arrayBufferToBase64(buffer: ArrayBuffer): string {
|
||||
let binary = '';
|
||||
|
||||
const bytes = new Uint8Array(buffer);
|
||||
|
||||
for (let index = 0; index < bytes.byteLength; index++) {
|
||||
binary += String.fromCharCode(bytes[index]);
|
||||
}
|
||||
|
||||
return btoa(binary);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user