fix: Bug - Files lose host on reload
Persist large uploads under app data on publish and restore, and re-announce hosted attachments after reload so peers can download again. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -4,7 +4,10 @@ import {
|
|||||||
inject
|
inject
|
||||||
} from '@angular/core';
|
} from '@angular/core';
|
||||||
import { NavigationEnd, Router } from '@angular/router';
|
import { NavigationEnd, Router } from '@angular/router';
|
||||||
|
import { Store } from '@ngrx/store';
|
||||||
|
import { take } from 'rxjs';
|
||||||
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
||||||
|
import { selectCurrentUserId } from '../../../../store/users/users.selectors';
|
||||||
import { DatabaseService } from '../../../../infrastructure/persistence';
|
import { DatabaseService } from '../../../../infrastructure/persistence';
|
||||||
import { yieldToAttachmentHydrationLoop } from '../../domain/logic/attachment-blob.rules';
|
import { yieldToAttachmentHydrationLoop } from '../../domain/logic/attachment-blob.rules';
|
||||||
import {
|
import {
|
||||||
@@ -32,6 +35,7 @@ export class AttachmentManagerService {
|
|||||||
|
|
||||||
private readonly webrtc = inject(RealtimeSessionFacade);
|
private readonly webrtc = inject(RealtimeSessionFacade);
|
||||||
private readonly router = inject(Router);
|
private readonly router = inject(Router);
|
||||||
|
private readonly store = inject(Store);
|
||||||
private readonly database = inject(DatabaseService);
|
private readonly database = inject(DatabaseService);
|
||||||
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
private readonly runtimeStore = inject(AttachmentRuntimeStore);
|
||||||
private readonly persistence = inject(AttachmentPersistenceService);
|
private readonly persistence = inject(AttachmentPersistenceService);
|
||||||
@@ -45,9 +49,10 @@ export class AttachmentManagerService {
|
|||||||
effect(() => {
|
effect(() => {
|
||||||
if (this.database.isReady() && !this.isDatabaseInitialised) {
|
if (this.database.isReady() && !this.isDatabaseInitialised) {
|
||||||
this.isDatabaseInitialised = true;
|
this.isDatabaseInitialised = true;
|
||||||
void this.persistence.initFromDatabase().then(() => {
|
void this.persistence.initFromDatabase().then(async () => {
|
||||||
if (this.watchedRoomId) {
|
if (this.watchedRoomId) {
|
||||||
void this.restoreLocalAttachmentsForRoom(this.watchedRoomId);
|
await this.restoreLocalAttachmentsForRoom(this.watchedRoomId);
|
||||||
|
await this.announceHostedAttachments();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -68,7 +73,10 @@ export class AttachmentManagerService {
|
|||||||
|
|
||||||
this.webrtc.onPeerConnected.subscribe(() => {
|
this.webrtc.onPeerConnected.subscribe(() => {
|
||||||
if (this.watchedRoomId) {
|
if (this.watchedRoomId) {
|
||||||
void this.restoreLocalAttachmentsForRoom(this.watchedRoomId);
|
void this.restoreLocalAttachmentsForRoom(this.watchedRoomId).then(async () => {
|
||||||
|
await this.announceHostedAttachments();
|
||||||
|
});
|
||||||
|
|
||||||
void this.requestAutoDownloadsForRoom(this.watchedRoomId);
|
void this.requestAutoDownloadsForRoom(this.watchedRoomId);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -324,6 +332,15 @@ export class AttachmentManagerService {
|
|||||||
return getWatchedAttachmentRoomIdFromUrl(url);
|
return getWatchedAttachmentRoomIdFromUrl(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async announceHostedAttachments(): Promise<void> {
|
||||||
|
const currentUserId = await new Promise<string | null>((resolve) => {
|
||||||
|
this.store.select(selectCurrentUserId).pipe(take(1))
|
||||||
|
.subscribe((userId) => resolve(userId));
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.transfer.reannounceHostedAttachments(currentUserId);
|
||||||
|
}
|
||||||
|
|
||||||
private isRoomWatched(roomId: string | null | undefined): boolean {
|
private isRoomWatched(roomId: string | null | undefined): boolean {
|
||||||
return !!roomId && roomId === this.watchedRoomId;
|
return !!roomId && roomId === this.watchedRoomId;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import {
|
|||||||
signal
|
signal
|
||||||
} from '@angular/core';
|
} from '@angular/core';
|
||||||
import { Store } from '@ngrx/store';
|
import { Store } from '@ngrx/store';
|
||||||
|
import { of } from 'rxjs';
|
||||||
|
|
||||||
import { DatabaseService } from '../../../../infrastructure/persistence';
|
import { DatabaseService } from '../../../../infrastructure/persistence';
|
||||||
import { AttachmentStorageService } from '../../infrastructure/services/attachment-storage.service';
|
import { AttachmentStorageService } from '../../infrastructure/services/attachment-storage.service';
|
||||||
@@ -51,6 +52,7 @@ describe('AttachmentPersistenceService', () => {
|
|||||||
savedPath: '/appdata/photo.png'
|
savedPath: '/appdata/photo.png'
|
||||||
}
|
}
|
||||||
])),
|
])),
|
||||||
|
getAttachmentsForMessage: vi.fn(() => Promise.resolve([])),
|
||||||
getMessageById: vi.fn(() => Promise.resolve(null)),
|
getMessageById: vi.fn(() => Promise.resolve(null)),
|
||||||
saveAttachment: vi.fn(() => Promise.resolve()),
|
saveAttachment: vi.fn(() => Promise.resolve()),
|
||||||
deleteAttachmentsForMessage: vi.fn(() => Promise.resolve())
|
deleteAttachmentsForMessage: vi.fn(() => Promise.resolve())
|
||||||
@@ -64,6 +66,9 @@ describe('AttachmentPersistenceService', () => {
|
|||||||
getFileSize: vi.fn(() => Promise.resolve(3)),
|
getFileSize: vi.fn(() => Promise.resolve(3)),
|
||||||
getFileUrl: vi.fn(() => Promise.resolve(null)),
|
getFileUrl: vi.fn(() => Promise.resolve(null)),
|
||||||
canReadFileChunks: vi.fn(() => true),
|
canReadFileChunks: vi.fn(() => true),
|
||||||
|
canCopyFiles: vi.fn(() => true),
|
||||||
|
createWritableFile: vi.fn(async () => '/appdata/server/room/files/setup.exe'),
|
||||||
|
copyFile: vi.fn(async () => true),
|
||||||
providesInlineObjectUrl: vi.fn(() => false)
|
providesInlineObjectUrl: vi.fn(() => false)
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
@@ -75,7 +80,7 @@ describe('AttachmentPersistenceService', () => {
|
|||||||
AttachmentRuntimeStore,
|
AttachmentRuntimeStore,
|
||||||
{ provide: DatabaseService, useValue: database },
|
{ provide: DatabaseService, useValue: database },
|
||||||
{ provide: AttachmentStorageService, useValue: attachmentStorage },
|
{ provide: AttachmentStorageService, useValue: attachmentStorage },
|
||||||
{ provide: Store, useValue: { select: () => ({ pipe: () => ({ subscribe: () => {} }) }) } }
|
{ provide: Store, useValue: { select: () => of('room-1') } }
|
||||||
]
|
]
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -169,4 +174,36 @@ describe('AttachmentPersistenceService', () => {
|
|||||||
expect(attachmentStorage.readFile).not.toHaveBeenCalled();
|
expect(attachmentStorage.readFile).not.toHaveBeenCalled();
|
||||||
expect(attachmentStorage.readFileChunk).not.toHaveBeenCalled();
|
expect(attachmentStorage.readFileChunk).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('copies an external upload path into app data and hydrates generic files without loading a blob', async () => {
|
||||||
|
attachmentStorage.resolveExistingPath
|
||||||
|
.mockResolvedValueOnce(null)
|
||||||
|
.mockResolvedValue('/appdata/server/room/files/setup.exe');
|
||||||
|
|
||||||
|
const service = createService();
|
||||||
|
const attachment = {
|
||||||
|
id: 'att-setup',
|
||||||
|
messageId: 'msg-1',
|
||||||
|
filename: 'setup.exe',
|
||||||
|
size: 628 * 1024 * 1024,
|
||||||
|
mime: 'application/octet-stream',
|
||||||
|
isImage: false,
|
||||||
|
filePath: '/home/ludde/Downloads/setup.exe',
|
||||||
|
available: false
|
||||||
|
};
|
||||||
|
|
||||||
|
await expect(service.ensurePersistedUploadHost(attachment)).resolves.toBe(true);
|
||||||
|
|
||||||
|
expect(attachment.savedPath).toBe('/appdata/server/room/files/setup.exe');
|
||||||
|
expect(attachment.available).toBe(true);
|
||||||
|
expect(attachment.objectUrl).toBeUndefined();
|
||||||
|
expect(attachmentStorage.copyFile).toHaveBeenCalledWith(
|
||||||
|
'/home/ludde/Downloads/setup.exe',
|
||||||
|
'/appdata/server/room/files/setup.exe'
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(attachmentStorage.readFile).not.toHaveBeenCalled();
|
||||||
|
expect(attachmentStorage.readFileChunk).not.toHaveBeenCalled();
|
||||||
|
expect(database.saveAttachment).toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import {
|
|||||||
} from '../../domain/logic/attachment-blob.rules';
|
} from '../../domain/logic/attachment-blob.rules';
|
||||||
import { isBlobObjectUrl, needsBlobObjectUrlForInlineDisplay } from '../../domain/logic/attachment-display-url.rules';
|
import { isBlobObjectUrl, needsBlobObjectUrlForInlineDisplay } from '../../domain/logic/attachment-display-url.rules';
|
||||||
import { mergeAttachmentLocalPaths } from '../../domain/logic/attachment-persistence.rules';
|
import { mergeAttachmentLocalPaths } from '../../domain/logic/attachment-persistence.rules';
|
||||||
|
import { isAttachmentMedia } from '../../domain/logic/attachment.logic';
|
||||||
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
||||||
|
|
||||||
@Injectable({ providedIn: 'root' })
|
@Injectable({ providedIn: 'root' })
|
||||||
@@ -118,7 +119,7 @@ export class AttachmentPersistenceService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async tryRestoreAttachmentFromLocal(attachment: Attachment): Promise<boolean> {
|
async tryRestoreAttachmentFromLocal(attachment: Attachment): Promise<boolean> {
|
||||||
const restored = await this.ensureInlineDisplayObjectUrl(attachment);
|
const restored = await this.ensurePersistedUploadHost(attachment);
|
||||||
|
|
||||||
if (restored) {
|
if (restored) {
|
||||||
attachment.requestError = undefined;
|
attachment.requestError = undefined;
|
||||||
@@ -127,6 +128,41 @@ export class AttachmentPersistenceService {
|
|||||||
return restored;
|
return restored;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async ensurePersistedUploadHost(attachment: Attachment): Promise<boolean> {
|
||||||
|
const existingPath = await this.attachmentStorage.resolveExistingPath(attachment);
|
||||||
|
|
||||||
|
if (existingPath) {
|
||||||
|
return this.hydrateAttachmentFromStoredPath(attachment, existingPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!attachment.filePath?.trim() || !this.attachmentStorage.canCopyFiles()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const savedPath = await this.persistUploadCopyFromSourcePath(attachment, attachment.filePath);
|
||||||
|
|
||||||
|
if (!savedPath) {
|
||||||
|
attachment.filePath = undefined;
|
||||||
|
void this.persistAttachmentMeta(attachment);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.hydrateAttachmentFromStoredPath(attachment, savedPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async hydrateAttachmentFromStoredPath(attachment: Attachment, diskPath: string): Promise<boolean> {
|
||||||
|
attachment.savedPath = diskPath;
|
||||||
|
|
||||||
|
if (isAttachmentMedia(attachment)) {
|
||||||
|
return this.ensureInlineDisplayObjectUrl(attachment);
|
||||||
|
}
|
||||||
|
|
||||||
|
attachment.available = true;
|
||||||
|
void this.persistAttachmentMeta(attachment);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
async ensureInlineDisplayObjectUrl(attachment: Attachment): Promise<boolean> {
|
async ensureInlineDisplayObjectUrl(attachment: Attachment): Promise<boolean> {
|
||||||
if (!needsBlobObjectUrlForInlineDisplay(attachment.objectUrl)) {
|
if (!needsBlobObjectUrlForInlineDisplay(attachment.objectUrl)) {
|
||||||
return true;
|
return true;
|
||||||
|
|||||||
@@ -479,4 +479,61 @@ describe('AttachmentTransferService', () => {
|
|||||||
expect(attachmentStorage.appendBase64).not.toHaveBeenCalled();
|
expect(attachmentStorage.appendBase64).not.toHaveBeenCalled();
|
||||||
expect(persistence.saveFileToDisk).toHaveBeenCalledTimes(1);
|
expect(persistence.saveFileToDisk).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('copies oversized generic uploads with a source path into app data when publishing', async () => {
|
||||||
|
attachmentStorage.canCopyFiles.mockReturnValue(true);
|
||||||
|
attachmentStorage.canPersistSize.mockReturnValue(true);
|
||||||
|
persistence.persistUploadCopyFromSourcePath.mockResolvedValue('/appdata/server/room/files/setup.exe');
|
||||||
|
|
||||||
|
const service = createService();
|
||||||
|
const file = new File([new Uint8Array(11 * 1024 * 1024)], 'setup.exe', { type: 'application/octet-stream' });
|
||||||
|
|
||||||
|
Object.defineProperty(file, 'path', { value: '/home/ludde/setup.exe' });
|
||||||
|
|
||||||
|
await service.publishAttachments(MESSAGE_ID, [file], PEER_ID);
|
||||||
|
|
||||||
|
expect(persistence.persistUploadCopyFromSourcePath).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('streams a restored oversized generic file from app data when the in-memory upload is gone', async () => {
|
||||||
|
attachmentStorage.resolveExistingPath.mockResolvedValue('/appdata/server/room/files/setup.exe');
|
||||||
|
|
||||||
|
const service = createService();
|
||||||
|
const attachment = registerIncomingGenericFile(12 * 1024 * 1024);
|
||||||
|
|
||||||
|
attachment.savedPath = '/appdata/server/room/files/setup.exe';
|
||||||
|
|
||||||
|
await service.handleFileRequest({
|
||||||
|
messageId: MESSAGE_ID,
|
||||||
|
fileId: FILE_ID,
|
||||||
|
fromPeerId: 'peer-2'
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(transport.streamFileFromDiskToPeer).toHaveBeenCalledWith(
|
||||||
|
'peer-2',
|
||||||
|
MESSAGE_ID,
|
||||||
|
FILE_ID,
|
||||||
|
'/appdata/server/room/files/setup.exe',
|
||||||
|
expect.any(Function)
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('re-announces hosted attachments that can still be served from disk', async () => {
|
||||||
|
attachmentStorage.resolveExistingPath.mockResolvedValue('/appdata/server/room/files/setup.exe');
|
||||||
|
|
||||||
|
const service = createService();
|
||||||
|
const attachment = registerIncomingGenericFile(12 * 1024 * 1024);
|
||||||
|
|
||||||
|
attachment.uploaderPeerId = PEER_ID;
|
||||||
|
attachment.savedPath = '/appdata/server/room/files/setup.exe';
|
||||||
|
attachment.available = true;
|
||||||
|
|
||||||
|
await service.reannounceHostedAttachments(PEER_ID);
|
||||||
|
|
||||||
|
expect(webrtc.broadcastMessage).toHaveBeenCalledWith(expect.objectContaining({
|
||||||
|
type: 'file-announce',
|
||||||
|
messageId: MESSAGE_ID,
|
||||||
|
file: expect.objectContaining({ id: FILE_ID })
|
||||||
|
}));
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ import { isSharingFromThisDevice } from '../../domain/logic/attachment-sharing.r
|
|||||||
import {
|
import {
|
||||||
canReceiveAttachment,
|
canReceiveAttachment,
|
||||||
isAttachmentMedia,
|
isAttachmentMedia,
|
||||||
shouldCopyUploaderMediaToAppData,
|
shouldCopyLargeUploaderFileToAppData,
|
||||||
shouldPersistDownloadedAttachment,
|
shouldPersistDownloadedAttachment,
|
||||||
shouldStreamAttachmentReceiveToDisk
|
shouldStreamAttachmentReceiveToDisk
|
||||||
} from '../../domain/logic/attachment.logic';
|
} from '../../domain/logic/attachment.logic';
|
||||||
@@ -764,7 +764,7 @@ export class AttachmentTransferService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldCopyUploaderMediaToAppData(
|
if (shouldCopyLargeUploaderFileToAppData(
|
||||||
attachment,
|
attachment,
|
||||||
attachment.filePath,
|
attachment.filePath,
|
||||||
this.attachmentStorage.canCopyFiles()
|
this.attachmentStorage.canCopyFiles()
|
||||||
@@ -782,6 +782,41 @@ export class AttachmentTransferService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async reannounceHostedAttachments(currentUserId: string | null | undefined): Promise<void> {
|
||||||
|
if (!currentUserId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [, attachments] of this.runtimeStore.getAttachmentEntries()) {
|
||||||
|
for (const attachment of attachments) {
|
||||||
|
if (!isSharingFromThisDevice(attachment, currentUserId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const canServe = await this.attachmentStorage.resolveExistingPath(attachment);
|
||||||
|
|
||||||
|
if (!canServe) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileAnnounceEvent: FileAnnounceEvent = {
|
||||||
|
type: 'file-announce',
|
||||||
|
messageId: attachment.messageId,
|
||||||
|
file: {
|
||||||
|
id: attachment.id,
|
||||||
|
filename: attachment.filename,
|
||||||
|
size: attachment.size,
|
||||||
|
mime: attachment.mime,
|
||||||
|
isImage: attachment.isImage,
|
||||||
|
uploaderPeerId: attachment.uploaderPeerId
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
this.webrtc.broadcastMessage(fileAnnounceEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private async applySavedPathObjectUrl(attachment: Attachment, savedPath: string | null): Promise<void> {
|
private async applySavedPathObjectUrl(attachment: Attachment, savedPath: string | null): Promise<void> {
|
||||||
if (!savedPath) {
|
if (!savedPath) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import {
|
|||||||
getWatchedAttachmentRoomIdFromUrl,
|
getWatchedAttachmentRoomIdFromUrl,
|
||||||
isDirectMessageAttachmentRoomId,
|
isDirectMessageAttachmentRoomId,
|
||||||
shouldCopyUploaderMediaToAppData,
|
shouldCopyUploaderMediaToAppData,
|
||||||
|
shouldCopyLargeUploaderFileToAppData,
|
||||||
shouldStreamAttachmentReceiveToDisk,
|
shouldStreamAttachmentReceiveToDisk,
|
||||||
canReceiveAttachment
|
canReceiveAttachment
|
||||||
} from './attachment.logic';
|
} from './attachment.logic';
|
||||||
@@ -35,6 +36,16 @@ describe('attachment logic', () => {
|
|||||||
}, '/home/ludde/video.mp4', true)).toBe(true);
|
}, '/home/ludde/video.mp4', true)).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('copies any oversized upload with a source path into app data', () => {
|
||||||
|
expect(shouldCopyLargeUploaderFileToAppData({
|
||||||
|
size: 628 * 1024 * 1024
|
||||||
|
}, '/home/ludde/setup.exe', true)).toBe(true);
|
||||||
|
|
||||||
|
expect(shouldCopyLargeUploaderFileToAppData({
|
||||||
|
size: 1024
|
||||||
|
}, '/home/ludde/setup.exe', true)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
it('skips app-data copy for small uploads and missing source paths', () => {
|
it('skips app-data copy for small uploads and missing source paths', () => {
|
||||||
expect(shouldCopyUploaderMediaToAppData({
|
expect(shouldCopyUploaderMediaToAppData({
|
||||||
size: 1024,
|
size: 1024,
|
||||||
|
|||||||
@@ -26,10 +26,18 @@ export function shouldCopyUploaderMediaToAppData(
|
|||||||
attachment: Pick<Attachment, 'size' | 'mime'>,
|
attachment: Pick<Attachment, 'size' | 'mime'>,
|
||||||
sourcePath?: string | null,
|
sourcePath?: string | null,
|
||||||
canCopyFiles = false
|
canCopyFiles = false
|
||||||
|
): boolean {
|
||||||
|
return shouldCopyLargeUploaderFileToAppData(attachment, sourcePath, canCopyFiles) &&
|
||||||
|
(attachment.mime.startsWith('video/') || attachment.mime.startsWith('audio/'));
|
||||||
|
}
|
||||||
|
|
||||||
|
export function shouldCopyLargeUploaderFileToAppData(
|
||||||
|
attachment: Pick<Attachment, 'size'>,
|
||||||
|
sourcePath?: string | null,
|
||||||
|
canCopyFiles = false
|
||||||
): boolean {
|
): boolean {
|
||||||
return canCopyFiles &&
|
return canCopyFiles &&
|
||||||
!!sourcePath &&
|
!!sourcePath?.trim() &&
|
||||||
(attachment.mime.startsWith('video/') || attachment.mime.startsWith('audio/')) &&
|
|
||||||
attachment.size > MAX_AUTO_SAVE_SIZE_BYTES;
|
attachment.size > MAX_AUTO_SAVE_SIZE_BYTES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user