fix: recurriing network issue
All checks were successful
Queue Release Build / prepare (push) Successful in 18s
Deploy Web Apps / deploy (push) Successful in 6m32s
Queue Release Build / build-windows (push) Successful in 26m8s
Queue Release Build / build-linux (push) Successful in 40m18s
Queue Release Build / finalize (push) Successful in 42s
All checks were successful
Queue Release Build / prepare (push) Successful in 18s
Deploy Web Apps / deploy (push) Successful in 6m32s
Queue Release Build / build-windows (push) Successful in 26m8s
Queue Release Build / build-linux (push) Successful in 40m18s
Queue Release Build / finalize (push) Successful in 42s
This commit is contained in:
@@ -498,6 +498,49 @@ export class MessagesEffects {
|
||||
)
|
||||
);
|
||||
|
||||
incomingSignalingMessages$ = createEffect(() =>
|
||||
this.webrtc.onSignalingMessage.pipe(
|
||||
withLatestFrom(
|
||||
this.store.select(selectCurrentUser),
|
||||
this.store.select(selectCurrentRoom)
|
||||
),
|
||||
mergeMap(([
|
||||
event,
|
||||
currentUser,
|
||||
currentRoom
|
||||
]) => {
|
||||
if (event.type !== 'chat_message') {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
const ctx: IncomingMessageContext = {
|
||||
db: this.db,
|
||||
webrtc: this.webrtc,
|
||||
attachments: this.attachments,
|
||||
debugging: this.debugging,
|
||||
currentUser: currentUser ?? null,
|
||||
currentRoom
|
||||
};
|
||||
|
||||
return dispatchIncomingMessage({
|
||||
...event,
|
||||
type: 'chat-message',
|
||||
fromPeerId: event.fromUserId
|
||||
}, ctx).pipe(
|
||||
catchError((error) => {
|
||||
reportDebuggingError(this.debugging, 'messages', 'Failed to process incoming signaling chat message', {
|
||||
eventType: event.type,
|
||||
fromPeerId: event.fromUserId ?? null,
|
||||
roomId: event.serverId ?? null
|
||||
}, error);
|
||||
|
||||
return EMPTY;
|
||||
})
|
||||
);
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
private trackBackgroundOperation(task: Promise<unknown> | unknown, message: string, payload: Record<string, unknown>): void {
|
||||
trackDebuggingTaskFailure(task, this.debugging, 'messages', message, payload);
|
||||
}
|
||||
|
||||
161
toju-app/src/app/store/rooms/room-signaling-connection.spec.ts
Normal file
161
toju-app/src/app/store/rooms/room-signaling-connection.spec.ts
Normal file
@@ -0,0 +1,161 @@
|
||||
import { of } from 'rxjs';
|
||||
import {
|
||||
beforeEach,
|
||||
describe,
|
||||
expect,
|
||||
it,
|
||||
vi
|
||||
} from 'vitest';
|
||||
import type { Store } from '@ngrx/store';
|
||||
import type { RealtimeSessionFacade } from '../../core/realtime';
|
||||
import type { ServerDirectoryFacade } from '../../domains/server-directory';
|
||||
import type { Room, User } from '../../shared-kernel';
|
||||
import { RoomSignalingConnection } from './room-signaling-connection';
|
||||
|
||||
interface EndpointFixture {
|
||||
id: string;
|
||||
isActive: boolean;
|
||||
name: string;
|
||||
status: 'offline' | 'online';
|
||||
url: string;
|
||||
}
|
||||
|
||||
interface SignalSourceFixture {
|
||||
sourceId: string;
|
||||
sourceName?: string;
|
||||
sourceUrl: string;
|
||||
}
|
||||
|
||||
interface SignalSelectorFixture {
|
||||
sourceId: string;
|
||||
sourceUrl: string;
|
||||
}
|
||||
|
||||
interface FakeRealtimeSessionFacade {
|
||||
connectToSignalingServer: ReturnType<typeof vi.fn>;
|
||||
hasJoinedServer: ReturnType<typeof vi.fn>;
|
||||
identify: ReturnType<typeof vi.fn>;
|
||||
isSignalingConnectedTo: ReturnType<typeof vi.fn>;
|
||||
joinRoom: ReturnType<typeof vi.fn>;
|
||||
peerId: ReturnType<typeof vi.fn>;
|
||||
setCurrentServer: ReturnType<typeof vi.fn>;
|
||||
switchServer: ReturnType<typeof vi.fn>;
|
||||
}
|
||||
|
||||
interface FakeServerDirectoryFacade {
|
||||
awaitInitialServerHealthCheck: ReturnType<typeof vi.fn>;
|
||||
buildRoomSignalSelector: ReturnType<typeof vi.fn>;
|
||||
ensureEndpointVersionCompatibility: ReturnType<typeof vi.fn>;
|
||||
findServerAcrossActiveEndpoints: ReturnType<typeof vi.fn>;
|
||||
getFallbackRoomEndpoints: ReturnType<typeof vi.fn>;
|
||||
getServer: ReturnType<typeof vi.fn>;
|
||||
getWebSocketUrl: ReturnType<typeof vi.fn>;
|
||||
normaliseRoomSignalSource: ReturnType<typeof vi.fn>;
|
||||
resolveRoomEndpoint: ReturnType<typeof vi.fn>;
|
||||
}
|
||||
|
||||
interface FakeStore {
|
||||
dispatch: ReturnType<typeof vi.fn>;
|
||||
}
|
||||
|
||||
describe('RoomSignalingConnection', () => {
|
||||
const room: Room = {
|
||||
id: 'room-1',
|
||||
name: 'Room One',
|
||||
description: '',
|
||||
hostId: 'user-1',
|
||||
createdAt: 1,
|
||||
userCount: 1,
|
||||
sourceId: 'primary',
|
||||
sourceName: 'Primary',
|
||||
sourceUrl: 'https://signal.toju.app'
|
||||
};
|
||||
const user: User = {
|
||||
id: 'user-1',
|
||||
oderId: 'peer-a',
|
||||
username: 'maomao',
|
||||
displayName: 'maomao',
|
||||
status: 'online',
|
||||
createdAt: 1
|
||||
};
|
||||
|
||||
let webrtc: FakeRealtimeSessionFacade;
|
||||
let serverDirectory: FakeServerDirectoryFacade;
|
||||
let store: FakeStore;
|
||||
|
||||
beforeEach(() => {
|
||||
const endpoints = new Map<string, EndpointFixture>([
|
||||
[
|
||||
'primary',
|
||||
{
|
||||
id: 'primary',
|
||||
name: 'Primary',
|
||||
url: 'https://signal.toju.app',
|
||||
isActive: true,
|
||||
status: 'offline'
|
||||
}
|
||||
],
|
||||
[
|
||||
'fallback',
|
||||
{
|
||||
id: 'fallback',
|
||||
name: 'Sweden',
|
||||
url: 'https://signal-sweden.toju.app',
|
||||
isActive: true,
|
||||
status: 'online'
|
||||
}
|
||||
]
|
||||
]);
|
||||
|
||||
webrtc = {
|
||||
connectToSignalingServer: vi.fn((url: string) => of(url === 'wss://signal-sweden.toju.app')),
|
||||
hasJoinedServer: vi.fn(() => false),
|
||||
identify: vi.fn(),
|
||||
isSignalingConnectedTo: vi.fn(() => false),
|
||||
joinRoom: vi.fn(),
|
||||
peerId: vi.fn(() => 'peer-a'),
|
||||
setCurrentServer: vi.fn(),
|
||||
switchServer: vi.fn()
|
||||
};
|
||||
|
||||
serverDirectory = {
|
||||
awaitInitialServerHealthCheck: vi.fn(() => Promise.resolve()),
|
||||
buildRoomSignalSelector: vi.fn((source: SignalSourceFixture) => ({
|
||||
sourceId: source.sourceId,
|
||||
sourceUrl: source.sourceUrl
|
||||
})),
|
||||
ensureEndpointVersionCompatibility: vi.fn((selector: SignalSelectorFixture) =>
|
||||
Promise.resolve(selector.sourceId === 'fallback')
|
||||
),
|
||||
findServerAcrossActiveEndpoints: vi.fn(() => of(null)),
|
||||
getFallbackRoomEndpoints: vi.fn(() => [endpoints.get('fallback')]),
|
||||
getServer: vi.fn(() => of(null)),
|
||||
getWebSocketUrl: vi.fn((selector: SignalSelectorFixture) => selector.sourceUrl.replace(/^http/, 'ws')),
|
||||
normaliseRoomSignalSource: vi.fn((source: SignalSourceFixture) => ({
|
||||
sourceId: source.sourceId,
|
||||
sourceName: source.sourceName,
|
||||
sourceUrl: source.sourceUrl
|
||||
})),
|
||||
resolveRoomEndpoint: vi.fn((source: SignalSourceFixture) => endpoints.get(source.sourceId) ?? null)
|
||||
};
|
||||
|
||||
store = {
|
||||
dispatch: vi.fn()
|
||||
};
|
||||
});
|
||||
|
||||
it('tries fallback endpoints when the primary endpoint is offline', async () => {
|
||||
const connection = new RoomSignalingConnection(
|
||||
webrtc as unknown as RealtimeSessionFacade,
|
||||
serverDirectory as unknown as ServerDirectoryFacade,
|
||||
store as unknown as Store
|
||||
);
|
||||
|
||||
connection.beginRoomNavigation(room.id);
|
||||
await connection.connectToRoomSignaling(room, user, user.oderId, [room]);
|
||||
|
||||
expect(serverDirectory.ensureEndpointVersionCompatibility).toHaveBeenCalledTimes(2);
|
||||
expect(webrtc.connectToSignalingServer).toHaveBeenCalledWith('wss://signal-sweden.toju.app');
|
||||
expect(webrtc.joinRoom).toHaveBeenCalledWith(room.id, user.oderId, 'wss://signal-sweden.toju.app');
|
||||
});
|
||||
});
|
||||
@@ -135,7 +135,13 @@ export class RoomSignalingConnection {
|
||||
}
|
||||
|
||||
if (!isCompatible) {
|
||||
if (candidate.isPrimary) {
|
||||
// Warning: offline/unreachable endpoints also fail this check. Only
|
||||
// version-incompatible primary endpoints should stop fallback; transient
|
||||
// 521/522/network failures must continue to the next active endpoint.
|
||||
const endpoint = this.serverDirectory.resolveRoomEndpoint(candidate.source);
|
||||
const isEndpointIncompatible = endpoint?.status === 'incompatible';
|
||||
|
||||
if (candidate.isPrimary && isEndpointIncompatible) {
|
||||
if (shouldShowCompatibilityError) {
|
||||
this.store.dispatch(
|
||||
RoomsActions.setSignalServerCompatibilityError({ message: CLIENT_UPDATE_REQUIRED_MESSAGE })
|
||||
@@ -297,6 +303,10 @@ export class RoomSignalingConnection {
|
||||
|
||||
if (!this.webrtc.hasJoinedServer(room.id)) {
|
||||
const selector = this.resolveRoomSignalSelector(primarySource, resolvedRoom.name);
|
||||
// Warning: getServer returns null for both SERVER_NOT_FOUND and transient
|
||||
// endpoint failures. Always search active endpoints before deciding the
|
||||
// saved room source is stale, otherwise a Cloudflare/origin outage pins
|
||||
// reconnects to the dead endpoint.
|
||||
const authoritativeServer = (
|
||||
selector
|
||||
? await firstValueFrom(this.serverDirectory.getServer(room.id, selector))
|
||||
|
||||
@@ -95,12 +95,18 @@ export class RoomStateSyncEffects {
|
||||
/** Handles WebRTC signaling events for user presence (join, leave, server_users). */
|
||||
signalingMessages$ = createEffect(() =>
|
||||
this.webrtc.onSignalingMessage.pipe(
|
||||
withLatestFrom(this.store.select(selectCurrentUser), this.store.select(selectCurrentRoom), this.store.select(selectSavedRooms)),
|
||||
withLatestFrom(
|
||||
this.store.select(selectCurrentUser),
|
||||
this.store.select(selectCurrentRoom),
|
||||
this.store.select(selectSavedRooms),
|
||||
this.store.select(selectAllUsers)
|
||||
),
|
||||
mergeMap(([
|
||||
message,
|
||||
currentUser,
|
||||
currentRoom,
|
||||
savedRooms
|
||||
savedRooms,
|
||||
allUsers
|
||||
]) => {
|
||||
const signalingMessage: RoomPresenceSignalingMessage = message;
|
||||
const myId = currentUser?.oderId || currentUser?.id;
|
||||
@@ -226,6 +232,16 @@ export class RoomStateSyncEffects {
|
||||
];
|
||||
}
|
||||
|
||||
case 'voice_state': {
|
||||
const voiceEvent = {
|
||||
...signalingMessage,
|
||||
type: 'voice-state',
|
||||
fromPeerId: signalingMessage.oderId ?? signalingMessage.fromUserId
|
||||
} as ChatEvent;
|
||||
|
||||
return this.handleVoiceOrScreenState(voiceEvent, allUsers, currentUser ?? null, 'voice');
|
||||
}
|
||||
|
||||
case 'access_denied': {
|
||||
if (isWrongServer(signalingMessage.serverId, viewedServerId))
|
||||
return EMPTY;
|
||||
|
||||
Reference in New Issue
Block a user