refactor: Cleaning rooms store
This commit is contained in:
453
toju-app/src/app/store/rooms/room-signaling-connection.ts
Normal file
453
toju-app/src/app/store/rooms/room-signaling-connection.ts
Normal file
@@ -0,0 +1,453 @@
|
||||
import { Store } from '@ngrx/store';
|
||||
import { firstValueFrom } from 'rxjs';
|
||||
import { Room, User } from '../../shared-kernel';
|
||||
import {
|
||||
type RoomSignalSource,
|
||||
type ServerSourceSelector,
|
||||
type ServerDirectoryFacade,
|
||||
areRoomSignalSourcesEqual,
|
||||
CLIENT_UPDATE_REQUIRED_MESSAGE
|
||||
} from '../../domains/server-directory';
|
||||
import { RealtimeSessionFacade } from '../../core/realtime';
|
||||
import { RoomsActions } from './rooms.actions';
|
||||
import { resolveUserDisplayName, extractRoomIdFromUrl } from './rooms.helpers';
|
||||
|
||||
export interface RoomSignalConnectionPlan {
|
||||
fallbackSources: RoomSignalSource[];
|
||||
primarySource: RoomSignalSource | null;
|
||||
room: Room;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encapsulates signaling-server connection, fallback cascade, and
|
||||
* navigation versioning logic that was previously inlined in
|
||||
* {@link RoomsEffects}.
|
||||
*
|
||||
* Instantiated by `RoomsEffects` (not an Angular service) - it receives
|
||||
* the required dependencies through its constructor.
|
||||
*/
|
||||
export class RoomSignalingConnection {
|
||||
latestNavigatedRoomId: string | null = null;
|
||||
private readonly roomSignalFallbackSources = new Map<string, RoomSignalSource>();
|
||||
private roomNavigationRequestVersion = 0;
|
||||
|
||||
constructor(
|
||||
private readonly webrtc: RealtimeSessionFacade,
|
||||
private readonly serverDirectory: ServerDirectoryFacade,
|
||||
private readonly store: Store
|
||||
) {}
|
||||
|
||||
// ── Navigation versioning ──────────────────────────────────────
|
||||
|
||||
beginRoomNavigation(roomId: string): number {
|
||||
this.roomNavigationRequestVersion += 1;
|
||||
this.latestNavigatedRoomId = roomId;
|
||||
|
||||
return this.roomNavigationRequestVersion;
|
||||
}
|
||||
|
||||
isCurrentRoomNavigation(roomId: string, navigationRequestVersion?: number): boolean {
|
||||
if (typeof navigationRequestVersion !== 'number') {
|
||||
return true;
|
||||
}
|
||||
|
||||
return navigationRequestVersion === this.roomNavigationRequestVersion
|
||||
&& roomId === this.latestNavigatedRoomId;
|
||||
}
|
||||
|
||||
deleteRoomFallbackSource(roomId: string): void {
|
||||
this.roomSignalFallbackSources.delete(roomId);
|
||||
}
|
||||
|
||||
// ── Primary connection entry points ────────────────────────────
|
||||
|
||||
async connectToRoomSignaling(
|
||||
room: Room,
|
||||
user: User | null,
|
||||
resolvedOderId?: string,
|
||||
savedRooms: Room[] = [],
|
||||
options: { showCompatibilityError?: boolean; navigationRequestVersion?: number } = {}
|
||||
): Promise<void> {
|
||||
const shouldShowCompatibilityError = options.showCompatibilityError ?? false;
|
||||
const navigationRequestVersion = options.navigationRequestVersion;
|
||||
const isViewedRoom = () => room.id === this.latestNavigatedRoomId;
|
||||
|
||||
await this.serverDirectory.awaitInitialServerHealthCheck();
|
||||
|
||||
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const connectionPlan = await this.resolveRoomSignalConnectionPlan(room);
|
||||
|
||||
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const sessionFallbackSource = this.roomSignalFallbackSources.get(room.id);
|
||||
const connectionCandidates: {
|
||||
isExistingFallback?: boolean;
|
||||
isFallback?: boolean;
|
||||
isPrimary?: boolean;
|
||||
source: RoomSignalSource;
|
||||
}[] = [];
|
||||
const pushConnectionCandidate = (
|
||||
source: RoomSignalSource | null | undefined,
|
||||
flags: { isExistingFallback?: boolean; isFallback?: boolean; isPrimary?: boolean } = {}
|
||||
) => {
|
||||
if (!source || !this.resolveRoomSignalSelector(source, room.name)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (connectionCandidates.some((candidate) => areRoomSignalSourcesEqual(candidate.source, source))) {
|
||||
return;
|
||||
}
|
||||
|
||||
connectionCandidates.push({
|
||||
...flags,
|
||||
source
|
||||
});
|
||||
};
|
||||
|
||||
if (sessionFallbackSource && this.webrtc.hasJoinedServer(room.id)) {
|
||||
pushConnectionCandidate(sessionFallbackSource, { isExistingFallback: true, isFallback: true });
|
||||
}
|
||||
|
||||
pushConnectionCandidate(connectionPlan.primarySource, { isPrimary: true });
|
||||
|
||||
for (const fallbackSource of connectionPlan.fallbackSources) {
|
||||
pushConnectionCandidate(fallbackSource, { isFallback: true });
|
||||
}
|
||||
|
||||
let attemptedFallback = false;
|
||||
|
||||
for (const candidate of connectionCandidates) {
|
||||
const selector = this.resolveRoomSignalSelector(candidate.source, connectionPlan.room.name);
|
||||
|
||||
if (!selector) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const isCompatible = await this.serverDirectory.ensureEndpointVersionCompatibility(selector);
|
||||
|
||||
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isCompatible) {
|
||||
if (candidate.isPrimary) {
|
||||
if (shouldShowCompatibilityError) {
|
||||
this.store.dispatch(
|
||||
RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE })
|
||||
);
|
||||
}
|
||||
|
||||
if (isViewedRoom()) {
|
||||
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false }));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (candidate.isFallback && !candidate.isExistingFallback && !attemptedFallback) {
|
||||
attemptedFallback = true;
|
||||
|
||||
if (isViewedRoom()) {
|
||||
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true }));
|
||||
}
|
||||
}
|
||||
|
||||
const connected = await this.connectRoomToSignalSource(
|
||||
connectionPlan.room,
|
||||
candidate.source,
|
||||
user,
|
||||
resolvedOderId,
|
||||
savedRooms,
|
||||
navigationRequestVersion
|
||||
);
|
||||
|
||||
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!connected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (candidate.isFallback) {
|
||||
this.roomSignalFallbackSources.set(room.id, candidate.source);
|
||||
} else {
|
||||
this.roomSignalFallbackSources.delete(room.id);
|
||||
}
|
||||
|
||||
if (shouldShowCompatibilityError) {
|
||||
this.store.dispatch(RoomsActions.setSignalServerCompatibilityError({ message: null }));
|
||||
}
|
||||
|
||||
if (isViewedRoom()) {
|
||||
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: false }));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (shouldShowCompatibilityError) {
|
||||
this.store.dispatch(RoomsActions.setSignalServerCompatibilityError({ message: null }));
|
||||
}
|
||||
|
||||
if (isViewedRoom()) {
|
||||
this.store.dispatch(RoomsActions.setSignalServerReconnecting({ isReconnecting: true }));
|
||||
}
|
||||
}
|
||||
|
||||
syncSavedRoomConnections(
|
||||
user: User | null,
|
||||
currentRoom: Room | null,
|
||||
savedRooms: Room[],
|
||||
currentUrl: string
|
||||
): void {
|
||||
if (!user || savedRooms.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const watchedRoomId = extractRoomIdFromUrl(currentUrl);
|
||||
const roomsToSync = currentRoom ? this.includeRoom(savedRooms, currentRoom) : savedRooms;
|
||||
const roomsBySignalingUrl = new Map<string, Room[]>();
|
||||
|
||||
for (const room of roomsToSync) {
|
||||
const wsUrl = this.resolveRoomSignalingUrl(room);
|
||||
|
||||
if (!wsUrl) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const groupedRooms = roomsBySignalingUrl.get(wsUrl) ?? [];
|
||||
|
||||
if (!groupedRooms.some((groupedRoom) => groupedRoom.id === room.id)) {
|
||||
groupedRooms.push(room);
|
||||
}
|
||||
|
||||
roomsBySignalingUrl.set(wsUrl, groupedRooms);
|
||||
}
|
||||
|
||||
for (const groupedRooms of roomsBySignalingUrl.values()) {
|
||||
const preferredRoom = groupedRooms.find((room) => room.id === watchedRoomId)
|
||||
?? (currentRoom && groupedRooms.some((room) => room.id === currentRoom.id)
|
||||
? currentRoom
|
||||
: null)
|
||||
?? groupedRooms[0]
|
||||
?? null;
|
||||
|
||||
if (!preferredRoom) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const shouldShowCompatibilityError = preferredRoom.id === watchedRoomId
|
||||
|| (!!currentRoom && preferredRoom.id === currentRoom.id);
|
||||
|
||||
void this.connectToRoomSignaling(preferredRoom, user, user.oderId || this.webrtc.peerId(), roomsToSync, {
|
||||
showCompatibilityError: shouldShowCompatibilityError
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
resolveRoomSignalingUrl(room: Room): string {
|
||||
const selector = this.resolveRoomSignalSelector(this.getPreferredRoomSignalSource(room), room.name);
|
||||
|
||||
return selector ? this.serverDirectory.getWebSocketUrl(selector) : '';
|
||||
}
|
||||
|
||||
resolveRoomSignalSource(
|
||||
room: Pick<Room, 'name' | 'sourceId' | 'sourceName' | 'sourceUrl'>
|
||||
): RoomSignalSource {
|
||||
return this.serverDirectory.normaliseRoomSignalSource({
|
||||
sourceId: room.sourceId,
|
||||
sourceName: room.sourceName,
|
||||
sourceUrl: room.sourceUrl,
|
||||
fallbackName: room.sourceName ?? room.name
|
||||
}, {
|
||||
ensureEndpoint: !!room.sourceUrl
|
||||
});
|
||||
}
|
||||
|
||||
resolveRoomSignalSelector(
|
||||
source: RoomSignalSource | null | undefined,
|
||||
fallbackName: string
|
||||
): ServerSourceSelector | undefined {
|
||||
if (!source) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return this.serverDirectory.buildRoomSignalSelector({
|
||||
...source,
|
||||
fallbackName: source.sourceName ?? fallbackName
|
||||
}, {
|
||||
ensureEndpoint: !!source.sourceUrl
|
||||
});
|
||||
}
|
||||
|
||||
// ── Internal helpers ───────────────────────────────────────────
|
||||
|
||||
private async resolveRoomSignalConnectionPlan(room: Room): Promise<RoomSignalConnectionPlan> {
|
||||
let resolvedRoom = this.repairRoomSignalSource(room, this.resolveRoomSignalSource(room));
|
||||
let primarySource = this.resolveRoomSignalSource(resolvedRoom);
|
||||
|
||||
if (!this.webrtc.hasJoinedServer(room.id)) {
|
||||
const selector = this.resolveRoomSignalSelector(primarySource, resolvedRoom.name);
|
||||
const authoritativeServer = (
|
||||
selector
|
||||
? await firstValueFrom(this.serverDirectory.getServer(room.id, selector))
|
||||
: null
|
||||
) ?? await firstValueFrom(this.serverDirectory.findServerAcrossActiveEndpoints(room.id, primarySource));
|
||||
|
||||
if (authoritativeServer) {
|
||||
const authoritativeSource = this.serverDirectory.normaliseRoomSignalSource({
|
||||
sourceId: authoritativeServer.sourceId ?? primarySource.sourceId,
|
||||
sourceName: authoritativeServer.sourceName ?? primarySource.sourceName,
|
||||
sourceUrl: authoritativeServer.sourceUrl ?? primarySource.sourceUrl,
|
||||
fallbackName: authoritativeServer.sourceName ?? primarySource.sourceName ?? resolvedRoom.name
|
||||
}, {
|
||||
ensureEndpoint: !!(authoritativeServer.sourceUrl ?? primarySource.sourceUrl)
|
||||
});
|
||||
|
||||
resolvedRoom = this.repairRoomSignalSource(resolvedRoom, authoritativeSource);
|
||||
primarySource = authoritativeSource;
|
||||
}
|
||||
}
|
||||
|
||||
const fallbackSources = this.serverDirectory.getFallbackRoomEndpoints(primarySource)
|
||||
.map((endpoint) => this.serverDirectory.normaliseRoomSignalSource({
|
||||
sourceId: endpoint.id,
|
||||
sourceName: endpoint.name,
|
||||
sourceUrl: endpoint.url,
|
||||
fallbackName: endpoint.name
|
||||
}))
|
||||
.filter((source, index, sources) =>
|
||||
sources.findIndex((candidate) => areRoomSignalSourcesEqual(candidate, source)) === index
|
||||
);
|
||||
|
||||
return {
|
||||
fallbackSources,
|
||||
primarySource: this.resolveRoomSignalSelector(primarySource, resolvedRoom.name) ? primarySource : null,
|
||||
room: resolvedRoom
|
||||
};
|
||||
}
|
||||
|
||||
private async connectRoomToSignalSource(
|
||||
room: Room,
|
||||
source: RoomSignalSource,
|
||||
user: User | null,
|
||||
resolvedOderId: string | undefined,
|
||||
savedRooms: Room[],
|
||||
navigationRequestVersion?: number
|
||||
): Promise<boolean> {
|
||||
const selector = this.resolveRoomSignalSelector(source, room.name);
|
||||
|
||||
if (!selector) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const wsUrl = this.serverDirectory.getWebSocketUrl(selector);
|
||||
const oderId = resolvedOderId || user?.oderId || this.webrtc.peerId();
|
||||
const displayName = resolveUserDisplayName(user);
|
||||
const sameSignalRooms = this.getRoomsForSignalingUrl(this.includeRoom(savedRooms, room), wsUrl);
|
||||
const backgroundRooms = sameSignalRooms.filter((candidate) => candidate.id !== room.id);
|
||||
const joinCurrentEndpointRooms = () => {
|
||||
if (!this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.webrtc.setCurrentServer(room.id);
|
||||
this.webrtc.identify(oderId, displayName, wsUrl);
|
||||
|
||||
for (const backgroundRoom of backgroundRooms) {
|
||||
this.webrtc.joinRoom(backgroundRoom.id, oderId, wsUrl);
|
||||
}
|
||||
|
||||
if (this.webrtc.hasJoinedServer(room.id)) {
|
||||
this.webrtc.switchServer(room.id, oderId, wsUrl);
|
||||
} else {
|
||||
this.webrtc.joinRoom(room.id, oderId, wsUrl);
|
||||
}
|
||||
};
|
||||
|
||||
if (this.webrtc.isSignalingConnectedTo(wsUrl)) {
|
||||
joinCurrentEndpointRooms();
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
const connected = await firstValueFrom(this.webrtc.connectToSignalingServer(wsUrl));
|
||||
|
||||
if (!connected || !this.isCurrentRoomNavigation(room.id, navigationRequestVersion)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
joinCurrentEndpointRooms();
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private repairRoomSignalSource(room: Room, source: RoomSignalSource | null): Room {
|
||||
if (!source || areRoomSignalSourcesEqual(room, source)) {
|
||||
return room;
|
||||
}
|
||||
|
||||
const changes: Partial<Room> = {
|
||||
sourceId: source.sourceId,
|
||||
sourceName: source.sourceName,
|
||||
sourceUrl: source.sourceUrl
|
||||
};
|
||||
|
||||
this.store.dispatch(RoomsActions.updateRoom({
|
||||
roomId: room.id,
|
||||
changes
|
||||
}));
|
||||
|
||||
return {
|
||||
...room,
|
||||
...changes
|
||||
};
|
||||
}
|
||||
|
||||
private getPreferredRoomSignalSource(room: Room): RoomSignalSource {
|
||||
const fallbackSource = this.roomSignalFallbackSources.get(room.id);
|
||||
|
||||
if (fallbackSource && this.webrtc.hasJoinedServer(room.id)) {
|
||||
return fallbackSource;
|
||||
}
|
||||
|
||||
return this.resolveRoomSignalSource(room);
|
||||
}
|
||||
|
||||
private includeRoom(rooms: Room[], room: Room): Room[] {
|
||||
return rooms.some((candidate) => candidate.id === room.id)
|
||||
? rooms
|
||||
: [...rooms, room];
|
||||
}
|
||||
|
||||
private getRoomsForSignalingUrl(rooms: Room[], wsUrl: string): Room[] {
|
||||
const seenRoomIds = new Set<string>();
|
||||
const matchingRooms: Room[] = [];
|
||||
|
||||
for (const room of rooms) {
|
||||
if (seenRoomIds.has(room.id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (this.resolveRoomSignalingUrl(room) !== wsUrl) {
|
||||
continue;
|
||||
}
|
||||
|
||||
seenRoomIds.add(room.id);
|
||||
matchingRooms.push(room);
|
||||
}
|
||||
|
||||
return matchingRooms;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user