fix: multiple bug fixes
isolated users, db backup, weird disconnect issues for long voice sessions,
This commit is contained in:
@@ -0,0 +1,104 @@
|
||||
import {
|
||||
defaultIfEmpty,
|
||||
firstValueFrom
|
||||
} from 'rxjs';
|
||||
|
||||
import { type Message } from '../../shared-kernel';
|
||||
import { dispatchIncomingMessage } from './messages-incoming.handlers';
|
||||
|
||||
function createMessage(overrides: Partial<Message> = {}): Message {
|
||||
return {
|
||||
id: 'message-1',
|
||||
roomId: 'room-1',
|
||||
senderId: 'user-1',
|
||||
senderName: 'User 1',
|
||||
content: 'hello',
|
||||
timestamp: 1,
|
||||
reactions: [],
|
||||
isDeleted: false,
|
||||
...overrides
|
||||
};
|
||||
}
|
||||
|
||||
function createContext(overrides: Record<string, unknown> = {}) {
|
||||
return {
|
||||
db: {
|
||||
getMessages: vi.fn()
|
||||
},
|
||||
webrtc: {
|
||||
sendToPeer: vi.fn()
|
||||
},
|
||||
attachments: {},
|
||||
debugging: {},
|
||||
currentUser: null,
|
||||
currentRoom: null,
|
||||
...overrides
|
||||
} as const;
|
||||
}
|
||||
|
||||
describe('dispatchIncomingMessage room-scoped sync', () => {
|
||||
it('requests sync for event room even when another room is viewed', async () => {
|
||||
const getMessages = vi.fn(async (roomId: string) => roomId === 'room-b'
|
||||
? [createMessage({ roomId: 'room-b', timestamp: 10, editedAt: 10 })]
|
||||
: [createMessage({ roomId: 'room-a', timestamp: 100, editedAt: 100 })]);
|
||||
const sendToPeer = vi.fn();
|
||||
const context = createContext({
|
||||
db: { getMessages },
|
||||
webrtc: { sendToPeer },
|
||||
currentRoom: { id: 'room-a' }
|
||||
});
|
||||
|
||||
await firstValueFrom(
|
||||
dispatchIncomingMessage(
|
||||
{
|
||||
type: 'chat-sync-summary',
|
||||
roomId: 'room-b',
|
||||
fromPeerId: 'peer-1',
|
||||
count: 2,
|
||||
lastUpdated: 20
|
||||
} as never,
|
||||
context as never
|
||||
).pipe(defaultIfEmpty(null))
|
||||
);
|
||||
|
||||
expect(getMessages).toHaveBeenCalledWith('room-b', expect.any(Number), 0);
|
||||
expect(sendToPeer).toHaveBeenCalledWith('peer-1', {
|
||||
type: 'chat-sync-request',
|
||||
roomId: 'room-b'
|
||||
});
|
||||
});
|
||||
|
||||
it('sends full sync for requested room even when another room is viewed', async () => {
|
||||
const roomBMessages = [
|
||||
createMessage({ id: 'message-b1', roomId: 'room-b', timestamp: 5 }),
|
||||
createMessage({ id: 'message-b2', roomId: 'room-b', timestamp: 15 })
|
||||
];
|
||||
const getMessages = vi.fn(async (roomId: string) => roomId === 'room-b'
|
||||
? roomBMessages
|
||||
: [createMessage({ id: 'message-a1', roomId: 'room-a', timestamp: 200 })]);
|
||||
const sendToPeer = vi.fn();
|
||||
const context = createContext({
|
||||
db: { getMessages },
|
||||
webrtc: { sendToPeer },
|
||||
currentRoom: { id: 'room-a' }
|
||||
});
|
||||
|
||||
await firstValueFrom(
|
||||
dispatchIncomingMessage(
|
||||
{
|
||||
type: 'chat-sync-request',
|
||||
roomId: 'room-b',
|
||||
fromPeerId: 'peer-2'
|
||||
} as never,
|
||||
context as never
|
||||
).pipe(defaultIfEmpty(null))
|
||||
);
|
||||
|
||||
expect(getMessages).toHaveBeenCalledWith('room-b', expect.any(Number), 0);
|
||||
expect(sendToPeer).toHaveBeenCalledWith('peer-2', {
|
||||
type: 'chat-sync-full',
|
||||
roomId: 'room-b',
|
||||
messages: roomBMessages
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -538,12 +538,14 @@ function handleSyncSummary(
|
||||
event: IncomingMessageEvent,
|
||||
{ db, webrtc, currentRoom }: IncomingMessageContext
|
||||
): Observable<Action> {
|
||||
if (!currentRoom)
|
||||
const targetRoomId = event.roomId || currentRoom?.id;
|
||||
|
||||
if (!targetRoomId)
|
||||
return EMPTY;
|
||||
|
||||
return from(
|
||||
(async () => {
|
||||
const local = await db.getMessages(currentRoom.id, FULL_SYNC_LIMIT, 0);
|
||||
const local = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0);
|
||||
const localCount = local.length;
|
||||
const localLastUpdated = local.reduce(
|
||||
(maxTimestamp, message) => Math.max(maxTimestamp, message.editedAt || message.timestamp || 0),
|
||||
@@ -561,7 +563,7 @@ function handleSyncSummary(
|
||||
if (!identical && needsSync && fromPeerId) {
|
||||
const syncRequestEvent: ChatEvent = {
|
||||
type: 'chat-sync-request',
|
||||
roomId: currentRoom.id
|
||||
roomId: targetRoomId
|
||||
};
|
||||
|
||||
webrtc.sendToPeer(fromPeerId, syncRequestEvent);
|
||||
@@ -575,17 +577,18 @@ function handleSyncRequest(
|
||||
event: IncomingMessageEvent,
|
||||
{ db, webrtc, currentRoom }: IncomingMessageContext
|
||||
): Observable<Action> {
|
||||
const targetRoomId = event.roomId || currentRoom?.id;
|
||||
const fromPeerId = event.fromPeerId;
|
||||
|
||||
if (!currentRoom || !fromPeerId)
|
||||
if (!targetRoomId || !fromPeerId)
|
||||
return EMPTY;
|
||||
|
||||
return from(
|
||||
(async () => {
|
||||
const all = await db.getMessages(currentRoom.id, FULL_SYNC_LIMIT, 0);
|
||||
const all = await db.getMessages(targetRoomId, FULL_SYNC_LIMIT, 0);
|
||||
const syncFullEvent: ChatEvent = {
|
||||
type: 'chat-sync-full',
|
||||
roomId: currentRoom.id,
|
||||
roomId: targetRoomId,
|
||||
messages: all
|
||||
};
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ import {
|
||||
buildKnownUserExtras,
|
||||
isWrongServer,
|
||||
resolveRoom,
|
||||
reconcileRoomSnapshotChannels,
|
||||
sanitizeRoomSnapshot,
|
||||
normalizeIncomingBans,
|
||||
getPersistedCurrentUserId
|
||||
@@ -122,7 +123,8 @@ export class RoomStateSyncEffects {
|
||||
const actions: Action[] = [
|
||||
UsersActions.syncServerPresence({
|
||||
roomId: signalingMessage.serverId,
|
||||
users: syncedUsers
|
||||
users: syncedUsers,
|
||||
connectedPeerIds: this.webrtc.getConnectedPeers()
|
||||
})
|
||||
];
|
||||
|
||||
@@ -641,7 +643,10 @@ export class RoomStateSyncEffects {
|
||||
if (!room || !incomingRoom)
|
||||
return EMPTY;
|
||||
|
||||
const roomChanges = sanitizeRoomSnapshot(incomingRoom);
|
||||
const roomChanges = {
|
||||
...sanitizeRoomSnapshot(incomingRoom),
|
||||
channels: reconcileRoomSnapshotChannels(room.channels, incomingRoom.channels)
|
||||
};
|
||||
const bans = normalizeIncomingBans(room.id, event.bans);
|
||||
|
||||
return this.syncBansToLocalRoom(room.id, bans).pipe(
|
||||
|
||||
45
toju-app/src/app/store/rooms/rooms-helpers-snapshot.spec.ts
Normal file
45
toju-app/src/app/store/rooms/rooms-helpers-snapshot.spec.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import {
|
||||
reconcileRoomSnapshotChannels,
|
||||
sanitizeRoomSnapshot
|
||||
} from './rooms.helpers';
|
||||
|
||||
describe('room snapshot helpers', () => {
|
||||
it('drops empty channel arrays from outgoing snapshots', () => {
|
||||
expect(sanitizeRoomSnapshot({ channels: [] }).channels).toBeUndefined();
|
||||
});
|
||||
|
||||
it('keeps cached channels when incoming snapshot has none', () => {
|
||||
const cachedChannels = [
|
||||
{ id: 'general', name: 'general', type: 'text', position: 0 },
|
||||
{ id: 'updates', name: 'updates', type: 'text', position: 1 }
|
||||
] as const;
|
||||
|
||||
expect(reconcileRoomSnapshotChannels(cachedChannels as never, undefined)).toEqual(cachedChannels);
|
||||
expect(reconcileRoomSnapshotChannels(cachedChannels as never, [] as never)).toEqual(cachedChannels);
|
||||
});
|
||||
|
||||
it('keeps richer cached channels when incoming snapshot is smaller', () => {
|
||||
const cachedChannels = [
|
||||
{ id: 'general', name: 'general', type: 'text', position: 0 },
|
||||
{ id: 'updates', name: 'updates', type: 'text', position: 1 },
|
||||
{ id: 'voice', name: 'General', type: 'voice', position: 0 }
|
||||
] as const;
|
||||
const incomingChannels = [
|
||||
{ id: 'general', name: 'general', type: 'text', position: 0 }
|
||||
] as const;
|
||||
|
||||
expect(reconcileRoomSnapshotChannels(cachedChannels as never, incomingChannels as never)).toEqual(cachedChannels);
|
||||
});
|
||||
|
||||
it('accepts incoming channels when snapshot is at least as complete', () => {
|
||||
const cachedChannels = [
|
||||
{ id: 'general', name: 'general', type: 'text', position: 0 }
|
||||
] as const;
|
||||
const incomingChannels = [
|
||||
{ id: 'general', name: 'general', type: 'text', position: 0 },
|
||||
{ id: 'updates', name: 'updates', type: 'text', position: 1 }
|
||||
] as const;
|
||||
|
||||
expect(reconcileRoomSnapshotChannels(cachedChannels as never, incomingChannels as never)).toEqual(incomingChannels);
|
||||
});
|
||||
});
|
||||
@@ -57,6 +57,8 @@ export const RoomsActions = createActionGroup({
|
||||
'Forget Room': props<{ roomId: string; nextOwnerKey?: string }>(),
|
||||
'Forget Room Success': props<{ roomId: string }>(),
|
||||
|
||||
'Reset Rooms State': emptyProps(),
|
||||
|
||||
'Update Room Settings': props<{ roomId: string; settings: Partial<RoomSettings> }>(),
|
||||
'Update Room Settings Success': props<{ roomId: string; settings: RoomSettings }>(),
|
||||
'Update Room Settings Failure': props<{ error: string }>(),
|
||||
|
||||
@@ -97,6 +97,31 @@ export function resolveRoomChannels(
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Peer room-state snapshots can lag behind cached room metadata.
|
||||
* Keep richer cached channels until an equally rich or richer snapshot arrives.
|
||||
*/
|
||||
export function reconcileRoomSnapshotChannels(
|
||||
cachedChannels: Room['channels'] | undefined,
|
||||
incomingChannels: Room['channels'] | undefined
|
||||
): Room['channels'] | undefined {
|
||||
if (hasPersistedChannels(cachedChannels) && !hasPersistedChannels(incomingChannels)) {
|
||||
return cachedChannels;
|
||||
}
|
||||
|
||||
if (hasPersistedChannels(cachedChannels) && hasPersistedChannels(incomingChannels)) {
|
||||
return incomingChannels.length >= cachedChannels.length
|
||||
? incomingChannels
|
||||
: cachedChannels;
|
||||
}
|
||||
|
||||
if (hasPersistedChannels(incomingChannels)) {
|
||||
return incomingChannels;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function resolveTextChannelId(
|
||||
channels: Room['channels'] | undefined,
|
||||
preferredChannelId?: string | null
|
||||
@@ -136,7 +161,7 @@ export function sanitizeRoomSnapshot(room: Partial<Room>): Partial<Room> {
|
||||
iconUpdatedAt: typeof room.iconUpdatedAt === 'number' ? room.iconUpdatedAt : undefined,
|
||||
slowModeInterval: typeof room.slowModeInterval === 'number' ? room.slowModeInterval : undefined,
|
||||
permissions: room.permissions ? { ...room.permissions } : undefined,
|
||||
channels: Array.isArray(room.channels) ? room.channels : undefined,
|
||||
channels: hasPersistedChannels(room.channels) ? room.channels : undefined,
|
||||
members: Array.isArray(room.members) ? room.members : undefined,
|
||||
roles: Array.isArray(room.roles) ? room.roles : undefined,
|
||||
roleAssignments: Array.isArray(room.roleAssignments) ? room.roleAssignments : undefined,
|
||||
|
||||
@@ -105,6 +105,10 @@ export const initialState: RoomsState = {
|
||||
export const roomsReducer = createReducer(
|
||||
initialState,
|
||||
|
||||
on(RoomsActions.resetRoomsState, () => ({
|
||||
...initialState
|
||||
})),
|
||||
|
||||
// Load rooms
|
||||
on(RoomsActions.loadRooms, (state) => ({
|
||||
...state,
|
||||
|
||||
@@ -235,6 +235,71 @@ describe('users reducer - status', () => {
|
||||
// The buildPresenceAwareUser function takes incoming status when non-offline
|
||||
expect(state.entities['u1']?.status).toBe('online');
|
||||
});
|
||||
|
||||
it('preserves omitted live peer presence and voice state during stale server snapshot', () => {
|
||||
const remoteUser = createUser({
|
||||
id: 'u2',
|
||||
oderId: 'u2',
|
||||
displayName: 'Voice Peer',
|
||||
presenceServerIds: ['s1'],
|
||||
voiceState: {
|
||||
isConnected: true,
|
||||
isMuted: false,
|
||||
isDeafened: false,
|
||||
isSpeaking: true,
|
||||
roomId: 'voice-1',
|
||||
serverId: 's1'
|
||||
},
|
||||
cameraState: { isEnabled: true },
|
||||
screenShareState: { isSharing: true }
|
||||
});
|
||||
const withUser = usersReducer(baseState, UsersActions.userJoined({ user: remoteUser }));
|
||||
const state = usersReducer(withUser, UsersActions.syncServerPresence({
|
||||
roomId: 's1',
|
||||
users: [],
|
||||
connectedPeerIds: ['u2']
|
||||
}));
|
||||
|
||||
expect(state.entities['u2']?.presenceServerIds).toEqual(['s1']);
|
||||
expect(state.entities['u2']?.isOnline).toBe(true);
|
||||
expect(state.entities['u2']?.voiceState?.isConnected).toBe(true);
|
||||
expect(state.entities['u2']?.voiceState?.roomId).toBe('voice-1');
|
||||
expect(state.entities['u2']?.cameraState?.isEnabled).toBe(true);
|
||||
expect(state.entities['u2']?.screenShareState?.isSharing).toBe(true);
|
||||
});
|
||||
|
||||
it('clears omitted peer live state when transport is gone', () => {
|
||||
const remoteUser = createUser({
|
||||
id: 'u3',
|
||||
oderId: 'u3',
|
||||
displayName: 'Dropped Peer',
|
||||
presenceServerIds: ['s1'],
|
||||
voiceState: {
|
||||
isConnected: true,
|
||||
isMuted: false,
|
||||
isDeafened: false,
|
||||
isSpeaking: true,
|
||||
roomId: 'voice-1',
|
||||
serverId: 's1'
|
||||
},
|
||||
cameraState: { isEnabled: true },
|
||||
screenShareState: { isSharing: true }
|
||||
});
|
||||
const withUser = usersReducer(baseState, UsersActions.userJoined({ user: remoteUser }));
|
||||
const state = usersReducer(withUser, UsersActions.syncServerPresence({
|
||||
roomId: 's1',
|
||||
users: [],
|
||||
connectedPeerIds: []
|
||||
}));
|
||||
|
||||
expect(state.entities['u3']?.presenceServerIds).toBeUndefined();
|
||||
expect(state.entities['u3']?.isOnline).toBe(false);
|
||||
expect(state.entities['u3']?.status).toBe('offline');
|
||||
expect(state.entities['u3']?.voiceState?.isConnected).toBe(false);
|
||||
expect(state.entities['u3']?.voiceState?.roomId).toBeUndefined();
|
||||
expect(state.entities['u3']?.cameraState?.isEnabled).toBe(false);
|
||||
expect(state.entities['u3']?.screenShareState?.isSharing).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('manual status overrides auto idle', () => {
|
||||
|
||||
@@ -18,6 +18,7 @@ import {
|
||||
export const UsersActions = createActionGroup({
|
||||
source: 'Users',
|
||||
events: {
|
||||
'Authenticate User': props<{ user: User }>(),
|
||||
'Load Current User': emptyProps(),
|
||||
'Load Current User Success': props<{ user: User }>(),
|
||||
'Load Current User Failure': props<{ error: string }>(),
|
||||
@@ -31,7 +32,7 @@ export const UsersActions = createActionGroup({
|
||||
|
||||
'User Joined': props<{ user: User }>(),
|
||||
'User Left': props<{ userId: string; serverId?: string; serverIds?: string[] }>(),
|
||||
'Sync Server Presence': props<{ roomId: string; users: User[] }>(),
|
||||
'Sync Server Presence': props<{ roomId: string; users: User[]; connectedPeerIds?: string[] }>(),
|
||||
|
||||
'Update User': props<{ userId: string; updates: Partial<User> }>(),
|
||||
'Update User Role': props<{ userId: string; role: User['role'] }>(),
|
||||
@@ -58,6 +59,7 @@ export const UsersActions = createActionGroup({
|
||||
|
||||
'Sync Users': props<{ users: User[] }>(),
|
||||
'Clear Users': emptyProps(),
|
||||
'Reset Users State': emptyProps(),
|
||||
'Update Host': props<{ userId: string }>(),
|
||||
|
||||
'Update Voice State': props<{ userId: string; voiceState: Partial<VoiceState> }>(),
|
||||
|
||||
@@ -23,6 +23,7 @@ import {
|
||||
switchMap
|
||||
} from 'rxjs/operators';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { MessagesActions } from '../messages/messages.actions';
|
||||
import { UsersActions } from './users.actions';
|
||||
import { RoomsActions } from '../rooms/rooms.actions';
|
||||
import {
|
||||
@@ -46,6 +47,9 @@ import {
|
||||
Room,
|
||||
User
|
||||
} from '../../shared-kernel';
|
||||
import {
|
||||
setStoredCurrentUserId
|
||||
} from '../../core/storage/current-user-storage';
|
||||
import { findRoomMember, removeRoomMember } from '../rooms/room-members.helpers';
|
||||
|
||||
type IncomingModerationExtraAction =
|
||||
@@ -65,6 +69,27 @@ export class UsersEffects {
|
||||
private serverDirectory = inject(ServerDirectoryFacade);
|
||||
private webrtc = inject(RealtimeSessionFacade);
|
||||
|
||||
/** Prepares persisted state for a successful login before exposing the user in-memory. */
|
||||
authenticateUser$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
ofType(UsersActions.authenticateUser),
|
||||
switchMap(({ user }) =>
|
||||
from(this.prepareAuthenticatedUserStorage(user.id)).pipe(
|
||||
mergeMap(() => [
|
||||
MessagesActions.clearMessages(),
|
||||
UsersActions.resetUsersState(),
|
||||
RoomsActions.resetRoomsState(),
|
||||
UsersActions.setCurrentUser({ user }),
|
||||
RoomsActions.loadRooms()
|
||||
]),
|
||||
catchError((error: Error) =>
|
||||
of(UsersActions.loadCurrentUserFailure({ error: error.message || 'Failed to prepare local user state.' }))
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
// Load current user from storage
|
||||
/** Loads the persisted current user from the local database on startup. */
|
||||
loadCurrentUser$ = createEffect(() =>
|
||||
@@ -124,6 +149,11 @@ export class UsersEffects {
|
||||
};
|
||||
}
|
||||
|
||||
private async prepareAuthenticatedUserStorage(userId: string): Promise<void> {
|
||||
setStoredCurrentUserId(userId);
|
||||
await this.db.initialize();
|
||||
}
|
||||
|
||||
/** Loads all users associated with a specific room from the local database. */
|
||||
loadRoomUsers$ = createEffect(() =>
|
||||
this.actions$.pipe(
|
||||
|
||||
@@ -30,6 +30,16 @@ function mergePresenceServerIds(
|
||||
return normalizePresenceServerIds([...(existingServerIds ?? []), ...(incomingServerIds ?? [])]);
|
||||
}
|
||||
|
||||
function hasLivePeerTransport(user: User, connectedPeerIds: ReadonlySet<string>): boolean {
|
||||
if (connectedPeerIds.size === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return connectedPeerIds.has(user.id)
|
||||
|| connectedPeerIds.has(user.oderId)
|
||||
|| (!!user.peerId && connectedPeerIds.has(user.peerId));
|
||||
}
|
||||
|
||||
interface AvatarFields {
|
||||
avatarUrl?: string;
|
||||
avatarHash?: string;
|
||||
@@ -262,6 +272,10 @@ export const initialState: UsersState = usersAdapter.getInitialState({
|
||||
|
||||
export const usersReducer = createReducer(
|
||||
initialState,
|
||||
on(UsersActions.resetUsersState, () => ({
|
||||
...initialState
|
||||
})),
|
||||
|
||||
on(UsersActions.loadCurrentUser, (state) => ({
|
||||
...state,
|
||||
loading: true,
|
||||
@@ -344,10 +358,11 @@ export const usersReducer = createReducer(
|
||||
on(UsersActions.userJoined, (state, { user }) =>
|
||||
usersAdapter.upsertOne(buildPresenceAwareUser(state.entities[user.id], user), state)
|
||||
),
|
||||
on(UsersActions.syncServerPresence, (state, { roomId, users }) => {
|
||||
on(UsersActions.syncServerPresence, (state, { roomId, users, connectedPeerIds }) => {
|
||||
let nextState = state;
|
||||
|
||||
const seenUserIds = new Set<string>();
|
||||
const connectedPeerIdSet = new Set(connectedPeerIds ?? []);
|
||||
|
||||
for (const user of users) {
|
||||
seenUserIds.add(user.id);
|
||||
@@ -363,6 +378,7 @@ export const usersReducer = createReducer(
|
||||
&& user.id !== nextState.currentUserId
|
||||
&& user.presenceServerIds?.includes(roomId) === true
|
||||
&& !seenUserIds.has(user.id)
|
||||
&& !hasLivePeerTransport(user, connectedPeerIdSet)
|
||||
)
|
||||
.map((user) => ({
|
||||
id: user.id,
|
||||
|
||||
Reference in New Issue
Block a user