Broadcast a cleared voice_state when voice-active sockets drop and reset mute/deafen flags on disconnect or reconnect so stale session state cannot leak to other clients. Co-authored-by: Cursor <cursoragent@cursor.com>
876 lines
25 KiB
TypeScript
876 lines
25 KiB
TypeScript
import { connectedUsers } from './state';
|
|
import { ConnectedUser } from './types';
|
|
import {
|
|
broadcastToServer,
|
|
findUserByOderId,
|
|
getServerIdsForOderId,
|
|
getUniqueUsersInServer,
|
|
isOderIdConnectedToServer,
|
|
notifyOtherConnectionsForOderId
|
|
} from './broadcast';
|
|
import {
|
|
authorizeWebSocketJoin,
|
|
findServerMembership,
|
|
usersShareServerMembership
|
|
} from '../services/server-access.service';
|
|
import { consumeSessionToken } from '../services/session-auth.service';
|
|
import {
|
|
getPluginRequirementsSnapshot,
|
|
PluginSupportError,
|
|
validatePluginEventEnvelope
|
|
} from '../services/plugin-support.service';
|
|
|
|
interface WsMessage {
|
|
[key: string]: unknown;
|
|
type: string;
|
|
}
|
|
|
|
function normalizeDisplayName(value: unknown, fallback = 'User'): string {
|
|
const normalized = typeof value === 'string' ? value.trim() : '';
|
|
|
|
return normalized || fallback;
|
|
}
|
|
|
|
function normalizeDescription(value: unknown): string | undefined {
|
|
if (typeof value !== 'string') {
|
|
return undefined;
|
|
}
|
|
|
|
const normalized = value.trim();
|
|
|
|
return normalized || undefined;
|
|
}
|
|
|
|
function normalizeProfileUpdatedAt(value: unknown): number | undefined {
|
|
return typeof value === 'number' && Number.isFinite(value) && value > 0 ? value : undefined;
|
|
}
|
|
|
|
function normalizeHomeSignalServerUrl(value: unknown): string | undefined {
|
|
if (typeof value !== 'string') {
|
|
return undefined;
|
|
}
|
|
|
|
const normalized = value.trim().replace(/\/+$/, '');
|
|
|
|
return normalized || undefined;
|
|
}
|
|
|
|
function buildPresenceUserPayload(user: ConnectedUser): {
|
|
oderId: string;
|
|
displayName: string;
|
|
description?: string;
|
|
profileUpdatedAt?: number;
|
|
homeSignalServerUrl?: string;
|
|
status: 'online' | 'away' | 'busy' | 'offline';
|
|
} {
|
|
return {
|
|
oderId: user.oderId,
|
|
displayName: normalizeDisplayName(user.displayName),
|
|
description: user.description,
|
|
profileUpdatedAt: user.profileUpdatedAt,
|
|
homeSignalServerUrl: user.homeSignalServerUrl,
|
|
status: user.status ?? 'online'
|
|
};
|
|
}
|
|
|
|
function normalizeClientInstanceId(value: unknown): string | undefined {
|
|
if (typeof value !== 'string') {
|
|
return undefined;
|
|
}
|
|
|
|
const normalized = value.trim();
|
|
|
|
return normalized || undefined;
|
|
}
|
|
|
|
function readVoiceConnected(message: WsMessage): boolean {
|
|
const voiceState = message['voiceState'];
|
|
|
|
if (!voiceState || typeof voiceState !== 'object') {
|
|
return message['isConnected'] === true;
|
|
}
|
|
|
|
return (voiceState as { isConnected?: boolean }).isConnected === true;
|
|
}
|
|
|
|
function evictStaleClientInstanceConnections(
|
|
oderId: string,
|
|
connectionScope: string | undefined,
|
|
clientInstanceId: string | undefined,
|
|
keepConnectionId: string
|
|
): void {
|
|
if (!clientInstanceId) {
|
|
return;
|
|
}
|
|
|
|
connectedUsers.forEach((candidate, connectionId) => {
|
|
if (
|
|
connectionId === keepConnectionId
|
|
|| candidate.oderId !== oderId
|
|
|| candidate.connectionScope !== connectionScope
|
|
|| candidate.clientInstanceId !== clientInstanceId
|
|
) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
candidate.ws.close();
|
|
} catch {
|
|
console.warn(`Failed to close stale connection ${connectionId} for ${oderId}`);
|
|
}
|
|
|
|
connectedUsers.delete(connectionId);
|
|
});
|
|
}
|
|
|
|
function clearVoiceActiveForOderId(oderId: string, exceptConnectionId?: string): void {
|
|
connectedUsers.forEach((candidate, connectionId) => {
|
|
if (candidate.oderId !== oderId || connectionId === exceptConnectionId) {
|
|
return;
|
|
}
|
|
|
|
candidate.voiceActive = false;
|
|
connectedUsers.set(connectionId, candidate);
|
|
});
|
|
}
|
|
|
|
function readVoiceStateServerId(snapshot: Record<string, unknown> | undefined): string | undefined {
|
|
if (!snapshot) {
|
|
return undefined;
|
|
}
|
|
|
|
const nestedVoiceState = snapshot['voiceState'];
|
|
|
|
if (nestedVoiceState && typeof nestedVoiceState === 'object') {
|
|
const nestedServerId = readMessageId((nestedVoiceState as { serverId?: unknown }).serverId);
|
|
|
|
if (nestedServerId) {
|
|
return nestedServerId;
|
|
}
|
|
}
|
|
|
|
return readMessageId(snapshot['serverId']);
|
|
}
|
|
|
|
/** Broadcast a cleared voice_state when a voice-active socket disappears without a graceful leave. */
|
|
export function finalizeVoiceDisconnectForConnection(connectionId: string): void {
|
|
const user = connectedUsers.get(connectionId);
|
|
|
|
if (!user?.authenticated || (!user.voiceActive && !user.voiceStateSnapshot)) {
|
|
return;
|
|
}
|
|
|
|
const serverId = readVoiceStateServerId(user.voiceStateSnapshot) ?? user.viewedServerId;
|
|
|
|
if (serverId && user.serverIds.has(serverId)) {
|
|
broadcastToServer(
|
|
serverId,
|
|
{
|
|
type: 'voice_state',
|
|
serverId,
|
|
oderId: user.oderId,
|
|
displayName: normalizeDisplayName(user.displayName),
|
|
voiceState: {
|
|
isConnected: false,
|
|
isMuted: false,
|
|
isDeafened: false,
|
|
isSpeaking: false
|
|
}
|
|
},
|
|
{ excludeConnectionId: connectionId }
|
|
);
|
|
}
|
|
|
|
user.voiceActive = false;
|
|
user.voiceStateSnapshot = undefined;
|
|
connectedUsers.set(connectionId, user);
|
|
clearVoiceActiveForOderId(user.oderId, connectionId);
|
|
}
|
|
|
|
function sendVoiceStateSnapshotToConnection(user: ConnectedUser, snapshot: Record<string, unknown>): void {
|
|
user.ws.send(JSON.stringify({
|
|
type: 'voice_state',
|
|
...snapshot
|
|
}));
|
|
}
|
|
|
|
function readMessageId(value: unknown): string | undefined {
|
|
if (typeof value !== 'string') {
|
|
return undefined;
|
|
}
|
|
|
|
const normalized = value.trim();
|
|
|
|
if (!normalized || normalized === 'undefined' || normalized === 'null') {
|
|
return undefined;
|
|
}
|
|
|
|
return normalized;
|
|
}
|
|
|
|
function sendPluginError(user: ConnectedUser, error: unknown, message: WsMessage): void {
|
|
if (error instanceof PluginSupportError) {
|
|
user.ws.send(
|
|
JSON.stringify({
|
|
type: 'plugin_error',
|
|
serverId: typeof message['serverId'] === 'string' ? message['serverId'] : undefined,
|
|
pluginId: typeof message['pluginId'] === 'string' ? message['pluginId'] : undefined,
|
|
eventName: typeof message['eventName'] === 'string' ? message['eventName'] : undefined,
|
|
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
|
code: error.code,
|
|
message: error.message
|
|
})
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
console.error('Unhandled plugin websocket error:', error);
|
|
user.ws.send(
|
|
JSON.stringify({
|
|
type: 'plugin_error',
|
|
code: 'INTERNAL_ERROR',
|
|
message: 'Internal server error'
|
|
})
|
|
);
|
|
}
|
|
|
|
/** Sends the current user list for a given server to a single connected user. */
|
|
function sendServerUsers(user: ConnectedUser, serverId: string): void {
|
|
const users = getUniqueUsersInServer(serverId, user.oderId).map((cu) => buildPresenceUserPayload(cu));
|
|
|
|
user.ws.send(JSON.stringify({ type: 'server_users', serverId, users }));
|
|
}
|
|
|
|
async function sendPluginRequirements(user: ConnectedUser, serverId: string): Promise<void> {
|
|
try {
|
|
const snapshot = await getPluginRequirementsSnapshot(serverId);
|
|
|
|
user.ws.send(
|
|
JSON.stringify({
|
|
type: 'plugin_requirements',
|
|
serverId,
|
|
snapshot
|
|
})
|
|
);
|
|
} catch (error) {
|
|
sendPluginError(user, error, { type: 'plugin_requirements', serverId });
|
|
}
|
|
}
|
|
|
|
const DIRECT_SIGNALING_TYPES = new Set([
|
|
'direct-message',
|
|
'direct-message-status',
|
|
'direct-message-mutation',
|
|
'direct-message-typing',
|
|
'direct-message-sync-request',
|
|
'direct-message-sync',
|
|
'direct-call'
|
|
]);
|
|
const SERVER_SCOPED_SIGNALING_TYPES = new Set([
|
|
'server_icon_peer_request',
|
|
'server_icon_peer_data',
|
|
'server_icon_available',
|
|
'server_icon_sync_request'
|
|
]);
|
|
|
|
function sendAuthRequired(user: ConnectedUser): void {
|
|
user.ws.send(JSON.stringify({
|
|
type: 'auth_required',
|
|
message: 'identify with a valid session token before sending messages'
|
|
}));
|
|
}
|
|
|
|
async function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: string): Promise<void> {
|
|
const token = typeof message['token'] === 'string' ? message['token'].trim() : '';
|
|
|
|
if (!token) {
|
|
user.ws.send(JSON.stringify({
|
|
type: 'auth_error',
|
|
code: 'MISSING_TOKEN',
|
|
message: 'identify requires a session token'
|
|
}));
|
|
|
|
return;
|
|
}
|
|
|
|
const session = await consumeSessionToken(token);
|
|
|
|
if (!session) {
|
|
user.ws.send(JSON.stringify({
|
|
type: 'auth_error',
|
|
code: 'INVALID_TOKEN',
|
|
message: 'invalid or expired session token'
|
|
}));
|
|
|
|
return;
|
|
}
|
|
|
|
const claimedOderId = readMessageId(message['oderId']);
|
|
|
|
if (claimedOderId && claimedOderId !== session.user.id) {
|
|
user.ws.send(JSON.stringify({
|
|
type: 'auth_error',
|
|
code: 'USER_ID_MISMATCH',
|
|
message: 'oderId must match the authenticated user'
|
|
}));
|
|
|
|
return;
|
|
}
|
|
|
|
const newOderId = session.user.id;
|
|
const newScope = typeof message['connectionScope'] === 'string' ? message['connectionScope'] : undefined;
|
|
const newClientInstanceId = normalizeClientInstanceId(message['clientInstanceId']);
|
|
const previousDisplayName = normalizeDisplayName(user.displayName);
|
|
const previousDescription = user.description;
|
|
const previousProfileUpdatedAt = user.profileUpdatedAt;
|
|
const previousHomeSignalServerUrl = user.homeSignalServerUrl;
|
|
|
|
evictStaleClientInstanceConnections(newOderId, newScope, newClientInstanceId, connectionId);
|
|
|
|
user.oderId = newOderId;
|
|
user.authenticated = true;
|
|
user.clientInstanceId = newClientInstanceId;
|
|
user.displayName = normalizeDisplayName(message['displayName'], normalizeDisplayName(user.displayName));
|
|
|
|
if (Object.prototype.hasOwnProperty.call(message, 'description')) {
|
|
user.description = normalizeDescription(message['description']);
|
|
}
|
|
|
|
if (Object.prototype.hasOwnProperty.call(message, 'profileUpdatedAt')) {
|
|
user.profileUpdatedAt = normalizeProfileUpdatedAt(message['profileUpdatedAt']);
|
|
}
|
|
|
|
if (Object.prototype.hasOwnProperty.call(message, 'homeSignalServerUrl')) {
|
|
user.homeSignalServerUrl = normalizeHomeSignalServerUrl(message['homeSignalServerUrl']);
|
|
}
|
|
|
|
user.connectionScope = newScope;
|
|
connectedUsers.set(connectionId, user);
|
|
console.log(`User identified: ${user.displayName} (${user.oderId})`);
|
|
|
|
notifyOtherConnectionsForOderId(newOderId, {
|
|
type: 'account_sync_peer_online',
|
|
clientInstanceId: newClientInstanceId
|
|
}, connectionId);
|
|
|
|
const voiceSnapshot = Array.from(connectedUsers.entries()).find(([otherConnectionId, otherUser]) =>
|
|
otherConnectionId !== connectionId
|
|
&& otherUser.oderId === newOderId
|
|
&& otherUser.voiceActive
|
|
&& otherUser.voiceStateSnapshot
|
|
)?.[1]?.voiceStateSnapshot;
|
|
|
|
if (voiceSnapshot) {
|
|
sendVoiceStateSnapshotToConnection(user, voiceSnapshot);
|
|
}
|
|
|
|
if (
|
|
user.displayName === previousDisplayName
|
|
&& user.description === previousDescription
|
|
&& user.profileUpdatedAt === previousProfileUpdatedAt
|
|
&& user.homeSignalServerUrl === previousHomeSignalServerUrl
|
|
) {
|
|
return;
|
|
}
|
|
|
|
for (const serverId of user.serverIds) {
|
|
broadcastToServer(
|
|
serverId,
|
|
{
|
|
type: 'user_joined',
|
|
...buildPresenceUserPayload(user),
|
|
serverId
|
|
},
|
|
{ excludeConnectionId: connectionId }
|
|
);
|
|
}
|
|
}
|
|
|
|
async function handleJoinServer(user: ConnectedUser, message: WsMessage, connectionId: string): Promise<void> {
|
|
const sid = readMessageId(message['serverId']);
|
|
|
|
if (!sid)
|
|
return;
|
|
|
|
const authorization = await authorizeWebSocketJoin(sid, user.oderId);
|
|
|
|
if (!authorization.allowed) {
|
|
user.ws.send(
|
|
JSON.stringify({
|
|
type: 'access_denied',
|
|
serverId: sid,
|
|
reason: authorization.reason
|
|
})
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
const isNewConnectionMembership = !user.serverIds.has(sid);
|
|
const isNewIdentityMembership = isNewConnectionMembership && !isOderIdConnectedToServer(user.oderId, sid, connectionId);
|
|
|
|
user.serverIds.add(sid);
|
|
user.viewedServerId = sid;
|
|
connectedUsers.set(connectionId, user);
|
|
console.log(
|
|
`User ${normalizeDisplayName(user.displayName)} (${user.oderId}) joined server ${sid} ` +
|
|
`(newConnection=${isNewConnectionMembership}, newIdentity=${isNewIdentityMembership})`
|
|
);
|
|
|
|
sendServerUsers(user, sid);
|
|
await sendPluginRequirements(user, sid);
|
|
|
|
if (isNewIdentityMembership) {
|
|
broadcastToServer(
|
|
sid,
|
|
{
|
|
type: 'user_joined',
|
|
...buildPresenceUserPayload(user),
|
|
serverId: sid
|
|
},
|
|
{ excludeIdentityOderId: user.oderId }
|
|
);
|
|
}
|
|
}
|
|
|
|
async function handleViewServer(user: ConnectedUser, message: WsMessage, connectionId: string): Promise<void> {
|
|
const viewSid = readMessageId(message['serverId']);
|
|
|
|
if (!viewSid)
|
|
return;
|
|
|
|
if (!user.serverIds.has(viewSid)) {
|
|
return;
|
|
}
|
|
|
|
user.viewedServerId = viewSid;
|
|
connectedUsers.set(connectionId, user);
|
|
console.log(`User ${normalizeDisplayName(user.displayName)} (${user.oderId}) viewing server ${viewSid}`);
|
|
|
|
sendServerUsers(user, viewSid);
|
|
await sendPluginRequirements(user, viewSid);
|
|
}
|
|
|
|
function handleLeaveServer(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
const leaveSid = readMessageId(message['serverId']) ?? user.viewedServerId;
|
|
|
|
if (!leaveSid)
|
|
return;
|
|
|
|
user.serverIds.delete(leaveSid);
|
|
|
|
if (user.viewedServerId === leaveSid)
|
|
user.viewedServerId = undefined;
|
|
|
|
connectedUsers.set(connectionId, user);
|
|
|
|
const remainingServerIds = getServerIdsForOderId(user.oderId, connectionId);
|
|
|
|
if (remainingServerIds.includes(leaveSid)) {
|
|
return;
|
|
}
|
|
|
|
broadcastToServer(
|
|
leaveSid,
|
|
{
|
|
type: 'user_left',
|
|
oderId: user.oderId,
|
|
displayName: normalizeDisplayName(user.displayName),
|
|
serverId: leaveSid,
|
|
serverIds: remainingServerIds
|
|
},
|
|
{ excludeIdentityOderId: user.oderId }
|
|
);
|
|
}
|
|
|
|
async function canForwardRtcMessage(user: ConnectedUser, message: WsMessage, targetUserId: string): Promise<boolean> {
|
|
if (!targetUserId || targetUserId === user.oderId) {
|
|
return false;
|
|
}
|
|
|
|
if (DIRECT_SIGNALING_TYPES.has(message.type)) {
|
|
return true;
|
|
}
|
|
|
|
if (SERVER_SCOPED_SIGNALING_TYPES.has(message.type)) {
|
|
const serverId = readMessageId(message['serverId']);
|
|
|
|
if (!serverId) {
|
|
return false;
|
|
}
|
|
|
|
const senderMembership = await findServerMembership(serverId, user.oderId);
|
|
const targetMembership = await findServerMembership(serverId, targetUserId);
|
|
|
|
return !!senderMembership && !!targetMembership;
|
|
}
|
|
|
|
if (message.type === 'offer' || message.type === 'answer' || message.type === 'ice_candidate') {
|
|
return true;
|
|
}
|
|
|
|
return usersShareServerMembership(user.oderId, targetUserId);
|
|
}
|
|
|
|
async function forwardRtcMessage(user: ConnectedUser, message: WsMessage): Promise<void> {
|
|
const targetUserId = readMessageId(message['targetUserId']) ?? '';
|
|
|
|
console.log(`Forwarding ${message.type} from ${user.oderId} to ${targetUserId}`);
|
|
|
|
if (!(await canForwardRtcMessage(user, message, targetUserId))) {
|
|
console.log(`Blocked ${message.type} relay from ${user.oderId} to ${targetUserId}`);
|
|
return;
|
|
}
|
|
|
|
const targetUser = findUserByOderId(targetUserId);
|
|
|
|
if (targetUser) {
|
|
targetUser.ws.send(JSON.stringify({ ...message, fromUserId: user.oderId }));
|
|
console.log(`Successfully forwarded ${message.type} to ${targetUserId}`);
|
|
} else {
|
|
console.log(
|
|
`Target user ${targetUserId} not found. Connected users:`,
|
|
Array.from(connectedUsers.values()).map((cu) => ({ oderId: cu.oderId, displayName: cu.displayName }))
|
|
);
|
|
}
|
|
}
|
|
|
|
function handleChatMessage(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
const chatSid = (message['serverId'] as string | undefined) ?? user.viewedServerId;
|
|
|
|
if (chatSid && user.serverIds.has(chatSid)) {
|
|
broadcastToServer(chatSid, {
|
|
type: 'chat_message',
|
|
serverId: chatSid,
|
|
message: message['message'],
|
|
senderId: user.oderId,
|
|
senderName: user.displayName,
|
|
clientInstanceId: user.clientInstanceId,
|
|
timestamp: Date.now()
|
|
}, { excludeConnectionId: connectionId });
|
|
}
|
|
}
|
|
|
|
function handleVoiceState(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
const serverId = readMessageId(message['serverId']) ?? user.viewedServerId;
|
|
|
|
if (!serverId || !user.serverIds.has(serverId)) {
|
|
return;
|
|
}
|
|
|
|
const isConnected = readVoiceConnected(message);
|
|
|
|
if (isConnected) {
|
|
clearVoiceActiveForOderId(user.oderId, connectionId);
|
|
user.voiceActive = true;
|
|
user.voiceStateSnapshot = {
|
|
...message,
|
|
type: 'voice_state',
|
|
serverId,
|
|
oderId: user.oderId,
|
|
displayName: normalizeDisplayName(user.displayName)
|
|
};
|
|
} else {
|
|
user.voiceActive = false;
|
|
user.voiceStateSnapshot = undefined;
|
|
}
|
|
|
|
connectedUsers.set(connectionId, user);
|
|
|
|
broadcastToServer(
|
|
serverId,
|
|
{
|
|
...message,
|
|
type: 'voice_state',
|
|
serverId,
|
|
oderId: user.oderId,
|
|
displayName: normalizeDisplayName(user.displayName)
|
|
},
|
|
{ excludeConnectionId: connectionId }
|
|
);
|
|
}
|
|
|
|
function handleVoiceClientTakeover(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
notifyOtherConnectionsForOderId(user.oderId, {
|
|
type: 'voice_client_takeover',
|
|
clientInstanceId: normalizeClientInstanceId(message['clientInstanceId']) ?? user.clientInstanceId,
|
|
requestedByClientInstanceId: normalizeClientInstanceId(message['clientInstanceId']) ?? user.clientInstanceId
|
|
}, connectionId);
|
|
}
|
|
|
|
function handleAccountSync(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
const payload = message['payload'];
|
|
|
|
if (!payload || typeof payload !== 'object' || typeof (payload as { type?: unknown }).type !== 'string') {
|
|
return;
|
|
}
|
|
|
|
notifyOtherConnectionsForOderId(user.oderId, {
|
|
type: 'account_sync',
|
|
clientInstanceId: normalizeClientInstanceId(message['clientInstanceId']) ?? user.clientInstanceId,
|
|
fromUserId: user.oderId,
|
|
payload
|
|
}, connectionId);
|
|
}
|
|
|
|
function handleTyping(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
const typingSid = (message['serverId'] as string | undefined) ?? user.viewedServerId;
|
|
const channelId = typeof message['channelId'] === 'string' && message['channelId'].trim() ? message['channelId'].trim() : 'general';
|
|
const isTyping = message['isTyping'] !== false;
|
|
|
|
if (typingSid && user.serverIds.has(typingSid)) {
|
|
broadcastToServer(
|
|
typingSid,
|
|
{
|
|
type: 'user_typing',
|
|
serverId: typingSid,
|
|
channelId,
|
|
isTyping,
|
|
oderId: user.oderId,
|
|
displayName: user.displayName,
|
|
clientInstanceId: user.clientInstanceId
|
|
},
|
|
{ excludeConnectionId: connectionId }
|
|
);
|
|
}
|
|
}
|
|
|
|
const VALID_STATUSES = new Set([
|
|
'online',
|
|
'away',
|
|
'busy',
|
|
'offline'
|
|
]);
|
|
|
|
function handleStatusUpdate(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
const status = typeof message['status'] === 'string' ? message['status'] : undefined;
|
|
|
|
if (!status || !VALID_STATUSES.has(status))
|
|
return;
|
|
|
|
user.status = status as ConnectedUser['status'];
|
|
connectedUsers.set(connectionId, user);
|
|
console.log(`User ${normalizeDisplayName(user.displayName)} (${user.oderId}) status -> ${status}`);
|
|
|
|
for (const serverId of user.serverIds) {
|
|
broadcastToServer(
|
|
serverId,
|
|
{
|
|
type: 'status_update',
|
|
oderId: user.oderId,
|
|
status
|
|
},
|
|
{ excludeConnectionId: connectionId }
|
|
);
|
|
}
|
|
}
|
|
|
|
function handleServerIconAvailable(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
|
const serverId = readMessageId(message['serverId']);
|
|
const iconUpdatedAt = typeof message['iconUpdatedAt'] === 'number' && Number.isFinite(message['iconUpdatedAt']) ? message['iconUpdatedAt'] : 0;
|
|
|
|
if (!serverId || iconUpdatedAt <= 0 || !user.serverIds.has(serverId)) {
|
|
return;
|
|
}
|
|
|
|
const availableIcons = user.serverIconUpdatedAtByServerId ?? new Map<string, number>();
|
|
|
|
availableIcons.set(serverId, iconUpdatedAt);
|
|
user.serverIconUpdatedAtByServerId = availableIcons;
|
|
connectedUsers.set(connectionId, user);
|
|
}
|
|
|
|
function handleServerIconSyncRequest(user: ConnectedUser, message: WsMessage): void {
|
|
const serverId = readMessageId(message['serverId']);
|
|
const localUpdatedAt = typeof message['iconUpdatedAt'] === 'number' && Number.isFinite(message['iconUpdatedAt']) ? message['iconUpdatedAt'] : 0;
|
|
|
|
if (!serverId) {
|
|
return;
|
|
}
|
|
|
|
const users = getUniqueUsersInServer(serverId, user.oderId)
|
|
.filter((candidate) => (candidate.serverIconUpdatedAtByServerId?.get(serverId) ?? 0) > localUpdatedAt)
|
|
.map((candidate) => ({
|
|
oderId: candidate.oderId,
|
|
displayName: normalizeDisplayName(candidate.displayName),
|
|
description: candidate.description,
|
|
profileUpdatedAt: candidate.profileUpdatedAt,
|
|
status: candidate.status ?? 'online'
|
|
}));
|
|
|
|
if (users.length === 0) {
|
|
return;
|
|
}
|
|
|
|
user.ws.send(JSON.stringify({ type: 'server_icon_sync_peers', serverId, users }));
|
|
}
|
|
|
|
async function handlePluginEvent(user: ConnectedUser, message: WsMessage, connectionId: string): Promise<void> {
|
|
const serverId = readMessageId(message['serverId']) ?? user.viewedServerId;
|
|
const pluginId = readMessageId(message['pluginId']);
|
|
const eventName = readMessageId(message['eventName']);
|
|
|
|
if (!serverId || !pluginId || !eventName || !user.serverIds.has(serverId)) {
|
|
user.ws.send(
|
|
JSON.stringify({
|
|
type: 'plugin_error',
|
|
serverId,
|
|
pluginId,
|
|
eventName,
|
|
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
|
code: 'INVALID_PLUGIN_EVENT',
|
|
message: 'Plugin event is missing required fields or server membership'
|
|
})
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await validatePluginEventEnvelope({
|
|
type: 'plugin_event',
|
|
serverId,
|
|
pluginId,
|
|
eventName,
|
|
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
|
payload: message['payload'],
|
|
sourcePluginUserId: typeof message['sourcePluginUserId'] === 'string' ? message['sourcePluginUserId'] : undefined
|
|
});
|
|
|
|
broadcastToServer(
|
|
serverId,
|
|
{
|
|
type: 'plugin_event',
|
|
serverId,
|
|
pluginId,
|
|
eventName,
|
|
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
|
payload: message['payload'],
|
|
sourcePluginUserId: typeof message['sourcePluginUserId'] === 'string' ? message['sourcePluginUserId'] : undefined,
|
|
sourceUserId: user.oderId,
|
|
emittedAt: Date.now()
|
|
},
|
|
{ excludeConnectionId: connectionId }
|
|
);
|
|
} catch (error) {
|
|
sendPluginError(user, error, message);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Tail of the in-flight message chain per connection.
|
|
*
|
|
* Messages from one client can arrive in the same tick (one TCP segment), but
|
|
* handlers like identify await async work. Without serialization a
|
|
* join_server can be evaluated while identify is still pending, get rejected
|
|
* as unauthenticated, and silently lose the room membership.
|
|
*/
|
|
const connectionMessageChains = new Map<string, Promise<void>>();
|
|
|
|
export function handleWebSocketMessage(connectionId: string, message: WsMessage): Promise<void> {
|
|
const prior = connectionMessageChains.get(connectionId) ?? Promise.resolve();
|
|
const current = prior.then(() => processWebSocketMessage(connectionId, message));
|
|
const tail = current.catch(() => undefined);
|
|
|
|
connectionMessageChains.set(connectionId, tail);
|
|
void tail.then(() => {
|
|
if (connectionMessageChains.get(connectionId) === tail) {
|
|
connectionMessageChains.delete(connectionId);
|
|
}
|
|
});
|
|
|
|
return current;
|
|
}
|
|
|
|
async function processWebSocketMessage(connectionId: string, message: WsMessage): Promise<void> {
|
|
const user = connectedUsers.get(connectionId);
|
|
|
|
if (!user)
|
|
return;
|
|
|
|
user.lastPong = Date.now();
|
|
connectedUsers.set(connectionId, user);
|
|
|
|
if (!user.authenticated && message.type !== 'identify' && message.type !== 'keepalive') {
|
|
sendAuthRequired(user);
|
|
return;
|
|
}
|
|
|
|
switch (message.type) {
|
|
case 'keepalive':
|
|
user.ws.send(JSON.stringify({ type: 'keepalive_ack', serverTime: Date.now() }));
|
|
break;
|
|
|
|
case 'identify':
|
|
await handleIdentify(user, message, connectionId);
|
|
break;
|
|
|
|
case 'join_server':
|
|
await handleJoinServer(user, message, connectionId);
|
|
break;
|
|
|
|
case 'view_server':
|
|
await handleViewServer(user, message, connectionId);
|
|
break;
|
|
|
|
case 'leave_server':
|
|
handleLeaveServer(user, message, connectionId);
|
|
break;
|
|
|
|
case 'offer':
|
|
case 'answer':
|
|
case 'ice_candidate':
|
|
case 'direct-message':
|
|
case 'direct-message-status':
|
|
case 'direct-message-mutation':
|
|
case 'direct-message-typing':
|
|
case 'direct-message-sync-request':
|
|
case 'direct-message-sync':
|
|
case 'direct-call':
|
|
case 'server_icon_peer_request':
|
|
case 'server_icon_peer_data':
|
|
await forwardRtcMessage(user, message);
|
|
break;
|
|
|
|
case 'chat_message':
|
|
handleChatMessage(user, message, connectionId);
|
|
break;
|
|
|
|
case 'voice_state':
|
|
handleVoiceState(user, message, connectionId);
|
|
break;
|
|
|
|
case 'voice_client_takeover':
|
|
handleVoiceClientTakeover(user, message, connectionId);
|
|
break;
|
|
|
|
case 'account_sync':
|
|
handleAccountSync(user, message, connectionId);
|
|
break;
|
|
|
|
case 'typing':
|
|
handleTyping(user, message, connectionId);
|
|
break;
|
|
|
|
case 'status_update':
|
|
handleStatusUpdate(user, message, connectionId);
|
|
break;
|
|
|
|
case 'server_icon_available':
|
|
handleServerIconAvailable(user, message, connectionId);
|
|
break;
|
|
|
|
case 'server_icon_sync_request':
|
|
handleServerIconSyncRequest(user, message);
|
|
break;
|
|
|
|
case 'plugin_event':
|
|
await handlePluginEvent(user, message, connectionId);
|
|
break;
|
|
|
|
default:
|
|
console.log('Unknown message type:', message.type);
|
|
}
|
|
}
|