Files
Toju/toju-app/src/app/store/rooms/room-members-sync.effects.ts
Myx de2d3300d4 fix: Fix users unable to see or hear each other in voice channels due to
stale server sockets, passive non-initiators, and race conditions
during peer connection setup.

Fix users unable to see or hear each other in voice channels due to
stale server sockets, passive non-initiators, and race conditions
during peer connection setup.

Server:
- Close stale WebSocket connections sharing the same oderId in
  handleIdentify instead of letting them linger up to 45s
- Make user_joined/user_left broadcasts identity-aware so duplicate
  sockets don't produce phantom join/leave events
- Include serverIds in user_left payload for multi-room presence
- Simplify findUserByOderId now that stale sockets are cleaned up

Client - signaling:
- Add fallback offer system with 1s timer for missed user_joined races
- Add non-initiator takeover after 5s when the initiator fails to send
  an offer (NON_INITIATOR_GIVE_UP_MS)
- Scope peerServerMap per signaling URL to prevent cross-server
  collisions
- Add socket identity guards on all signaling event handlers
- Replace canReusePeerConnection with hasActivePeerConnection and
  isPeerConnectionNegotiating with extended grace periods

Client - peer connections:
- Extract replaceUnusablePeer helper to deduplicate stale peer
  replacement in offer and ICE handlers
- Add stale connectionstatechange guard to ignore events from replaced
  RTCPeerConnection instances
- Use deterministic initiator election in peer recovery reconnects
- Track createdAt on PeerData for staleness detection

Client - presence:
- Add multi-room presence tracking via presenceServerIds on User
- Replace clearUsers + individual userJoined with syncServerPresence
  for atomic server roster updates
- Make userLeft handle partial server removal instead of full eviction

Documentation:
- Add server-side connection hygiene, non-initiator takeover, and stale
  peer replacement sections to the realtime README
2026-04-04 02:47:58 +02:00

529 lines
15 KiB
TypeScript

/* eslint-disable @typescript-eslint/member-ordering */
import { Injectable, inject } from '@angular/core';
import {
Actions,
createEffect,
ofType
} from '@ngrx/effects';
import { Action } from '@ngrx/store';
import { Store } from '@ngrx/store';
import { EMPTY } from 'rxjs';
import {
mergeMap,
tap,
withLatestFrom
} from 'rxjs/operators';
import {
ChatEvent,
Room,
RoomMember,
User
} from '../../shared-kernel';
import { RealtimeSessionFacade } from '../../core/realtime';
import { UsersActions } from '../users/users.actions';
import { selectCurrentUser } from '../users/users.selectors';
import { RoomsActions } from './rooms.actions';
import { selectCurrentRoom, selectSavedRooms } from './rooms.selectors';
import {
areRoomMembersEqual,
findRoomMember,
mergeRoomMembers,
pruneRoomMembers,
removeRoomMember,
roomMemberFromUser,
touchRoomMemberLastSeen,
transferRoomOwnership,
updateRoomMemberRole,
upsertRoomMember
} from './room-members.helpers';
@Injectable()
export class RoomMembersSyncEffects {
private readonly actions$ = inject(Actions);
private readonly store = inject(Store);
private readonly webrtc = inject(RealtimeSessionFacade);
/** Ensure the local user is recorded in a room as soon as it becomes active. */
ensureCurrentMemberOnRoomEntry$ = createEffect(() =>
this.actions$.pipe(
ofType(
RoomsActions.createRoomSuccess,
RoomsActions.joinRoomSuccess,
RoomsActions.viewServerSuccess
),
withLatestFrom(this.store.select(selectCurrentUser)),
mergeMap(([{ room }, currentUser]) => {
if (!currentUser)
return EMPTY;
const members = upsertRoomMember(
room.members ?? [],
this.buildCurrentUserMember(room, currentUser, true)
);
const actions = this.createRoomMemberUpdateActions(room, members);
return actions.length > 0 ? actions : EMPTY;
})
)
);
/** Keep the viewed room's local member record aligned with the current profile. */
syncCurrentUserIntoCurrentRoom$ = createEffect(() =>
this.actions$.pipe(
ofType(
UsersActions.loadCurrentUserSuccess,
UsersActions.setCurrentUser,
UsersActions.updateCurrentUser
),
withLatestFrom(
this.store.select(selectCurrentUser),
this.store.select(selectCurrentRoom)
),
mergeMap(([
, currentUser,
currentRoom
]) => {
if (!currentUser || !currentRoom)
return EMPTY;
const members = upsertRoomMember(
currentRoom.members ?? [],
this.buildCurrentUserMember(currentRoom, currentUser, true)
);
const actions = this.createRoomMemberUpdateActions(currentRoom, members);
return actions.length > 0 ? actions : EMPTY;
})
)
);
/** Persist room-role changes into the stored member roster for the active room. */
syncRoleChangesIntoCurrentRoom$ = createEffect(() =>
this.actions$.pipe(
ofType(UsersActions.updateUserRole),
withLatestFrom(this.store.select(selectCurrentRoom)),
mergeMap(([{ userId, role }, currentRoom]) => {
if (!currentRoom)
return EMPTY;
const members = updateRoomMemberRole(currentRoom.members ?? [], userId, role);
const actions = this.createRoomMemberUpdateActions(currentRoom, members);
return actions.length > 0 ? actions : EMPTY;
})
)
);
/** Update persisted room rosters when signaling presence changes arrive. */
signalingPresenceIntoRoomMembers$ = createEffect(() =>
this.webrtc.onSignalingMessage.pipe(
withLatestFrom(
this.store.select(selectCurrentRoom),
this.store.select(selectSavedRooms),
this.store.select(selectCurrentUser)
),
mergeMap(([
message,
currentRoom,
savedRooms,
currentUser
]) => {
const signalingMessage: {
type: string;
serverId?: string;
users?: { oderId: string; displayName: string }[];
oderId?: string;
displayName?: string;
} = message;
const roomId = typeof signalingMessage.serverId === 'string' ? signalingMessage.serverId : undefined;
const room = this.resolveRoom(roomId, currentRoom, savedRooms);
if (!room)
return EMPTY;
const myId = currentUser?.oderId || currentUser?.id;
switch (signalingMessage.type) {
case 'server_users': {
if (!Array.isArray(signalingMessage.users))
return EMPTY;
let members = room.members ?? [];
for (const user of signalingMessage.users as { oderId: string; displayName: string }[]) {
if (!user?.oderId || user.oderId === myId)
continue;
members = upsertRoomMember(members, this.buildPresenceMember(room, user));
}
const actions = this.createRoomMemberUpdateActions(room, members);
return actions.length > 0 ? actions : EMPTY;
}
case 'user_joined': {
if (!signalingMessage.oderId || signalingMessage.oderId === myId)
return EMPTY;
const joinedUser = {
oderId: signalingMessage.oderId,
displayName: signalingMessage.displayName
};
const members = upsertRoomMember(
room.members ?? [],
this.buildPresenceMember(room, joinedUser)
);
const actions = this.createRoomMemberUpdateActions(room, members);
return actions.length > 0 ? actions : EMPTY;
}
case 'user_left': {
if (!signalingMessage.oderId)
return EMPTY;
const members = touchRoomMemberLastSeen(room.members ?? [], signalingMessage.oderId, Date.now());
const actions = this.createRoomMemberUpdateActions(room, members);
return actions.length > 0 ? actions : EMPTY;
}
default:
return EMPTY;
}
})
)
);
/** Request the latest member roster whenever a new peer data channel opens. */
peerConnectedRosterSync$ = createEffect(
() =>
this.webrtc.onPeerConnected.pipe(
withLatestFrom(this.store.select(selectCurrentRoom)),
tap(([peerId, currentRoom]) => {
if (!currentRoom)
return;
this.webrtc.sendToPeer(peerId, {
type: 'member-roster-request',
roomId: currentRoom.id
});
})
),
{ dispatch: false }
);
/** Kick off room-member sync when entering or switching to a room. */
roomEntryRosterSync$ = createEffect(
() =>
this.actions$.pipe(
ofType(
RoomsActions.createRoomSuccess,
RoomsActions.joinRoomSuccess,
RoomsActions.viewServerSuccess
),
tap(({ room }) => {
for (const peerId of this.webrtc.getConnectedPeers()) {
try {
this.webrtc.sendToPeer(peerId, {
type: 'member-roster-request',
roomId: room.id
});
} catch {
/* peer may have disconnected */
}
}
})
),
{ dispatch: false }
);
/** Handle peer-to-peer member roster sync and explicit leave messages. */
incomingRoomMemberEvents$ = createEffect(() =>
this.webrtc.onMessageReceived.pipe(
withLatestFrom(
this.store.select(selectCurrentRoom),
this.store.select(selectSavedRooms),
this.store.select(selectCurrentUser)
),
mergeMap(([
event,
currentRoom,
savedRooms,
currentUser
]) => {
switch (event.type) {
case 'member-roster-request': {
const actions = this.handleMemberRosterRequest(event, currentRoom, savedRooms, currentUser ?? null);
return actions.length > 0 ? actions : EMPTY;
}
case 'member-roster': {
const actions = this.handleMemberRoster(event, currentRoom, savedRooms, currentUser ?? null);
return actions.length > 0 ? actions : EMPTY;
}
case 'member-leave': {
const actions = this.handleMemberLeave(event, currentRoom, savedRooms);
return actions.length > 0 ? actions : EMPTY;
}
case 'host-change': {
const actions = this.handleIncomingHostChange(event, currentRoom, savedRooms, currentUser ?? null);
return actions.length > 0 ? actions : EMPTY;
}
case 'role-change': {
const actions = this.handleIncomingRoleChange(event, currentRoom, savedRooms);
return actions.length > 0 ? actions : EMPTY;
}
default:
return EMPTY;
}
})
)
);
private resolveRoom(roomId: string | undefined, currentRoom: Room | null, savedRooms: Room[]): Room | null {
if (!roomId)
return null;
if (currentRoom?.id === roomId)
return currentRoom;
return savedRooms.find((room) => room.id === roomId) ?? null;
}
private buildCurrentUserMember(room: Room, currentUser: User, isCurrentRoom: boolean): RoomMember {
const existingMember = findRoomMember(room.members ?? [], currentUser.oderId || currentUser.id);
const role = room.hostId === currentUser.id
? 'host'
: (isCurrentRoom ? currentUser.role : existingMember?.role ?? 'member');
return {
...roomMemberFromUser(currentUser, Date.now(), role),
id: existingMember?.id ?? currentUser.id,
joinedAt: existingMember?.joinedAt ?? currentUser.joinedAt ?? Date.now(),
avatarUrl: currentUser.avatarUrl ?? existingMember?.avatarUrl,
role
};
}
private buildPresenceMember(
room: Room,
data: { oderId: string; displayName?: string }
): RoomMember {
const existingMember = findRoomMember(room.members ?? [], data.oderId);
const now = Date.now();
return {
id: existingMember?.id ?? data.oderId,
oderId: data.oderId,
username:
existingMember?.username ??
(data.displayName || 'User').toLowerCase().replace(/\s+/g, '_'),
displayName: data.displayName || existingMember?.displayName || 'User',
avatarUrl: existingMember?.avatarUrl,
role: existingMember?.role ?? 'member',
joinedAt: existingMember?.joinedAt ?? now,
lastSeenAt: now
};
}
private createRoomMemberUpdateActions(room: Room, members: RoomMember[]): Action[] {
return areRoomMembersEqual(room.members ?? [], members)
? []
: [RoomsActions.updateRoom({ roomId: room.id, changes: { members } })];
}
private handleMemberRosterRequest(
event: ChatEvent,
currentRoom: Room | null,
savedRooms: Room[],
currentUser: User | null
): Action[] {
const room = this.resolveRoom(event.roomId, currentRoom, savedRooms);
if (!room || !event.fromPeerId)
return [];
const isCurrentRoom = currentRoom?.id === room.id;
let members = room.members ?? [];
if (currentUser) {
members = upsertRoomMember(
members,
this.buildCurrentUserMember(room, currentUser, isCurrentRoom)
);
}
this.webrtc.sendToPeer(event.fromPeerId, {
type: 'member-roster',
roomId: room.id,
members
});
return this.createRoomMemberUpdateActions(room, members);
}
private handleMemberRoster(
event: ChatEvent,
currentRoom: Room | null,
savedRooms: Room[],
currentUser: User | null
): Action[] {
const room = this.resolveRoom(event.roomId, currentRoom, savedRooms);
if (!room || !Array.isArray(event.members))
return [];
let members = mergeRoomMembers(room.members ?? [], event.members);
if (currentUser) {
members = upsertRoomMember(
members,
this.buildCurrentUserMember(room, currentUser, currentRoom?.id === room.id)
);
}
return this.createRoomMemberUpdateActions(room, members);
}
private handleMemberLeave(
event: ChatEvent,
currentRoom: Room | null,
savedRooms: Room[]
): Action[] {
const roomId = typeof event.roomId === 'string' ? event.roomId : currentRoom?.id;
const room = this.resolveRoom(roomId, currentRoom, savedRooms);
if (!room)
return [];
const actions = this.createRoomMemberUpdateActions(
room,
removeRoomMember(room.members ?? [], event.targetUserId, event.oderId)
);
const departedUserId = event.oderId ?? event.targetUserId;
if (currentRoom?.id === room.id && departedUserId) {
actions.push(
UsersActions.userLeft({
userId: departedUserId,
serverId: room.id
})
);
}
return actions;
}
private handleIncomingHostChange(
event: ChatEvent,
currentRoom: Room | null,
savedRooms: Room[],
currentUser: User | null
): Action[] {
const roomId = typeof event.roomId === 'string' ? event.roomId : currentRoom?.id;
const room = this.resolveRoom(roomId, currentRoom, savedRooms);
if (!room)
return [];
const members = Array.isArray(event.members)
? pruneRoomMembers(event.members)
: transferRoomOwnership(
room.members ?? [],
event.hostId || event.hostOderId
? {
id: event.hostId,
oderId: event.hostOderId
}
: null,
{
id: event.previousHostId ?? event.previousHostOderId ?? '',
oderId: event.previousHostOderId
}
);
const hostId = typeof event.hostId === 'string' ? event.hostId : '';
const actions: Action[] = [
RoomsActions.updateRoom({
roomId: room.id,
changes: {
hostId,
members
}
})
];
for (const previousHostKey of new Set([event.previousHostId, event.previousHostOderId].filter((value): value is string => !!value))) {
actions.push(
UsersActions.updateUserRole({
userId: previousHostKey,
role: 'member'
})
);
}
for (const nextHostKey of new Set([event.hostId, event.hostOderId].filter((value): value is string => !!value))) {
actions.push(
UsersActions.updateUserRole({
userId: nextHostKey,
role: 'host'
})
);
}
if (currentUser) {
const isCurrentUserNextHost = event.hostId === currentUser.id || event.hostOderId === currentUser.oderId;
const isCurrentUserPreviousHost = event.previousHostId === currentUser.id || event.previousHostOderId === currentUser.oderId;
if (isCurrentUserPreviousHost && !isCurrentUserNextHost) {
actions.push(UsersActions.updateCurrentUser({ updates: { role: 'member' } }));
}
if (isCurrentUserNextHost) {
actions.push(UsersActions.updateCurrentUser({ updates: { role: 'host' } }));
}
}
return actions;
}
private handleIncomingRoleChange(
event: ChatEvent,
currentRoom: Room | null,
savedRooms: Room[]
): Action[] {
const roomId = typeof event.roomId === 'string' ? event.roomId : currentRoom?.id;
const room = this.resolveRoom(roomId, currentRoom, savedRooms);
if (!room || !event.targetUserId || !event.role)
return [];
const actions = this.createRoomMemberUpdateActions(
room,
updateRoomMemberRole(room.members ?? [], event.targetUserId, event.role)
);
if (currentRoom?.id === room.id) {
actions.push(
UsersActions.updateUserRole({
userId: event.targetUserId,
role: event.role
})
);
}
return actions;
}
}