diff --git a/agents-docs/LESSONS.md b/agents-docs/LESSONS.md index 8a2cba5..59d28bb 100644 --- a/agents-docs/LESSONS.md +++ b/agents-docs/LESSONS.md @@ -25,6 +25,13 @@ Durable rules for AI agents working on this project. Read this file at session s ## Lessons +### Never count duplicate chunks toward transfer progress, and never finalize on byte counters [attachments] [webrtc] + +- **Trigger:** P2P attachments arrived corrupt everywhere ("only the first bytes") because concurrent auto-download triggers double-requested a file, the sender streamed it twice, and the receiver counted duplicate chunk deliveries toward `receivedBytes` — inflating it past `size`, which both dropped the remaining chunks (post-Security guard) and passed the `receivedBytes >= size` finalize shortcut over a sparse buffer. +- **Rule:** in chunked transfer receivers, ignore an already-buffered chunk index entirely (no progress update), use dense buffers, and finalize only when every chunk index is present — never use byte totals as an alternative completion signal; dedupe streams on the sender per `(messageId, fileId, peerId)`. +- **Why:** byte counters lie as soon as any duplicate, retry, or concurrent stream exists, and sparse-array `every`/`some` skip holes, so "looks complete" checks silently pass on partial data (same trap as the custom-emoji sparse-array lesson). +- **Example:** `handleFileChunk` / `finalizeTransferIfComplete` in `attachment-transfer.service.ts`; multi-chunk e2e coverage via `expectMessageImageContentSha256` in `e2e/tests/chat/chat-message-features.spec.ts` (single-chunk files cannot catch assembly bugs — test with >64 KiB payloads). + ### Don't bump E2E timeouts for sync flakes - gate on presence and read server logs [testing] [realtime] - **Trigger:** a multi-client chat-sync E2E flaked on "message not visible" and the first instinct was to raise `toBeVisible` timeouts or add waits; the user correctly rejected this ("it's not a timeout issue"). diff --git a/e2e/pages/chat-messages.page.ts b/e2e/pages/chat-messages.page.ts index dca5ad0..a48f744 100644 --- a/e2e/pages/chat-messages.page.ts +++ b/e2e/pages/chat-messages.page.ts @@ -132,6 +132,31 @@ export class ChatMessagesPage { }).toBe(true); } + /** SHA-256 of the bytes currently served by the rendered chat image. */ + async getMessageImageSha256(altText: string): Promise { + const image = this.getMessageImageByAlt(altText); + + return image.evaluate(async (element) => { + const img = element as HTMLImageElement; + const response = await fetch(img.src); + const buffer = await response.arrayBuffer(); + const digest = await crypto.subtle.digest('SHA-256', buffer); + + return [...new Uint8Array(digest)] + .map((byte) => byte.toString(16).padStart(2, '0')) + .join(''); + }); + } + + /** Asserts the rendered chat image is byte-identical to the sent file. */ + async expectMessageImageContentSha256(altText: string, expectedSha256: string): Promise { + await this.expectMessageImageLoaded(altText); + await expect.poll(() => this.getMessageImageSha256(altText), { + timeout: 30_000, + message: `Image ${altText} should be received byte-identical (no truncated/corrupt transfer)` + }).toBe(expectedSha256); + } + getEmbedCardByTitle(title: string): Locator { return this.page.locator('app-chat-link-embed').filter({ has: this.page.getByText(title, { exact: true }) diff --git a/e2e/tests/chat/chat-message-features.spec.ts b/e2e/tests/chat/chat-message-features.spec.ts index 25c0f47..55a58a0 100644 --- a/e2e/tests/chat/chat-message-features.spec.ts +++ b/e2e/tests/chat/chat-message-features.spec.ts @@ -1,3 +1,4 @@ +import { createHash, randomBytes } from 'node:crypto'; import { type Page } from '@playwright/test'; import { test, @@ -182,6 +183,28 @@ test.describe('Chat messaging features', () => { }); }); + test('syncs multi-chunk image attachments byte-identical between users', async ({ createClient }) => { + const scenario = await createChatScenario(createClient); + const imageName = `${uniqueName('photo')}.svg`; + const imageCaption = `Large image upload ${uniqueName('caption')}`; + // Several P2P file chunks (64 KiB each) - regression coverage for transfers + // that previously finalized with only the first chunks received. + const { payload, sha256 } = createMultiChunkImagePayload(imageName); + + await test.step('Alice sends a multi-chunk image attachment', async () => { + await scenario.aliceMessages.attachFiles([payload]); + await scenario.aliceMessages.sendMessage(imageCaption); + + await scenario.aliceMessages.expectMessageImageLoaded(imageName); + await scenario.aliceMessages.expectMessageImageContentSha256(imageName, sha256); + }); + + await test.step('Bob receives the image fully and byte-identical', async () => { + await expect(scenario.bobMessages.getMessageItemByText(imageCaption)).toBeVisible({ timeout: 20_000 }); + await scenario.bobMessages.expectMessageImageContentSha256(imageName, sha256); + }); + }); + test('renders link embeds for shared links', async ({ createClient }) => { const scenario = await createChatScenario(createClient); const messageText = `Useful docs ${MOCK_EMBED_URL}`; @@ -442,6 +465,24 @@ function createTextFilePayload(name: string, mimeType: string, content: string): }; } +function createMultiChunkImagePayload(name: string): { payload: ChatDropFilePayload; sha256: string } { + // ~300 KB of XML-safe noise inside an SVG comment so the file spans + // multiple 64 KiB P2P transfer chunks while remaining a renderable image. + const noise = randomBytes(225_000).toString('base64'); + const markup = buildMockSvgMarkup(name).replace('', ``); + const contentBuffer = Buffer.from(markup, 'utf8'); + + return { + payload: { + name, + mimeType: 'image/svg+xml', + base64: contentBuffer.toString('base64') + }, + sha256: createHash('sha256').update(contentBuffer) + .digest('hex') + }; +} + function buildMockSvgMarkup(label: string): string { return [ '', diff --git a/toju-app/src/app/domains/attachment/README.md b/toju-app/src/app/domains/attachment/README.md index ce4f0b2..a529a20 100644 --- a/toju-app/src/app/domains/attachment/README.md +++ b/toju-app/src/app/domains/attachment/README.md @@ -101,6 +101,14 @@ sequenceDiagram Note over R: shouldPersistDownloadedAttachment? Save to disk ``` +### Transfer integrity invariants + +Concurrent triggers (file-announce, message sync, peer connect) can race to request the same file, and a sender can receive duplicate `file-request`s for the same attachment. The transfer service enforces these invariants so duplicate streams can never corrupt a download (regression: receivers used to finalize after only the first chunks): + +- **Requester:** `requestFromAnyPeer` marks the request pending *synchronously* before any async work, so the manager's `hasPendingRequest` gate closes the double-request race window. +- **Sender:** `handleFileRequest` / `fulfillRequestWithFile` track active outbound streams per `(messageId, fileId, peerId)` and ignore duplicate requests while a stream is in flight. A fresh `file-request` clears any earlier `file-cancel` marker from that peer. +- **Receiver:** chunk buffers are dense (`Array.from({ length: total })`, never sparse `new Array(total)`); a chunk index that is already buffered is ignored entirely and never counts toward `receivedBytes`; a transfer finalizes only when *every* chunk index is present — byte counters are never a substitute for chunk completeness. Assembly state is released only after the attachment is marked `available`, and chunks arriving for an already-available attachment are dropped. + ### Failure handling If the sender cannot find the file, it replies with `file-not-found`. The transfer service then tries the next connected peer that has announced the same attachment. Either side can send `file-cancel` to abort a transfer in progress. diff --git a/toju-app/src/app/domains/attachment/application/services/attachment-runtime.store.ts b/toju-app/src/app/domains/attachment/application/services/attachment-runtime.store.ts index 05bd041..3644b8e 100644 --- a/toju-app/src/app/domains/attachment/application/services/attachment-runtime.store.ts +++ b/toju-app/src/app/domains/attachment/application/services/attachment-runtime.store.ts @@ -10,7 +10,7 @@ export class AttachmentRuntimeStore { private originalFiles = new Map(); private cancelledTransfers = new Set(); private pendingRequests = new Map>(); - private chunkBuffers = new Map(); + private chunkBuffers = new Map(); private chunkCounts = new Map(); touch(): void { @@ -84,6 +84,10 @@ export class AttachmentRuntimeStore { return this.cancelledTransfers.has(key); } + deleteCancelledTransfer(key: string): void { + this.cancelledTransfers.delete(key); + } + setPendingRequestPeers(key: string, peers: Set): void { this.pendingRequests.set(key, peers); } @@ -100,11 +104,11 @@ export class AttachmentRuntimeStore { this.pendingRequests.delete(key); } - setChunkBuffer(key: string, buffer: ArrayBuffer[]): void { + setChunkBuffer(key: string, buffer: (ArrayBuffer | undefined)[]): void { this.chunkBuffers.set(key, buffer); } - getChunkBuffer(key: string): ArrayBuffer[] | undefined { + getChunkBuffer(key: string): (ArrayBuffer | undefined)[] | undefined { return this.chunkBuffers.get(key); } diff --git a/toju-app/src/app/domains/attachment/application/services/attachment-transfer.service.spec.ts b/toju-app/src/app/domains/attachment/application/services/attachment-transfer.service.spec.ts new file mode 100644 index 0000000..de0d87e --- /dev/null +++ b/toju-app/src/app/domains/attachment/application/services/attachment-transfer.service.spec.ts @@ -0,0 +1,354 @@ +import '@angular/compiler'; +import { + beforeEach, + describe, + expect, + it, + vi +} from 'vitest'; +import { Injector, runInInjectionContext } from '@angular/core'; +import { Store } from '@ngrx/store'; +import { of } from 'rxjs'; + +import { RealtimeSessionFacade } from '../../../../core/realtime'; +import { AppI18nService } from '../../../../core/i18n'; +import { arrayBufferToBase64 } from '../../../../shared-kernel'; +import { decodeBase64 } from '../../../../shared-kernel/p2p-transfer.utils'; +import { AttachmentStorageService } from '../../infrastructure/services/attachment-storage.service'; +import type { Attachment } from '../../domain/models/attachment.model'; +import type { FileChunkPayload } from '../../domain/models/attachment-transfer.model'; +import { AttachmentPersistenceService } from './attachment-persistence.service'; +import { AttachmentRuntimeStore } from './attachment-runtime.store'; +import { AttachmentTransferService } from './attachment-transfer.service'; +import { AttachmentTransferTransportService } from './attachment-transfer-transport.service'; + +const MESSAGE_ID = 'msg-1'; +const FILE_ID = 'file-1'; +const PEER_ID = 'peer-1'; + +function encodeChunk(bytes: number[]): string { + return arrayBufferToBase64(new Uint8Array(bytes).buffer); +} + +describe('AttachmentTransferService', () => { + let runtimeStore: AttachmentRuntimeStore; + let persistence: { + whenReady: ReturnType; + persistAttachmentMeta: ReturnType; + tryRestoreAttachmentFromLocal: ReturnType; + saveFileToDisk: ReturnType; + persistUploadCopyFromSourcePath: ReturnType; + resolveCurrentRoomName: ReturnType; + resolveStorageContainerName: ReturnType; + ensureInlineDisplayObjectUrl: ReturnType; + }; + let attachmentStorage: { + canWriteFiles: ReturnType; + canCopyFiles: ReturnType; + canReadFileChunks: ReturnType; + getFileUrl: ReturnType; + resolveExistingPath: ReturnType; + resolveLegacyImagePath: ReturnType; + appendBase64: ReturnType; + createWritableFile: ReturnType; + deleteFile: ReturnType; + }; + let transport: { + decodeBase64: ReturnType; + streamFileToPeer: ReturnType; + streamFileFromDiskToPeer: ReturnType; + }; + let webrtc: { + getConnectedPeers: ReturnType; + broadcastMessage: ReturnType; + sendToPeer: ReturnType; + }; + + beforeEach(() => { + persistence = { + whenReady: vi.fn(async () => undefined), + persistAttachmentMeta: vi.fn(async () => undefined), + tryRestoreAttachmentFromLocal: vi.fn(async () => false), + saveFileToDisk: vi.fn(async () => null), + persistUploadCopyFromSourcePath: vi.fn(async () => null), + resolveCurrentRoomName: vi.fn(async () => null), + resolveStorageContainerName: vi.fn(async () => 'room'), + ensureInlineDisplayObjectUrl: vi.fn(async () => true) + }; + + attachmentStorage = { + canWriteFiles: vi.fn(() => false), + canCopyFiles: vi.fn(() => false), + canReadFileChunks: vi.fn(() => false), + getFileUrl: vi.fn(async () => null), + resolveExistingPath: vi.fn(async () => null), + resolveLegacyImagePath: vi.fn(async () => null), + appendBase64: vi.fn(async () => true), + createWritableFile: vi.fn(async () => '/appdata/server/room/files/file-1'), + deleteFile: vi.fn(async () => true) + }; + + transport = { + decodeBase64: vi.fn((base64: string) => decodeBase64(base64)), + streamFileToPeer: vi.fn(async () => undefined), + streamFileFromDiskToPeer: vi.fn(async () => undefined) + }; + + webrtc = { + getConnectedPeers: vi.fn(() => [PEER_ID]), + broadcastMessage: vi.fn(), + sendToPeer: vi.fn() + }; + }); + + function createService(): AttachmentTransferService { + const injector = Injector.create({ + providers: [ + AttachmentTransferService, + AttachmentRuntimeStore, + { provide: Store, useValue: { select: () => of(null) } }, + { provide: RealtimeSessionFacade, useValue: webrtc }, + { provide: AppI18nService, useValue: { instant: (key: string) => key } }, + { provide: AttachmentStorageService, useValue: attachmentStorage }, + { provide: AttachmentPersistenceService, useValue: persistence }, + { provide: AttachmentTransferTransportService, useValue: transport } + ] + }); + const service = runInInjectionContext(injector, () => injector.get(AttachmentTransferService)); + + runtimeStore = injector.get(AttachmentRuntimeStore); + return service; + } + + function registerIncomingAttachment(size: number): Attachment { + const attachment: Attachment = { + id: FILE_ID, + messageId: MESSAGE_ID, + filename: 'photo.png', + size, + mime: 'image/png', + isImage: true, + uploaderPeerId: PEER_ID, + available: false, + receivedBytes: 0 + }; + + runtimeStore.setAttachmentsForMessage(MESSAGE_ID, [attachment]); + return attachment; + } + + function chunkPayload(index: number, total: number, bytes: number[]): FileChunkPayload { + return { + messageId: MESSAGE_ID, + fileId: FILE_ID, + fromPeerId: PEER_ID, + index, + total, + data: encodeChunk(bytes) + }; + } + + async function readSavedBlobBytes(): Promise { + const lastCall = persistence.saveFileToDisk.mock.calls.at(-1); + const blob = lastCall?.[1] as Blob; + const buffer = await blob.arrayBuffer(); + + return [...new Uint8Array(buffer)]; + } + + it('assembles a multi-chunk in-memory transfer intact when each chunk arrives once', async () => { + const service = createService(); + const attachment = registerIncomingAttachment(9); + + service.handleFileChunk(chunkPayload(0, 3, [ + 1, + 2, + 3 + ])); + + service.handleFileChunk(chunkPayload(1, 3, [ + 4, + 5, + 6 + ])); + + service.handleFileChunk(chunkPayload(2, 3, [ + 7, + 8, + 9 + ])); + + await vi.waitFor(() => expect(attachment.available).toBe(true)); + + expect(persistence.saveFileToDisk).toHaveBeenCalledTimes(1); + await expect(readSavedBlobBytes()).resolves.toEqual([ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9 + ]); + + expect(attachment.receivedBytes).toBe(9); + }); + + it('ignores duplicate chunk deliveries from concurrent streams and assembles the full file', async () => { + const service = createService(); + const attachment = registerIncomingAttachment(9); + + // Two interleaved streams of the same file (duplicate file-request race). + service.handleFileChunk(chunkPayload(0, 3, [ + 1, + 2, + 3 + ])); + + service.handleFileChunk(chunkPayload(0, 3, [ + 1, + 2, + 3 + ])); + + service.handleFileChunk(chunkPayload(1, 3, [ + 4, + 5, + 6 + ])); + + service.handleFileChunk(chunkPayload(1, 3, [ + 4, + 5, + 6 + ])); + + service.handleFileChunk(chunkPayload(2, 3, [ + 7, + 8, + 9 + ])); + + service.handleFileChunk(chunkPayload(2, 3, [ + 7, + 8, + 9 + ])); + + await vi.waitFor(() => expect(attachment.available).toBe(true)); + + expect(persistence.saveFileToDisk).toHaveBeenCalledTimes(1); + await expect(readSavedBlobBytes()).resolves.toEqual([ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9 + ]); + + expect(attachment.receivedBytes).toBe(9); + }); + + it('does not finalize a transfer while chunks are still missing', () => { + const service = createService(); + const attachment = registerIncomingAttachment(9); + + // Duplicates inflate naive byte accounting past the file size while + // chunk 2 has not arrived yet - the transfer must stay incomplete. + service.handleFileChunk(chunkPayload(0, 3, [ + 1, + 2, + 3 + ])); + + service.handleFileChunk(chunkPayload(0, 3, [ + 1, + 2, + 3 + ])); + + service.handleFileChunk(chunkPayload(1, 3, [ + 4, + 5, + 6 + ])); + + service.handleFileChunk(chunkPayload(1, 3, [ + 4, + 5, + 6 + ])); + + expect(attachment.available).toBe(false); + expect(persistence.saveFileToDisk).not.toHaveBeenCalled(); + }); + + it('streams a requested file only once while the same request is already in flight', async () => { + const service = createService(); + + registerIncomingAttachment(9); + runtimeStore.setOriginalFile(`${MESSAGE_ID}:${FILE_ID}`, new File([new Uint8Array(9)], 'photo.png', { type: 'image/png' })); + + let releaseStream: () => void = () => undefined; + + transport.streamFileToPeer.mockImplementation(() => new Promise((resolve) => { + releaseStream = resolve; + })); + + const firstRequest = service.handleFileRequest({ + messageId: MESSAGE_ID, + fileId: FILE_ID, + fromPeerId: PEER_ID + }); + const duplicateRequest = service.handleFileRequest({ + messageId: MESSAGE_ID, + fileId: FILE_ID, + fromPeerId: PEER_ID + }); + + releaseStream(); + await Promise.all([firstRequest, duplicateRequest]); + + expect(transport.streamFileToPeer).toHaveBeenCalledTimes(1); + }); + + it('allows a peer to download again after it cancelled a previous transfer', async () => { + const service = createService(); + + registerIncomingAttachment(9); + runtimeStore.setOriginalFile(`${MESSAGE_ID}:${FILE_ID}`, new File([new Uint8Array(9)], 'photo.png', { type: 'image/png' })); + + service.handleFileCancel({ + messageId: MESSAGE_ID, + fileId: FILE_ID, + fromPeerId: PEER_ID + }); + + await service.handleFileRequest({ + messageId: MESSAGE_ID, + fileId: FILE_ID, + fromPeerId: PEER_ID + }); + + expect(transport.streamFileToPeer).toHaveBeenCalledTimes(1); + + const isCancelled = transport.streamFileToPeer.mock.calls[0][4] as () => boolean; + + expect(isCancelled()).toBe(false); + }); + + it('marks a request as pending synchronously so concurrent auto-download triggers cannot double-request', () => { + const service = createService(); + const attachment = registerIncomingAttachment(9); + + void service.requestFromAnyPeer(MESSAGE_ID, attachment); + + expect(service.hasPendingRequest(MESSAGE_ID, FILE_ID)).toBe(true); + }); +}); diff --git a/toju-app/src/app/domains/attachment/application/services/attachment-transfer.service.ts b/toju-app/src/app/domains/attachment/application/services/attachment-transfer.service.ts index 612986c..38bf551 100644 --- a/toju-app/src/app/domains/attachment/application/services/attachment-transfer.service.ts +++ b/toju-app/src/app/domains/attachment/application/services/attachment-transfer.service.ts @@ -55,6 +55,20 @@ interface ValidFileChunkPayload { total: number; } +function isValidFileChunkPayload(payload: FileChunkPayload): payload is FileChunkPayload & ValidFileChunkPayload { + const { messageId, fileId, index, total, data } = payload; + + return !!messageId && !!fileId && + typeof data === 'string' && + typeof index === 'number' && + typeof total === 'number' && + Number.isInteger(index) && + Number.isInteger(total) && + total > 0 && + index >= 0 && + index < total; +} + @Injectable({ providedIn: 'root' }) export class AttachmentTransferService { private readonly ngrxStore = inject(Store); @@ -67,6 +81,7 @@ export class AttachmentTransferService { private readonly diskReceiveAssemblies = new Map(); private readonly diskReceiveChains = new Map>(); + private readonly activeOutboundTransfers = new Set(); getAttachmentMetasForMessages(messageIds: string[]): Record { const result: Record = {}; @@ -135,12 +150,19 @@ export class AttachmentTransferService { } async requestFromAnyPeer(messageId: string, attachment: Attachment): Promise { + const requestKey = this.buildRequestKey(messageId, attachment.id); const clearedRequestError = this.clearAttachmentRequestError(attachment); + // Mark the request pending synchronously so concurrent triggers (file-announce, + // message sync, peer connect) cannot double-request the same file - a duplicate + // request makes the sender stream the file twice and corrupts byte accounting. + this.runtimeStore.setPendingRequestPeers(requestKey, new Set()); + if (!attachment.available) { const restoredLocally = await this.persistence.tryRestoreAttachmentFromLocal(attachment); if (restoredLocally) { + this.runtimeStore.deletePendingRequest(requestKey); this.runtimeStore.touch(); return; } @@ -153,6 +175,7 @@ export class AttachmentTransferService { attachment.uploaderPeerId === currentUserId; if (connectedPeers.length === 0) { + this.runtimeStore.deletePendingRequest(requestKey); attachment.requestError = isUploader ? this.appI18n.instant(UPLOADER_LOCAL_FILE_MISSING_ERROR_KEY) : this.appI18n.instant(NO_CONNECTED_PEERS_REQUEST_ERROR_KEY); @@ -165,11 +188,6 @@ export class AttachmentTransferService { if (clearedRequestError) this.runtimeStore.touch(); - this.runtimeStore.setPendingRequestPeers( - this.buildRequestKey(messageId, attachment.id), - new Set() - ); - this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId); } @@ -334,28 +352,23 @@ export class AttachmentTransferService { } handleFileChunk(payload: FileChunkPayload): void { - const { messageId, fileId, fromPeerId, index, total, data } = payload; - - if ( - !messageId || !fileId || - typeof index !== 'number' || - typeof total !== 'number' || - typeof data !== 'string' || - !Number.isInteger(index) || - !Number.isInteger(total) || - total <= 0 || - index < 0 || - index >= total - ) { + if (!isValidFileChunkPayload(payload)) { return; } + const { messageId, fileId, fromPeerId, index, total, data } = payload; const list = this.runtimeStore.getAttachmentsForMessage(messageId); const attachment = list.find((entry) => entry.id === fileId); if (!attachment) return; + if (attachment.available) { + // Transfer already completed (or restored locally) - trailing chunks from + // a redundant stream must not restart accounting or rewrite the file. + return; + } + if ((attachment.receivedBytes ?? 0) > attachment.size) { return; } @@ -386,11 +399,14 @@ export class AttachmentTransferService { const chunkBuffer = this.getOrCreateChunkBuffer(assemblyKey, total); - if (!chunkBuffer[index]) { - chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer; - this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1); + if (index >= chunkBuffer.length || chunkBuffer[index]) { + // Duplicate delivery (e.g. a redundant concurrent stream) - the chunk is + // already buffered, so it must not count toward transfer progress. + return; } + chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer; + this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1); this.updateTransferProgress(attachment, decodedBytes, fromPeerId); this.runtimeStore.touch(); @@ -403,6 +419,110 @@ export class AttachmentTransferService { if (!messageId || !fileId || !fromPeerId) return; + const transferKey = this.buildTransferKey(messageId, fileId, fromPeerId); + + if (this.activeOutboundTransfers.has(transferKey)) { + // A stream for this exact request is already in flight - sending the file + // twice in parallel duplicates chunks and corrupts the receiver's assembly. + return; + } + + // A fresh request supersedes any earlier cancellation from this peer. + this.runtimeStore.deleteCancelledTransfer(transferKey); + this.activeOutboundTransfers.add(transferKey); + + try { + await this.streamRequestedFile(messageId, fileId, fromPeerId); + } finally { + this.activeOutboundTransfers.delete(transferKey); + } + } + + cancelRequest(messageId: string, attachment: Attachment): void { + const targetPeerId = attachment.uploaderPeerId; + + if (!targetPeerId) + return; + + try { + const assemblyKey = `${messageId}:${attachment.id}`; + + this.runtimeStore.deleteChunkBuffer(assemblyKey); + this.runtimeStore.deleteChunkCount(assemblyKey); + void this.deleteDiskReceiveAssembly(assemblyKey); + + attachment.receivedBytes = 0; + attachment.speedBps = 0; + attachment.startedAtMs = undefined; + attachment.lastUpdateMs = undefined; + + if (attachment.objectUrl) { + try { + URL.revokeObjectURL(attachment.objectUrl); + } catch { /* ignore */ } + + attachment.objectUrl = undefined; + } + + attachment.available = false; + this.runtimeStore.touch(); + + const fileCancelEvent: FileCancelEvent = { + type: 'file-cancel', + messageId, + fileId: attachment.id + }; + + this.webrtc.sendToPeer(targetPeerId, fileCancelEvent); + } catch { /* best-effort */ } + } + + handleFileCancel(payload: FileCancelPayload): void { + const { messageId, fileId, fromPeerId } = payload; + + if (!messageId || !fileId || !fromPeerId) + return; + + this.runtimeStore.addCancelledTransfer( + this.buildTransferKey(messageId, fileId, fromPeerId) + ); + } + + async fulfillRequestWithFile( + messageId: string, + fileId: string, + targetPeerId: string, + file: File + ): Promise { + this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file); + + const transferKey = this.buildTransferKey(messageId, fileId, targetPeerId); + + if (this.activeOutboundTransfers.has(transferKey)) { + return; + } + + this.runtimeStore.deleteCancelledTransfer(transferKey); + this.activeOutboundTransfers.add(transferKey); + + try { + await this.transport.streamFileToPeer( + targetPeerId, + messageId, + fileId, + file, + () => this.isTransferCancelled(targetPeerId, messageId, fileId) + ); + } finally { + this.activeOutboundTransfers.delete(transferKey); + } + } + + private async streamRequestedFile( + messageId: string, + fileId: string, + fromPeerId: string + ): Promise { const exactKey = `${messageId}:${fileId}`; const originalFile = this.runtimeStore.getOriginalFile(exactKey) ?? this.runtimeStore.findOriginalFileByFileId(fileId); @@ -484,72 +604,6 @@ export class AttachmentTransferService { this.webrtc.sendToPeer(fromPeerId, fileNotFoundEvent); } - cancelRequest(messageId: string, attachment: Attachment): void { - const targetPeerId = attachment.uploaderPeerId; - - if (!targetPeerId) - return; - - try { - const assemblyKey = `${messageId}:${attachment.id}`; - - this.runtimeStore.deleteChunkBuffer(assemblyKey); - this.runtimeStore.deleteChunkCount(assemblyKey); - void this.deleteDiskReceiveAssembly(assemblyKey); - - attachment.receivedBytes = 0; - attachment.speedBps = 0; - attachment.startedAtMs = undefined; - attachment.lastUpdateMs = undefined; - - if (attachment.objectUrl) { - try { - URL.revokeObjectURL(attachment.objectUrl); - } catch { /* ignore */ } - - attachment.objectUrl = undefined; - } - - attachment.available = false; - this.runtimeStore.touch(); - - const fileCancelEvent: FileCancelEvent = { - type: 'file-cancel', - messageId, - fileId: attachment.id - }; - - this.webrtc.sendToPeer(targetPeerId, fileCancelEvent); - } catch { /* best-effort */ } - } - - handleFileCancel(payload: FileCancelPayload): void { - const { messageId, fileId, fromPeerId } = payload; - - if (!messageId || !fileId || !fromPeerId) - return; - - this.runtimeStore.addCancelledTransfer( - this.buildTransferKey(messageId, fileId, fromPeerId) - ); - } - - async fulfillRequestWithFile( - messageId: string, - fileId: string, - targetPeerId: string, - file: File - ): Promise { - this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file); - await this.transport.streamFileToPeer( - targetPeerId, - messageId, - fileId, - file, - () => this.isTransferCancelled(targetPeerId, messageId, fileId) - ); - } - private resolveCurrentUserId(): Promise { return new Promise((resolve) => { this.ngrxStore @@ -617,14 +671,16 @@ export class AttachmentTransferService { return true; } - private getOrCreateChunkBuffer(assemblyKey: string, total: number): ArrayBuffer[] { + private getOrCreateChunkBuffer(assemblyKey: string, total: number): (ArrayBuffer | undefined)[] { const existingChunkBuffer = this.runtimeStore.getChunkBuffer(assemblyKey); if (existingChunkBuffer) { return existingChunkBuffer; } - const createdChunkBuffer = new Array(total); + // Dense initialization - sparse arrays from `new Array(total)` skip holes in + // `every`/`some`, which lets incomplete transfers pass completion checks. + const createdChunkBuffer = Array.from({ length: total }); this.runtimeStore.setChunkBuffer(assemblyKey, createdChunkBuffer); this.runtimeStore.setChunkCount(assemblyKey, 0); @@ -671,18 +727,21 @@ export class AttachmentTransferService { const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0; const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey); - if ( - !completeBuffer - || (receivedChunkCount !== total && (attachment.receivedBytes ?? 0) < attachment.size) - || !completeBuffer.every((part) => part instanceof ArrayBuffer) - ) { + if (!completeBuffer || receivedChunkCount < total) { return; } - const blob = new Blob(completeBuffer, { type: attachment.mime }); + const bufferedChunks = completeBuffer.filter( + (part): part is ArrayBuffer => part instanceof ArrayBuffer + ); - this.runtimeStore.deleteChunkBuffer(assemblyKey); - this.runtimeStore.deleteChunkCount(assemblyKey); + // Every chunk index must be present - byte counters are never a substitute + // for chunk completeness, otherwise partial files finalize as corrupt blobs. + if (bufferedChunks.length !== total || completeBuffer.length !== total) { + return; + } + + const blob = new Blob(bufferedChunks, { type: attachment.mime }); if (shouldPersistDownloadedAttachment(attachment)) { await this.persistence.saveFileToDisk(attachment, blob); @@ -691,6 +750,12 @@ export class AttachmentTransferService { attachment.objectUrl = URL.createObjectURL(blob); attachment.available = true; + + // Release assembly state only after the attachment is marked available so a + // trailing duplicate chunk cannot restart accounting in a fresh buffer. + this.runtimeStore.deleteChunkBuffer(assemblyKey); + this.runtimeStore.deleteChunkCount(assemblyKey); + this.runtimeStore.touch(); void this.persistence.persistAttachmentMeta(attachment); } @@ -758,7 +823,7 @@ export class AttachmentTransferService { this.updateTransferProgress(attachment, decodedBytes, payload.fromPeerId); this.runtimeStore.touch(); - if (assembly.receivedCount < assembly.total && (attachment.receivedBytes ?? 0) < attachment.size) { + if (assembly.receivedCount < assembly.total) { return; }