feat: Add pm
This commit is contained in:
@@ -0,0 +1,69 @@
|
||||
import {
|
||||
advanceDirectMessageStatus,
|
||||
createDirectConversation,
|
||||
getDirectConversationId,
|
||||
updateMessageStatusInConversation,
|
||||
upsertDirectMessage
|
||||
} from '../../domain/logic/direct-message.logic';
|
||||
import type {
|
||||
DirectMessage,
|
||||
DirectMessageParticipant
|
||||
} from '../../domain/models/direct-message.model';
|
||||
|
||||
const alice: DirectMessageParticipant = {
|
||||
userId: 'alice',
|
||||
username: 'alice',
|
||||
displayName: 'Alice'
|
||||
};
|
||||
|
||||
const bob: DirectMessageParticipant = {
|
||||
userId: 'bob',
|
||||
username: 'bob',
|
||||
displayName: 'Bob'
|
||||
};
|
||||
|
||||
describe('DirectMessageService domain flow', () => {
|
||||
it('should create conversation', () => {
|
||||
const conversation = createDirectConversation(alice, bob, 10);
|
||||
|
||||
expect(conversation.id).toBe(getDirectConversationId('alice', 'bob'));
|
||||
expect(conversation.participants).toEqual(['alice', 'bob']);
|
||||
expect(conversation.unreadCount).toBe(0);
|
||||
});
|
||||
|
||||
it('should send message', () => {
|
||||
const conversation = createDirectConversation(alice, bob, 10);
|
||||
const queuedMessage = createMessage('message-1', 'QUEUED');
|
||||
const withQueuedMessage = upsertDirectMessage(conversation, queuedMessage, false);
|
||||
const withSentMessage = updateMessageStatusInConversation(withQueuedMessage, queuedMessage.id, 'SENT');
|
||||
|
||||
expect(withSentMessage.messages[0].status).toBe('SENT');
|
||||
});
|
||||
|
||||
it('should queue message when offline', () => {
|
||||
const conversation = createDirectConversation(alice, bob, 10);
|
||||
const queuedMessage = createMessage('message-1', 'QUEUED');
|
||||
const updatedConversation = upsertDirectMessage(conversation, queuedMessage, false);
|
||||
|
||||
expect(updatedConversation.messages[0].status).toBe('QUEUED');
|
||||
});
|
||||
|
||||
it('should update status correctly', () => {
|
||||
expect(advanceDirectMessageStatus('QUEUED', 'SENT')).toBe('SENT');
|
||||
expect(advanceDirectMessageStatus('SENT', 'DELIVERED')).toBe('DELIVERED');
|
||||
expect(advanceDirectMessageStatus('DELIVERED', 'SENT')).toBe('DELIVERED');
|
||||
expect(advanceDirectMessageStatus('DELIVERED', 'ACKNOWLEDGED')).toBe('ACKNOWLEDGED');
|
||||
});
|
||||
});
|
||||
|
||||
function createMessage(id: string, status: DirectMessage['status']): DirectMessage {
|
||||
return {
|
||||
id,
|
||||
conversationId: getDirectConversationId('alice', 'bob'),
|
||||
senderId: 'alice',
|
||||
recipientId: 'bob',
|
||||
content: 'Hello',
|
||||
timestamp: 20,
|
||||
status
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,566 @@
|
||||
/* eslint-disable @typescript-eslint/member-ordering */
|
||||
import {
|
||||
Injectable,
|
||||
computed,
|
||||
effect,
|
||||
inject,
|
||||
signal
|
||||
} from '@angular/core';
|
||||
import { Router } from '@angular/router';
|
||||
import { Store } from '@ngrx/store';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { DirectMessageRepository } from '../../infrastructure/direct-message.repository';
|
||||
import { OfflineMessageQueueService } from './offline-message-queue.service';
|
||||
import { PeerDeliveryService } from './peer-delivery.service';
|
||||
import {
|
||||
advanceDirectMessageStatus,
|
||||
createDirectConversation,
|
||||
getDirectConversationId,
|
||||
updateMessageStatusInConversation,
|
||||
upsertDirectMessage
|
||||
} from '../../domain/logic/direct-message.logic';
|
||||
import {
|
||||
DirectMessage,
|
||||
DirectMessageConversation,
|
||||
DirectMessageEventPayload,
|
||||
DirectMessageMutationEventPayload,
|
||||
DirectMessageStatus,
|
||||
DirectMessageStatusEventPayload,
|
||||
toDirectMessageParticipant
|
||||
} from '../../domain/models/direct-message.model';
|
||||
import type {
|
||||
ChatEvent,
|
||||
Reaction,
|
||||
User
|
||||
} from '../../../../shared-kernel';
|
||||
import { selectCurrentUser } from '../../../../store/users/users.selectors';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class DirectMessageService {
|
||||
private readonly repository = inject(DirectMessageRepository);
|
||||
private readonly offlineQueue = inject(OfflineMessageQueueService);
|
||||
private readonly delivery = inject(PeerDeliveryService);
|
||||
private readonly store = inject(Store);
|
||||
private readonly router = inject(Router);
|
||||
private readonly currentUser = this.store.selectSignal(selectCurrentUser);
|
||||
private readonly conversationsSignal = signal<DirectMessageConversation[]>([]);
|
||||
private readonly selectedConversationIdSignal = signal<string | null>(null);
|
||||
private loadedOwnerId: string | null = null;
|
||||
|
||||
readonly conversations = computed(() => [...this.conversationsSignal()].sort(
|
||||
(firstConversation, secondConversation) => secondConversation.lastMessageAt - firstConversation.lastMessageAt
|
||||
));
|
||||
readonly selectedConversationId = this.selectedConversationIdSignal.asReadonly();
|
||||
readonly selectedConversation = computed(() => {
|
||||
const selectedId = this.selectedConversationIdSignal();
|
||||
|
||||
return selectedId
|
||||
? this.conversationsSignal().find((conversation) => conversation.id === selectedId) ?? null
|
||||
: null;
|
||||
});
|
||||
readonly totalUnreadCount = computed(() => this.conversationsSignal().reduce(
|
||||
(total, conversation) => total + conversation.unreadCount,
|
||||
0
|
||||
));
|
||||
|
||||
constructor() {
|
||||
effect(() => {
|
||||
const ownerId = this.getCurrentUserId();
|
||||
|
||||
void this.loadForOwner(ownerId);
|
||||
});
|
||||
|
||||
this.delivery.directMessageEvents$.subscribe((event) => {
|
||||
void this.handlePeerEvent(event);
|
||||
});
|
||||
|
||||
this.delivery.peerConnected$.subscribe(() => {
|
||||
void this.retryPending();
|
||||
});
|
||||
|
||||
this.delivery.networkRestored$.subscribe(() => {
|
||||
void this.retryPending();
|
||||
});
|
||||
}
|
||||
|
||||
async createConversation(user: User): Promise<DirectMessageConversation> {
|
||||
const currentUser = this.requireCurrentUser();
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
|
||||
await this.loadForOwner(ownerId);
|
||||
|
||||
const currentParticipant = toDirectMessageParticipant(currentUser);
|
||||
const peerParticipant = toDirectMessageParticipant(user);
|
||||
const conversationId = getDirectConversationId(currentParticipant.userId, peerParticipant.userId);
|
||||
const existingConversation = this.conversationsSignal().find((conversation) => conversation.id === conversationId);
|
||||
|
||||
if (existingConversation) {
|
||||
this.selectedConversationIdSignal.set(existingConversation.id);
|
||||
return existingConversation;
|
||||
}
|
||||
|
||||
const conversation = createDirectConversation(currentParticipant, peerParticipant, Date.now());
|
||||
|
||||
await this.persistConversation(ownerId, conversation);
|
||||
this.selectedConversationIdSignal.set(conversation.id);
|
||||
return conversation;
|
||||
}
|
||||
|
||||
async openConversation(conversationId: string): Promise<void> {
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
|
||||
await this.loadForOwner(ownerId);
|
||||
this.selectedConversationIdSignal.set(conversationId);
|
||||
await this.markRead(conversationId);
|
||||
}
|
||||
|
||||
closeConversationView(conversationId?: string | null): void {
|
||||
if (!conversationId || this.selectedConversationIdSignal() === conversationId) {
|
||||
this.selectedConversationIdSignal.set(null);
|
||||
}
|
||||
}
|
||||
|
||||
async forgetConversation(conversationId: string): Promise<void> {
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
const conversation = await this.repository.getConversation(ownerId, conversationId);
|
||||
|
||||
if (!conversation) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.repository.deleteConversation(ownerId, conversationId);
|
||||
|
||||
for (const message of conversation.messages) {
|
||||
await this.offlineQueue.markDelivered(ownerId, message.id);
|
||||
}
|
||||
|
||||
this.conversationsSignal.update((conversations) => conversations.filter((entry) => entry.id !== conversationId));
|
||||
|
||||
if (this.selectedConversationIdSignal() === conversationId) {
|
||||
this.selectedConversationIdSignal.set(null);
|
||||
}
|
||||
}
|
||||
|
||||
async sendMessage(conversationId: string, content: string, replyToId?: string): Promise<DirectMessage> {
|
||||
const normalizedContent = content.trim();
|
||||
|
||||
if (!normalizedContent) {
|
||||
throw new Error('Cannot send an empty direct message.');
|
||||
}
|
||||
|
||||
const currentUser = this.requireCurrentUser();
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
const conversation = await this.requireConversation(ownerId, conversationId);
|
||||
const senderId = currentUser.oderId || currentUser.id;
|
||||
const recipientId = conversation.participants.find((participantId) => participantId !== senderId);
|
||||
|
||||
if (!recipientId) {
|
||||
throw new Error('Direct message conversation has no recipient.');
|
||||
}
|
||||
|
||||
const message: DirectMessage = {
|
||||
id: uuidv4(),
|
||||
conversationId,
|
||||
senderId,
|
||||
recipientId,
|
||||
content: normalizedContent,
|
||||
timestamp: Date.now(),
|
||||
status: 'QUEUED',
|
||||
reactions: [],
|
||||
isDeleted: false,
|
||||
replyToId
|
||||
};
|
||||
|
||||
await this.persistConversation(ownerId, upsertDirectMessage(conversation, message, false));
|
||||
await this.attemptDelivery(ownerId, message);
|
||||
return message;
|
||||
}
|
||||
|
||||
async editMessage(conversationId: string, messageId: string, content: string): Promise<void> {
|
||||
const normalizedContent = content.trim();
|
||||
|
||||
if (!normalizedContent) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.applyAndSendMutation(conversationId, {
|
||||
conversationId,
|
||||
messageId,
|
||||
type: 'edit',
|
||||
content: normalizedContent,
|
||||
editedAt: Date.now(),
|
||||
updatedAt: Date.now()
|
||||
});
|
||||
}
|
||||
|
||||
async deleteMessage(conversationId: string, messageId: string): Promise<void> {
|
||||
await this.applyAndSendMutation(conversationId, {
|
||||
conversationId,
|
||||
messageId,
|
||||
type: 'delete',
|
||||
updatedAt: Date.now()
|
||||
});
|
||||
}
|
||||
|
||||
async addReaction(conversationId: string, messageId: string, emoji: string): Promise<void> {
|
||||
const userId = this.getCurrentUserIdOrThrow();
|
||||
const reaction: Reaction = {
|
||||
id: uuidv4(),
|
||||
messageId,
|
||||
oderId: userId,
|
||||
userId,
|
||||
emoji,
|
||||
timestamp: Date.now()
|
||||
};
|
||||
|
||||
await this.applyAndSendMutation(conversationId, {
|
||||
conversationId,
|
||||
messageId,
|
||||
type: 'reaction-add',
|
||||
reaction,
|
||||
updatedAt: reaction.timestamp
|
||||
});
|
||||
}
|
||||
|
||||
async toggleReaction(conversationId: string, messageId: string, emoji: string): Promise<void> {
|
||||
const userId = this.getCurrentUserIdOrThrow();
|
||||
const conversation = await this.requireConversation(userId, conversationId);
|
||||
const message = conversation.messages.find((entry) => entry.id === messageId);
|
||||
const existingReaction = message?.reactions?.find((reaction) =>
|
||||
reaction.emoji === emoji && (reaction.userId === userId || reaction.oderId === userId)
|
||||
);
|
||||
|
||||
if (existingReaction) {
|
||||
await this.applyAndSendMutation(conversationId, {
|
||||
conversationId,
|
||||
messageId,
|
||||
type: 'reaction-remove',
|
||||
oderId: userId,
|
||||
emoji,
|
||||
updatedAt: Date.now()
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.addReaction(conversationId, messageId, emoji);
|
||||
}
|
||||
|
||||
requestPeerAvatarSync(conversationId: string): void {
|
||||
const currentUserId = this.getCurrentUserId();
|
||||
const conversation = this.conversationsSignal().find((entry) => entry.id === conversationId);
|
||||
const peerId = conversation?.participants.find((participantId) => participantId !== currentUserId);
|
||||
|
||||
if (peerId) {
|
||||
this.delivery.requestUserAvatar(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
currentUserId(): string | null {
|
||||
return this.getCurrentUserId();
|
||||
}
|
||||
|
||||
async updateStatus(messageId: string, status: DirectMessageStatus): Promise<void> {
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
const conversation = this.conversationsSignal().find((entry) => entry.messages.some((message) => message.id === messageId));
|
||||
|
||||
if (!conversation) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.persistConversation(ownerId, updateMessageStatusInConversation(conversation, messageId, status));
|
||||
}
|
||||
|
||||
async receiveMessage(message: DirectMessage, sender: User): Promise<void> {
|
||||
await this.handleIncomingMessage({
|
||||
message,
|
||||
sender: toDirectMessageParticipant(sender)
|
||||
});
|
||||
}
|
||||
|
||||
async markRead(conversationId: string): Promise<void> {
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
const currentUserId = this.getCurrentUserIdOrThrow();
|
||||
const conversation = await this.requireConversation(ownerId, conversationId);
|
||||
const updatedConversation = { ...conversation, unreadCount: 0 };
|
||||
|
||||
await this.persistConversation(ownerId, updatedConversation);
|
||||
await this.repository.markRead(ownerId, conversationId);
|
||||
|
||||
for (const message of updatedConversation.messages) {
|
||||
if (message.recipientId !== currentUserId || message.status === 'ACKNOWLEDGED') {
|
||||
continue;
|
||||
}
|
||||
|
||||
const nextStatus = advanceDirectMessageStatus(message.status, 'ACKNOWLEDGED');
|
||||
|
||||
if (nextStatus !== message.status) {
|
||||
await this.persistConversation(ownerId, updateMessageStatusInConversation(updatedConversation, message.id, nextStatus));
|
||||
}
|
||||
|
||||
this.sendStatusUpdate(message.senderId, {
|
||||
conversationId,
|
||||
messageId: message.id,
|
||||
status: 'ACKNOWLEDGED',
|
||||
updatedAt: Date.now()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async retryPending(): Promise<void> {
|
||||
const ownerId = this.getCurrentUserId();
|
||||
|
||||
if (!ownerId) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.loadForOwner(ownerId);
|
||||
|
||||
const pendingMessageIds = await this.offlineQueue.retryPending(ownerId);
|
||||
const messages = this.conversationsSignal().flatMap((conversation) => conversation.messages);
|
||||
|
||||
for (const messageId of pendingMessageIds) {
|
||||
const message = messages.find((entry) => entry.id === messageId);
|
||||
|
||||
if (message) {
|
||||
await this.attemptDelivery(ownerId, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async handlePeerEvent(event: ChatEvent): Promise<void> {
|
||||
if (event.type === 'direct-message' && event.directMessage) {
|
||||
await this.handleIncomingMessage(event.directMessage);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type === 'direct-message-status' && event.directMessageStatus) {
|
||||
await this.handleIncomingStatus(event.directMessageStatus);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type === 'direct-message-mutation' && event.directMessageMutation) {
|
||||
await this.handleIncomingMutation(event.directMessageMutation);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleIncomingMessage(payload: DirectMessageEventPayload): Promise<void> {
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
const currentUser = this.requireCurrentUser();
|
||||
const currentParticipant = toDirectMessageParticipant(currentUser);
|
||||
const sender = payload.sender;
|
||||
const conversationId = payload.message.conversationId
|
||||
|| getDirectConversationId(currentParticipant.userId, sender.userId);
|
||||
const existingConversation = this.conversationsSignal().find((conversation) => conversation.id === conversationId)
|
||||
?? createDirectConversation(currentParticipant, sender, payload.message.timestamp);
|
||||
const incomingMessage: DirectMessage = {
|
||||
...payload.message,
|
||||
conversationId,
|
||||
status: advanceDirectMessageStatus(payload.message.status, 'DELIVERED')
|
||||
};
|
||||
const shouldIncrementUnread = !this.isConversationVisible(conversationId);
|
||||
|
||||
await this.persistConversation(ownerId, upsertDirectMessage(existingConversation, incomingMessage, shouldIncrementUnread));
|
||||
this.sendStatusUpdate(incomingMessage.senderId, {
|
||||
conversationId,
|
||||
messageId: incomingMessage.id,
|
||||
status: 'DELIVERED',
|
||||
updatedAt: Date.now()
|
||||
});
|
||||
|
||||
if (!shouldIncrementUnread) {
|
||||
await this.markRead(conversationId);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleIncomingStatus(payload: DirectMessageStatusEventPayload): Promise<void> {
|
||||
await this.updateStatus(payload.messageId, payload.status);
|
||||
|
||||
if (payload.status === 'DELIVERED' || payload.status === 'ACKNOWLEDGED') {
|
||||
await this.offlineQueue.markDelivered(this.getCurrentUserIdOrThrow(), payload.messageId);
|
||||
}
|
||||
}
|
||||
|
||||
private isConversationVisible(conversationId: string): boolean {
|
||||
const currentUrl = this.router.url.split(/[?#]/, 1)[0];
|
||||
|
||||
if (!currentUrl.startsWith('/dm/')) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
return decodeURIComponent(currentUrl.slice('/dm/'.length)) === conversationId;
|
||||
} catch {
|
||||
return currentUrl.slice('/dm/'.length) === conversationId;
|
||||
}
|
||||
}
|
||||
|
||||
private async handleIncomingMutation(payload: DirectMessageMutationEventPayload): Promise<void> {
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
const conversation = await this.requireConversation(ownerId, payload.conversationId);
|
||||
|
||||
await this.persistConversation(ownerId, this.applyMutation(conversation, payload));
|
||||
}
|
||||
|
||||
private async applyAndSendMutation(
|
||||
conversationId: string,
|
||||
payload: DirectMessageMutationEventPayload
|
||||
): Promise<void> {
|
||||
const ownerId = this.getCurrentUserIdOrThrow();
|
||||
const conversation = await this.requireConversation(ownerId, conversationId);
|
||||
const updatedConversation = this.applyMutation(conversation, payload);
|
||||
const recipientId = conversation.participants.find((participantId) => participantId !== ownerId);
|
||||
|
||||
await this.persistConversation(ownerId, updatedConversation);
|
||||
|
||||
if (recipientId) {
|
||||
this.delivery.sendViaWebRTC(recipientId, {
|
||||
type: 'direct-message-mutation',
|
||||
directMessageMutation: payload
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private applyMutation(
|
||||
conversation: DirectMessageConversation,
|
||||
payload: DirectMessageMutationEventPayload
|
||||
): DirectMessageConversation {
|
||||
const messages = conversation.messages.map((message) => {
|
||||
if (message.id !== payload.messageId) {
|
||||
return message;
|
||||
}
|
||||
|
||||
if (payload.type === 'edit' && payload.content) {
|
||||
return {
|
||||
...message,
|
||||
content: payload.content,
|
||||
editedAt: payload.editedAt ?? payload.updatedAt,
|
||||
isDeleted: false
|
||||
};
|
||||
}
|
||||
|
||||
if (payload.type === 'delete') {
|
||||
return {
|
||||
...message,
|
||||
content: '',
|
||||
isDeleted: true,
|
||||
editedAt: payload.updatedAt
|
||||
};
|
||||
}
|
||||
|
||||
if (payload.type === 'reaction-add' && payload.reaction) {
|
||||
const reactions = (message.reactions ?? []).filter((reaction) =>
|
||||
!(reaction.emoji === payload.reaction?.emoji && reaction.userId === payload.reaction.userId)
|
||||
);
|
||||
|
||||
return {
|
||||
...message,
|
||||
reactions: [...reactions, payload.reaction]
|
||||
};
|
||||
}
|
||||
|
||||
if (payload.type === 'reaction-remove' && payload.oderId && payload.emoji) {
|
||||
return {
|
||||
...message,
|
||||
reactions: (message.reactions ?? []).filter((reaction) =>
|
||||
!(reaction.emoji === payload.emoji && (reaction.userId === payload.oderId || reaction.oderId === payload.oderId))
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
return message;
|
||||
});
|
||||
|
||||
return { ...conversation, messages };
|
||||
}
|
||||
|
||||
private async attemptDelivery(ownerId: string, message: DirectMessage): Promise<void> {
|
||||
const currentUser = this.requireCurrentUser();
|
||||
const sent = this.delivery.sendViaWebRTC(message.recipientId, {
|
||||
type: 'direct-message',
|
||||
directMessage: {
|
||||
message,
|
||||
sender: toDirectMessageParticipant(currentUser)
|
||||
}
|
||||
});
|
||||
|
||||
if (!sent) {
|
||||
await this.offlineQueue.enqueue(ownerId, message.id);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.offlineQueue.markDelivered(ownerId, message.id);
|
||||
await this.updateStatus(message.id, 'SENT');
|
||||
}
|
||||
|
||||
private sendStatusUpdate(recipientId: string, payload: DirectMessageStatusEventPayload): void {
|
||||
this.delivery.handleAck(recipientId, {
|
||||
type: 'direct-message-status',
|
||||
directMessageStatus: payload
|
||||
});
|
||||
}
|
||||
|
||||
private async loadForOwner(ownerId: string | null): Promise<void> {
|
||||
if (!ownerId) {
|
||||
this.loadedOwnerId = null;
|
||||
this.conversationsSignal.set([]);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.loadedOwnerId === ownerId) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.loadedOwnerId = ownerId;
|
||||
this.conversationsSignal.set(await this.repository.loadConversations(ownerId));
|
||||
}
|
||||
|
||||
private async persistConversation(ownerId: string, conversation: DirectMessageConversation): Promise<void> {
|
||||
await this.repository.saveConversation(ownerId, conversation);
|
||||
this.conversationsSignal.update((conversations) => {
|
||||
const nextConversations = conversations.filter((entry) => entry.id !== conversation.id);
|
||||
|
||||
nextConversations.push(conversation);
|
||||
return nextConversations;
|
||||
});
|
||||
}
|
||||
|
||||
private async requireConversation(ownerId: string, conversationId: string): Promise<DirectMessageConversation> {
|
||||
await this.loadForOwner(ownerId);
|
||||
|
||||
const conversation = this.conversationsSignal().find((entry) => entry.id === conversationId)
|
||||
?? await this.repository.getConversation(ownerId, conversationId);
|
||||
|
||||
if (!conversation) {
|
||||
throw new Error('Direct message conversation not found.');
|
||||
}
|
||||
|
||||
return conversation;
|
||||
}
|
||||
|
||||
private requireCurrentUser(): User {
|
||||
const currentUser = this.currentUser();
|
||||
|
||||
if (!currentUser) {
|
||||
throw new Error('Cannot use direct messages without a current user.');
|
||||
}
|
||||
|
||||
return currentUser;
|
||||
}
|
||||
|
||||
private getCurrentUserId(): string | null {
|
||||
const user = this.currentUser();
|
||||
|
||||
return user?.oderId || user?.id || null;
|
||||
}
|
||||
|
||||
private getCurrentUserIdOrThrow(): string {
|
||||
const ownerId = this.getCurrentUserId();
|
||||
|
||||
if (!ownerId) {
|
||||
throw new Error('Cannot use direct messages without a current user.');
|
||||
}
|
||||
|
||||
return ownerId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
import { FriendRepository } from '../../infrastructure/friend.repository';
|
||||
|
||||
describe('FriendService storage contract', () => {
|
||||
let repository: FriendRepository;
|
||||
|
||||
beforeEach(() => {
|
||||
installLocalStorageMock();
|
||||
repository = new FriendRepository();
|
||||
});
|
||||
|
||||
it('should add friend', async () => {
|
||||
await repository.addFriend('alice', { userId: 'bob', addedAt: 10 });
|
||||
|
||||
expect(await repository.loadFriends('alice')).toEqual([{ userId: 'bob', addedAt: 10 }]);
|
||||
});
|
||||
|
||||
it('should remove friend', async () => {
|
||||
await repository.addFriend('alice', { userId: 'bob', addedAt: 10 });
|
||||
await repository.removeFriend('alice', 'bob');
|
||||
|
||||
expect(await repository.loadFriends('alice')).toEqual([]);
|
||||
});
|
||||
|
||||
it('should persist friends', async () => {
|
||||
await repository.addFriend('alice', { userId: 'bob', addedAt: 10 });
|
||||
const reloadedRepository = new FriendRepository();
|
||||
|
||||
expect(await reloadedRepository.loadFriends('alice')).toEqual([{ userId: 'bob', addedAt: 10 }]);
|
||||
});
|
||||
});
|
||||
|
||||
function installLocalStorageMock(): void {
|
||||
const values = new Map<string, string>();
|
||||
|
||||
vi.stubGlobal('localStorage', {
|
||||
getItem: (key: string) => values.get(key) ?? null,
|
||||
setItem: (key: string, value: string) => values.set(key, value),
|
||||
removeItem: (key: string) => values.delete(key),
|
||||
clear: () => values.clear()
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
import {
|
||||
Injectable,
|
||||
computed,
|
||||
effect,
|
||||
inject,
|
||||
signal
|
||||
} from '@angular/core';
|
||||
import { Store } from '@ngrx/store';
|
||||
import { FriendRepository } from '../../infrastructure/friend.repository';
|
||||
import type { Friend } from '../../domain/models/direct-message.model';
|
||||
import { selectCurrentUser } from '../../../../store/users/users.selectors';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class FriendService {
|
||||
private readonly repository = inject(FriendRepository);
|
||||
private readonly store = inject(Store);
|
||||
private readonly currentUser = this.store.selectSignal(selectCurrentUser);
|
||||
private readonly friendsSignal = signal<Friend[]>([]);
|
||||
private loadedOwnerId: string | null = null;
|
||||
|
||||
readonly friends = this.friendsSignal.asReadonly();
|
||||
readonly friendIds = computed(() => new Set(this.friendsSignal().map((friend) => friend.userId)));
|
||||
|
||||
constructor() {
|
||||
effect(() => {
|
||||
const ownerId = this.currentUser()?.oderId || this.currentUser()?.id || null;
|
||||
|
||||
void this.loadForOwner(ownerId);
|
||||
});
|
||||
}
|
||||
|
||||
async addFriend(userId: string): Promise<void> {
|
||||
const ownerId = await this.requireOwnerId();
|
||||
const friend: Friend = { userId, addedAt: Date.now() };
|
||||
|
||||
await this.repository.addFriend(ownerId, friend);
|
||||
await this.loadForOwner(ownerId, true);
|
||||
}
|
||||
|
||||
async removeFriend(userId: string): Promise<void> {
|
||||
const ownerId = await this.requireOwnerId();
|
||||
|
||||
await this.repository.removeFriend(ownerId, userId);
|
||||
await this.loadForOwner(ownerId, true);
|
||||
}
|
||||
|
||||
isFriend(userId: string): boolean {
|
||||
return this.friendIds().has(userId);
|
||||
}
|
||||
|
||||
async toggleFriend(userId: string): Promise<void> {
|
||||
if (this.isFriend(userId)) {
|
||||
await this.removeFriend(userId);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.addFriend(userId);
|
||||
}
|
||||
|
||||
private async loadForOwner(ownerId: string | null, force = false): Promise<void> {
|
||||
if (!ownerId) {
|
||||
this.loadedOwnerId = null;
|
||||
this.friendsSignal.set([]);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!force && this.loadedOwnerId === ownerId) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.loadedOwnerId = ownerId;
|
||||
this.friendsSignal.set(await this.repository.loadFriends(ownerId));
|
||||
}
|
||||
|
||||
private async requireOwnerId(): Promise<string> {
|
||||
const ownerId = this.currentUser()?.oderId || this.currentUser()?.id;
|
||||
|
||||
if (!ownerId) {
|
||||
throw new Error('Cannot manage friends without a current user.');
|
||||
}
|
||||
|
||||
await this.loadForOwner(ownerId);
|
||||
return ownerId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { OfflineQueueRepository } from '../../infrastructure/offline-queue.repository';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class OfflineMessageQueueService {
|
||||
private readonly repository = inject(OfflineQueueRepository);
|
||||
|
||||
enqueue(ownerId: string, messageId: string): Promise<void> {
|
||||
return this.repository.enqueue(ownerId, messageId);
|
||||
}
|
||||
|
||||
retryPending(ownerId: string): Promise<string[]> {
|
||||
return this.repository.load(ownerId);
|
||||
}
|
||||
|
||||
markDelivered(ownerId: string, messageId: string): Promise<void> {
|
||||
return this.repository.remove(ownerId, messageId);
|
||||
}
|
||||
|
||||
clear(ownerId: string): Promise<void> {
|
||||
return this.repository.clear(ownerId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
import { OfflineQueueRepository } from '../../infrastructure/offline-queue.repository';
|
||||
|
||||
describe('OfflineMessageQueueService storage contract', () => {
|
||||
let repository: OfflineQueueRepository;
|
||||
|
||||
beforeEach(() => {
|
||||
installLocalStorageMock();
|
||||
repository = new OfflineQueueRepository();
|
||||
});
|
||||
|
||||
it('should enqueue messages', async () => {
|
||||
await repository.enqueue('alice', 'message-1');
|
||||
await repository.enqueue('alice', 'message-1');
|
||||
|
||||
expect(await repository.load('alice')).toEqual(['message-1']);
|
||||
});
|
||||
|
||||
it('should retry on reconnect', async () => {
|
||||
await repository.enqueue('alice', 'message-1');
|
||||
await repository.enqueue('alice', 'message-2');
|
||||
|
||||
expect(await repository.load('alice')).toEqual(['message-1', 'message-2']);
|
||||
});
|
||||
|
||||
it('should clear delivered messages', async () => {
|
||||
await repository.enqueue('alice', 'message-1');
|
||||
await repository.remove('alice', 'message-1');
|
||||
|
||||
expect(await repository.load('alice')).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
function installLocalStorageMock(): void {
|
||||
const values = new Map<string, string>();
|
||||
|
||||
vi.stubGlobal('localStorage', {
|
||||
getItem: (key: string) => values.get(key) ?? null,
|
||||
setItem: (key: string, value: string) => values.set(key, value),
|
||||
removeItem: (key: string) => values.delete(key),
|
||||
clear: () => values.clear()
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
/* eslint-disable @typescript-eslint/member-ordering */
|
||||
import { Injectable, inject } from '@angular/core';
|
||||
import { Store } from '@ngrx/store';
|
||||
import {
|
||||
Subject,
|
||||
filter,
|
||||
type Observable
|
||||
} from 'rxjs';
|
||||
import { RealtimeSessionFacade } from '../../../../core/realtime';
|
||||
import { selectAllUsers } from '../../../../store/users/users.selectors';
|
||||
import type { ChatEvent, User } from '../../../../shared-kernel';
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class PeerDeliveryService {
|
||||
private readonly webrtc = inject(RealtimeSessionFacade);
|
||||
private readonly store = inject(Store);
|
||||
private readonly users = this.store.selectSignal(selectAllUsers);
|
||||
private readonly networkRestoredSubject = new Subject<void>();
|
||||
|
||||
readonly directMessageEvents$: Observable<ChatEvent> = this.webrtc.onMessageReceived.pipe(
|
||||
filter((event) => event.type === 'direct-message' || event.type === 'direct-message-status' || event.type === 'direct-message-mutation')
|
||||
);
|
||||
|
||||
readonly peerConnected$ = this.webrtc.onPeerConnected;
|
||||
readonly networkRestored$ = this.networkRestoredSubject.asObservable();
|
||||
|
||||
constructor() {
|
||||
this.installNetworkTestHooks();
|
||||
}
|
||||
|
||||
sendViaWebRTC(recipientId: string, event: ChatEvent): boolean {
|
||||
if (this.isOfflineOverrideEnabled()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const peerId = this.resolvePeerId(recipientId);
|
||||
|
||||
if (!peerId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
this.webrtc.sendToPeer(peerId, event);
|
||||
return true;
|
||||
}
|
||||
|
||||
handleAck(recipientId: string, event: ChatEvent): boolean {
|
||||
return this.sendViaWebRTC(recipientId, event);
|
||||
}
|
||||
|
||||
requestUserAvatar(recipientId: string): boolean {
|
||||
return this.sendViaWebRTC(recipientId, {
|
||||
type: 'user-avatar-request',
|
||||
oderId: recipientId
|
||||
});
|
||||
}
|
||||
|
||||
syncOnReconnect(onReconnect: () => void): void {
|
||||
this.peerConnected$.subscribe(() => onReconnect());
|
||||
}
|
||||
|
||||
private resolvePeerId(recipientId: string): string | null {
|
||||
const connectedPeerIds = new Set(this.webrtc.getConnectedPeers());
|
||||
|
||||
if (connectedPeerIds.has(recipientId)) {
|
||||
return recipientId;
|
||||
}
|
||||
|
||||
const user = this.users().find((candidate: User) =>
|
||||
candidate.id === recipientId || candidate.oderId === recipientId || candidate.peerId === recipientId
|
||||
);
|
||||
const candidates = [
|
||||
user?.oderId,
|
||||
user?.peerId,
|
||||
user?.id
|
||||
].filter((candidate): candidate is string => !!candidate);
|
||||
|
||||
return candidates.find((candidate) => connectedPeerIds.has(candidate)) ?? null;
|
||||
}
|
||||
|
||||
private isOfflineOverrideEnabled(): boolean {
|
||||
return typeof window !== 'undefined'
|
||||
&& !!(window as Window & { metoyouDmNetworkOffline?: boolean }).metoyouDmNetworkOffline;
|
||||
}
|
||||
|
||||
private installNetworkTestHooks(): void {
|
||||
if (typeof window === 'undefined') {
|
||||
return;
|
||||
}
|
||||
|
||||
const testWindow = window as Window & {
|
||||
simulateOffline?: () => void;
|
||||
simulateOnline?: () => void;
|
||||
metoyouDmNetworkOffline?: boolean;
|
||||
};
|
||||
|
||||
testWindow.simulateOffline = () => {
|
||||
testWindow.metoyouDmNetworkOffline = true;
|
||||
};
|
||||
|
||||
testWindow.simulateOnline = () => {
|
||||
testWindow.metoyouDmNetworkOffline = false;
|
||||
this.networkRestoredSubject.next();
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user