From 49925038fd3cfc81cfea972f7100c56478785732 Mon Sep 17 00:00:00 2001 From: Marcel Mraz Date: Mon, 27 Jan 2025 22:06:46 +0100 Subject: [PATCH] Switch from sqlite payload strings to buffers, utils refactor, dev logging --- excalidraw-app/App.tsx | 1 + packages/excalidraw/cloudflare/repository.ts | 56 +++++----- packages/excalidraw/cloudflare/room.ts | 4 +- packages/excalidraw/delta.ts | 2 +- packages/excalidraw/scene/Scene.ts | 2 +- packages/excalidraw/store.ts | 7 +- packages/excalidraw/sync/client.ts | 106 +++++++------------ packages/excalidraw/sync/protocol.ts | 43 ++++---- packages/excalidraw/sync/queue.ts | 1 - packages/excalidraw/sync/server.ts | 104 +++++++----------- packages/excalidraw/sync/utils.ts | 50 +++++++++ 11 files changed, 193 insertions(+), 183 deletions(-) diff --git a/excalidraw-app/App.tsx b/excalidraw-app/App.tsx index 42ea7b37fd..cd808cac71 100644 --- a/excalidraw-app/App.tsx +++ b/excalidraw-app/App.tsx @@ -868,6 +868,7 @@ const ExcalidrawWrapper = () => { let deltas: StoreDelta[] = []; + // CFDO I: going both in collaborative setting means the (acknowledge) deltas need to have applied latest changes switch (direction) { case "forward": { deltas = acknowledgedDeltas.slice(sliderVersion, value) ?? []; diff --git a/packages/excalidraw/cloudflare/repository.ts b/packages/excalidraw/cloudflare/repository.ts index 2be3390194..f1948cc87e 100644 --- a/packages/excalidraw/cloudflare/repository.ts +++ b/packages/excalidraw/cloudflare/repository.ts @@ -1,8 +1,14 @@ -import type { DeltasRepository, DELTA, SERVER_DELTA } from "../sync/protocol"; - -// CFDO: add senderId, possibly roomId as well +import type { + DeltasRepository, + CLIENT_DELTA, + SERVER_DELTA, + SERVER_DELTA_STORAGE, +} from "../sync/protocol"; +import { Network } from "../sync/utils"; + +// CFDO II: add senderId, possibly roomId as well export class DurableDeltasRepository implements DeltasRepository { - // there is a 2MB row limit, hence working max row size of 1.5 MB + // there is a 2MB row limit, hence working with max payload size of 1.5 MB // and leaving a buffer for other row metadata private static readonly MAX_PAYLOAD_SIZE = 1_500_000; @@ -15,13 +21,13 @@ export class DurableDeltasRepository implements DeltasRepository { id TEXT NOT NULL, version INTEGER NOT NULL, position INTEGER NOT NULL, - payload TEXT NOT NULL, + payload BLOB NOT NULL, createdAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, version, position) );`); } - public save(delta: DELTA): SERVER_DELTA | null { + public save(delta: CLIENT_DELTA): SERVER_DELTA | null { return this.storage.transactionSync(() => { const existingDelta = this.getById(delta.id); @@ -31,9 +37,8 @@ export class DurableDeltasRepository implements DeltasRepository { } try { - // CFDO: could be also a buffer - const payload = JSON.stringify(delta); - const payloadSize = new TextEncoder().encode(payload).byteLength; + const payloadBuffer = Network.toBinary(delta); + const payloadSize = payloadBuffer.byteLength; const nextVersion = this.getLastVersion() + 1; const chunksCount = Math.ceil( payloadSize / DurableDeltasRepository.MAX_PAYLOAD_SIZE, @@ -42,8 +47,7 @@ export class DurableDeltasRepository implements DeltasRepository { for (let position = 0; position < chunksCount; position++) { const start = position * DurableDeltasRepository.MAX_PAYLOAD_SIZE; const end = start + DurableDeltasRepository.MAX_PAYLOAD_SIZE; - // slicing the chunk payload - const chunkedPayload = payload.slice(start, end); + const chunkedPayload = payloadBuffer.subarray(start, end); this.storage.sql.exec( `INSERT INTO deltas (id, version, position, payload) VALUES (?, ?, ?, ?);`, @@ -73,13 +77,13 @@ export class DurableDeltasRepository implements DeltasRepository { // CFDO: for versioning we need deletions, but not for the "snapshot" update; public getAllSinceVersion(version: number): Array { const deltas = this.storage.sql - .exec( - `SELECT id, payload, version FROM deltas WHERE version > (?) ORDER BY version, position, createdAt ASC;`, + .exec( + `SELECT id, payload, version, position FROM deltas WHERE version > (?) ORDER BY version, position, createdAt ASC;`, version, ) .toArray(); - return this.restoreServerDeltas(deltas); + return this.restorePayloadChunks(deltas); } public getLastVersion(): number { @@ -93,8 +97,8 @@ export class DurableDeltasRepository implements DeltasRepository { public getById(id: string): SERVER_DELTA | null { const deltas = this.storage.sql - .exec( - `SELECT id, payload, version FROM deltas WHERE id = (?) ORDER BY position ASC`, + .exec( + `SELECT id, payload, version, position FROM deltas WHERE id = (?) ORDER BY position ASC`, id, ) .toArray(); @@ -103,7 +107,7 @@ export class DurableDeltasRepository implements DeltasRepository { return null; } - const restoredDeltas = this.restoreServerDeltas(deltas); + const restoredDeltas = this.restorePayloadChunks(deltas); if (restoredDeltas.length !== 1) { throw new Error( @@ -114,35 +118,37 @@ export class DurableDeltasRepository implements DeltasRepository { return restoredDeltas[0]; } - // CFDO: fix types (should be buffer in the first place) - private restoreServerDeltas(deltas: SERVER_DELTA[]): SERVER_DELTA[] { + private restorePayloadChunks( + deltas: Array, + ): Array { return Array.from( deltas .reduce((acc, curr) => { const delta = acc.get(curr.version); if (delta) { + const currentPayload = new Uint8Array(curr.payload); acc.set(curr.version, { ...delta, // glueing the chunks payload back - payload: delta.payload + curr.payload, + payload: Uint8Array.from([...delta.payload, ...currentPayload]), }); } else { - // let's not unnecessarily expose more props than these + // let's not unnecessarily expose more props than these (i.e. position) acc.set(curr.version, { id: curr.id, version: curr.version, - payload: curr.payload, + payload: new Uint8Array(curr.payload), }); } return acc; - }, new Map()) + // using Uint8Array instead of ArrayBuffer, as it has nicer methods + }, new Map & { payload: Uint8Array }>()) .values(), - // CFDO: temporary ).map((delta) => ({ ...delta, - payload: JSON.parse(delta.payload), + payload: Network.fromBinary(delta.payload), })); } } diff --git a/packages/excalidraw/cloudflare/room.ts b/packages/excalidraw/cloudflare/room.ts index 332a993a3f..ae1c230035 100644 --- a/packages/excalidraw/cloudflare/room.ts +++ b/packages/excalidraw/cloudflare/room.ts @@ -21,8 +21,8 @@ export class DurableRoom extends DurableObject { super(ctx, env); this.ctx.blockConcurrencyWhile(async () => { - // CFDO: snapshot should likely be a transient store - // CFDO: loaded the latest state from the db + // CFDO I: snapshot should likely be a transient store + // CFDO II: loaded the latest state from the db this.snapshot = { // CFDO: start persisting acknowledged version (not a scene version!) // CFDO: we don't persist appState, should we? diff --git a/packages/excalidraw/delta.ts b/packages/excalidraw/delta.ts index 980906d9ef..2e96f98f09 100644 --- a/packages/excalidraw/delta.ts +++ b/packages/excalidraw/delta.ts @@ -1228,7 +1228,7 @@ export class ElementsDelta implements DeltaContainer { } } else if (type === "added") { // for additions the element does not have to exist (i.e. remote update) - // CFDO: the version itself might be different! + // CFDO II: the version itself might be different! element = newElementWith( { id, version: 1 } as OrderedExcalidrawElement, { diff --git a/packages/excalidraw/scene/Scene.ts b/packages/excalidraw/scene/Scene.ts index f51a006de8..3073cbf698 100644 --- a/packages/excalidraw/scene/Scene.ts +++ b/packages/excalidraw/scene/Scene.ts @@ -296,7 +296,7 @@ class Scene { validateIndicesThrottled(_nextElements); - // CFDO: if technically this leads to modifying the indices, it should update the snapshot immediately (as it shall be an non-undoable change) + // CFDO: if this leads to modifying the indices, it should update the snapshot immediately (as it shall be an non-undoable change) this.elements = syncInvalidIndices(_nextElements); this.elementsMap.clear(); this.elements.forEach((element) => { diff --git a/packages/excalidraw/store.ts b/packages/excalidraw/store.ts index 843ed8453b..ed184ba8f6 100644 --- a/packages/excalidraw/store.ts +++ b/packages/excalidraw/store.ts @@ -76,7 +76,7 @@ export const StoreAction = { * * These updates will _eventually_ make it to the local undo / redo stacks. */ - // CFDO I: none is not really "none" anymore, as it at very least emits an ephemeral increment + // CFDO: none is not really "none" anymore, as it at very least emits an ephemeral increment // we should likely rename these somehow and keep "none" only for real "no action" cases NONE: "NONE", } as const; @@ -313,7 +313,7 @@ export class Store { this.onStoreIncrementEmitter.trigger(increment); } - // CFDO: maybe I should not update the snapshot here so that it always syncs ephemeral change after durable change, + // CFDO II: maybe I should not update the snapshot here so that it always syncs ephemeral change after durable change, // so that clients exchange the latest element versions between each other, // meaning if it will be ignored on other clients, other clients would initiate a relay with current version instead of doing nothing if (options.updateSnapshot) { @@ -454,7 +454,6 @@ export class StoreDelta { id, elements: { added, removed, updated }, }: SERVER_DELTA["payload"]) { - // CFDO: ensure typesafety const elements = ElementsDelta.create(added, removed, updated, { shouldRedistribute: false, }); @@ -622,7 +621,7 @@ export class StoreSnapshot { } private detectChangedAppState(nextObservedAppState: ObservedAppState) { - // CFDO: could we optimize by checking only reference changes? (i.e. selectedElementIds should be stable now) + // CFDO: could we optimize by checking only reference changes? (i.e. selectedElementIds should be stable now); this is not used for now return Delta.isRightDifferent(this.appState, nextObservedAppState); } diff --git a/packages/excalidraw/sync/client.ts b/packages/excalidraw/sync/client.ts index ab5e413f6c..d16ccd8669 100644 --- a/packages/excalidraw/sync/client.ts +++ b/packages/excalidraw/sync/client.ts @@ -1,11 +1,9 @@ -/* eslint-disable no-console */ import throttle from "lodash.throttle"; -import msgpack from "msgpack-lite"; import ReconnectingWebSocket, { type Event, type CloseEvent, } from "reconnecting-websocket"; -import { Utils } from "./utils"; +import { Network, Utils } from "./utils"; import { LocalDeltasQueue, type MetadataRepository, @@ -16,16 +14,17 @@ import type { StoreChange } from "../store"; import type { ExcalidrawImperativeAPI } from "../types"; import type { ExcalidrawElement, SceneElementsMap } from "../element/types"; import type { - CLIENT_MESSAGE_RAW, SERVER_DELTA, - CHANGE, + CLIENT_CHANGE, SERVER_MESSAGE, + CLIENT_MESSAGE_BINARY, } from "./protocol"; import { debounce } from "../utils"; import { randomId } from "../random"; import { orderByFractionalIndex } from "../fractionalIndex"; +import { ENV } from "../constants"; -class SocketMessage implements CLIENT_MESSAGE_RAW { +class SocketMessage implements CLIENT_MESSAGE_BINARY { constructor( public readonly type: "relay" | "pull" | "push", public readonly payload: Uint8Array, @@ -77,6 +76,7 @@ class SocketClient { window.addEventListener("online", this.onOnline); window.addEventListener("offline", this.onOffline); + // eslint-disable-next-line no-console console.debug(`Connecting to the room "${this.roomId}"...`); this.socket = new ReconnectingWebSocket( `${this.host}/connect?roomId=${this.roomId}`, @@ -103,7 +103,6 @@ class SocketClient { { leading: true, trailing: false }, ); - // CFDO: the connections seem to keep hanging for some reason public disconnect() { if (this.isDisconnected) { return; @@ -119,6 +118,7 @@ class SocketClient { this.socket?.removeEventListener("error", this.onError); this.socket?.close(); + // eslint-disable-next-line no-console console.debug(`Disconnected from the room "${this.roomId}".`); } finally { this.socket = null; @@ -135,7 +135,6 @@ class SocketClient { return; } - // CFDO: could be closed / closing / connecting if (this.isDisconnected) { this.connect(); return; @@ -143,10 +142,14 @@ class SocketClient { const { type, payload } = message; - // CFDO II: could be slowish for large payloads, thing about a better solution (i.e. msgpack 10x faster, 2x smaller) - const payloadBuffer = msgpack.encode(payload) as Uint8Array; + const payloadBuffer = Network.toBinary(payload); const payloadSize = payloadBuffer.byteLength; + if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) { + // eslint-disable-next-line no-console + console.debug("send", message, payloadSize); + } + if (payloadSize < SocketClient.MAX_MESSAGE_SIZE) { const message = new SocketMessage(type, payloadBuffer); return this.sendMessage(message); @@ -176,86 +179,55 @@ class SocketClient { return; } + if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) { + // eslint-disable-next-line no-console + console.debug("onMessage", message); + } + this.handlers.onMessage(message); }); }; private onOpen = (event: Event) => { + // eslint-disable-next-line no-console console.debug(`Connection to the room "${this.roomId}" opened.`); this.isOffline = false; this.handlers.onOpen(event); }; private onClose = (event: CloseEvent) => { + // eslint-disable-next-line no-console console.debug(`Connection to the room "${this.roomId}" closed.`, event); }; private onError = (event: Event) => { - console.debug( + // eslint-disable-next-line no-console + console.error( `Connection to the room "${this.roomId}" returned an error.`, event, ); }; - private sendMessage = ({ payload, ...metadata }: CLIENT_MESSAGE_RAW) => { - const metadataBuffer = msgpack.encode(metadata) as Uint8Array; - - // contains the length of the rest of the message, so that we could decode it server side - const headerBuffer = new ArrayBuffer(4); - new DataView(headerBuffer).setUint32(0, metadataBuffer.byteLength); - - // concatenate into [header(4 bytes)][metadata][payload] - const message = Uint8Array.from([ - ...new Uint8Array(headerBuffer), - ...metadataBuffer, - ...payload, - ]); - - // CFDO: add dev-level logging - { - const headerLength = 4; - const header = new Uint8Array(message.buffer, 0, headerLength); - const metadataLength = new DataView( - header.buffer, - header.byteOffset, - ).getUint32(0); - - const metadata = new Uint8Array( - message.buffer, - headerLength, - headerLength + metadataLength, - ); - - const payload = new Uint8Array( - message.buffer, - headerLength + metadataLength, - ); - - console.log({ - ...msgpack.decode(metadata), - payload, - }); - } - - this.socket?.send(message); + private sendMessage = (message: CLIENT_MESSAGE_BINARY) => { + this.socket?.send(Network.encodeClientMessage(message)); }; + // CFDO: should be (runtime) type-safe private async receiveMessage( message: Blob, ): Promise { const arrayBuffer = await message.arrayBuffer(); const uint8Array = new Uint8Array(arrayBuffer); - const [decodedMessage, decodeError] = Utils.try(() => - msgpack.decode(uint8Array), + const [decodedMessage, decodingError] = Utils.try(() => + Network.fromBinary(uint8Array), ); - if (decodeError) { + if (decodingError) { console.error("Failed to decode message:", message); return; } - // CFDO: should be type-safe return decodedMessage; } } @@ -285,7 +257,7 @@ export class SyncClient { >(); // #region ACKNOWLEDGED DELTAS & METADATA - // CFDO: shouldn't be stateful, only request / response + // CFDO II: shouldn't be stateful, only request / response private readonly acknowledgedDeltasMap: Map = new Map(); @@ -336,7 +308,7 @@ export class SyncClient { return new SyncClient(api, repository, queue, { host: SyncClient.HOST_URL, roomId: roomId ?? SyncClient.ROOM_ID, - // CFDO: temporary, so that all deltas are loaded and applied on init + // CFDO II: temporary, so that all deltas are loaded and applied on init lastAcknowledgedVersion: 0, }); } @@ -377,7 +349,7 @@ export class SyncClient { } } - // CFDO: should be throttled! 60 fps for live scenes, 10s or so for single player + // CFDO: should be throttled! 16ms (60 fps) for live scenes, not needed at all for single player public relay(change: StoreChange): void { if (this.client.isDisconnected) { // don't reconnect if we're explicitly disconnected @@ -414,7 +386,7 @@ export class SyncClient { // #region PRIVATE SOCKET MESSAGE HANDLERS private onOpen = (event: Event) => { - // CFDO: hack to pull everything for on init + // CFDO II: hack to pull everything for on init this.pull(0); this.push(); }; @@ -425,9 +397,8 @@ export class SyncClient { this.push(); }; - private onMessage = ({ type, payload }: SERVER_MESSAGE) => { - // CFDO: add dev-level logging - console.log({ type, payload }); + private onMessage = (serverMessage: SERVER_MESSAGE) => { + const { type, payload } = serverMessage; switch (type) { case "relayed": @@ -441,8 +412,8 @@ export class SyncClient { } }; - private handleRelayed = (payload: CHANGE) => { - // CFDO: retrieve the map already + private handleRelayed = (payload: CLIENT_CHANGE) => { + // CFDO I: retrieve the map already const nextElements = new Map( this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]), ) as SceneElementsMap; @@ -457,7 +428,6 @@ export class SyncClient { !existingElement || // new element existingElement.version < relayedElement.version // updated element ) { - // CFDO: in theory could make the yet unsynced element (due to a bug) to move to the top nextElements.set(id, relayedElement); this.relayedElementsVersionsCache.set(id, relayedElement.version); } @@ -492,7 +462,7 @@ export class SyncClient { ) as SceneElementsMap; for (const { id, version, payload } of remoteDeltas) { - // CFDO: temporary to load all deltas on init + // CFDO II: temporary to load all deltas on init this.acknowledgedDeltasMap.set(id, { delta: StoreDelta.load(payload), version, @@ -503,7 +473,7 @@ export class SyncClient { continue; } - // CFDO:strictly checking for out of order deltas; might be relaxed if it becomes a problem + // CFDO: strictly checking for out of order deltas; might be relaxed if it becomes a problem if (version !== nextAcknowledgedVersion + 1) { throw new Error( `Received out of order delta, expected "${ diff --git a/packages/excalidraw/sync/protocol.ts b/packages/excalidraw/sync/protocol.ts index bb9edabe42..715afce664 100644 --- a/packages/excalidraw/sync/protocol.ts +++ b/packages/excalidraw/sync/protocol.ts @@ -1,11 +1,11 @@ import type { StoreChange, StoreDelta } from "../store"; import type { DTO } from "../utility-types"; -export type DELTA = DTO; -export type CHANGE = DTO; +export type CLIENT_DELTA = DTO; +export type CLIENT_CHANGE = DTO; -export type RELAY_PAYLOAD = CHANGE; -export type PUSH_PAYLOAD = DELTA; +export type RELAY_PAYLOAD = CLIENT_CHANGE; +export type PUSH_PAYLOAD = CLIENT_DELTA; export type PULL_PAYLOAD = { lastAcknowledgedVersion: number }; export type CHUNK_INFO = { @@ -14,24 +14,31 @@ export type CHUNK_INFO = { count: number; }; -export type CLIENT_MESSAGE_RAW = { - type: "relay" | "pull" | "push"; - payload: Uint8Array; - chunkInfo?: CHUNK_INFO; -}; - -export type CLIENT_MESSAGE = { chunkInfo: CHUNK_INFO } & ( +export type CLIENT_MESSAGE = ( | { type: "relay"; payload: RELAY_PAYLOAD } | { type: "pull"; payload: PULL_PAYLOAD } | { type: "push"; payload: PUSH_PAYLOAD } -); +) & { chunkInfo?: CHUNK_INFO }; + +export type CLIENT_MESSAGE_BINARY = { + type: CLIENT_MESSAGE["type"]; + payload: Uint8Array; + chunkInfo?: CHUNK_INFO; +}; export type SERVER_DELTA = { - id: string; + id: CLIENT_DELTA["id"]; version: number; - // CFDO: should be type-safe - payload: Record; + payload: CLIENT_DELTA; +}; + +export type SERVER_DELTA_STORAGE = { + id: SERVER_DELTA["id"]; + version: SERVER_DELTA["version"]; + position: number; + payload: ArrayBuffer; }; + export type SERVER_MESSAGE = | { type: "relayed"; @@ -40,16 +47,16 @@ export type SERVER_MESSAGE = | { type: "acknowledged"; payload: { deltas: Array } } | { type: "rejected"; - payload: { deltas: Array; message: string }; + payload: { deltas: Array; message: string }; }; export interface DeltasRepository { - save(delta: DELTA): SERVER_DELTA | null; + save(delta: CLIENT_DELTA): SERVER_DELTA | null; getAllSinceVersion(version: number): Array; getLastVersion(): number; } -// CFDO: should come from the shared types package +// CFDO: should come from the shared types package (might need a bigger refactor) export type ExcalidrawElement = { id: string; type: any; diff --git a/packages/excalidraw/sync/queue.ts b/packages/excalidraw/sync/queue.ts index 874d77089c..c9baeacfb5 100644 --- a/packages/excalidraw/sync/queue.ts +++ b/packages/excalidraw/sync/queue.ts @@ -11,7 +11,6 @@ export interface MetadataRepository { saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise; } -// CFDO: make sure the deltas are always acknowledged (deleted from the repository) export class LocalDeltasQueue { private readonly queue: Map; private readonly repository: DeltasRepository; diff --git a/packages/excalidraw/sync/server.ts b/packages/excalidraw/sync/server.ts index 8a5181ec68..6ea34ce856 100644 --- a/packages/excalidraw/sync/server.ts +++ b/packages/excalidraw/sync/server.ts @@ -1,6 +1,5 @@ import AsyncLock from "async-lock"; -import msgpack from "msgpack-lite"; -import { Utils } from "./utils"; +import { Network, Utils } from "./utils"; import type { DeltasRepository, @@ -9,13 +8,11 @@ import type { PUSH_PAYLOAD, SERVER_MESSAGE, SERVER_DELTA, - CLIENT_MESSAGE_RAW, CHUNK_INFO, RELAY_PAYLOAD, + CLIENT_MESSAGE_BINARY, } from "./protocol"; -// CFDO: message could be binary (cbor, protobuf, etc.) - /** * Core excalidraw sync logic. */ @@ -24,12 +21,12 @@ export class ExcalidrawSyncServer { private readonly sessions: Set = new Set(); private readonly chunks = new Map< CHUNK_INFO["id"], - Map + Map >(); constructor(private readonly repository: DeltasRepository) {} - // CFDO: should send a message about collaborators (no collaborators => no need to send ephemerals) + // CFDO: optimize, should send a message about collaborators (no collaborators => no need to send ephemerals) public onConnect(client: WebSocket) { this.sessions.add(client); } @@ -42,47 +39,23 @@ export class ExcalidrawSyncServer { client: WebSocket, message: ArrayBuffer, ): Promise | void { - const [parsedMessage, parseMessageError] = Utils.try( - () => { - const headerLength = 4; - const header = new Uint8Array(message, 0, headerLength); - const metadataLength = new DataView( - header.buffer, - header.byteOffset, - ).getUint32(0); - - const metadata = new Uint8Array( - message, - headerLength, - headerLength + metadataLength, - ); - - const payload = new Uint8Array(message, headerLength + metadataLength); - const parsed = { - ...msgpack.decode(metadata), - payload, - }; - - // CFDO: add dev-level logging - console.log({ parsed }); - - return parsed; - }, + const [rawMessage, parsingError] = Utils.try(() => + Network.decodeClientMessage(message), ); - if (parseMessageError) { - console.error(parseMessageError); + if (parsingError) { + console.error(parsingError); return; } - const { type, payload, chunkInfo } = parsedMessage; + const { type, payload, chunkInfo } = rawMessage; // if there is chunkInfo, there are more than 1 chunks => process them first if (chunkInfo) { return this.processChunks(client, { type, payload, chunkInfo }); } - return this.processMessage(client, parsedMessage); + return this.processMessage(client, rawMessage); } /** @@ -90,7 +63,8 @@ export class ExcalidrawSyncServer { */ private processChunks( client: WebSocket, - message: CLIENT_MESSAGE_RAW & { chunkInfo: CHUNK_INFO }, + message: CLIENT_MESSAGE_BINARY & + Required>, ) { let shouldCleanupchunks = true; const { @@ -131,7 +105,7 @@ export class ExcalidrawSyncServer { const rawMessage = { type, payload: restoredPayload, - }; + } as Omit; return this.processMessage(client, rawMessage); } catch (error) { @@ -146,14 +120,14 @@ export class ExcalidrawSyncServer { private processMessage( client: WebSocket, - { type, payload }: Omit, + { type, payload }: Omit, ) { - const [parsedPayload, parsePayloadError] = Utils.try< - CLIENT_MESSAGE["payload"] - >(() => msgpack.decode(payload)); + const [parsedPayload, parsingError] = Utils.try( + () => Network.fromBinary(payload), + ); - if (parsePayloadError) { - console.error(parsePayloadError); + if (parsingError) { + console.error(parsingError); return; } @@ -164,7 +138,7 @@ export class ExcalidrawSyncServer { return this.pull(client, parsedPayload as PULL_PAYLOAD); case "push": // apply each one-by-one to avoid race conditions - // CFDO: in theory we do not need to block ephemeral appState changes + // CFDO: in theory we do not need to block ephemeral appState (for now we are not even using them) return this.lock.acquire("push", () => this.push(client, parsedPayload as PUSH_PAYLOAD), ); @@ -174,7 +148,7 @@ export class ExcalidrawSyncServer { } private relay(client: WebSocket, payload: RELAY_PAYLOAD) { - // CFDO: we should likely apply these to the snapshot + // CFDO I: we should likely apply these to the snapshot return this.broadcast( { type: "relayed", @@ -193,7 +167,7 @@ export class ExcalidrawSyncServer { lastAcknowledgedServerVersion - lastAcknowledgedClientVersion; if (versionĪ” < 0) { - // CFDO: restore the client from the snapshot / deltas? + // CFDO II: restore the client from the snapshot / deltas? console.error( `Panic! Client claims to have higher acknowledged version than the latest one on the server!`, ); @@ -217,15 +191,19 @@ export class ExcalidrawSyncServer { } private push(client: WebSocket, delta: PUSH_PAYLOAD) { - // CFDO: try to apply the deltas to the snapshot - const [acknowledged, error] = Utils.try(() => this.repository.save(delta)); + // CFDO I: apply latest changes to delt & apply the deltas to the snapshot + const [acknowledged, savingError] = Utils.try(() => + this.repository.save(delta), + ); - if (error || !acknowledged) { - // everything should be automatically rolled-back -> double-check + if (savingError || !acknowledged) { + // CFDO: everything should be automatically rolled-back in the db -> double-check return this.send(client, { type: "rejected", payload: { - message: error ? error.message : "Coudn't persist the delta.", + message: savingError + ? savingError.message + : "Coudn't persist the delta.", deltas: [delta], }, }); @@ -239,26 +217,26 @@ export class ExcalidrawSyncServer { }); } - private send(client: WebSocket, message: SERVER_MESSAGE) { - const [encodedMessage, encodeError] = Utils.try(() => - msgpack.encode(message), + private send(ws: WebSocket, message: SERVER_MESSAGE) { + const [encodedMessage, encodingError] = Utils.try(() => + Network.toBinary(message), ); - if (encodeError) { - console.error(encodeError); + if (encodingError) { + console.error(encodingError); return; } - client.send(encodedMessage); + ws.send(encodedMessage); } private broadcast(message: SERVER_MESSAGE, exclude?: WebSocket) { - const [encodedMessage, encodeError] = Utils.try(() => - msgpack.encode(message), + const [encodedMessage, encodingError] = Utils.try(() => + Network.toBinary(message), ); - if (encodeError) { - console.error(encodeError); + if (encodingError) { + console.error(encodingError); return; } diff --git a/packages/excalidraw/sync/utils.ts b/packages/excalidraw/sync/utils.ts index 44e63a5ff3..fd53f33a19 100644 --- a/packages/excalidraw/sync/utils.ts +++ b/packages/excalidraw/sync/utils.ts @@ -1,3 +1,7 @@ +import msgpack from "msgpack-lite"; + +import type { CLIENT_MESSAGE_BINARY } from "./protocol"; + export const Utils = { try(cb: () => T): [T, null] | [null, Error] { try { @@ -16,3 +20,49 @@ export const Utils = { } }, }; + +export const Network = { + toBinary: (payload: Record) => { + return new Uint8Array(msgpack.encode(payload)); + }, + fromBinary: (payload: Uint8Array) => { + return msgpack.decode(payload); + }, + encodeClientMessage: (message: CLIENT_MESSAGE_BINARY) => { + const { payload, ...metadata } = message; + const metadataBuffer = Network.toBinary(metadata); + + // contains the length of the rest of the message, so that we could chunk the payload and decode it server side + const headerBuffer = new ArrayBuffer(4); + new DataView(headerBuffer).setUint32(0, metadataBuffer.byteLength); + + // concatenate into [header(4 bytes)][metadata][payload] + return Uint8Array.from([ + ...new Uint8Array(headerBuffer), + ...metadataBuffer, + ...message.payload, + ]); + }, + decodeClientMessage: (message: ArrayBuffer) => { + const headerLength = 4; + const header = new Uint8Array(message, 0, headerLength); + const metadataLength = new DataView( + header.buffer, + header.byteOffset, + ).getUint32(0); + + const metadata = new Uint8Array( + message, + headerLength, + headerLength + metadataLength, + ); + + const payload = new Uint8Array(message, headerLength + metadataLength); + const rawMessage = { + ...Network.fromBinary(metadata), + payload, + } as CLIENT_MESSAGE_BINARY; + + return rawMessage; + }, +};