Files
Toju/toju-app/src/app/store/users/user-avatar.effects.ts
Myx d0aff6319d
All checks were successful
Queue Release Build / prepare (push) Successful in 25s
Deploy Web Apps / deploy (push) Successful in 7m8s
Queue Release Build / build-windows (push) Successful in 28m10s
Queue Release Build / build-linux (push) Successful in 44m38s
Queue Release Build / build-android (push) Successful in 18m36s
Queue Release Build / finalize (push) Successful in 1m40s
fix: should now sync with other devices
2026-06-09 22:00:39 +02:00

527 lines
16 KiB
TypeScript

/* eslint-disable @typescript-eslint/member-ordering */
import { Injectable, inject } from '@angular/core';
import {
Actions,
createEffect,
ofType
} from '@ngrx/effects';
import { Store, type Action } from '@ngrx/store';
import {
EMPTY,
from,
of
} from 'rxjs';
import {
mergeMap,
tap,
withLatestFrom
} from 'rxjs/operators';
import { ProfileAvatarFacade } from '../../domains/profile-avatar';
import {
P2P_BASE64_CHUNK_SIZE_BYTES,
decodeBase64,
iterateBlobChunks
} from '../../shared-kernel';
import type { ChatEvent, User } from '../../shared-kernel';
import { RealtimeSessionFacade } from '../../core/realtime';
import { pushProfileViaAccountSync as relayProfileViaAccountSync } from '../../infrastructure/realtime/account-sync/account-sync-profile.helper';
import { DatabaseService } from '../../infrastructure/persistence';
import { UsersActions } from './users.actions';
import { selectAllUsers, selectCurrentUser } from './users.selectors';
import { selectCurrentRoom, selectSavedRooms } from '../rooms/rooms.selectors';
import { RoomsActions } from '../rooms/rooms.actions';
import { findRoomMember } from '../rooms/room-members.helpers';
interface PendingAvatarTransfer {
displayName: string;
description?: string;
profileUpdatedAt?: number;
mime?: string;
oderId: string;
total: number;
updatedAt: number;
username: string;
chunks: (string | undefined)[];
hash?: string;
}
type AvatarVersionState = Pick<User, 'avatarUrl' | 'avatarHash' | 'avatarUpdatedAt'> | undefined;
type RoomProfileState = Pick<User,
| 'id'
| 'oderId'
| 'displayName'
| 'description'
| 'profileUpdatedAt'
| 'avatarUrl'
| 'avatarHash'
| 'avatarMime'
| 'avatarUpdatedAt'
>;
function shouldAcceptAvatarPayload(
existingUser: AvatarVersionState,
incomingUpdatedAt: number,
incomingHash?: string
): boolean {
const localUpdatedAt = existingUser?.avatarUpdatedAt ?? 0;
if (incomingUpdatedAt > localUpdatedAt) {
return true;
}
if (incomingUpdatedAt < localUpdatedAt || incomingUpdatedAt === 0) {
return false;
}
if (!existingUser?.avatarUrl) {
return true;
}
return !!incomingHash && incomingHash !== existingUser.avatarHash;
}
function hasSyncableUserData(user: Pick<User, 'avatarUpdatedAt' | 'profileUpdatedAt'> | null | undefined): boolean {
return (user?.avatarUpdatedAt ?? 0) > 0;
}
export function shouldRequestAvatarData(
existingUser: AvatarVersionState,
incomingAvatar: Pick<ChatEvent, 'avatarHash' | 'avatarUpdatedAt' | 'profileUpdatedAt'>
): boolean {
return shouldAcceptAvatarPayload(existingUser, incomingAvatar.avatarUpdatedAt ?? 0, incomingAvatar.avatarHash);
}
export function shouldApplyAvatarTransfer(
existingUser: AvatarVersionState,
transfer: Pick<PendingAvatarTransfer, 'hash' | 'updatedAt'>
): boolean {
return shouldAcceptAvatarPayload(existingUser, transfer.updatedAt, transfer.hash);
}
@Injectable()
export class UserAvatarEffects {
private readonly actions$ = inject(Actions);
private readonly store = inject(Store);
private readonly webrtc = inject(RealtimeSessionFacade);
private readonly db = inject(DatabaseService);
private readonly avatars = inject(ProfileAvatarFacade);
private readonly pendingTransfers = new Map<string, PendingAvatarTransfer>();
persistCurrentAvatar$ = createEffect(
() =>
this.actions$.pipe(
ofType(UsersActions.updateCurrentUserAvatar),
withLatestFrom(this.store.select(selectCurrentUser)),
tap(([, currentUser]) => {
if (currentUser) {
this.db.saveUser(currentUser);
}
})
),
{ dispatch: false }
);
persistRemoteAvatar$ = createEffect(
() =>
this.actions$.pipe(
ofType(UsersActions.upsertRemoteUserAvatar),
withLatestFrom(this.store.select(selectAllUsers)),
tap(([{ user }, allUsers]) => {
const mergedUser = allUsers.find((entry) => entry.id === user.id || entry.oderId === user.oderId);
const userToPersist = mergedUser ?? {
id: user.id,
oderId: user.oderId,
username: user.username,
displayName: user.displayName,
description: user.description,
profileUpdatedAt: user.profileUpdatedAt,
avatarUrl: user.avatarUrl,
avatarHash: user.avatarHash,
avatarMime: user.avatarMime,
avatarUpdatedAt: user.avatarUpdatedAt,
status: 'offline' as const,
role: 'member' as const,
joinedAt: Date.now()
};
this.db.saveUser(userToPersist);
if (!user.avatarUrl) {
return;
}
void this.avatars.persistAvatarDataUrl({
id: userToPersist.id,
username: userToPersist.username,
displayName: userToPersist.displayName
}, user.avatarUrl);
})
),
{ dispatch: false }
);
syncRoomMemberProfiles$ = createEffect(() =>
this.actions$.pipe(
ofType(UsersActions.updateCurrentUserAvatar, UsersActions.updateCurrentUserProfile, UsersActions.upsertRemoteUserAvatar),
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom),
this.store.select(selectSavedRooms)
),
mergeMap(([
action,
currentUser,
currentRoom,
savedRooms
]) => {
const avatarOwner = action.type === UsersActions.upsertRemoteUserAvatar.type
? action.user
: action.type === UsersActions.updateCurrentUserProfile.type
? (currentUser ? {
...currentUser,
...action.profile
} : null)
: (currentUser ? {
...currentUser,
...action.avatar
} : null);
if (!avatarOwner) {
return EMPTY;
}
const actions = this.buildRoomProfileActions(avatarOwner, currentRoom, savedRooms);
return actions.length > 0 ? actions : EMPTY;
})
)
);
broadcastCurrentProfileSummary$ = createEffect(
() =>
this.actions$.pipe(
ofType(UsersActions.updateCurrentUserAvatar, UsersActions.updateCurrentUserProfile),
withLatestFrom(this.store.select(selectCurrentUser)),
tap(([, currentUser]) => {
if (!currentUser || !hasSyncableUserData(currentUser)) {
return;
}
this.webrtc.broadcastMessage(this.buildAvatarSummary(currentUser));
void relayProfileViaAccountSync(this.webrtc, currentUser);
})
),
{ dispatch: false }
);
peerConnectedAvatarSummary$ = createEffect(
() =>
this.webrtc.onPeerConnected.pipe(
withLatestFrom(this.store.select(selectCurrentUser)),
tap(([peerId, currentUser]) => {
if (!currentUser || !hasSyncableUserData(currentUser)) {
return;
}
this.webrtc.sendToPeer(peerId, this.buildAvatarSummary(currentUser));
})
),
{ dispatch: false }
);
incomingAvatarEvents$ = createEffect(() =>
this.webrtc.onMessageReceived.pipe(
withLatestFrom(this.store.select(selectAllUsers), this.store.select(selectCurrentUser)),
mergeMap(([
event,
allUsers,
currentUser
]) => {
switch (event.type) {
case 'user-avatar-summary':
return this.handleAvatarSummary(event, allUsers, currentUser ?? null);
case 'user-avatar-request':
return this.handleAvatarRequest(event, currentUser ?? null);
case 'user-avatar-full':
return this.handleAvatarFull(event, allUsers);
case 'user-avatar-chunk':
return this.handleAvatarChunk(event, allUsers);
default:
return EMPTY;
}
})
)
);
private buildAvatarSummary(user: Pick<User, 'oderId' | 'id' | 'avatarHash' | 'avatarUpdatedAt'>): ChatEvent {
return {
type: 'user-avatar-summary',
oderId: user.oderId || user.id,
avatarHash: user.avatarHash,
avatarUpdatedAt: user.avatarUpdatedAt || 0
};
}
private handleAvatarSummary(event: ChatEvent, allUsers: User[], currentUser: User | null) {
if (!event.fromPeerId || !event.oderId || !event.avatarUpdatedAt) {
return EMPTY;
}
const currentUserKey = currentUser?.oderId || currentUser?.id;
if (currentUserKey && event.oderId === currentUserKey) {
return EMPTY;
}
const existingUser = allUsers.find((user) => user.id === event.oderId || user.oderId === event.oderId);
if (!shouldRequestAvatarData(existingUser, event)) {
return EMPTY;
}
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'user-avatar-request',
oderId: event.oderId
});
return EMPTY;
}
private handleAvatarRequest(event: ChatEvent, currentUser: User | null) {
const currentUserKey = currentUser?.oderId || currentUser?.id;
if (!event.fromPeerId || !currentUser || !currentUserKey || event.oderId !== currentUserKey || !hasSyncableUserData(currentUser)) {
return EMPTY;
}
return from(this.sendAvatarToPeer(event.fromPeerId, currentUser)).pipe(mergeMap(() => EMPTY));
}
private handleAvatarFull(event: ChatEvent, allUsers: User[]) {
if (!event.oderId || typeof event.total !== 'number' || event.total < 0) {
return EMPTY;
}
if (event.total === 0) {
return from(this.buildRemoteAvatarAction({
chunks: [],
displayName: event.displayName || 'User',
description: event.description,
profileUpdatedAt: event.profileUpdatedAt,
mime: event.avatarMime,
oderId: event.oderId,
total: 0,
updatedAt: event.avatarUpdatedAt || 0,
username: event.username || (event.displayName || 'User').toLowerCase().replace(/\s+/g, '_'),
hash: event.avatarHash
}, allUsers)).pipe(
mergeMap((action) => action ? of(action) : EMPTY)
);
}
if (!event.avatarMime) {
return EMPTY;
}
this.pendingTransfers.set(event.oderId, {
chunks: new Array<string | undefined>(event.total),
displayName: event.displayName || 'User',
description: event.description,
profileUpdatedAt: event.profileUpdatedAt,
mime: event.avatarMime,
oderId: event.oderId,
total: event.total,
updatedAt: event.avatarUpdatedAt || Date.now(),
username: event.username || (event.displayName || 'User').toLowerCase().replace(/\s+/g, '_'),
hash: event.avatarHash
});
return EMPTY;
}
private handleAvatarChunk(event: ChatEvent, allUsers: User[]) {
if (!event.oderId || typeof event.index !== 'number' || typeof event.total !== 'number' || typeof event.data !== 'string') {
return EMPTY;
}
const transfer = this.pendingTransfers.get(event.oderId);
if (!transfer || transfer.total !== event.total || event.index < 0 || event.index >= transfer.total) {
return EMPTY;
}
transfer.chunks[event.index] = event.data;
if (transfer.chunks.some((chunk) => !chunk)) {
return EMPTY;
}
this.pendingTransfers.delete(event.oderId);
return from(this.buildRemoteAvatarAction(transfer, allUsers)).pipe(
mergeMap((action) => action ? of(action) : EMPTY)
);
}
private async buildRemoteAvatarAction(
transfer: PendingAvatarTransfer,
allUsers: User[]
): Promise<Action | null> {
const existingUser = allUsers.find(
(user) => user.id === transfer.oderId || user.oderId === transfer.oderId
);
if (!shouldApplyAvatarTransfer(existingUser, transfer)) {
return null;
}
const base64Chunks = transfer.chunks.filter(
(chunk): chunk is string => typeof chunk === 'string'
);
if (transfer.total > 0 && base64Chunks.length !== transfer.total) {
return null;
}
const dataUrl = transfer.total > 0
? await this.readBlobAsDataUrl(new Blob(
base64Chunks.map((chunk) => this.decodeBase64ToArrayBuffer(chunk)),
{ type: transfer.mime || 'image/webp' }
))
: undefined;
return UsersActions.upsertRemoteUserAvatar({
user: {
id: existingUser?.id || transfer.oderId,
oderId: existingUser?.oderId || transfer.oderId,
username: existingUser?.username || transfer.username,
displayName: transfer.displayName || existingUser?.displayName || 'User',
description: transfer.description ?? existingUser?.description,
profileUpdatedAt: transfer.profileUpdatedAt ?? existingUser?.profileUpdatedAt,
avatarUrl: dataUrl,
avatarHash: transfer.hash,
avatarMime: transfer.mime,
avatarUpdatedAt: transfer.updatedAt || undefined
}
});
}
private buildRoomProfileActions(
avatarOwner: RoomProfileState,
currentRoom: ReturnType<typeof selectCurrentRoom['projector']> | null,
savedRooms: ReturnType<typeof selectSavedRooms['projector']>
): Action[] {
const rooms = [currentRoom, ...savedRooms.filter((room) => room.id !== currentRoom?.id)].filter(
(room): room is NonNullable<typeof currentRoom> => !!room
);
const roomActions: Action[] = [];
const avatarOwnerId = avatarOwner.oderId || avatarOwner.id;
for (const room of rooms) {
const member = findRoomMember(room.members ?? [], avatarOwnerId);
if (!member) {
continue;
}
const nextMembers = (room.members ?? []).map((roomMember) => {
if (roomMember.id !== member.id && roomMember.oderId !== member.oderId) {
return roomMember;
}
return {
...roomMember,
displayName: avatarOwner.displayName,
description: avatarOwner.description,
profileUpdatedAt: avatarOwner.profileUpdatedAt,
avatarUrl: avatarOwner.avatarUrl,
avatarHash: avatarOwner.avatarHash,
avatarMime: avatarOwner.avatarMime,
avatarUpdatedAt: avatarOwner.avatarUpdatedAt
};
});
roomActions.push(RoomsActions.updateRoom({
roomId: room.id,
changes: { members: nextMembers }
}));
}
return roomActions;
}
private async sendAvatarToPeer(targetPeerId: string, user: User): Promise<void> {
const userKey = user.oderId || user.id;
const blob = user.avatarUrl
? await this.dataUrlToBlob(user.avatarUrl, user.avatarMime || 'image/webp')
: null;
const total = blob ? Math.ceil(blob.size / P2P_BASE64_CHUNK_SIZE_BYTES) : 0;
this.webrtc.sendToPeer(targetPeerId, {
type: 'user-avatar-full',
oderId: userKey,
username: user.username,
displayName: user.displayName,
avatarHash: user.avatarHash,
avatarMime: blob ? (user.avatarMime || blob.type || 'image/webp') : undefined,
avatarUpdatedAt: user.avatarUpdatedAt || 0,
total
});
if (!blob) {
return;
}
for await (const chunk of iterateBlobChunks(blob, P2P_BASE64_CHUNK_SIZE_BYTES)) {
await this.webrtc.sendToPeerBuffered(targetPeerId, {
type: 'user-avatar-chunk',
oderId: userKey,
avatarHash: user.avatarHash,
avatarMime: user.avatarMime || blob.type || 'image/webp',
avatarUpdatedAt: user.avatarUpdatedAt || Date.now(),
index: chunk.index,
total: chunk.total,
data: chunk.base64
});
}
}
private dataUrlToBlob(dataUrl: string, mimeType: string): Promise<Blob> {
const base64 = dataUrl.split(',', 2)[1] ?? '';
return Promise.resolve(new Blob([this.decodeBase64ToArrayBuffer(base64)], { type: mimeType }));
}
private decodeBase64ToArrayBuffer(base64: string): ArrayBuffer {
const decodedBytes = decodeBase64(base64);
return decodedBytes.buffer.slice(
decodedBytes.byteOffset,
decodedBytes.byteOffset + decodedBytes.byteLength
) as ArrayBuffer;
}
private readBlobAsDataUrl(blob: Blob): Promise<string> {
return new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = () => {
if (typeof reader.result === 'string') {
resolve(reader.result);
return;
}
reject(new Error('Failed to encode avatar image'));
};
reader.onerror = () => reject(reader.error ?? new Error('Failed to read avatar image'));
reader.readAsDataURL(blob);
});
}
}