feat: plugins v1
This commit is contained in:
221
server/src/websocket/handler-plugin.spec.ts
Normal file
221
server/src/websocket/handler-plugin.spec.ts
Normal file
@@ -0,0 +1,221 @@
|
||||
import {
|
||||
beforeEach,
|
||||
describe,
|
||||
expect,
|
||||
it,
|
||||
vi
|
||||
} from 'vitest';
|
||||
import { WebSocket } from 'ws';
|
||||
import { ConnectedUser } from './types';
|
||||
import { connectedUsers } from './state';
|
||||
|
||||
const pluginSupportMocks = vi.hoisted(() => {
|
||||
class MockPluginSupportError extends Error {
|
||||
constructor(
|
||||
readonly status: number,
|
||||
readonly code: string,
|
||||
message: string
|
||||
) {
|
||||
super(message);
|
||||
this.name = 'PluginSupportError';
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
getPluginRequirementsSnapshot: vi.fn(),
|
||||
PluginSupportError: MockPluginSupportError,
|
||||
validatePluginEventEnvelope: vi.fn()
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock('../services/server-access.service', () => ({
|
||||
authorizeWebSocketJoin: vi.fn(async () => ({ allowed: true as const }))
|
||||
}));
|
||||
|
||||
vi.mock('../services/plugin-support.service', () => pluginSupportMocks);
|
||||
|
||||
import { handleWebSocketMessage } from './handler';
|
||||
|
||||
interface SentMessageStore {
|
||||
sentMessages: string[];
|
||||
}
|
||||
|
||||
function createMockWs(): WebSocket & SentMessageStore {
|
||||
const sentMessages: string[] = [];
|
||||
const socket = {
|
||||
readyState: WebSocket.OPEN,
|
||||
send: (data: string) => {
|
||||
sentMessages.push(data);
|
||||
},
|
||||
close: () => {},
|
||||
sentMessages
|
||||
} as unknown as WebSocket & SentMessageStore;
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
function createConnectedUser(
|
||||
connectionId: string,
|
||||
oderId: string,
|
||||
overrides: Partial<ConnectedUser> = {}
|
||||
): ConnectedUser {
|
||||
const user: ConnectedUser = {
|
||||
displayName: `User ${oderId}`,
|
||||
lastPong: Date.now(),
|
||||
oderId,
|
||||
serverIds: new Set(),
|
||||
ws: createMockWs(),
|
||||
...overrides
|
||||
};
|
||||
|
||||
connectedUsers.set(connectionId, user);
|
||||
return user;
|
||||
}
|
||||
|
||||
function readSentMessages(user: ConnectedUser): Record<string, unknown>[] {
|
||||
return (user.ws as unknown as SentMessageStore).sentMessages.map((messageText) => JSON.parse(messageText) as Record<string, unknown>);
|
||||
}
|
||||
|
||||
describe('server websocket handler - plugin support', () => {
|
||||
beforeEach(() => {
|
||||
connectedUsers.clear();
|
||||
pluginSupportMocks.getPluginRequirementsSnapshot.mockReset();
|
||||
pluginSupportMocks.validatePluginEventEnvelope.mockReset();
|
||||
pluginSupportMocks.getPluginRequirementsSnapshot.mockResolvedValue({
|
||||
eventDefinitions: [],
|
||||
requirements: [],
|
||||
serverId: 'server-1',
|
||||
updatedAt: 0
|
||||
});
|
||||
|
||||
pluginSupportMocks.validatePluginEventEnvelope.mockResolvedValue({ direction: 'serverRelay' });
|
||||
});
|
||||
|
||||
it('sends plugin requirement snapshots after joining a server', async () => {
|
||||
const alice = createConnectedUser('conn-1', 'alice');
|
||||
|
||||
pluginSupportMocks.getPluginRequirementsSnapshot.mockResolvedValue({
|
||||
eventDefinitions: [
|
||||
{
|
||||
direction: 'serverRelay',
|
||||
eventName: 'e2e:relay',
|
||||
maxPayloadBytes: 2048,
|
||||
pluginId: 'e2e.plugin-api',
|
||||
scope: 'server',
|
||||
updatedAt: 2
|
||||
}
|
||||
],
|
||||
requirements: [
|
||||
{
|
||||
pluginId: 'e2e.plugin-api',
|
||||
status: 'required',
|
||||
updatedAt: 1
|
||||
}
|
||||
],
|
||||
serverId: 'server-1',
|
||||
updatedAt: 2
|
||||
});
|
||||
|
||||
await handleWebSocketMessage('conn-1', { type: 'join_server', serverId: 'server-1' });
|
||||
|
||||
const messages = readSentMessages(alice);
|
||||
const pluginRequirements = messages.find((message) => message['type'] === 'plugin_requirements');
|
||||
|
||||
expect(pluginRequirements?.['serverId']).toBe('server-1');
|
||||
expect(pluginRequirements?.['snapshot']).toEqual(expect.objectContaining({ updatedAt: 2 }));
|
||||
});
|
||||
|
||||
it('validates and relays plugin events to other joined users', async () => {
|
||||
const alice = createConnectedUser('conn-1', 'alice', { viewedServerId: 'server-1' });
|
||||
const bob = createConnectedUser('conn-2', 'bob', { viewedServerId: 'server-1' });
|
||||
|
||||
alice.serverIds.add('server-1');
|
||||
bob.serverIds.add('server-1');
|
||||
|
||||
await handleWebSocketMessage('conn-1', {
|
||||
type: 'plugin_event',
|
||||
eventId: 'event-1',
|
||||
eventName: 'e2e:relay',
|
||||
payload: { ok: true },
|
||||
pluginId: 'e2e.plugin-api',
|
||||
serverId: 'server-1',
|
||||
sourcePluginUserId: 'fixture-user'
|
||||
});
|
||||
|
||||
expect(pluginSupportMocks.validatePluginEventEnvelope).toHaveBeenCalledWith({
|
||||
type: 'plugin_event',
|
||||
eventId: 'event-1',
|
||||
eventName: 'e2e:relay',
|
||||
payload: { ok: true },
|
||||
pluginId: 'e2e.plugin-api',
|
||||
serverId: 'server-1',
|
||||
sourcePluginUserId: 'fixture-user'
|
||||
});
|
||||
|
||||
const bobMessages = readSentMessages(bob);
|
||||
const relayedEvent = bobMessages.find((message) => message['type'] === 'plugin_event');
|
||||
|
||||
expect(relayedEvent).toEqual(expect.objectContaining({
|
||||
eventId: 'event-1',
|
||||
eventName: 'e2e:relay',
|
||||
pluginId: 'e2e.plugin-api',
|
||||
serverId: 'server-1',
|
||||
sourcePluginUserId: 'fixture-user',
|
||||
sourceUserId: 'alice'
|
||||
}));
|
||||
|
||||
expect(typeof relayedEvent?.['emittedAt']).toBe('number');
|
||||
});
|
||||
|
||||
it('returns plugin errors for invalid plugin event messages', async () => {
|
||||
const alice = createConnectedUser('conn-1', 'alice');
|
||||
|
||||
await handleWebSocketMessage('conn-1', {
|
||||
type: 'plugin_event',
|
||||
eventName: 'e2e:relay',
|
||||
pluginId: 'e2e.plugin-api',
|
||||
serverId: 'server-1'
|
||||
});
|
||||
|
||||
const pluginError = readSentMessages(alice).find((message) => message['type'] === 'plugin_error');
|
||||
|
||||
expect(pluginError).toEqual(expect.objectContaining({
|
||||
code: 'INVALID_PLUGIN_EVENT',
|
||||
eventName: 'e2e:relay',
|
||||
pluginId: 'e2e.plugin-api',
|
||||
serverId: 'server-1'
|
||||
}));
|
||||
|
||||
expect(pluginSupportMocks.validatePluginEventEnvelope).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('forwards plugin support validation errors to the sending user', async () => {
|
||||
const alice = createConnectedUser('conn-1', 'alice', { viewedServerId: 'server-1' });
|
||||
|
||||
alice.serverIds.add('server-1');
|
||||
pluginSupportMocks.validatePluginEventEnvelope.mockRejectedValue(new pluginSupportMocks.PluginSupportError(
|
||||
400,
|
||||
'PLUGIN_EVENT_NOT_RELAYABLE',
|
||||
'P2P plugin events must not be relayed by the signal server'
|
||||
));
|
||||
|
||||
await handleWebSocketMessage('conn-1', {
|
||||
type: 'plugin_event',
|
||||
eventId: 'event-p2p',
|
||||
eventName: 'e2e:p2p',
|
||||
payload: { hint: true },
|
||||
pluginId: 'e2e.plugin-api',
|
||||
serverId: 'server-1'
|
||||
});
|
||||
|
||||
const pluginError = readSentMessages(alice).find((message) => message['type'] === 'plugin_error');
|
||||
|
||||
expect(pluginError).toEqual(expect.objectContaining({
|
||||
code: 'PLUGIN_EVENT_NOT_RELAYABLE',
|
||||
eventId: 'event-p2p',
|
||||
eventName: 'e2e:p2p',
|
||||
pluginId: 'e2e.plugin-api',
|
||||
serverId: 'server-1'
|
||||
}));
|
||||
});
|
||||
});
|
||||
@@ -8,6 +8,11 @@ import {
|
||||
isOderIdConnectedToServer
|
||||
} from './broadcast';
|
||||
import { authorizeWebSocketJoin } from '../services/server-access.service';
|
||||
import {
|
||||
getPluginRequirementsSnapshot,
|
||||
PluginSupportError,
|
||||
validatePluginEventEnvelope
|
||||
} from '../services/plugin-support.service';
|
||||
|
||||
interface WsMessage {
|
||||
[key: string]: unknown;
|
||||
@@ -50,6 +55,29 @@ function readMessageId(value: unknown): string | undefined {
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function sendPluginError(user: ConnectedUser, error: unknown, message: WsMessage): void {
|
||||
if (error instanceof PluginSupportError) {
|
||||
user.ws.send(JSON.stringify({
|
||||
type: 'plugin_error',
|
||||
serverId: typeof message['serverId'] === 'string' ? message['serverId'] : undefined,
|
||||
pluginId: typeof message['pluginId'] === 'string' ? message['pluginId'] : undefined,
|
||||
eventName: typeof message['eventName'] === 'string' ? message['eventName'] : undefined,
|
||||
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
||||
code: error.code,
|
||||
message: error.message
|
||||
}));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
console.error('Unhandled plugin websocket error:', error);
|
||||
user.ws.send(JSON.stringify({
|
||||
type: 'plugin_error',
|
||||
code: 'INTERNAL_ERROR',
|
||||
message: 'Internal server error'
|
||||
}));
|
||||
}
|
||||
|
||||
/** Sends the current user list for a given server to a single connected user. */
|
||||
function sendServerUsers(user: ConnectedUser, serverId: string): void {
|
||||
const users = getUniqueUsersInServer(serverId, user.oderId)
|
||||
@@ -64,6 +92,20 @@ function sendServerUsers(user: ConnectedUser, serverId: string): void {
|
||||
user.ws.send(JSON.stringify({ type: 'server_users', serverId, users }));
|
||||
}
|
||||
|
||||
async function sendPluginRequirements(user: ConnectedUser, serverId: string): Promise<void> {
|
||||
try {
|
||||
const snapshot = await getPluginRequirementsSnapshot(serverId);
|
||||
|
||||
user.ws.send(JSON.stringify({
|
||||
type: 'plugin_requirements',
|
||||
serverId,
|
||||
snapshot
|
||||
}));
|
||||
} catch (error) {
|
||||
sendPluginError(user, error, { type: 'plugin_requirements', serverId });
|
||||
}
|
||||
}
|
||||
|
||||
function handleIdentify(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
||||
const newOderId = readMessageId(message['oderId']) ?? connectionId;
|
||||
const newScope = typeof message['connectionScope'] === 'string' ? message['connectionScope'] : undefined;
|
||||
@@ -137,6 +179,7 @@ async function handleJoinServer(user: ConnectedUser, message: WsMessage, connect
|
||||
);
|
||||
|
||||
sendServerUsers(user, sid);
|
||||
await sendPluginRequirements(user, sid);
|
||||
|
||||
if (isNewIdentityMembership) {
|
||||
broadcastToServer(sid, {
|
||||
@@ -151,17 +194,22 @@ async function handleJoinServer(user: ConnectedUser, message: WsMessage, connect
|
||||
}
|
||||
}
|
||||
|
||||
function handleViewServer(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
||||
async function handleViewServer(user: ConnectedUser, message: WsMessage, connectionId: string): Promise<void> {
|
||||
const viewSid = readMessageId(message['serverId']);
|
||||
|
||||
if (!viewSid)
|
||||
return;
|
||||
|
||||
if (!user.serverIds.has(viewSid)) {
|
||||
return;
|
||||
}
|
||||
|
||||
user.viewedServerId = viewSid;
|
||||
connectedUsers.set(connectionId, user);
|
||||
console.log(`User ${normalizeDisplayName(user.displayName)} (${user.oderId}) viewing server ${viewSid}`);
|
||||
|
||||
sendServerUsers(user, viewSid);
|
||||
await sendPluginRequirements(user, viewSid);
|
||||
}
|
||||
|
||||
function handleLeaveServer(user: ConnectedUser, message: WsMessage, connectionId: string): void {
|
||||
@@ -268,6 +316,52 @@ function handleStatusUpdate(user: ConnectedUser, message: WsMessage, connectionI
|
||||
}
|
||||
}
|
||||
|
||||
async function handlePluginEvent(user: ConnectedUser, message: WsMessage): Promise<void> {
|
||||
const serverId = readMessageId(message['serverId']) ?? user.viewedServerId;
|
||||
const pluginId = readMessageId(message['pluginId']);
|
||||
const eventName = readMessageId(message['eventName']);
|
||||
|
||||
if (!serverId || !pluginId || !eventName || !user.serverIds.has(serverId)) {
|
||||
user.ws.send(JSON.stringify({
|
||||
type: 'plugin_error',
|
||||
serverId,
|
||||
pluginId,
|
||||
eventName,
|
||||
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
||||
code: 'INVALID_PLUGIN_EVENT',
|
||||
message: 'Plugin event is missing required fields or server membership'
|
||||
}));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await validatePluginEventEnvelope({
|
||||
type: 'plugin_event',
|
||||
serverId,
|
||||
pluginId,
|
||||
eventName,
|
||||
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
||||
payload: message['payload'],
|
||||
sourcePluginUserId: typeof message['sourcePluginUserId'] === 'string' ? message['sourcePluginUserId'] : undefined
|
||||
});
|
||||
|
||||
broadcastToServer(serverId, {
|
||||
type: 'plugin_event',
|
||||
serverId,
|
||||
pluginId,
|
||||
eventName,
|
||||
eventId: typeof message['eventId'] === 'string' ? message['eventId'] : undefined,
|
||||
payload: message['payload'],
|
||||
sourcePluginUserId: typeof message['sourcePluginUserId'] === 'string' ? message['sourcePluginUserId'] : undefined,
|
||||
sourceUserId: user.oderId,
|
||||
emittedAt: Date.now()
|
||||
}, user.oderId);
|
||||
} catch (error) {
|
||||
sendPluginError(user, error, message);
|
||||
}
|
||||
}
|
||||
|
||||
export async function handleWebSocketMessage(connectionId: string, message: WsMessage): Promise<void> {
|
||||
const user = connectedUsers.get(connectionId);
|
||||
|
||||
@@ -290,7 +384,7 @@ export async function handleWebSocketMessage(connectionId: string, message: WsMe
|
||||
break;
|
||||
|
||||
case 'view_server':
|
||||
handleViewServer(user, message, connectionId);
|
||||
await handleViewServer(user, message, connectionId);
|
||||
break;
|
||||
|
||||
case 'leave_server':
|
||||
@@ -315,6 +409,10 @@ export async function handleWebSocketMessage(connectionId: string, message: WsMe
|
||||
handleStatusUpdate(user, message, connectionId);
|
||||
break;
|
||||
|
||||
case 'plugin_event':
|
||||
await handlePluginEvent(user, message);
|
||||
break;
|
||||
|
||||
default:
|
||||
console.log('Unknown message type:', message.type);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user