fix: Bug - Attachments gets syncronized corrupt
This commit is contained in:
@@ -25,6 +25,13 @@ Durable rules for AI agents working on this project. Read this file at session s
|
||||
|
||||
## Lessons
|
||||
|
||||
### Never count duplicate chunks toward transfer progress, and never finalize on byte counters [attachments] [webrtc]
|
||||
|
||||
- **Trigger:** P2P attachments arrived corrupt everywhere ("only the first bytes") because concurrent auto-download triggers double-requested a file, the sender streamed it twice, and the receiver counted duplicate chunk deliveries toward `receivedBytes` — inflating it past `size`, which both dropped the remaining chunks (post-Security guard) and passed the `receivedBytes >= size` finalize shortcut over a sparse buffer.
|
||||
- **Rule:** in chunked transfer receivers, ignore an already-buffered chunk index entirely (no progress update), use dense buffers, and finalize only when every chunk index is present — never use byte totals as an alternative completion signal; dedupe streams on the sender per `(messageId, fileId, peerId)`.
|
||||
- **Why:** byte counters lie as soon as any duplicate, retry, or concurrent stream exists, and sparse-array `every`/`some` skip holes, so "looks complete" checks silently pass on partial data (same trap as the custom-emoji sparse-array lesson).
|
||||
- **Example:** `handleFileChunk` / `finalizeTransferIfComplete` in `attachment-transfer.service.ts`; multi-chunk e2e coverage via `expectMessageImageContentSha256` in `e2e/tests/chat/chat-message-features.spec.ts` (single-chunk files cannot catch assembly bugs — test with >64 KiB payloads).
|
||||
|
||||
### Don't bump E2E timeouts for sync flakes - gate on presence and read server logs [testing] [realtime]
|
||||
|
||||
- **Trigger:** a multi-client chat-sync E2E flaked on "message not visible" and the first instinct was to raise `toBeVisible` timeouts or add waits; the user correctly rejected this ("it's not a timeout issue").
|
||||
|
||||
@@ -132,6 +132,31 @@ export class ChatMessagesPage {
|
||||
}).toBe(true);
|
||||
}
|
||||
|
||||
/** SHA-256 of the bytes currently served by the rendered chat image. */
|
||||
async getMessageImageSha256(altText: string): Promise<string> {
|
||||
const image = this.getMessageImageByAlt(altText);
|
||||
|
||||
return image.evaluate(async (element) => {
|
||||
const img = element as HTMLImageElement;
|
||||
const response = await fetch(img.src);
|
||||
const buffer = await response.arrayBuffer();
|
||||
const digest = await crypto.subtle.digest('SHA-256', buffer);
|
||||
|
||||
return [...new Uint8Array(digest)]
|
||||
.map((byte) => byte.toString(16).padStart(2, '0'))
|
||||
.join('');
|
||||
});
|
||||
}
|
||||
|
||||
/** Asserts the rendered chat image is byte-identical to the sent file. */
|
||||
async expectMessageImageContentSha256(altText: string, expectedSha256: string): Promise<void> {
|
||||
await this.expectMessageImageLoaded(altText);
|
||||
await expect.poll(() => this.getMessageImageSha256(altText), {
|
||||
timeout: 30_000,
|
||||
message: `Image ${altText} should be received byte-identical (no truncated/corrupt transfer)`
|
||||
}).toBe(expectedSha256);
|
||||
}
|
||||
|
||||
getEmbedCardByTitle(title: string): Locator {
|
||||
return this.page.locator('app-chat-link-embed').filter({
|
||||
has: this.page.getByText(title, { exact: true })
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { createHash, randomBytes } from 'node:crypto';
|
||||
import { type Page } from '@playwright/test';
|
||||
import {
|
||||
test,
|
||||
@@ -182,6 +183,28 @@ test.describe('Chat messaging features', () => {
|
||||
});
|
||||
});
|
||||
|
||||
test('syncs multi-chunk image attachments byte-identical between users', async ({ createClient }) => {
|
||||
const scenario = await createChatScenario(createClient);
|
||||
const imageName = `${uniqueName('photo')}.svg`;
|
||||
const imageCaption = `Large image upload ${uniqueName('caption')}`;
|
||||
// Several P2P file chunks (64 KiB each) - regression coverage for transfers
|
||||
// that previously finalized with only the first chunks received.
|
||||
const { payload, sha256 } = createMultiChunkImagePayload(imageName);
|
||||
|
||||
await test.step('Alice sends a multi-chunk image attachment', async () => {
|
||||
await scenario.aliceMessages.attachFiles([payload]);
|
||||
await scenario.aliceMessages.sendMessage(imageCaption);
|
||||
|
||||
await scenario.aliceMessages.expectMessageImageLoaded(imageName);
|
||||
await scenario.aliceMessages.expectMessageImageContentSha256(imageName, sha256);
|
||||
});
|
||||
|
||||
await test.step('Bob receives the image fully and byte-identical', async () => {
|
||||
await expect(scenario.bobMessages.getMessageItemByText(imageCaption)).toBeVisible({ timeout: 20_000 });
|
||||
await scenario.bobMessages.expectMessageImageContentSha256(imageName, sha256);
|
||||
});
|
||||
});
|
||||
|
||||
test('renders link embeds for shared links', async ({ createClient }) => {
|
||||
const scenario = await createChatScenario(createClient);
|
||||
const messageText = `Useful docs ${MOCK_EMBED_URL}`;
|
||||
@@ -442,6 +465,24 @@ function createTextFilePayload(name: string, mimeType: string, content: string):
|
||||
};
|
||||
}
|
||||
|
||||
function createMultiChunkImagePayload(name: string): { payload: ChatDropFilePayload; sha256: string } {
|
||||
// ~300 KB of XML-safe noise inside an SVG comment so the file spans
|
||||
// multiple 64 KiB P2P transfer chunks while remaining a renderable image.
|
||||
const noise = randomBytes(225_000).toString('base64');
|
||||
const markup = buildMockSvgMarkup(name).replace('</svg>', `<!-- ${noise} --></svg>`);
|
||||
const contentBuffer = Buffer.from(markup, 'utf8');
|
||||
|
||||
return {
|
||||
payload: {
|
||||
name,
|
||||
mimeType: 'image/svg+xml',
|
||||
base64: contentBuffer.toString('base64')
|
||||
},
|
||||
sha256: createHash('sha256').update(contentBuffer)
|
||||
.digest('hex')
|
||||
};
|
||||
}
|
||||
|
||||
function buildMockSvgMarkup(label: string): string {
|
||||
return [
|
||||
'<svg xmlns="http://www.w3.org/2000/svg" width="160" height="120" viewBox="0 0 160 120">',
|
||||
|
||||
@@ -101,6 +101,14 @@ sequenceDiagram
|
||||
Note over R: shouldPersistDownloadedAttachment? Save to disk
|
||||
```
|
||||
|
||||
### Transfer integrity invariants
|
||||
|
||||
Concurrent triggers (file-announce, message sync, peer connect) can race to request the same file, and a sender can receive duplicate `file-request`s for the same attachment. The transfer service enforces these invariants so duplicate streams can never corrupt a download (regression: receivers used to finalize after only the first chunks):
|
||||
|
||||
- **Requester:** `requestFromAnyPeer` marks the request pending *synchronously* before any async work, so the manager's `hasPendingRequest` gate closes the double-request race window.
|
||||
- **Sender:** `handleFileRequest` / `fulfillRequestWithFile` track active outbound streams per `(messageId, fileId, peerId)` and ignore duplicate requests while a stream is in flight. A fresh `file-request` clears any earlier `file-cancel` marker from that peer.
|
||||
- **Receiver:** chunk buffers are dense (`Array.from({ length: total })`, never sparse `new Array(total)`); a chunk index that is already buffered is ignored entirely and never counts toward `receivedBytes`; a transfer finalizes only when *every* chunk index is present — byte counters are never a substitute for chunk completeness. Assembly state is released only after the attachment is marked `available`, and chunks arriving for an already-available attachment are dropped.
|
||||
|
||||
### Failure handling
|
||||
|
||||
If the sender cannot find the file, it replies with `file-not-found`. The transfer service then tries the next connected peer that has announced the same attachment. Either side can send `file-cancel` to abort a transfer in progress.
|
||||
|
||||
@@ -10,7 +10,7 @@ export class AttachmentRuntimeStore {
|
||||
private originalFiles = new Map<string, File>();
|
||||
private cancelledTransfers = new Set<string>();
|
||||
private pendingRequests = new Map<string, Set<string>>();
|
||||
private chunkBuffers = new Map<string, ArrayBuffer[]>();
|
||||
private chunkBuffers = new Map<string, (ArrayBuffer | undefined)[]>();
|
||||
private chunkCounts = new Map<string, number>();
|
||||
|
||||
touch(): void {
|
||||
@@ -84,6 +84,10 @@ export class AttachmentRuntimeStore {
|
||||
return this.cancelledTransfers.has(key);
|
||||
}
|
||||
|
||||
deleteCancelledTransfer(key: string): void {
|
||||
this.cancelledTransfers.delete(key);
|
||||
}
|
||||
|
||||
setPendingRequestPeers(key: string, peers: Set<string>): void {
|
||||
this.pendingRequests.set(key, peers);
|
||||
}
|
||||
@@ -100,11 +104,11 @@ export class AttachmentRuntimeStore {
|
||||
this.pendingRequests.delete(key);
|
||||
}
|
||||
|
||||
setChunkBuffer(key: string, buffer: ArrayBuffer[]): void {
|
||||
setChunkBuffer(key: string, buffer: (ArrayBuffer | undefined)[]): void {
|
||||
this.chunkBuffers.set(key, buffer);
|
||||
}
|
||||
|
||||
getChunkBuffer(key: string): ArrayBuffer[] | undefined {
|
||||
getChunkBuffer(key: string): (ArrayBuffer | undefined)[] | undefined {
|
||||
return this.chunkBuffers.get(key);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,354 @@
|
||||
import '@angular/compiler';
|
||||
import {
|
||||
beforeEach,
|
||||
describe,
|
||||
expect,
|
||||
it,
|
||||
vi
|
||||
} from 'vitest';
|
||||
import { Injector, runInInjectionContext } from '@angular/core';
|
||||
import { Store } from '@ngrx/store';
|
||||
import { of } from 'rxjs';
|
||||
|
||||
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
||||
import { AppI18nService } from '../../../../core/i18n';
|
||||
import { arrayBufferToBase64 } from '../../../../shared-kernel';
|
||||
import { decodeBase64 } from '../../../../shared-kernel/p2p-transfer.utils';
|
||||
import { AttachmentStorageService } from '../../infrastructure/services/attachment-storage.service';
|
||||
import type { Attachment } from '../../domain/models/attachment.model';
|
||||
import type { FileChunkPayload } from '../../domain/models/attachment-transfer.model';
|
||||
import { AttachmentPersistenceService } from './attachment-persistence.service';
|
||||
import { AttachmentRuntimeStore } from './attachment-runtime.store';
|
||||
import { AttachmentTransferService } from './attachment-transfer.service';
|
||||
import { AttachmentTransferTransportService } from './attachment-transfer-transport.service';
|
||||
|
||||
const MESSAGE_ID = 'msg-1';
|
||||
const FILE_ID = 'file-1';
|
||||
const PEER_ID = 'peer-1';
|
||||
|
||||
function encodeChunk(bytes: number[]): string {
|
||||
return arrayBufferToBase64(new Uint8Array(bytes).buffer);
|
||||
}
|
||||
|
||||
describe('AttachmentTransferService', () => {
|
||||
let runtimeStore: AttachmentRuntimeStore;
|
||||
let persistence: {
|
||||
whenReady: ReturnType<typeof vi.fn>;
|
||||
persistAttachmentMeta: ReturnType<typeof vi.fn>;
|
||||
tryRestoreAttachmentFromLocal: ReturnType<typeof vi.fn>;
|
||||
saveFileToDisk: ReturnType<typeof vi.fn>;
|
||||
persistUploadCopyFromSourcePath: ReturnType<typeof vi.fn>;
|
||||
resolveCurrentRoomName: ReturnType<typeof vi.fn>;
|
||||
resolveStorageContainerName: ReturnType<typeof vi.fn>;
|
||||
ensureInlineDisplayObjectUrl: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let attachmentStorage: {
|
||||
canWriteFiles: ReturnType<typeof vi.fn>;
|
||||
canCopyFiles: ReturnType<typeof vi.fn>;
|
||||
canReadFileChunks: ReturnType<typeof vi.fn>;
|
||||
getFileUrl: ReturnType<typeof vi.fn>;
|
||||
resolveExistingPath: ReturnType<typeof vi.fn>;
|
||||
resolveLegacyImagePath: ReturnType<typeof vi.fn>;
|
||||
appendBase64: ReturnType<typeof vi.fn>;
|
||||
createWritableFile: ReturnType<typeof vi.fn>;
|
||||
deleteFile: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let transport: {
|
||||
decodeBase64: ReturnType<typeof vi.fn>;
|
||||
streamFileToPeer: ReturnType<typeof vi.fn>;
|
||||
streamFileFromDiskToPeer: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let webrtc: {
|
||||
getConnectedPeers: ReturnType<typeof vi.fn>;
|
||||
broadcastMessage: ReturnType<typeof vi.fn>;
|
||||
sendToPeer: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
persistence = {
|
||||
whenReady: vi.fn(async () => undefined),
|
||||
persistAttachmentMeta: vi.fn(async () => undefined),
|
||||
tryRestoreAttachmentFromLocal: vi.fn(async () => false),
|
||||
saveFileToDisk: vi.fn(async () => null),
|
||||
persistUploadCopyFromSourcePath: vi.fn(async () => null),
|
||||
resolveCurrentRoomName: vi.fn(async () => null),
|
||||
resolveStorageContainerName: vi.fn(async () => 'room'),
|
||||
ensureInlineDisplayObjectUrl: vi.fn(async () => true)
|
||||
};
|
||||
|
||||
attachmentStorage = {
|
||||
canWriteFiles: vi.fn(() => false),
|
||||
canCopyFiles: vi.fn(() => false),
|
||||
canReadFileChunks: vi.fn(() => false),
|
||||
getFileUrl: vi.fn(async () => null),
|
||||
resolveExistingPath: vi.fn(async () => null),
|
||||
resolveLegacyImagePath: vi.fn(async () => null),
|
||||
appendBase64: vi.fn(async () => true),
|
||||
createWritableFile: vi.fn(async () => '/appdata/server/room/files/file-1'),
|
||||
deleteFile: vi.fn(async () => true)
|
||||
};
|
||||
|
||||
transport = {
|
||||
decodeBase64: vi.fn((base64: string) => decodeBase64(base64)),
|
||||
streamFileToPeer: vi.fn(async () => undefined),
|
||||
streamFileFromDiskToPeer: vi.fn(async () => undefined)
|
||||
};
|
||||
|
||||
webrtc = {
|
||||
getConnectedPeers: vi.fn(() => [PEER_ID]),
|
||||
broadcastMessage: vi.fn(),
|
||||
sendToPeer: vi.fn()
|
||||
};
|
||||
});
|
||||
|
||||
function createService(): AttachmentTransferService {
|
||||
const injector = Injector.create({
|
||||
providers: [
|
||||
AttachmentTransferService,
|
||||
AttachmentRuntimeStore,
|
||||
{ provide: Store, useValue: { select: () => of(null) } },
|
||||
{ provide: RealtimeSessionFacade, useValue: webrtc },
|
||||
{ provide: AppI18nService, useValue: { instant: (key: string) => key } },
|
||||
{ provide: AttachmentStorageService, useValue: attachmentStorage },
|
||||
{ provide: AttachmentPersistenceService, useValue: persistence },
|
||||
{ provide: AttachmentTransferTransportService, useValue: transport }
|
||||
]
|
||||
});
|
||||
const service = runInInjectionContext(injector, () => injector.get(AttachmentTransferService));
|
||||
|
||||
runtimeStore = injector.get(AttachmentRuntimeStore);
|
||||
return service;
|
||||
}
|
||||
|
||||
function registerIncomingAttachment(size: number): Attachment {
|
||||
const attachment: Attachment = {
|
||||
id: FILE_ID,
|
||||
messageId: MESSAGE_ID,
|
||||
filename: 'photo.png',
|
||||
size,
|
||||
mime: 'image/png',
|
||||
isImage: true,
|
||||
uploaderPeerId: PEER_ID,
|
||||
available: false,
|
||||
receivedBytes: 0
|
||||
};
|
||||
|
||||
runtimeStore.setAttachmentsForMessage(MESSAGE_ID, [attachment]);
|
||||
return attachment;
|
||||
}
|
||||
|
||||
function chunkPayload(index: number, total: number, bytes: number[]): FileChunkPayload {
|
||||
return {
|
||||
messageId: MESSAGE_ID,
|
||||
fileId: FILE_ID,
|
||||
fromPeerId: PEER_ID,
|
||||
index,
|
||||
total,
|
||||
data: encodeChunk(bytes)
|
||||
};
|
||||
}
|
||||
|
||||
async function readSavedBlobBytes(): Promise<number[]> {
|
||||
const lastCall = persistence.saveFileToDisk.mock.calls.at(-1);
|
||||
const blob = lastCall?.[1] as Blob;
|
||||
const buffer = await blob.arrayBuffer();
|
||||
|
||||
return [...new Uint8Array(buffer)];
|
||||
}
|
||||
|
||||
it('assembles a multi-chunk in-memory transfer intact when each chunk arrives once', async () => {
|
||||
const service = createService();
|
||||
const attachment = registerIncomingAttachment(9);
|
||||
|
||||
service.handleFileChunk(chunkPayload(0, 3, [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(1, 3, [
|
||||
4,
|
||||
5,
|
||||
6
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(2, 3, [
|
||||
7,
|
||||
8,
|
||||
9
|
||||
]));
|
||||
|
||||
await vi.waitFor(() => expect(attachment.available).toBe(true));
|
||||
|
||||
expect(persistence.saveFileToDisk).toHaveBeenCalledTimes(1);
|
||||
await expect(readSavedBlobBytes()).resolves.toEqual([
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
]);
|
||||
|
||||
expect(attachment.receivedBytes).toBe(9);
|
||||
});
|
||||
|
||||
it('ignores duplicate chunk deliveries from concurrent streams and assembles the full file', async () => {
|
||||
const service = createService();
|
||||
const attachment = registerIncomingAttachment(9);
|
||||
|
||||
// Two interleaved streams of the same file (duplicate file-request race).
|
||||
service.handleFileChunk(chunkPayload(0, 3, [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(0, 3, [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(1, 3, [
|
||||
4,
|
||||
5,
|
||||
6
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(1, 3, [
|
||||
4,
|
||||
5,
|
||||
6
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(2, 3, [
|
||||
7,
|
||||
8,
|
||||
9
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(2, 3, [
|
||||
7,
|
||||
8,
|
||||
9
|
||||
]));
|
||||
|
||||
await vi.waitFor(() => expect(attachment.available).toBe(true));
|
||||
|
||||
expect(persistence.saveFileToDisk).toHaveBeenCalledTimes(1);
|
||||
await expect(readSavedBlobBytes()).resolves.toEqual([
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
]);
|
||||
|
||||
expect(attachment.receivedBytes).toBe(9);
|
||||
});
|
||||
|
||||
it('does not finalize a transfer while chunks are still missing', () => {
|
||||
const service = createService();
|
||||
const attachment = registerIncomingAttachment(9);
|
||||
|
||||
// Duplicates inflate naive byte accounting past the file size while
|
||||
// chunk 2 has not arrived yet - the transfer must stay incomplete.
|
||||
service.handleFileChunk(chunkPayload(0, 3, [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(0, 3, [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(1, 3, [
|
||||
4,
|
||||
5,
|
||||
6
|
||||
]));
|
||||
|
||||
service.handleFileChunk(chunkPayload(1, 3, [
|
||||
4,
|
||||
5,
|
||||
6
|
||||
]));
|
||||
|
||||
expect(attachment.available).toBe(false);
|
||||
expect(persistence.saveFileToDisk).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('streams a requested file only once while the same request is already in flight', async () => {
|
||||
const service = createService();
|
||||
|
||||
registerIncomingAttachment(9);
|
||||
runtimeStore.setOriginalFile(`${MESSAGE_ID}:${FILE_ID}`, new File([new Uint8Array(9)], 'photo.png', { type: 'image/png' }));
|
||||
|
||||
let releaseStream: () => void = () => undefined;
|
||||
|
||||
transport.streamFileToPeer.mockImplementation(() => new Promise<void>((resolve) => {
|
||||
releaseStream = resolve;
|
||||
}));
|
||||
|
||||
const firstRequest = service.handleFileRequest({
|
||||
messageId: MESSAGE_ID,
|
||||
fileId: FILE_ID,
|
||||
fromPeerId: PEER_ID
|
||||
});
|
||||
const duplicateRequest = service.handleFileRequest({
|
||||
messageId: MESSAGE_ID,
|
||||
fileId: FILE_ID,
|
||||
fromPeerId: PEER_ID
|
||||
});
|
||||
|
||||
releaseStream();
|
||||
await Promise.all([firstRequest, duplicateRequest]);
|
||||
|
||||
expect(transport.streamFileToPeer).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('allows a peer to download again after it cancelled a previous transfer', async () => {
|
||||
const service = createService();
|
||||
|
||||
registerIncomingAttachment(9);
|
||||
runtimeStore.setOriginalFile(`${MESSAGE_ID}:${FILE_ID}`, new File([new Uint8Array(9)], 'photo.png', { type: 'image/png' }));
|
||||
|
||||
service.handleFileCancel({
|
||||
messageId: MESSAGE_ID,
|
||||
fileId: FILE_ID,
|
||||
fromPeerId: PEER_ID
|
||||
});
|
||||
|
||||
await service.handleFileRequest({
|
||||
messageId: MESSAGE_ID,
|
||||
fileId: FILE_ID,
|
||||
fromPeerId: PEER_ID
|
||||
});
|
||||
|
||||
expect(transport.streamFileToPeer).toHaveBeenCalledTimes(1);
|
||||
|
||||
const isCancelled = transport.streamFileToPeer.mock.calls[0][4] as () => boolean;
|
||||
|
||||
expect(isCancelled()).toBe(false);
|
||||
});
|
||||
|
||||
it('marks a request as pending synchronously so concurrent auto-download triggers cannot double-request', () => {
|
||||
const service = createService();
|
||||
const attachment = registerIncomingAttachment(9);
|
||||
|
||||
void service.requestFromAnyPeer(MESSAGE_ID, attachment);
|
||||
|
||||
expect(service.hasPendingRequest(MESSAGE_ID, FILE_ID)).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -55,6 +55,20 @@ interface ValidFileChunkPayload {
|
||||
total: number;
|
||||
}
|
||||
|
||||
function isValidFileChunkPayload(payload: FileChunkPayload): payload is FileChunkPayload & ValidFileChunkPayload {
|
||||
const { messageId, fileId, index, total, data } = payload;
|
||||
|
||||
return !!messageId && !!fileId &&
|
||||
typeof data === 'string' &&
|
||||
typeof index === 'number' &&
|
||||
typeof total === 'number' &&
|
||||
Number.isInteger(index) &&
|
||||
Number.isInteger(total) &&
|
||||
total > 0 &&
|
||||
index >= 0 &&
|
||||
index < total;
|
||||
}
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentTransferService {
|
||||
private readonly ngrxStore = inject(Store);
|
||||
@@ -67,6 +81,7 @@ export class AttachmentTransferService {
|
||||
|
||||
private readonly diskReceiveAssemblies = new Map<string, DiskReceiveAssembly>();
|
||||
private readonly diskReceiveChains = new Map<string, Promise<void>>();
|
||||
private readonly activeOutboundTransfers = new Set<string>();
|
||||
|
||||
getAttachmentMetasForMessages(messageIds: string[]): Record<string, AttachmentMeta[]> {
|
||||
const result: Record<string, AttachmentMeta[]> = {};
|
||||
@@ -135,12 +150,19 @@ export class AttachmentTransferService {
|
||||
}
|
||||
|
||||
async requestFromAnyPeer(messageId: string, attachment: Attachment): Promise<void> {
|
||||
const requestKey = this.buildRequestKey(messageId, attachment.id);
|
||||
const clearedRequestError = this.clearAttachmentRequestError(attachment);
|
||||
|
||||
// Mark the request pending synchronously so concurrent triggers (file-announce,
|
||||
// message sync, peer connect) cannot double-request the same file - a duplicate
|
||||
// request makes the sender stream the file twice and corrupts byte accounting.
|
||||
this.runtimeStore.setPendingRequestPeers(requestKey, new Set<string>());
|
||||
|
||||
if (!attachment.available) {
|
||||
const restoredLocally = await this.persistence.tryRestoreAttachmentFromLocal(attachment);
|
||||
|
||||
if (restoredLocally) {
|
||||
this.runtimeStore.deletePendingRequest(requestKey);
|
||||
this.runtimeStore.touch();
|
||||
return;
|
||||
}
|
||||
@@ -153,6 +175,7 @@ export class AttachmentTransferService {
|
||||
attachment.uploaderPeerId === currentUserId;
|
||||
|
||||
if (connectedPeers.length === 0) {
|
||||
this.runtimeStore.deletePendingRequest(requestKey);
|
||||
attachment.requestError = isUploader
|
||||
? this.appI18n.instant(UPLOADER_LOCAL_FILE_MISSING_ERROR_KEY)
|
||||
: this.appI18n.instant(NO_CONNECTED_PEERS_REQUEST_ERROR_KEY);
|
||||
@@ -165,11 +188,6 @@ export class AttachmentTransferService {
|
||||
if (clearedRequestError)
|
||||
this.runtimeStore.touch();
|
||||
|
||||
this.runtimeStore.setPendingRequestPeers(
|
||||
this.buildRequestKey(messageId, attachment.id),
|
||||
new Set<string>()
|
||||
);
|
||||
|
||||
this.sendFileRequestToNextPeer(messageId, attachment.id, attachment.uploaderPeerId);
|
||||
}
|
||||
|
||||
@@ -334,28 +352,23 @@ export class AttachmentTransferService {
|
||||
}
|
||||
|
||||
handleFileChunk(payload: FileChunkPayload): void {
|
||||
const { messageId, fileId, fromPeerId, index, total, data } = payload;
|
||||
|
||||
if (
|
||||
!messageId || !fileId ||
|
||||
typeof index !== 'number' ||
|
||||
typeof total !== 'number' ||
|
||||
typeof data !== 'string' ||
|
||||
!Number.isInteger(index) ||
|
||||
!Number.isInteger(total) ||
|
||||
total <= 0 ||
|
||||
index < 0 ||
|
||||
index >= total
|
||||
) {
|
||||
if (!isValidFileChunkPayload(payload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { messageId, fileId, fromPeerId, index, total, data } = payload;
|
||||
const list = this.runtimeStore.getAttachmentsForMessage(messageId);
|
||||
const attachment = list.find((entry) => entry.id === fileId);
|
||||
|
||||
if (!attachment)
|
||||
return;
|
||||
|
||||
if (attachment.available) {
|
||||
// Transfer already completed (or restored locally) - trailing chunks from
|
||||
// a redundant stream must not restart accounting or rewrite the file.
|
||||
return;
|
||||
}
|
||||
|
||||
if ((attachment.receivedBytes ?? 0) > attachment.size) {
|
||||
return;
|
||||
}
|
||||
@@ -386,11 +399,14 @@ export class AttachmentTransferService {
|
||||
|
||||
const chunkBuffer = this.getOrCreateChunkBuffer(assemblyKey, total);
|
||||
|
||||
if (!chunkBuffer[index]) {
|
||||
chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer;
|
||||
this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1);
|
||||
if (index >= chunkBuffer.length || chunkBuffer[index]) {
|
||||
// Duplicate delivery (e.g. a redundant concurrent stream) - the chunk is
|
||||
// already buffered, so it must not count toward transfer progress.
|
||||
return;
|
||||
}
|
||||
|
||||
chunkBuffer[index] = decodedBytes.buffer as ArrayBuffer;
|
||||
this.runtimeStore.setChunkCount(assemblyKey, (this.runtimeStore.getChunkCount(assemblyKey) ?? 0) + 1);
|
||||
this.updateTransferProgress(attachment, decodedBytes, fromPeerId);
|
||||
|
||||
this.runtimeStore.touch();
|
||||
@@ -403,6 +419,110 @@ export class AttachmentTransferService {
|
||||
if (!messageId || !fileId || !fromPeerId)
|
||||
return;
|
||||
|
||||
const transferKey = this.buildTransferKey(messageId, fileId, fromPeerId);
|
||||
|
||||
if (this.activeOutboundTransfers.has(transferKey)) {
|
||||
// A stream for this exact request is already in flight - sending the file
|
||||
// twice in parallel duplicates chunks and corrupts the receiver's assembly.
|
||||
return;
|
||||
}
|
||||
|
||||
// A fresh request supersedes any earlier cancellation from this peer.
|
||||
this.runtimeStore.deleteCancelledTransfer(transferKey);
|
||||
this.activeOutboundTransfers.add(transferKey);
|
||||
|
||||
try {
|
||||
await this.streamRequestedFile(messageId, fileId, fromPeerId);
|
||||
} finally {
|
||||
this.activeOutboundTransfers.delete(transferKey);
|
||||
}
|
||||
}
|
||||
|
||||
cancelRequest(messageId: string, attachment: Attachment): void {
|
||||
const targetPeerId = attachment.uploaderPeerId;
|
||||
|
||||
if (!targetPeerId)
|
||||
return;
|
||||
|
||||
try {
|
||||
const assemblyKey = `${messageId}:${attachment.id}`;
|
||||
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
void this.deleteDiskReceiveAssembly(assemblyKey);
|
||||
|
||||
attachment.receivedBytes = 0;
|
||||
attachment.speedBps = 0;
|
||||
attachment.startedAtMs = undefined;
|
||||
attachment.lastUpdateMs = undefined;
|
||||
|
||||
if (attachment.objectUrl) {
|
||||
try {
|
||||
URL.revokeObjectURL(attachment.objectUrl);
|
||||
} catch { /* ignore */ }
|
||||
|
||||
attachment.objectUrl = undefined;
|
||||
}
|
||||
|
||||
attachment.available = false;
|
||||
this.runtimeStore.touch();
|
||||
|
||||
const fileCancelEvent: FileCancelEvent = {
|
||||
type: 'file-cancel',
|
||||
messageId,
|
||||
fileId: attachment.id
|
||||
};
|
||||
|
||||
this.webrtc.sendToPeer(targetPeerId, fileCancelEvent);
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
|
||||
handleFileCancel(payload: FileCancelPayload): void {
|
||||
const { messageId, fileId, fromPeerId } = payload;
|
||||
|
||||
if (!messageId || !fileId || !fromPeerId)
|
||||
return;
|
||||
|
||||
this.runtimeStore.addCancelledTransfer(
|
||||
this.buildTransferKey(messageId, fileId, fromPeerId)
|
||||
);
|
||||
}
|
||||
|
||||
async fulfillRequestWithFile(
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
targetPeerId: string,
|
||||
file: File
|
||||
): Promise<void> {
|
||||
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
||||
|
||||
const transferKey = this.buildTransferKey(messageId, fileId, targetPeerId);
|
||||
|
||||
if (this.activeOutboundTransfers.has(transferKey)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.runtimeStore.deleteCancelledTransfer(transferKey);
|
||||
this.activeOutboundTransfers.add(transferKey);
|
||||
|
||||
try {
|
||||
await this.transport.streamFileToPeer(
|
||||
targetPeerId,
|
||||
messageId,
|
||||
fileId,
|
||||
file,
|
||||
() => this.isTransferCancelled(targetPeerId, messageId, fileId)
|
||||
);
|
||||
} finally {
|
||||
this.activeOutboundTransfers.delete(transferKey);
|
||||
}
|
||||
}
|
||||
|
||||
private async streamRequestedFile(
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
fromPeerId: string
|
||||
): Promise<void> {
|
||||
const exactKey = `${messageId}:${fileId}`;
|
||||
const originalFile = this.runtimeStore.getOriginalFile(exactKey)
|
||||
?? this.runtimeStore.findOriginalFileByFileId(fileId);
|
||||
@@ -484,72 +604,6 @@ export class AttachmentTransferService {
|
||||
this.webrtc.sendToPeer(fromPeerId, fileNotFoundEvent);
|
||||
}
|
||||
|
||||
cancelRequest(messageId: string, attachment: Attachment): void {
|
||||
const targetPeerId = attachment.uploaderPeerId;
|
||||
|
||||
if (!targetPeerId)
|
||||
return;
|
||||
|
||||
try {
|
||||
const assemblyKey = `${messageId}:${attachment.id}`;
|
||||
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
void this.deleteDiskReceiveAssembly(assemblyKey);
|
||||
|
||||
attachment.receivedBytes = 0;
|
||||
attachment.speedBps = 0;
|
||||
attachment.startedAtMs = undefined;
|
||||
attachment.lastUpdateMs = undefined;
|
||||
|
||||
if (attachment.objectUrl) {
|
||||
try {
|
||||
URL.revokeObjectURL(attachment.objectUrl);
|
||||
} catch { /* ignore */ }
|
||||
|
||||
attachment.objectUrl = undefined;
|
||||
}
|
||||
|
||||
attachment.available = false;
|
||||
this.runtimeStore.touch();
|
||||
|
||||
const fileCancelEvent: FileCancelEvent = {
|
||||
type: 'file-cancel',
|
||||
messageId,
|
||||
fileId: attachment.id
|
||||
};
|
||||
|
||||
this.webrtc.sendToPeer(targetPeerId, fileCancelEvent);
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
|
||||
handleFileCancel(payload: FileCancelPayload): void {
|
||||
const { messageId, fileId, fromPeerId } = payload;
|
||||
|
||||
if (!messageId || !fileId || !fromPeerId)
|
||||
return;
|
||||
|
||||
this.runtimeStore.addCancelledTransfer(
|
||||
this.buildTransferKey(messageId, fileId, fromPeerId)
|
||||
);
|
||||
}
|
||||
|
||||
async fulfillRequestWithFile(
|
||||
messageId: string,
|
||||
fileId: string,
|
||||
targetPeerId: string,
|
||||
file: File
|
||||
): Promise<void> {
|
||||
this.runtimeStore.setOriginalFile(`${messageId}:${fileId}`, file);
|
||||
await this.transport.streamFileToPeer(
|
||||
targetPeerId,
|
||||
messageId,
|
||||
fileId,
|
||||
file,
|
||||
() => this.isTransferCancelled(targetPeerId, messageId, fileId)
|
||||
);
|
||||
}
|
||||
|
||||
private resolveCurrentUserId(): Promise<string | null> {
|
||||
return new Promise<string | null>((resolve) => {
|
||||
this.ngrxStore
|
||||
@@ -617,14 +671,16 @@ export class AttachmentTransferService {
|
||||
return true;
|
||||
}
|
||||
|
||||
private getOrCreateChunkBuffer(assemblyKey: string, total: number): ArrayBuffer[] {
|
||||
private getOrCreateChunkBuffer(assemblyKey: string, total: number): (ArrayBuffer | undefined)[] {
|
||||
const existingChunkBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
||||
|
||||
if (existingChunkBuffer) {
|
||||
return existingChunkBuffer;
|
||||
}
|
||||
|
||||
const createdChunkBuffer = new Array(total);
|
||||
// Dense initialization - sparse arrays from `new Array(total)` skip holes in
|
||||
// `every`/`some`, which lets incomplete transfers pass completion checks.
|
||||
const createdChunkBuffer = Array.from<ArrayBuffer | undefined>({ length: total });
|
||||
|
||||
this.runtimeStore.setChunkBuffer(assemblyKey, createdChunkBuffer);
|
||||
this.runtimeStore.setChunkCount(assemblyKey, 0);
|
||||
@@ -671,18 +727,21 @@ export class AttachmentTransferService {
|
||||
const receivedChunkCount = this.runtimeStore.getChunkCount(assemblyKey) ?? 0;
|
||||
const completeBuffer = this.runtimeStore.getChunkBuffer(assemblyKey);
|
||||
|
||||
if (
|
||||
!completeBuffer
|
||||
|| (receivedChunkCount !== total && (attachment.receivedBytes ?? 0) < attachment.size)
|
||||
|| !completeBuffer.every((part) => part instanceof ArrayBuffer)
|
||||
) {
|
||||
if (!completeBuffer || receivedChunkCount < total) {
|
||||
return;
|
||||
}
|
||||
|
||||
const blob = new Blob(completeBuffer, { type: attachment.mime });
|
||||
const bufferedChunks = completeBuffer.filter(
|
||||
(part): part is ArrayBuffer => part instanceof ArrayBuffer
|
||||
);
|
||||
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
// Every chunk index must be present - byte counters are never a substitute
|
||||
// for chunk completeness, otherwise partial files finalize as corrupt blobs.
|
||||
if (bufferedChunks.length !== total || completeBuffer.length !== total) {
|
||||
return;
|
||||
}
|
||||
|
||||
const blob = new Blob(bufferedChunks, { type: attachment.mime });
|
||||
|
||||
if (shouldPersistDownloadedAttachment(attachment)) {
|
||||
await this.persistence.saveFileToDisk(attachment, blob);
|
||||
@@ -691,6 +750,12 @@ export class AttachmentTransferService {
|
||||
attachment.objectUrl = URL.createObjectURL(blob);
|
||||
|
||||
attachment.available = true;
|
||||
|
||||
// Release assembly state only after the attachment is marked available so a
|
||||
// trailing duplicate chunk cannot restart accounting in a fresh buffer.
|
||||
this.runtimeStore.deleteChunkBuffer(assemblyKey);
|
||||
this.runtimeStore.deleteChunkCount(assemblyKey);
|
||||
|
||||
this.runtimeStore.touch();
|
||||
void this.persistence.persistAttachmentMeta(attachment);
|
||||
}
|
||||
@@ -758,7 +823,7 @@ export class AttachmentTransferService {
|
||||
this.updateTransferProgress(attachment, decodedBytes, payload.fromPeerId);
|
||||
this.runtimeStore.touch();
|
||||
|
||||
if (assembly.receivedCount < assembly.total && (attachment.receivedBytes ?? 0) < attachment.size) {
|
||||
if (assembly.receivedCount < assembly.total) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user