feat: Security
This commit is contained in:
@@ -2,10 +2,22 @@
|
||||
* Message store helpers - delegates pure domain logic to `domains/chat/domain/`
|
||||
* and provides DB-dependent hydration/merge operations at the application level.
|
||||
*/
|
||||
import { Message } from '../../shared-kernel';
|
||||
import {
|
||||
Message,
|
||||
type MessageRevision
|
||||
} from '../../shared-kernel';
|
||||
import { DatabaseService } from '../../infrastructure/persistence';
|
||||
import { getMessageTimestamp, normaliseDeletedMessage } from '../../domains/chat/domain/rules/message.rules';
|
||||
import type { InventoryItem } from '../../domains/chat/domain/rules/message-sync.rules';
|
||||
import {
|
||||
computeMessageHeadHashFromMessage,
|
||||
getMessageRevision,
|
||||
shouldApplyIncomingRevision
|
||||
} from '../../domains/chat/domain/rules/message-integrity.rules';
|
||||
import {
|
||||
materializeMessageFromRevision,
|
||||
revisionBeatsMessage
|
||||
} from '../../domains/chat/domain/rules/message-revision.builder.rules';
|
||||
|
||||
// Re-export domain logic so existing callers keep working
|
||||
export {
|
||||
@@ -58,26 +70,28 @@ export async function buildInventoryItem(
|
||||
_db: DatabaseService,
|
||||
attachmentCountOverride?: number
|
||||
): Promise<InventoryItem> {
|
||||
const revision = getMessageRevision(msg);
|
||||
const headHash = msg.headHash ?? await computeMessageHeadHashFromMessage(msg, revision);
|
||||
|
||||
if (msg.isDeleted) {
|
||||
return {
|
||||
id: msg.id,
|
||||
ts: getMessageTimestamp(msg),
|
||||
rc: 0,
|
||||
ac: 0
|
||||
ac: 0,
|
||||
revision,
|
||||
headHash
|
||||
};
|
||||
}
|
||||
|
||||
const item: InventoryItem = {
|
||||
return {
|
||||
id: msg.id,
|
||||
ts: getMessageTimestamp(msg),
|
||||
rc: msg.reactions?.length ?? 0
|
||||
rc: msg.reactions?.length ?? 0,
|
||||
ac: attachmentCountOverride ?? 0,
|
||||
revision,
|
||||
headHash
|
||||
};
|
||||
|
||||
if (attachmentCountOverride !== undefined) {
|
||||
item.ac = attachmentCountOverride;
|
||||
}
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
/** Builds a local map of `{timestamp, reactionCount, attachmentCount}` keyed by message ID.
|
||||
@@ -90,25 +104,17 @@ export async function buildLocalInventoryMap(
|
||||
messages: Message[],
|
||||
_db: DatabaseService,
|
||||
attachmentCountOverrides?: ReadonlyMap<string, number>
|
||||
): Promise<Map<string, { ts: number; rc: number; ac: number }>> {
|
||||
const map = new Map<string, { ts: number; rc: number; ac: number }>();
|
||||
): Promise<Map<string, InventoryItem>> {
|
||||
const map = new Map<string, InventoryItem>();
|
||||
|
||||
for (const msg of messages) {
|
||||
if (msg.isDeleted) {
|
||||
map.set(msg.id, {
|
||||
ts: getMessageTimestamp(msg),
|
||||
rc: 0,
|
||||
ac: 0
|
||||
});
|
||||
const item = await buildInventoryItem(
|
||||
msg,
|
||||
_db,
|
||||
attachmentCountOverrides?.get(msg.id)
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
map.set(msg.id, {
|
||||
ts: getMessageTimestamp(msg),
|
||||
rc: msg.reactions?.length ?? 0,
|
||||
ac: attachmentCountOverrides?.get(msg.id) ?? 0
|
||||
});
|
||||
map.set(msg.id, item);
|
||||
}
|
||||
|
||||
return map;
|
||||
@@ -125,11 +131,22 @@ export interface MergeResult {
|
||||
* Handles message upsert and reaction deduplication, then returns
|
||||
* the fully hydrated message alongside a `changed` flag.
|
||||
*/
|
||||
export async function mergeIncomingMessage(
|
||||
incoming: Message,
|
||||
db: DatabaseService
|
||||
): Promise<MergeResult> {
|
||||
const existing = await db.getMessageById(incoming.id);
|
||||
function shouldApplyIncomingMessage(incoming: Message, existing: Message | null): boolean {
|
||||
const incomingRevision = getMessageRevision(incoming);
|
||||
const existingRevision = getMessageRevision(existing ?? undefined);
|
||||
|
||||
if (incoming.headHash) {
|
||||
const existingHeadHash = existing?.headHash
|
||||
?? '';
|
||||
|
||||
return shouldApplyIncomingRevision(
|
||||
incomingRevision,
|
||||
existingRevision,
|
||||
incoming.headHash,
|
||||
existingHeadHash
|
||||
);
|
||||
}
|
||||
|
||||
const existingTs = existing ? getMessageTimestamp(existing) : -1;
|
||||
const incomingTs = getMessageTimestamp(incoming);
|
||||
const isDeletedStateNewer =
|
||||
@@ -137,10 +154,70 @@ export async function mergeIncomingMessage(
|
||||
incomingTs === existingTs &&
|
||||
incoming.isDeleted &&
|
||||
!existing.isDeleted;
|
||||
const isNewer = !existing || incomingTs > existingTs || isDeletedStateNewer;
|
||||
|
||||
return !existing || incomingTs > existingTs || isDeletedStateNewer;
|
||||
}
|
||||
|
||||
export async function mergeIncomingRevision(
|
||||
revision: MessageRevision,
|
||||
db: DatabaseService
|
||||
): Promise<MergeResult> {
|
||||
const existing = await db.getMessageById(revision.messageId);
|
||||
|
||||
if (!revisionBeatsMessage(revision, existing)) {
|
||||
if (!existing) {
|
||||
return {
|
||||
message: materializeMessageFromRevision(null, revision),
|
||||
changed: false
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
message: normaliseDeletedMessage(existing),
|
||||
changed: false
|
||||
};
|
||||
}
|
||||
|
||||
const message = materializeMessageFromRevision(existing, revision);
|
||||
|
||||
await db.saveMessage(message);
|
||||
await db.saveMessageRevision(revision);
|
||||
|
||||
if (message.isDeleted) {
|
||||
return {
|
||||
message: normaliseDeletedMessage(message),
|
||||
changed: true
|
||||
};
|
||||
}
|
||||
|
||||
const reactions = await db.getReactionsForMessage(message.id);
|
||||
|
||||
return {
|
||||
message: {
|
||||
...message,
|
||||
reactions
|
||||
},
|
||||
changed: true
|
||||
};
|
||||
}
|
||||
|
||||
export async function mergeIncomingMessage(
|
||||
incoming: Message,
|
||||
db: DatabaseService
|
||||
): Promise<MergeResult> {
|
||||
const existing = await db.getMessageById(incoming.id);
|
||||
const isNewer = shouldApplyIncomingMessage(incoming, existing);
|
||||
|
||||
if (isNewer) {
|
||||
await db.saveMessage(incoming);
|
||||
const persisted = incoming.headHash
|
||||
? incoming
|
||||
: {
|
||||
...incoming,
|
||||
revision: getMessageRevision(incoming),
|
||||
headHash: await computeMessageHeadHashFromMessage(incoming, getMessageRevision(incoming))
|
||||
};
|
||||
|
||||
await db.saveMessage(persisted);
|
||||
}
|
||||
|
||||
// Persist incoming reactions (deduped by the DB layer)
|
||||
|
||||
Reference in New Issue
Block a user