Switch from sqlite payload strings to buffers, utils refactor, dev logging

mrazator/delta-based-sync
Marcel Mraz 4 weeks ago
parent 05ba0339fe
commit 49925038fd
No known key found for this signature in database
GPG Key ID: 4EBD6E62DC830CD2

@ -868,6 +868,7 @@ const ExcalidrawWrapper = () => {
let deltas: StoreDelta[] = []; let deltas: StoreDelta[] = [];
// CFDO I: going both in collaborative setting means the (acknowledge) deltas need to have applied latest changes
switch (direction) { switch (direction) {
case "forward": { case "forward": {
deltas = acknowledgedDeltas.slice(sliderVersion, value) ?? []; deltas = acknowledgedDeltas.slice(sliderVersion, value) ?? [];

@ -1,8 +1,14 @@
import type { DeltasRepository, DELTA, SERVER_DELTA } from "../sync/protocol"; import type {
DeltasRepository,
// CFDO: add senderId, possibly roomId as well CLIENT_DELTA,
SERVER_DELTA,
SERVER_DELTA_STORAGE,
} from "../sync/protocol";
import { Network } from "../sync/utils";
// CFDO II: add senderId, possibly roomId as well
export class DurableDeltasRepository implements DeltasRepository { export class DurableDeltasRepository implements DeltasRepository {
// there is a 2MB row limit, hence working max row size of 1.5 MB // there is a 2MB row limit, hence working with max payload size of 1.5 MB
// and leaving a buffer for other row metadata // and leaving a buffer for other row metadata
private static readonly MAX_PAYLOAD_SIZE = 1_500_000; private static readonly MAX_PAYLOAD_SIZE = 1_500_000;
@ -15,13 +21,13 @@ export class DurableDeltasRepository implements DeltasRepository {
id TEXT NOT NULL, id TEXT NOT NULL,
version INTEGER NOT NULL, version INTEGER NOT NULL,
position INTEGER NOT NULL, position INTEGER NOT NULL,
payload TEXT NOT NULL, payload BLOB NOT NULL,
createdAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, createdAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, version, position) PRIMARY KEY (id, version, position)
);`); );`);
} }
public save(delta: DELTA): SERVER_DELTA | null { public save(delta: CLIENT_DELTA): SERVER_DELTA | null {
return this.storage.transactionSync(() => { return this.storage.transactionSync(() => {
const existingDelta = this.getById(delta.id); const existingDelta = this.getById(delta.id);
@ -31,9 +37,8 @@ export class DurableDeltasRepository implements DeltasRepository {
} }
try { try {
// CFDO: could be also a buffer const payloadBuffer = Network.toBinary(delta);
const payload = JSON.stringify(delta); const payloadSize = payloadBuffer.byteLength;
const payloadSize = new TextEncoder().encode(payload).byteLength;
const nextVersion = this.getLastVersion() + 1; const nextVersion = this.getLastVersion() + 1;
const chunksCount = Math.ceil( const chunksCount = Math.ceil(
payloadSize / DurableDeltasRepository.MAX_PAYLOAD_SIZE, payloadSize / DurableDeltasRepository.MAX_PAYLOAD_SIZE,
@ -42,8 +47,7 @@ export class DurableDeltasRepository implements DeltasRepository {
for (let position = 0; position < chunksCount; position++) { for (let position = 0; position < chunksCount; position++) {
const start = position * DurableDeltasRepository.MAX_PAYLOAD_SIZE; const start = position * DurableDeltasRepository.MAX_PAYLOAD_SIZE;
const end = start + DurableDeltasRepository.MAX_PAYLOAD_SIZE; const end = start + DurableDeltasRepository.MAX_PAYLOAD_SIZE;
// slicing the chunk payload const chunkedPayload = payloadBuffer.subarray(start, end);
const chunkedPayload = payload.slice(start, end);
this.storage.sql.exec( this.storage.sql.exec(
`INSERT INTO deltas (id, version, position, payload) VALUES (?, ?, ?, ?);`, `INSERT INTO deltas (id, version, position, payload) VALUES (?, ?, ?, ?);`,
@ -73,13 +77,13 @@ export class DurableDeltasRepository implements DeltasRepository {
// CFDO: for versioning we need deletions, but not for the "snapshot" update; // CFDO: for versioning we need deletions, but not for the "snapshot" update;
public getAllSinceVersion(version: number): Array<SERVER_DELTA> { public getAllSinceVersion(version: number): Array<SERVER_DELTA> {
const deltas = this.storage.sql const deltas = this.storage.sql
.exec<SERVER_DELTA>( .exec<SERVER_DELTA_STORAGE>(
`SELECT id, payload, version FROM deltas WHERE version > (?) ORDER BY version, position, createdAt ASC;`, `SELECT id, payload, version, position FROM deltas WHERE version > (?) ORDER BY version, position, createdAt ASC;`,
version, version,
) )
.toArray(); .toArray();
return this.restoreServerDeltas(deltas); return this.restorePayloadChunks(deltas);
} }
public getLastVersion(): number { public getLastVersion(): number {
@ -93,8 +97,8 @@ export class DurableDeltasRepository implements DeltasRepository {
public getById(id: string): SERVER_DELTA | null { public getById(id: string): SERVER_DELTA | null {
const deltas = this.storage.sql const deltas = this.storage.sql
.exec<SERVER_DELTA>( .exec<SERVER_DELTA_STORAGE>(
`SELECT id, payload, version FROM deltas WHERE id = (?) ORDER BY position ASC`, `SELECT id, payload, version, position FROM deltas WHERE id = (?) ORDER BY position ASC`,
id, id,
) )
.toArray(); .toArray();
@ -103,7 +107,7 @@ export class DurableDeltasRepository implements DeltasRepository {
return null; return null;
} }
const restoredDeltas = this.restoreServerDeltas(deltas); const restoredDeltas = this.restorePayloadChunks(deltas);
if (restoredDeltas.length !== 1) { if (restoredDeltas.length !== 1) {
throw new Error( throw new Error(
@ -114,35 +118,37 @@ export class DurableDeltasRepository implements DeltasRepository {
return restoredDeltas[0]; return restoredDeltas[0];
} }
// CFDO: fix types (should be buffer in the first place) private restorePayloadChunks(
private restoreServerDeltas(deltas: SERVER_DELTA[]): SERVER_DELTA[] { deltas: Array<SERVER_DELTA_STORAGE>,
): Array<SERVER_DELTA> {
return Array.from( return Array.from(
deltas deltas
.reduce((acc, curr) => { .reduce((acc, curr) => {
const delta = acc.get(curr.version); const delta = acc.get(curr.version);
if (delta) { if (delta) {
const currentPayload = new Uint8Array(curr.payload);
acc.set(curr.version, { acc.set(curr.version, {
...delta, ...delta,
// glueing the chunks payload back // glueing the chunks payload back
payload: delta.payload + curr.payload, payload: Uint8Array.from([...delta.payload, ...currentPayload]),
}); });
} else { } else {
// let's not unnecessarily expose more props than these // let's not unnecessarily expose more props than these (i.e. position)
acc.set(curr.version, { acc.set(curr.version, {
id: curr.id, id: curr.id,
version: curr.version, version: curr.version,
payload: curr.payload, payload: new Uint8Array(curr.payload),
}); });
} }
return acc; return acc;
}, new Map<number, SERVER_DELTA>()) // using Uint8Array instead of ArrayBuffer, as it has nicer methods
}, new Map<number, Omit<SERVER_DELTA_STORAGE, "payload" | "position"> & { payload: Uint8Array }>())
.values(), .values(),
// CFDO: temporary
).map((delta) => ({ ).map((delta) => ({
...delta, ...delta,
payload: JSON.parse(delta.payload), payload: Network.fromBinary(delta.payload),
})); }));
} }
} }

@ -21,8 +21,8 @@ export class DurableRoom extends DurableObject {
super(ctx, env); super(ctx, env);
this.ctx.blockConcurrencyWhile(async () => { this.ctx.blockConcurrencyWhile(async () => {
// CFDO: snapshot should likely be a transient store // CFDO I: snapshot should likely be a transient store
// CFDO: loaded the latest state from the db // CFDO II: loaded the latest state from the db
this.snapshot = { this.snapshot = {
// CFDO: start persisting acknowledged version (not a scene version!) // CFDO: start persisting acknowledged version (not a scene version!)
// CFDO: we don't persist appState, should we? // CFDO: we don't persist appState, should we?

@ -1228,7 +1228,7 @@ export class ElementsDelta implements DeltaContainer<SceneElementsMap> {
} }
} else if (type === "added") { } else if (type === "added") {
// for additions the element does not have to exist (i.e. remote update) // for additions the element does not have to exist (i.e. remote update)
// CFDO: the version itself might be different! // CFDO II: the version itself might be different!
element = newElementWith( element = newElementWith(
{ id, version: 1 } as OrderedExcalidrawElement, { id, version: 1 } as OrderedExcalidrawElement,
{ {

@ -296,7 +296,7 @@ class Scene {
validateIndicesThrottled(_nextElements); validateIndicesThrottled(_nextElements);
// CFDO: if technically this leads to modifying the indices, it should update the snapshot immediately (as it shall be an non-undoable change) // CFDO: if this leads to modifying the indices, it should update the snapshot immediately (as it shall be an non-undoable change)
this.elements = syncInvalidIndices(_nextElements); this.elements = syncInvalidIndices(_nextElements);
this.elementsMap.clear(); this.elementsMap.clear();
this.elements.forEach((element) => { this.elements.forEach((element) => {

@ -76,7 +76,7 @@ export const StoreAction = {
* *
* These updates will _eventually_ make it to the local undo / redo stacks. * These updates will _eventually_ make it to the local undo / redo stacks.
*/ */
// CFDO I: none is not really "none" anymore, as it at very least emits an ephemeral increment // CFDO: none is not really "none" anymore, as it at very least emits an ephemeral increment
// we should likely rename these somehow and keep "none" only for real "no action" cases // we should likely rename these somehow and keep "none" only for real "no action" cases
NONE: "NONE", NONE: "NONE",
} as const; } as const;
@ -313,7 +313,7 @@ export class Store {
this.onStoreIncrementEmitter.trigger(increment); this.onStoreIncrementEmitter.trigger(increment);
} }
// CFDO: maybe I should not update the snapshot here so that it always syncs ephemeral change after durable change, // CFDO II: maybe I should not update the snapshot here so that it always syncs ephemeral change after durable change,
// so that clients exchange the latest element versions between each other, // so that clients exchange the latest element versions between each other,
// meaning if it will be ignored on other clients, other clients would initiate a relay with current version instead of doing nothing // meaning if it will be ignored on other clients, other clients would initiate a relay with current version instead of doing nothing
if (options.updateSnapshot) { if (options.updateSnapshot) {
@ -454,7 +454,6 @@ export class StoreDelta {
id, id,
elements: { added, removed, updated }, elements: { added, removed, updated },
}: SERVER_DELTA["payload"]) { }: SERVER_DELTA["payload"]) {
// CFDO: ensure typesafety
const elements = ElementsDelta.create(added, removed, updated, { const elements = ElementsDelta.create(added, removed, updated, {
shouldRedistribute: false, shouldRedistribute: false,
}); });
@ -622,7 +621,7 @@ export class StoreSnapshot {
} }
private detectChangedAppState(nextObservedAppState: ObservedAppState) { private detectChangedAppState(nextObservedAppState: ObservedAppState) {
// CFDO: could we optimize by checking only reference changes? (i.e. selectedElementIds should be stable now) // CFDO: could we optimize by checking only reference changes? (i.e. selectedElementIds should be stable now); this is not used for now
return Delta.isRightDifferent(this.appState, nextObservedAppState); return Delta.isRightDifferent(this.appState, nextObservedAppState);
} }

@ -1,11 +1,9 @@
/* eslint-disable no-console */
import throttle from "lodash.throttle"; import throttle from "lodash.throttle";
import msgpack from "msgpack-lite";
import ReconnectingWebSocket, { import ReconnectingWebSocket, {
type Event, type Event,
type CloseEvent, type CloseEvent,
} from "reconnecting-websocket"; } from "reconnecting-websocket";
import { Utils } from "./utils"; import { Network, Utils } from "./utils";
import { import {
LocalDeltasQueue, LocalDeltasQueue,
type MetadataRepository, type MetadataRepository,
@ -16,16 +14,17 @@ import type { StoreChange } from "../store";
import type { ExcalidrawImperativeAPI } from "../types"; import type { ExcalidrawImperativeAPI } from "../types";
import type { ExcalidrawElement, SceneElementsMap } from "../element/types"; import type { ExcalidrawElement, SceneElementsMap } from "../element/types";
import type { import type {
CLIENT_MESSAGE_RAW,
SERVER_DELTA, SERVER_DELTA,
CHANGE, CLIENT_CHANGE,
SERVER_MESSAGE, SERVER_MESSAGE,
CLIENT_MESSAGE_BINARY,
} from "./protocol"; } from "./protocol";
import { debounce } from "../utils"; import { debounce } from "../utils";
import { randomId } from "../random"; import { randomId } from "../random";
import { orderByFractionalIndex } from "../fractionalIndex"; import { orderByFractionalIndex } from "../fractionalIndex";
import { ENV } from "../constants";
class SocketMessage implements CLIENT_MESSAGE_RAW { class SocketMessage implements CLIENT_MESSAGE_BINARY {
constructor( constructor(
public readonly type: "relay" | "pull" | "push", public readonly type: "relay" | "pull" | "push",
public readonly payload: Uint8Array, public readonly payload: Uint8Array,
@ -77,6 +76,7 @@ class SocketClient {
window.addEventListener("online", this.onOnline); window.addEventListener("online", this.onOnline);
window.addEventListener("offline", this.onOffline); window.addEventListener("offline", this.onOffline);
// eslint-disable-next-line no-console
console.debug(`Connecting to the room "${this.roomId}"...`); console.debug(`Connecting to the room "${this.roomId}"...`);
this.socket = new ReconnectingWebSocket( this.socket = new ReconnectingWebSocket(
`${this.host}/connect?roomId=${this.roomId}`, `${this.host}/connect?roomId=${this.roomId}`,
@ -103,7 +103,6 @@ class SocketClient {
{ leading: true, trailing: false }, { leading: true, trailing: false },
); );
// CFDO: the connections seem to keep hanging for some reason
public disconnect() { public disconnect() {
if (this.isDisconnected) { if (this.isDisconnected) {
return; return;
@ -119,6 +118,7 @@ class SocketClient {
this.socket?.removeEventListener("error", this.onError); this.socket?.removeEventListener("error", this.onError);
this.socket?.close(); this.socket?.close();
// eslint-disable-next-line no-console
console.debug(`Disconnected from the room "${this.roomId}".`); console.debug(`Disconnected from the room "${this.roomId}".`);
} finally { } finally {
this.socket = null; this.socket = null;
@ -135,7 +135,6 @@ class SocketClient {
return; return;
} }
// CFDO: could be closed / closing / connecting
if (this.isDisconnected) { if (this.isDisconnected) {
this.connect(); this.connect();
return; return;
@ -143,10 +142,14 @@ class SocketClient {
const { type, payload } = message; const { type, payload } = message;
// CFDO II: could be slowish for large payloads, thing about a better solution (i.e. msgpack 10x faster, 2x smaller) const payloadBuffer = Network.toBinary(payload);
const payloadBuffer = msgpack.encode(payload) as Uint8Array;
const payloadSize = payloadBuffer.byteLength; const payloadSize = payloadBuffer.byteLength;
if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) {
// eslint-disable-next-line no-console
console.debug("send", message, payloadSize);
}
if (payloadSize < SocketClient.MAX_MESSAGE_SIZE) { if (payloadSize < SocketClient.MAX_MESSAGE_SIZE) {
const message = new SocketMessage(type, payloadBuffer); const message = new SocketMessage(type, payloadBuffer);
return this.sendMessage(message); return this.sendMessage(message);
@ -176,86 +179,55 @@ class SocketClient {
return; return;
} }
if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) {
// eslint-disable-next-line no-console
console.debug("onMessage", message);
}
this.handlers.onMessage(message); this.handlers.onMessage(message);
}); });
}; };
private onOpen = (event: Event) => { private onOpen = (event: Event) => {
// eslint-disable-next-line no-console
console.debug(`Connection to the room "${this.roomId}" opened.`); console.debug(`Connection to the room "${this.roomId}" opened.`);
this.isOffline = false; this.isOffline = false;
this.handlers.onOpen(event); this.handlers.onOpen(event);
}; };
private onClose = (event: CloseEvent) => { private onClose = (event: CloseEvent) => {
// eslint-disable-next-line no-console
console.debug(`Connection to the room "${this.roomId}" closed.`, event); console.debug(`Connection to the room "${this.roomId}" closed.`, event);
}; };
private onError = (event: Event) => { private onError = (event: Event) => {
console.debug( // eslint-disable-next-line no-console
console.error(
`Connection to the room "${this.roomId}" returned an error.`, `Connection to the room "${this.roomId}" returned an error.`,
event, event,
); );
}; };
private sendMessage = ({ payload, ...metadata }: CLIENT_MESSAGE_RAW) => { private sendMessage = (message: CLIENT_MESSAGE_BINARY) => {
const metadataBuffer = msgpack.encode(metadata) as Uint8Array; this.socket?.send(Network.encodeClientMessage(message));
// contains the length of the rest of the message, so that we could decode it server side
const headerBuffer = new ArrayBuffer(4);
new DataView(headerBuffer).setUint32(0, metadataBuffer.byteLength);
// concatenate into [header(4 bytes)][metadata][payload]
const message = Uint8Array.from([
...new Uint8Array(headerBuffer),
...metadataBuffer,
...payload,
]);
// CFDO: add dev-level logging
{
const headerLength = 4;
const header = new Uint8Array(message.buffer, 0, headerLength);
const metadataLength = new DataView(
header.buffer,
header.byteOffset,
).getUint32(0);
const metadata = new Uint8Array(
message.buffer,
headerLength,
headerLength + metadataLength,
);
const payload = new Uint8Array(
message.buffer,
headerLength + metadataLength,
);
console.log({
...msgpack.decode(metadata),
payload,
});
}
this.socket?.send(message);
}; };
// CFDO: should be (runtime) type-safe
private async receiveMessage( private async receiveMessage(
message: Blob, message: Blob,
): Promise<SERVER_MESSAGE | undefined> { ): Promise<SERVER_MESSAGE | undefined> {
const arrayBuffer = await message.arrayBuffer(); const arrayBuffer = await message.arrayBuffer();
const uint8Array = new Uint8Array(arrayBuffer); const uint8Array = new Uint8Array(arrayBuffer);
const [decodedMessage, decodeError] = Utils.try<SERVER_MESSAGE>(() => const [decodedMessage, decodingError] = Utils.try<SERVER_MESSAGE>(() =>
msgpack.decode(uint8Array), Network.fromBinary(uint8Array),
); );
if (decodeError) { if (decodingError) {
console.error("Failed to decode message:", message); console.error("Failed to decode message:", message);
return; return;
} }
// CFDO: should be type-safe
return decodedMessage; return decodedMessage;
} }
} }
@ -285,7 +257,7 @@ export class SyncClient {
>(); >();
// #region ACKNOWLEDGED DELTAS & METADATA // #region ACKNOWLEDGED DELTAS & METADATA
// CFDO: shouldn't be stateful, only request / response // CFDO II: shouldn't be stateful, only request / response
private readonly acknowledgedDeltasMap: Map<string, AcknowledgedDelta> = private readonly acknowledgedDeltasMap: Map<string, AcknowledgedDelta> =
new Map(); new Map();
@ -336,7 +308,7 @@ export class SyncClient {
return new SyncClient(api, repository, queue, { return new SyncClient(api, repository, queue, {
host: SyncClient.HOST_URL, host: SyncClient.HOST_URL,
roomId: roomId ?? SyncClient.ROOM_ID, roomId: roomId ?? SyncClient.ROOM_ID,
// CFDO: temporary, so that all deltas are loaded and applied on init // CFDO II: temporary, so that all deltas are loaded and applied on init
lastAcknowledgedVersion: 0, lastAcknowledgedVersion: 0,
}); });
} }
@ -377,7 +349,7 @@ export class SyncClient {
} }
} }
// CFDO: should be throttled! 60 fps for live scenes, 10s or so for single player // CFDO: should be throttled! 16ms (60 fps) for live scenes, not needed at all for single player
public relay(change: StoreChange): void { public relay(change: StoreChange): void {
if (this.client.isDisconnected) { if (this.client.isDisconnected) {
// don't reconnect if we're explicitly disconnected // don't reconnect if we're explicitly disconnected
@ -414,7 +386,7 @@ export class SyncClient {
// #region PRIVATE SOCKET MESSAGE HANDLERS // #region PRIVATE SOCKET MESSAGE HANDLERS
private onOpen = (event: Event) => { private onOpen = (event: Event) => {
// CFDO: hack to pull everything for on init // CFDO II: hack to pull everything for on init
this.pull(0); this.pull(0);
this.push(); this.push();
}; };
@ -425,9 +397,8 @@ export class SyncClient {
this.push(); this.push();
}; };
private onMessage = ({ type, payload }: SERVER_MESSAGE) => { private onMessage = (serverMessage: SERVER_MESSAGE) => {
// CFDO: add dev-level logging const { type, payload } = serverMessage;
console.log({ type, payload });
switch (type) { switch (type) {
case "relayed": case "relayed":
@ -441,8 +412,8 @@ export class SyncClient {
} }
}; };
private handleRelayed = (payload: CHANGE) => { private handleRelayed = (payload: CLIENT_CHANGE) => {
// CFDO: retrieve the map already // CFDO I: retrieve the map already
const nextElements = new Map( const nextElements = new Map(
this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]), this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]),
) as SceneElementsMap; ) as SceneElementsMap;
@ -457,7 +428,6 @@ export class SyncClient {
!existingElement || // new element !existingElement || // new element
existingElement.version < relayedElement.version // updated element existingElement.version < relayedElement.version // updated element
) { ) {
// CFDO: in theory could make the yet unsynced element (due to a bug) to move to the top
nextElements.set(id, relayedElement); nextElements.set(id, relayedElement);
this.relayedElementsVersionsCache.set(id, relayedElement.version); this.relayedElementsVersionsCache.set(id, relayedElement.version);
} }
@ -492,7 +462,7 @@ export class SyncClient {
) as SceneElementsMap; ) as SceneElementsMap;
for (const { id, version, payload } of remoteDeltas) { for (const { id, version, payload } of remoteDeltas) {
// CFDO: temporary to load all deltas on init // CFDO II: temporary to load all deltas on init
this.acknowledgedDeltasMap.set(id, { this.acknowledgedDeltasMap.set(id, {
delta: StoreDelta.load(payload), delta: StoreDelta.load(payload),
version, version,
@ -503,7 +473,7 @@ export class SyncClient {
continue; continue;
} }
// CFDO:strictly checking for out of order deltas; might be relaxed if it becomes a problem // CFDO: strictly checking for out of order deltas; might be relaxed if it becomes a problem
if (version !== nextAcknowledgedVersion + 1) { if (version !== nextAcknowledgedVersion + 1) {
throw new Error( throw new Error(
`Received out of order delta, expected "${ `Received out of order delta, expected "${

@ -1,11 +1,11 @@
import type { StoreChange, StoreDelta } from "../store"; import type { StoreChange, StoreDelta } from "../store";
import type { DTO } from "../utility-types"; import type { DTO } from "../utility-types";
export type DELTA = DTO<StoreDelta>; export type CLIENT_DELTA = DTO<StoreDelta>;
export type CHANGE = DTO<StoreChange>; export type CLIENT_CHANGE = DTO<StoreChange>;
export type RELAY_PAYLOAD = CHANGE; export type RELAY_PAYLOAD = CLIENT_CHANGE;
export type PUSH_PAYLOAD = DELTA; export type PUSH_PAYLOAD = CLIENT_DELTA;
export type PULL_PAYLOAD = { lastAcknowledgedVersion: number }; export type PULL_PAYLOAD = { lastAcknowledgedVersion: number };
export type CHUNK_INFO = { export type CHUNK_INFO = {
@ -14,24 +14,31 @@ export type CHUNK_INFO = {
count: number; count: number;
}; };
export type CLIENT_MESSAGE_RAW = { export type CLIENT_MESSAGE = (
type: "relay" | "pull" | "push";
payload: Uint8Array;
chunkInfo?: CHUNK_INFO;
};
export type CLIENT_MESSAGE = { chunkInfo: CHUNK_INFO } & (
| { type: "relay"; payload: RELAY_PAYLOAD } | { type: "relay"; payload: RELAY_PAYLOAD }
| { type: "pull"; payload: PULL_PAYLOAD } | { type: "pull"; payload: PULL_PAYLOAD }
| { type: "push"; payload: PUSH_PAYLOAD } | { type: "push"; payload: PUSH_PAYLOAD }
); ) & { chunkInfo?: CHUNK_INFO };
export type CLIENT_MESSAGE_BINARY = {
type: CLIENT_MESSAGE["type"];
payload: Uint8Array;
chunkInfo?: CHUNK_INFO;
};
export type SERVER_DELTA = { export type SERVER_DELTA = {
id: string; id: CLIENT_DELTA["id"];
version: number; version: number;
// CFDO: should be type-safe payload: CLIENT_DELTA;
payload: Record<string, any>; };
export type SERVER_DELTA_STORAGE = {
id: SERVER_DELTA["id"];
version: SERVER_DELTA["version"];
position: number;
payload: ArrayBuffer;
}; };
export type SERVER_MESSAGE = export type SERVER_MESSAGE =
| { | {
type: "relayed"; type: "relayed";
@ -40,16 +47,16 @@ export type SERVER_MESSAGE =
| { type: "acknowledged"; payload: { deltas: Array<SERVER_DELTA> } } | { type: "acknowledged"; payload: { deltas: Array<SERVER_DELTA> } }
| { | {
type: "rejected"; type: "rejected";
payload: { deltas: Array<DELTA>; message: string }; payload: { deltas: Array<CLIENT_DELTA>; message: string };
}; };
export interface DeltasRepository { export interface DeltasRepository {
save(delta: DELTA): SERVER_DELTA | null; save(delta: CLIENT_DELTA): SERVER_DELTA | null;
getAllSinceVersion(version: number): Array<SERVER_DELTA>; getAllSinceVersion(version: number): Array<SERVER_DELTA>;
getLastVersion(): number; getLastVersion(): number;
} }
// CFDO: should come from the shared types package // CFDO: should come from the shared types package (might need a bigger refactor)
export type ExcalidrawElement = { export type ExcalidrawElement = {
id: string; id: string;
type: any; type: any;

@ -11,7 +11,6 @@ export interface MetadataRepository {
saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise<void>; saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise<void>;
} }
// CFDO: make sure the deltas are always acknowledged (deleted from the repository)
export class LocalDeltasQueue { export class LocalDeltasQueue {
private readonly queue: Map<string, StoreDelta>; private readonly queue: Map<string, StoreDelta>;
private readonly repository: DeltasRepository; private readonly repository: DeltasRepository;

@ -1,6 +1,5 @@
import AsyncLock from "async-lock"; import AsyncLock from "async-lock";
import msgpack from "msgpack-lite"; import { Network, Utils } from "./utils";
import { Utils } from "./utils";
import type { import type {
DeltasRepository, DeltasRepository,
@ -9,13 +8,11 @@ import type {
PUSH_PAYLOAD, PUSH_PAYLOAD,
SERVER_MESSAGE, SERVER_MESSAGE,
SERVER_DELTA, SERVER_DELTA,
CLIENT_MESSAGE_RAW,
CHUNK_INFO, CHUNK_INFO,
RELAY_PAYLOAD, RELAY_PAYLOAD,
CLIENT_MESSAGE_BINARY,
} from "./protocol"; } from "./protocol";
// CFDO: message could be binary (cbor, protobuf, etc.)
/** /**
* Core excalidraw sync logic. * Core excalidraw sync logic.
*/ */
@ -24,12 +21,12 @@ export class ExcalidrawSyncServer {
private readonly sessions: Set<WebSocket> = new Set(); private readonly sessions: Set<WebSocket> = new Set();
private readonly chunks = new Map< private readonly chunks = new Map<
CHUNK_INFO["id"], CHUNK_INFO["id"],
Map<CHUNK_INFO["position"], CLIENT_MESSAGE_RAW["payload"]> Map<CHUNK_INFO["position"], CLIENT_MESSAGE_BINARY["payload"]>
>(); >();
constructor(private readonly repository: DeltasRepository) {} constructor(private readonly repository: DeltasRepository) {}
// CFDO: should send a message about collaborators (no collaborators => no need to send ephemerals) // CFDO: optimize, should send a message about collaborators (no collaborators => no need to send ephemerals)
public onConnect(client: WebSocket) { public onConnect(client: WebSocket) {
this.sessions.add(client); this.sessions.add(client);
} }
@ -42,47 +39,23 @@ export class ExcalidrawSyncServer {
client: WebSocket, client: WebSocket,
message: ArrayBuffer, message: ArrayBuffer,
): Promise<void> | void { ): Promise<void> | void {
const [parsedMessage, parseMessageError] = Utils.try<CLIENT_MESSAGE_RAW>( const [rawMessage, parsingError] = Utils.try<CLIENT_MESSAGE_BINARY>(() =>
() => { Network.decodeClientMessage(message),
const headerLength = 4;
const header = new Uint8Array(message, 0, headerLength);
const metadataLength = new DataView(
header.buffer,
header.byteOffset,
).getUint32(0);
const metadata = new Uint8Array(
message,
headerLength,
headerLength + metadataLength,
); );
const payload = new Uint8Array(message, headerLength + metadataLength); if (parsingError) {
const parsed = { console.error(parsingError);
...msgpack.decode(metadata),
payload,
};
// CFDO: add dev-level logging
console.log({ parsed });
return parsed;
},
);
if (parseMessageError) {
console.error(parseMessageError);
return; return;
} }
const { type, payload, chunkInfo } = parsedMessage; const { type, payload, chunkInfo } = rawMessage;
// if there is chunkInfo, there are more than 1 chunks => process them first // if there is chunkInfo, there are more than 1 chunks => process them first
if (chunkInfo) { if (chunkInfo) {
return this.processChunks(client, { type, payload, chunkInfo }); return this.processChunks(client, { type, payload, chunkInfo });
} }
return this.processMessage(client, parsedMessage); return this.processMessage(client, rawMessage);
} }
/** /**
@ -90,7 +63,8 @@ export class ExcalidrawSyncServer {
*/ */
private processChunks( private processChunks(
client: WebSocket, client: WebSocket,
message: CLIENT_MESSAGE_RAW & { chunkInfo: CHUNK_INFO }, message: CLIENT_MESSAGE_BINARY &
Required<Pick<CLIENT_MESSAGE_BINARY, "chunkInfo">>,
) { ) {
let shouldCleanupchunks = true; let shouldCleanupchunks = true;
const { const {
@ -131,7 +105,7 @@ export class ExcalidrawSyncServer {
const rawMessage = { const rawMessage = {
type, type,
payload: restoredPayload, payload: restoredPayload,
}; } as Omit<CLIENT_MESSAGE_BINARY, "chunkInfo">;
return this.processMessage(client, rawMessage); return this.processMessage(client, rawMessage);
} catch (error) { } catch (error) {
@ -146,14 +120,14 @@ export class ExcalidrawSyncServer {
private processMessage( private processMessage(
client: WebSocket, client: WebSocket,
{ type, payload }: Omit<CLIENT_MESSAGE_RAW, "chunkInfo">, { type, payload }: Omit<CLIENT_MESSAGE_BINARY, "chunkInfo">,
) { ) {
const [parsedPayload, parsePayloadError] = Utils.try< const [parsedPayload, parsingError] = Utils.try<CLIENT_MESSAGE["payload"]>(
CLIENT_MESSAGE["payload"] () => Network.fromBinary(payload),
>(() => msgpack.decode(payload)); );
if (parsePayloadError) { if (parsingError) {
console.error(parsePayloadError); console.error(parsingError);
return; return;
} }
@ -164,7 +138,7 @@ export class ExcalidrawSyncServer {
return this.pull(client, parsedPayload as PULL_PAYLOAD); return this.pull(client, parsedPayload as PULL_PAYLOAD);
case "push": case "push":
// apply each one-by-one to avoid race conditions // apply each one-by-one to avoid race conditions
// CFDO: in theory we do not need to block ephemeral appState changes // CFDO: in theory we do not need to block ephemeral appState (for now we are not even using them)
return this.lock.acquire("push", () => return this.lock.acquire("push", () =>
this.push(client, parsedPayload as PUSH_PAYLOAD), this.push(client, parsedPayload as PUSH_PAYLOAD),
); );
@ -174,7 +148,7 @@ export class ExcalidrawSyncServer {
} }
private relay(client: WebSocket, payload: RELAY_PAYLOAD) { private relay(client: WebSocket, payload: RELAY_PAYLOAD) {
// CFDO: we should likely apply these to the snapshot // CFDO I: we should likely apply these to the snapshot
return this.broadcast( return this.broadcast(
{ {
type: "relayed", type: "relayed",
@ -193,7 +167,7 @@ export class ExcalidrawSyncServer {
lastAcknowledgedServerVersion - lastAcknowledgedClientVersion; lastAcknowledgedServerVersion - lastAcknowledgedClientVersion;
if (versionΔ < 0) { if (versionΔ < 0) {
// CFDO: restore the client from the snapshot / deltas? // CFDO II: restore the client from the snapshot / deltas?
console.error( console.error(
`Panic! Client claims to have higher acknowledged version than the latest one on the server!`, `Panic! Client claims to have higher acknowledged version than the latest one on the server!`,
); );
@ -217,15 +191,19 @@ export class ExcalidrawSyncServer {
} }
private push(client: WebSocket, delta: PUSH_PAYLOAD) { private push(client: WebSocket, delta: PUSH_PAYLOAD) {
// CFDO: try to apply the deltas to the snapshot // CFDO I: apply latest changes to delt & apply the deltas to the snapshot
const [acknowledged, error] = Utils.try(() => this.repository.save(delta)); const [acknowledged, savingError] = Utils.try(() =>
this.repository.save(delta),
);
if (error || !acknowledged) { if (savingError || !acknowledged) {
// everything should be automatically rolled-back -> double-check // CFDO: everything should be automatically rolled-back in the db -> double-check
return this.send(client, { return this.send(client, {
type: "rejected", type: "rejected",
payload: { payload: {
message: error ? error.message : "Coudn't persist the delta.", message: savingError
? savingError.message
: "Coudn't persist the delta.",
deltas: [delta], deltas: [delta],
}, },
}); });
@ -239,26 +217,26 @@ export class ExcalidrawSyncServer {
}); });
} }
private send(client: WebSocket, message: SERVER_MESSAGE) { private send(ws: WebSocket, message: SERVER_MESSAGE) {
const [encodedMessage, encodeError] = Utils.try<Uint8Array>(() => const [encodedMessage, encodingError] = Utils.try(() =>
msgpack.encode(message), Network.toBinary(message),
); );
if (encodeError) { if (encodingError) {
console.error(encodeError); console.error(encodingError);
return; return;
} }
client.send(encodedMessage); ws.send(encodedMessage);
} }
private broadcast(message: SERVER_MESSAGE, exclude?: WebSocket) { private broadcast(message: SERVER_MESSAGE, exclude?: WebSocket) {
const [encodedMessage, encodeError] = Utils.try<Uint8Array>(() => const [encodedMessage, encodingError] = Utils.try(() =>
msgpack.encode(message), Network.toBinary(message),
); );
if (encodeError) { if (encodingError) {
console.error(encodeError); console.error(encodingError);
return; return;
} }

@ -1,3 +1,7 @@
import msgpack from "msgpack-lite";
import type { CLIENT_MESSAGE_BINARY } from "./protocol";
export const Utils = { export const Utils = {
try<T>(cb: () => T): [T, null] | [null, Error] { try<T>(cb: () => T): [T, null] | [null, Error] {
try { try {
@ -16,3 +20,49 @@ export const Utils = {
} }
}, },
}; };
export const Network = {
toBinary: (payload: Record<string, unknown>) => {
return new Uint8Array(msgpack.encode(payload));
},
fromBinary: (payload: Uint8Array) => {
return msgpack.decode(payload);
},
encodeClientMessage: (message: CLIENT_MESSAGE_BINARY) => {
const { payload, ...metadata } = message;
const metadataBuffer = Network.toBinary(metadata);
// contains the length of the rest of the message, so that we could chunk the payload and decode it server side
const headerBuffer = new ArrayBuffer(4);
new DataView(headerBuffer).setUint32(0, metadataBuffer.byteLength);
// concatenate into [header(4 bytes)][metadata][payload]
return Uint8Array.from([
...new Uint8Array(headerBuffer),
...metadataBuffer,
...message.payload,
]);
},
decodeClientMessage: (message: ArrayBuffer) => {
const headerLength = 4;
const header = new Uint8Array(message, 0, headerLength);
const metadataLength = new DataView(
header.buffer,
header.byteOffset,
).getUint32(0);
const metadata = new Uint8Array(
message,
headerLength,
headerLength + metadataLength,
);
const payload = new Uint8Array(message, headerLength + metadataLength);
const rawMessage = {
...Network.fromBinary(metadata),
payload,
} as CLIENT_MESSAGE_BINARY;
return rawMessage;
},
};

Loading…
Cancel
Save