Remote connection
This commit is contained in:
418
src/app/core/services/attachment.service.ts
Normal file
418
src/app/core/services/attachment.service.ts
Normal file
@@ -0,0 +1,418 @@
|
||||
import { Injectable, inject, signal } from '@angular/core';
|
||||
import { Subject } from 'rxjs';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { WebRTCService } from './webrtc.service';
|
||||
import { Store } from '@ngrx/store';
|
||||
import { selectCurrentRoomName } from '../../store/rooms/rooms.selectors';
|
||||
|
||||
export interface AttachmentMeta {
|
||||
id: string;
|
||||
messageId: string;
|
||||
filename: string;
|
||||
size: number;
|
||||
mime: string;
|
||||
isImage: boolean;
|
||||
uploaderPeerId?: string;
|
||||
filePath?: string; // Electron-only: absolute path to original file
|
||||
}
|
||||
|
||||
export interface Attachment extends AttachmentMeta {
|
||||
available: boolean;
|
||||
objectUrl?: string;
|
||||
receivedBytes?: number;
|
||||
// Runtime-only stats
|
||||
speedBps?: number;
|
||||
startedAtMs?: number;
|
||||
lastUpdateMs?: number;
|
||||
}
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class AttachmentService {
|
||||
private readonly webrtc = inject(WebRTCService);
|
||||
// Injected NgRx store
|
||||
private readonly ngrxStore = inject(Store);
|
||||
private readonly STORAGE_KEY = 'metoyou_attachments';
|
||||
|
||||
// messageId -> attachments
|
||||
private attachmentsByMessage = new Map<string, Attachment[]>();
|
||||
// expose updates if needed
|
||||
updated = signal<number>(0);
|
||||
|
||||
// Keep original files for uploaders to fulfill requests
|
||||
private originals = new Map<string, File>(); // key: messageId:fileId
|
||||
// Notify UI when original is missing and uploader needs to reselect
|
||||
readonly onMissingOriginal = new Subject<{ messageId: string; fileId: string; fromPeerId: string }>();
|
||||
// Track cancelled transfers (uploader side) keyed by messageId:fileId:peerId
|
||||
private cancelledTransfers = new Set<string>();
|
||||
private makeKey(messageId: string, fileId: string, peerId: string): string { return `${messageId}:${fileId}:${peerId}`; }
|
||||
private isCancelled(targetPeerId: string, messageId: string, fileId: string): boolean {
|
||||
return this.cancelledTransfers.has(this.makeKey(messageId, fileId, targetPeerId));
|
||||
}
|
||||
|
||||
constructor() {
|
||||
this.loadPersisted();
|
||||
}
|
||||
|
||||
getForMessage(messageId: string): Attachment[] {
|
||||
return this.attachmentsByMessage.get(messageId) || [];
|
||||
}
|
||||
|
||||
// Publish attachments for a sent message and stream images <=10MB
|
||||
async publishAttachments(messageId: string, files: File[], uploaderPeerId?: string): Promise<void> {
|
||||
const attachments: Attachment[] = [];
|
||||
for (const file of files) {
|
||||
const id = uuidv4();
|
||||
const meta: Attachment = {
|
||||
id,
|
||||
messageId,
|
||||
filename: file.name,
|
||||
size: file.size,
|
||||
mime: file.type || 'application/octet-stream',
|
||||
isImage: file.type.startsWith('image/'),
|
||||
uploaderPeerId,
|
||||
filePath: (file as any)?.path,
|
||||
available: false,
|
||||
};
|
||||
attachments.push(meta);
|
||||
|
||||
// Save original for request-based transfer
|
||||
this.originals.set(`${messageId}:${id}`, file);
|
||||
|
||||
// Ensure uploader sees their own image immediately
|
||||
if (meta.isImage) {
|
||||
try {
|
||||
const url = URL.createObjectURL(file);
|
||||
meta.objectUrl = url;
|
||||
meta.available = true;
|
||||
// Auto-save only for images ≤10MB
|
||||
if (meta.size <= 10 * 1024 * 1024) {
|
||||
void this.saveImageToDisk(meta, file);
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
|
||||
// Announce to peers
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'file-announce',
|
||||
messageId,
|
||||
file: {
|
||||
id,
|
||||
filename: meta.filename,
|
||||
size: meta.size,
|
||||
mime: meta.mime,
|
||||
isImage: meta.isImage,
|
||||
uploaderPeerId,
|
||||
},
|
||||
} as any);
|
||||
|
||||
// Stream image content if small enough (<= 10MB)
|
||||
if (meta.isImage && meta.size <= 10 * 1024 * 1024) {
|
||||
await this.streamFileToPeers(messageId, id, file);
|
||||
}
|
||||
}
|
||||
|
||||
this.attachmentsByMessage.set(messageId, [ ...(this.attachmentsByMessage.get(messageId) || []), ...attachments ]);
|
||||
this.updated.set(this.updated() + 1);
|
||||
this.persist();
|
||||
}
|
||||
|
||||
private async streamFileToPeers(messageId: string, fileId: string, file: File): Promise<void> {
|
||||
const chunkSize = 64 * 1024; // 64KB
|
||||
const totalChunks = Math.ceil(file.size / chunkSize);
|
||||
let offset = 0;
|
||||
let index = 0;
|
||||
while (offset < file.size) {
|
||||
const slice = file.slice(offset, offset + chunkSize);
|
||||
const arrayBuffer = await slice.arrayBuffer();
|
||||
// Convert to base64 for JSON transport
|
||||
const base64 = this.arrayBufferToBase64(arrayBuffer);
|
||||
this.webrtc.broadcastMessage({
|
||||
type: 'file-chunk',
|
||||
messageId,
|
||||
fileId,
|
||||
index,
|
||||
total: totalChunks,
|
||||
data: base64,
|
||||
} as any);
|
||||
offset += chunkSize;
|
||||
index++;
|
||||
}
|
||||
}
|
||||
|
||||
// Incoming events from peers
|
||||
handleFileAnnounce(payload: any): void {
|
||||
const { messageId, file } = payload;
|
||||
if (!messageId || !file) return;
|
||||
const list = this.attachmentsByMessage.get(messageId) || [];
|
||||
const exists = list.find((a: Attachment) => a.id === file.id);
|
||||
if (!exists) {
|
||||
list.push({
|
||||
id: file.id,
|
||||
messageId,
|
||||
filename: file.filename,
|
||||
size: file.size,
|
||||
mime: file.mime,
|
||||
isImage: !!file.isImage,
|
||||
uploaderPeerId: file.uploaderPeerId,
|
||||
available: false,
|
||||
receivedBytes: 0,
|
||||
});
|
||||
this.attachmentsByMessage.set(messageId, list);
|
||||
this.updated.set(this.updated() + 1);
|
||||
this.persist();
|
||||
}
|
||||
}
|
||||
|
||||
handleFileChunk(payload: any): void {
|
||||
const { messageId, fileId, index, total, data } = payload;
|
||||
if (!messageId || !fileId || typeof index !== 'number' || typeof total !== 'number' || !data) return;
|
||||
const list = this.attachmentsByMessage.get(messageId) || [];
|
||||
const att = list.find((a: Attachment) => a.id === fileId);
|
||||
if (!att) return;
|
||||
|
||||
// Decode base64 and append to Blob parts
|
||||
const bytes = this.base64ToUint8Array(data);
|
||||
const partsKey = `${messageId}:${fileId}:parts`;
|
||||
const countKey = `${messageId}:${fileId}:count`;
|
||||
let parts = (this as any)[partsKey] as ArrayBuffer[] | undefined;
|
||||
if (!parts) {
|
||||
parts = new Array(total);
|
||||
(this as any)[partsKey] = parts;
|
||||
(this as any)[countKey] = 0;
|
||||
}
|
||||
if (!parts[index]) {
|
||||
parts[index] = bytes.buffer as ArrayBuffer;
|
||||
(this as any)[countKey] = ((this as any)[countKey] as number) + 1;
|
||||
}
|
||||
const now = Date.now();
|
||||
const prevReceived = att.receivedBytes || 0;
|
||||
att.receivedBytes = prevReceived + bytes.byteLength;
|
||||
if (!att.startedAtMs) att.startedAtMs = now;
|
||||
if (!att.lastUpdateMs) att.lastUpdateMs = now;
|
||||
const deltaMs = Math.max(1, now - att.lastUpdateMs);
|
||||
const instBps = (bytes.byteLength / deltaMs) * 1000;
|
||||
const prevSpeed = att.speedBps || instBps;
|
||||
// EWMA smoothing
|
||||
att.speedBps = 0.7 * prevSpeed + 0.3 * instBps;
|
||||
att.lastUpdateMs = now;
|
||||
// Trigger UI update for real-time progress
|
||||
this.updated.set(this.updated() + 1);
|
||||
|
||||
const receivedCount = (this as any)[countKey] as number;
|
||||
if (receivedCount === total || (att.receivedBytes || 0) >= att.size) {
|
||||
const finalParts = (this as any)[partsKey] as ArrayBuffer[];
|
||||
if (finalParts.every((p) => p instanceof ArrayBuffer)) {
|
||||
const blob = new Blob(finalParts, { type: att.mime });
|
||||
att.available = true;
|
||||
att.objectUrl = URL.createObjectURL(blob);
|
||||
// Auto-save small images to disk under app data: server/<room>/image
|
||||
if (att.isImage && att.size <= 10 * 1024 * 1024) {
|
||||
void this.saveImageToDisk(att, blob);
|
||||
}
|
||||
// Final update
|
||||
delete (this as any)[partsKey];
|
||||
delete (this as any)[countKey];
|
||||
this.updated.set(this.updated() + 1);
|
||||
this.persist();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async saveImageToDisk(att: Attachment, blob: Blob): Promise<void> {
|
||||
try {
|
||||
const w: any = window as any;
|
||||
const appData: string | undefined = await w?.electronAPI?.getAppDataPath?.();
|
||||
if (!appData) return;
|
||||
const roomName = await new Promise<string>((resolve) => {
|
||||
let name = '';
|
||||
const sub = this.ngrxStore.select(selectCurrentRoomName).subscribe((n) => { name = n || ''; resolve(name); sub.unsubscribe(); });
|
||||
});
|
||||
const safeRoom = roomName.replace(/[^\w.-]+/g, '_') || 'room';
|
||||
const dir = `${appData}/server/${safeRoom}/image`;
|
||||
await w.electronAPI.ensureDir(dir);
|
||||
const arrayBuffer = await blob.arrayBuffer();
|
||||
const base64 = this.arrayBufferToBase64(arrayBuffer);
|
||||
const path = `${dir}/${att.filename}`;
|
||||
await w.electronAPI.writeFile(path, base64);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
requestFile(messageId: string, att: Attachment): void {
|
||||
const target = att.uploaderPeerId;
|
||||
if (!target) return;
|
||||
const connected = this.webrtc.getConnectedPeers();
|
||||
if (!connected.includes(target)) {
|
||||
console.warn('Uploader peer not connected:', target);
|
||||
return;
|
||||
}
|
||||
this.webrtc.sendToPeer(target, {
|
||||
type: 'file-request',
|
||||
messageId,
|
||||
fileId: att.id,
|
||||
} as any);
|
||||
}
|
||||
|
||||
// Cancel an in-progress request from the requester side
|
||||
cancelRequest(messageId: string, att: Attachment): void {
|
||||
const target = att.uploaderPeerId;
|
||||
if (!target) return;
|
||||
try {
|
||||
// Reset local assembly state
|
||||
const partsKey = `${messageId}:${att.id}:parts`;
|
||||
const countKey = `${messageId}:${att.id}:count`;
|
||||
delete (this as any)[partsKey];
|
||||
delete (this as any)[countKey];
|
||||
att.receivedBytes = 0;
|
||||
att.speedBps = 0;
|
||||
att.startedAtMs = undefined;
|
||||
att.lastUpdateMs = undefined;
|
||||
if (att.objectUrl) {
|
||||
try { URL.revokeObjectURL(att.objectUrl); } catch {}
|
||||
att.objectUrl = undefined;
|
||||
}
|
||||
att.available = false;
|
||||
this.updated.set(this.updated() + 1);
|
||||
// Notify uploader to stop streaming
|
||||
this.webrtc.sendToPeer(target, {
|
||||
type: 'file-cancel',
|
||||
messageId,
|
||||
fileId: att.id,
|
||||
} as any);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
// When we receive a request and we are the uploader, stream the original file if available
|
||||
async handleFileRequest(payload: any): Promise<void> {
|
||||
const { messageId, fileId, fromPeerId } = payload;
|
||||
if (!messageId || !fileId || !fromPeerId) return;
|
||||
const original = this.originals.get(`${messageId}:${fileId}`);
|
||||
if (original) {
|
||||
await this.streamFileToPeer(fromPeerId, messageId, fileId, original);
|
||||
return;
|
||||
}
|
||||
// Try Electron file path fallback
|
||||
const list = this.attachmentsByMessage.get(messageId) || [];
|
||||
const att = list.find((a: Attachment) => a.id === fileId);
|
||||
const w: any = window as any;
|
||||
if (att?.filePath && w?.electronAPI?.fileExists && w?.electronAPI?.readFile) {
|
||||
try {
|
||||
const exists = await w.electronAPI.fileExists(att.filePath);
|
||||
if (exists) {
|
||||
const base64 = await w.electronAPI.readFile(att.filePath);
|
||||
const bytes = this.base64ToUint8Array(base64);
|
||||
const chunkSize = 64 * 1024;
|
||||
const totalChunks = Math.ceil(bytes.byteLength / chunkSize);
|
||||
for (let i = 0; i < totalChunks; i++) {
|
||||
if (this.isCancelled(fromPeerId, messageId, fileId)) break;
|
||||
const slice = bytes.subarray(i * chunkSize, Math.min(bytes.byteLength, (i + 1) * chunkSize));
|
||||
const slicedBuffer = (slice.buffer as ArrayBuffer).slice(slice.byteOffset, slice.byteOffset + slice.byteLength);
|
||||
const b64 = this.arrayBufferToBase64(slicedBuffer);
|
||||
this.webrtc.sendToPeer(fromPeerId, {
|
||||
type: 'file-chunk',
|
||||
messageId,
|
||||
fileId,
|
||||
index: i,
|
||||
total: totalChunks,
|
||||
data: b64,
|
||||
} as any);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
// Fallback: prompt reselect
|
||||
this.onMissingOriginal.next({ messageId, fileId, fromPeerId });
|
||||
}
|
||||
|
||||
private async streamFileToPeer(targetPeerId: string, messageId: string, fileId: string, file: File): Promise<void> {
|
||||
const chunkSize = 64 * 1024; // 64KB
|
||||
const totalChunks = Math.ceil(file.size / chunkSize);
|
||||
let offset = 0;
|
||||
let index = 0;
|
||||
while (offset < file.size) {
|
||||
if (this.isCancelled(targetPeerId, messageId, fileId)) break;
|
||||
const slice = file.slice(offset, offset + chunkSize);
|
||||
const arrayBuffer = await slice.arrayBuffer();
|
||||
const base64 = this.arrayBufferToBase64(arrayBuffer);
|
||||
await this.webrtc.sendToPeerBuffered(targetPeerId, {
|
||||
type: 'file-chunk',
|
||||
messageId,
|
||||
fileId,
|
||||
index,
|
||||
total: totalChunks,
|
||||
data: base64,
|
||||
} as any);
|
||||
offset += chunkSize;
|
||||
index++;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle cancellation message (uploader side): stop any in-progress stream to requester
|
||||
handleFileCancel(payload: any): void {
|
||||
const { messageId, fileId, fromPeerId } = payload;
|
||||
if (!messageId || !fileId || !fromPeerId) return;
|
||||
this.cancelledTransfers.add(this.makeKey(messageId, fileId, fromPeerId));
|
||||
// Optionally clear original if desired (keep for re-request)
|
||||
}
|
||||
|
||||
// Fulfill a pending request with a user-provided file (uploader side)
|
||||
async fulfillRequestWithFile(messageId: string, fileId: string, targetPeerId: string, file: File): Promise<void> {
|
||||
this.originals.set(`${messageId}:${fileId}`, file);
|
||||
await this.streamFileToPeer(targetPeerId, messageId, fileId, file);
|
||||
}
|
||||
|
||||
private persist(): void {
|
||||
try {
|
||||
const all: Attachment[] = Array.from(this.attachmentsByMessage.values()).flat();
|
||||
const minimal = all.map((a: Attachment) => ({
|
||||
id: a.id,
|
||||
messageId: a.messageId,
|
||||
filename: a.filename,
|
||||
size: a.size,
|
||||
mime: a.mime,
|
||||
isImage: a.isImage,
|
||||
uploaderPeerId: a.uploaderPeerId,
|
||||
filePath: a.filePath,
|
||||
available: false,
|
||||
}));
|
||||
localStorage.setItem(this.STORAGE_KEY, JSON.stringify(minimal));
|
||||
} catch {}
|
||||
}
|
||||
|
||||
private loadPersisted(): void {
|
||||
try {
|
||||
const raw = localStorage.getItem(this.STORAGE_KEY);
|
||||
if (!raw) return;
|
||||
const list: AttachmentMeta[] = JSON.parse(raw);
|
||||
const grouped = new Map<string, Attachment[]>();
|
||||
for (const a of list) {
|
||||
const att: Attachment = { ...a, available: false };
|
||||
const arr = grouped.get(a.messageId) || [];
|
||||
arr.push(att);
|
||||
grouped.set(a.messageId, arr);
|
||||
}
|
||||
this.attachmentsByMessage = grouped;
|
||||
this.updated.set(this.updated() + 1);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
private arrayBufferToBase64(buffer: ArrayBuffer): string {
|
||||
let binary = '';
|
||||
const bytes = new Uint8Array(buffer);
|
||||
const len = bytes.byteLength;
|
||||
for (let i = 0; i < len; i++) {
|
||||
binary += String.fromCharCode(bytes[i]);
|
||||
}
|
||||
return btoa(binary);
|
||||
}
|
||||
|
||||
private base64ToUint8Array(base64: string): Uint8Array {
|
||||
const binary = atob(base64);
|
||||
const len = binary.length;
|
||||
const bytes = new Uint8Array(len);
|
||||
for (let i = 0; i < len; i++) {
|
||||
bytes[i] = binary.charCodeAt(i);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user