WIP sync client

mrazator/delta-based-sync
Marcel Mraz 2 months ago
parent 508cfbc843
commit f12ed8e0b2
No known key found for this signature in database
GPG Key ID: 4EBD6E62DC830CD2

@ -54,6 +54,7 @@ import Collab, {
collabAPIAtom,
isCollaboratingAtom,
isOfflineAtom,
syncAPIAtom,
} from "./collab/Collab";
import {
exportToBackend,
@ -363,11 +364,20 @@ const ExcalidrawWrapper = () => {
const [, setShareDialogState] = useAtom(shareDialogStateAtom);
const [collabAPI] = useAtom(collabAPIAtom);
const [syncAPI] = useAtom(syncAPIAtom);
const [isCollaborating] = useAtomWithInitialValue(isCollaboratingAtom, () => {
return isCollaborationLink(window.location.href);
});
const collabError = useAtomValue(collabErrorIndicatorAtom);
useEffect(() => {
syncAPI?.reconnect();
return () => {
syncAPI?.disconnect();
};
}, [syncAPI]);
useHandleLibrary({
excalidrawAPI,
adapter: LibraryIndexedDBAdapter,
@ -671,7 +681,7 @@ const ExcalidrawWrapper = () => {
// some appState like selections should also be transfered (we could even persist it)
if (!elementsChange.isEmpty()) {
console.log(elementsChange);
syncAPI?.push("durable", [elementsChange]);
}
};

@ -88,7 +88,9 @@ import type {
ReconciledExcalidrawElement,
RemoteExcalidrawElement,
} from "../../packages/excalidraw/data/reconcile";
import { ExcalidrawSyncClient } from "../../packages/excalidraw/sync/client";
export const syncAPIAtom = atom<ExcalidrawSyncClient | null>(null);
export const collabAPIAtom = atom<CollabAPI | null>(null);
export const isCollaboratingAtom = atom(false);
export const isOfflineAtom = atom(false);
@ -234,6 +236,10 @@ class Collab extends PureComponent<CollabProps, CollabState> {
};
appJotaiStore.set(collabAPIAtom, collabAPI);
appJotaiStore.set(
syncAPIAtom,
new ExcalidrawSyncClient(this.excalidrawAPI),
);
if (import.meta.env.MODE === ENV.TEST || import.meta.env.DEV) {
window.collab = window.collab || ({} as Window["collab"]);

@ -1,2 +1,3 @@
node_modules
types
.wrangler

@ -32,6 +32,7 @@ import type {
} from "./element/types";
import { orderByFractionalIndex, syncMovedIndices } from "./fractionalIndex";
import { getNonDeletedGroupIds } from "./groups";
import { randomId } from "./random";
import { getObservedAppState } from "./store";
import type {
AppState,
@ -795,27 +796,33 @@ export class AppStateChange implements Change<AppState> {
}
}
type ElementPartial<T extends ExcalidrawElement = ExcalidrawElement> = Omit<
ElementUpdate<Ordered<T>>,
"seed"
>;
type ElementPartial<T extends ExcalidrawElement = ExcalidrawElement> =
ElementUpdate<Ordered<T>>;
/**
* Elements change is a low level primitive to capture a change between two sets of elements.
* It does so by encapsulating forward and backward `Delta`s, allowing to time-travel in both directions.
*/
export class ElementsChange implements Change<SceneElementsMap> {
public readonly id: string;
private constructor(
private readonly added: Record<string, Delta<ElementPartial>>,
private readonly removed: Record<string, Delta<ElementPartial>>,
private readonly updated: Record<string, Delta<ElementPartial>>,
) {}
options: { changeId: string },
) {
this.id = options.changeId;
}
public static create(
added: Record<string, Delta<ElementPartial>>,
removed: Record<string, Delta<ElementPartial>>,
updated: Record<string, Delta<ElementPartial>>,
options = { shouldRedistribute: false },
options: { changeId: string; shouldRedistribute: boolean } = {
changeId: randomId(),
shouldRedistribute: false,
},
) {
let change: ElementsChange;
@ -840,9 +847,13 @@ export class ElementsChange implements Change<SceneElementsMap> {
}
}
change = new ElementsChange(nextAdded, nextRemoved, nextUpdated);
change = new ElementsChange(nextAdded, nextRemoved, nextUpdated, {
changeId: options.changeId,
});
} else {
change = new ElementsChange(added, removed, updated);
change = new ElementsChange(added, removed, updated, {
changeId: options.changeId,
});
}
if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) {
@ -985,12 +996,13 @@ export class ElementsChange implements Change<SceneElementsMap> {
return ElementsChange.create({}, {}, {});
}
public static load(data: {
added: Record<string, Delta<ElementPartial>>;
removed: Record<string, Delta<ElementPartial>>;
updated: Record<string, Delta<ElementPartial>>;
}) {
return ElementsChange.create(data.added, data.removed, data.updated);
public static load(payload: string) {
const { id, added, removed, updated } = JSON.parse(payload);
return ElementsChange.create(added, removed, updated, {
changeId: id,
shouldRedistribute: false,
});
}
public inverse(): ElementsChange {
@ -1077,6 +1089,7 @@ export class ElementsChange implements Change<SceneElementsMap> {
const updated = applyLatestChangesInternal(this.updated);
return ElementsChange.create(added, removed, updated, {
changeId: this.id,
shouldRedistribute: true, // redistribute the deltas as `isDeleted` could have been updated
});
}
@ -1101,9 +1114,9 @@ export class ElementsChange implements Change<SceneElementsMap> {
flags,
);
const addedElements = applyDeltas(this.added);
const removedElements = applyDeltas(this.removed);
const updatedElements = applyDeltas(this.updated);
const addedElements = applyDeltas("added", this.added);
const removedElements = applyDeltas("removed", this.removed);
const updatedElements = applyDeltas("updated", this.updated);
const affectedElements = this.resolveConflicts(elements, nextElements);
@ -1156,22 +1169,27 @@ export class ElementsChange implements Change<SceneElementsMap> {
}
}
private static createApplier = (
nextElements: SceneElementsMap,
snapshot: Map<string, OrderedExcalidrawElement>,
flags: {
containsVisibleDifference: boolean;
containsZindexDifference: boolean;
},
) => {
const getElement = ElementsChange.createGetter(
nextElements,
snapshot,
flags,
);
private static createApplier =
(
nextElements: SceneElementsMap,
snapshot: Map<string, OrderedExcalidrawElement>,
flags: {
containsVisibleDifference: boolean;
containsZindexDifference: boolean;
},
) =>
(
type: "added" | "removed" | "updated",
deltas: Record<string, Delta<ElementPartial>>,
) => {
const getElement = ElementsChange.createGetter(
type,
nextElements,
snapshot,
flags,
);
return (deltas: Record<string, Delta<ElementPartial>>) =>
Object.entries(deltas).reduce((acc, [id, delta]) => {
return Object.entries(deltas).reduce((acc, [id, delta]) => {
const element = getElement(id, delta.inserted);
if (element) {
@ -1182,10 +1200,11 @@ export class ElementsChange implements Change<SceneElementsMap> {
return acc;
}, new Map<string, OrderedExcalidrawElement>());
};
};
private static createGetter =
(
type: "added" | "removed" | "updated",
elements: SceneElementsMap,
snapshot: Map<string, OrderedExcalidrawElement>,
flags: {
@ -1211,6 +1230,15 @@ export class ElementsChange implements Change<SceneElementsMap> {
) {
flags.containsVisibleDifference = true;
}
} else if (type === "added") {
// for additions the element does not have to exist (i.e. remote update)
// TODO: the version itself might be different!
element = newElementWith(
{ id, version: 1 } as OrderedExcalidrawElement,
{
...partial,
},
);
}
}
@ -1574,8 +1602,7 @@ export class ElementsChange implements Change<SceneElementsMap> {
private static stripIrrelevantProps(
partial: Partial<OrderedExcalidrawElement>,
): ElementPartial {
const { id, updated, version, versionNonce, seed, ...strippedPartial } =
partial;
const { id, updated, version, versionNonce, ...strippedPartial } = partial;
return strippedPartial;
}

@ -8,7 +8,7 @@ import type {
export class DurableChangesRepository implements ChangesRepository {
constructor(private storage: DurableObjectStorage) {
// #region DEV ONLY
this.storage.sql.exec(`DROP TABLE IF EXISTS changes;`);
// this.storage.sql.exec(`DROP TABLE IF EXISTS changes;`);
// #endregion
this.storage.sql.exec(`CREATE TABLE IF NOT EXISTS changes(

@ -719,6 +719,7 @@ class App extends React.Component<AppProps, AppState> {
addFiles: this.addFiles,
resetScene: this.resetScene,
getSceneElementsIncludingDeleted: this.getSceneElementsIncludingDeleted,
store: this.store,
history: {
clear: this.resetHistory,
},

@ -140,8 +140,8 @@
"start": "node ../../scripts/buildExample.mjs && vite",
"build:example": "node ../../scripts/buildExample.mjs",
"size": "yarn build:umd && size-limit",
"cf:deploy": "wrangler deploy",
"cf:dev": "wrangler dev",
"cf:typegen": "wrangler types"
"sync:deploy": "wrangler deploy",
"sync:dev": "wrangler dev",
"sync:typegen": "wrangler types"
}
}

@ -1,44 +1,147 @@
/* eslint-disable no-console */
import { Utils } from "./utils";
import type { CLIENT_CHANGE, SERVER_CHANGE } from "./protocol";
import { ElementsChange } from "../change";
import type { ExcalidrawImperativeAPI } from "../types";
import type { SceneElementsMap } from "../element/types";
import type { CLIENT_CHANGE, PUSH_PAYLOAD, SERVER_CHANGE } from "./protocol";
import throttle from "lodash.throttle";
class ExcalidrawSyncClient {
export class ExcalidrawSyncClient {
// TODO: add prod url
private static readonly HOST_URL = "ws://localhost:8787";
private static readonly RECONNECT_INTERVAL = 10_000;
private roomId: string;
private lastAcknowledgedVersion: number;
private lastAcknowledgedVersion = 0;
private readonly api: ExcalidrawImperativeAPI;
private readonly roomId: string;
private readonly queuedChanges: Map<string, CLIENT_CHANGE> = new Map();
private get localChanges() {
return Array.from(this.queuedChanges.values());
}
private server: WebSocket | null = null;
private get isConnected() {
return this.server?.readyState === WebSocket.OPEN;
}
constructor(roomId: string = "test_room_1") {
private isConnecting: { done: (error?: Error) => void } | null = null;
constructor(api: ExcalidrawImperativeAPI, roomId: string = "test_room_1") {
this.api = api;
this.roomId = roomId;
// TODO: persist in idb
this.lastAcknowledgedVersion = 0;
}
public connect() {
this.server = new WebSocket(
`${ExcalidrawSyncClient.HOST_URL}/connect?roomId=${this.roomId}`,
);
public reconnect = throttle(
async () => {
try {
if (this.isConnected) {
console.debug("Already connected to the sync server.");
return;
}
this.server.addEventListener("open", this.onOpen);
this.server.addEventListener("message", this.onMessage);
this.server.addEventListener("close", this.onClose);
this.server.addEventListener("error", this.onError);
}
if (this.isConnecting !== null) {
console.debug("Already reconnecting to the sync server...");
return;
}
console.trace("Reconnecting to the sync server...");
const isConnecting = {
done: () => {},
};
// ensure there won't be multiple reconnection attempts
this.isConnecting = isConnecting;
public disconnect() {
if (this.server) {
this.server.removeEventListener("open", this.onOpen);
this.server.removeEventListener("message", this.onMessage);
this.server.removeEventListener("close", this.onClose);
this.server.removeEventListener("error", this.onError);
this.server.close();
return await new Promise<void>((resolve, reject) => {
this.server = new WebSocket(
`${ExcalidrawSyncClient.HOST_URL}/connect?roomId=${this.roomId}`,
);
// wait for 10 seconds before timing out
const timeoutId = setTimeout(() => {
reject("Connecting the sync server timed out");
}, 10_000);
// resolved when opened, rejected on error
isConnecting.done = (error?: Error) => {
clearTimeout(timeoutId);
if (error) {
reject(error);
} else {
resolve();
}
};
this.server.addEventListener("message", this.onMessage);
this.server.addEventListener("close", this.onClose);
this.server.addEventListener("error", this.onError);
this.server.addEventListener("open", this.onOpen);
});
} catch (e) {
console.error("Failed to connect to sync server:", e);
this.disconnect(e as Error);
}
},
ExcalidrawSyncClient.RECONNECT_INTERVAL,
{ leading: true },
);
public disconnect = throttle(
(error?: Error) => {
try {
this.server?.removeEventListener("message", this.onMessage);
this.server?.removeEventListener("close", this.onClose);
this.server?.removeEventListener("error", this.onError);
this.server?.removeEventListener("open", this.onOpen);
if (error) {
this.isConnecting?.done(error);
}
} finally {
this.isConnecting = null;
this.server = null;
this.reconnect();
}
},
ExcalidrawSyncClient.RECONNECT_INTERVAL,
{ leading: true },
);
private onOpen = async () => {
if (!this.isConnected) {
throw new Error(
"Received open event, but the connection is still not ready.",
);
}
}
private onOpen = () => this.sync();
if (!this.isConnecting) {
throw new Error(
"Can't resolve connection without `isConnecting` callback.",
);
}
// resolve the current connection
this.isConnecting.done();
// initiate pull
this.pull();
};
private onClose = () =>
this.disconnect(
new Error(`Received "closed" event on the sync connection`),
);
private onError = (event: Event) =>
this.disconnect(
new Error(`Received "${event.type}" on the sync connection`),
);
// TODO: could be an array buffer
private onMessage = (event: MessageEvent) => {
@ -62,82 +165,126 @@ class ExcalidrawSyncClient {
}
};
private onClose = () => this.disconnect();
private onError = (error: Event) => console.error("WebSocket error:", error);
public sync() {
const remoteChanges = this.send({
private pull = (): void => {
this.send({
type: "pull",
payload: { lastAcknowledgedVersion: this.lastAcknowledgedVersion },
payload: {
lastAcknowledgedVersion: this.lastAcknowledgedVersion,
},
});
// TODO: apply remote changes
// const localChanges: Array<CLIENT_CHANGE> = [];
// // TODO: apply local changes (unacknowledged)
// this.push(localChanges, 'durable');
}
};
public pull() {
return this.send({
type: "pull",
payload: { lastAcknowledgedVersion: this.lastAcknowledgedVersion },
});
}
public push = (
type: "durable" | "ephemeral" = "durable",
changes: Array<CLIENT_CHANGE> = [],
): void => {
const payload: PUSH_PAYLOAD = { type, changes: [] };
public push(changes: Array<CLIENT_CHANGE>, type: "durable" | "ephemeral") {
return this.send({
type: "push",
payload: { type, changes },
});
}
if (type === "durable") {
// TODO: persist in idb (with insertion order)
for (const change of changes) {
this.queuedChanges.set(change.id, change);
}
// batch all queued changes
payload.changes = this.localChanges;
} else {
payload.changes = changes;
}
public relay(buffer: ArrayBuffer) {
return this.send({
if (payload.changes.length > 0) {
this.send({
type: "push",
payload,
});
}
};
public relay(buffer: ArrayBuffer): void {
this.send({
type: "relay",
payload: { buffer },
});
}
private handleMessage(message: string) {
const [result, error] = Utils.try(() => JSON.parse(message));
// TODO: refactor by applying all operations to store, not to the elements
private handleAcknowledged(payload: { changes: Array<SERVER_CHANGE> }) {
const { changes: remoteChanges } = payload;
if (error) {
console.error("Failed to parse message:", message);
return;
}
const oldAcknowledgedVersion = this.lastAcknowledgedVersion;
let elements = new Map(
this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]),
) as SceneElementsMap;
const { type, payload } = result;
switch (type) {
case "relayed":
return this.handleRelayed(payload);
case "acknowledged":
return this.handleAcknowledged(payload);
case "rejected":
return this.handleRejected(payload);
default:
console.error("Unknown message type:", type);
}
}
console.log("remote changes", remoteChanges);
console.log("local changes", this.localChanges);
private handleRelayed(payload: { changes: Array<CLIENT_CHANGE> }) {
console.log("Relayed message received:", payload);
// Process relayed changes
}
try {
// apply remote changes
for (const remoteChange of remoteChanges) {
if (this.queuedChanges.has(remoteChange.id)) {
// local change acknowledge by the server, safe to remove
this.queuedChanges.delete(remoteChange.id);
} else {
[elements] = ElementsChange.load(remoteChange.payload).applyTo(
elements,
this.api.store.snapshot.elements,
);
private handleAcknowledged(payload: { changes: Array<SERVER_CHANGE> }) {
console.log("Acknowledged message received:", payload);
// Handle acknowledged changes
// TODO: we might not need to be that strict here
if (this.lastAcknowledgedVersion + 1 !== remoteChange.version) {
throw new Error(
`Received out of order change, expected "${
this.lastAcknowledgedVersion + 1
}", but received "${remoteChange.version}"`,
);
}
}
this.lastAcknowledgedVersion = remoteChange.version;
}
// apply local changes
// TODO: only necessary when remote changes modified same element properties!
for (const localChange of this.localChanges) {
[elements] = localChange.applyTo(
elements,
this.api.store.snapshot.elements,
);
}
this.api.updateScene({
elements: Array.from(elements.values()),
storeAction: "update",
});
// push all queued changes
this.push();
} catch (e) {
console.error("Failed to apply acknowledged changes:", e);
// rollback the last acknowledged version
this.lastAcknowledgedVersion = oldAcknowledgedVersion;
// pull again to get the latest changes
this.pull();
}
}
private handleRejected(payload: { ids: Array<string>; message: string }) {
// handle rejected changes
console.error("Rejected message received:", payload);
// Handle rejected changes
}
private send(message: { type: string; payload: any }) {
if (this.server && this.server.readyState === WebSocket.OPEN) {
this.server.send(JSON.stringify(message));
} else {
console.error("WebSocket is not open. Unable to send message.");
private handleRelayed(payload: { changes: Array<CLIENT_CHANGE> }) {
// apply relayed changes / buffer
console.log("Relayed message received:", payload);
}
private send(message: { type: string; payload: any }): void {
if (!this.isConnected) {
console.error("Can't send a message without an active connection!");
return;
}
this.server?.send(JSON.stringify(message));
}
}

@ -1,3 +1,5 @@
import type { ElementsChange } from "../change";
export type RELAY_PAYLOAD = { buffer: ArrayBuffer };
export type PULL_PAYLOAD = { lastAcknowledgedVersion: number };
export type PUSH_PAYLOAD = {
@ -5,11 +7,7 @@ export type PUSH_PAYLOAD = {
changes: Array<CLIENT_CHANGE>;
};
export type CLIENT_CHANGE = {
id: string;
appStateChange: any;
elementsChange: any;
};
export type CLIENT_CHANGE = ElementsChange;
export type CLIENT_MESSAGE =
| { type: "relay"; payload: RELAY_PAYLOAD }
@ -23,7 +21,10 @@ export type SERVER_MESSAGE =
payload: { changes: Array<CLIENT_CHANGE> } | RELAY_PAYLOAD;
}
| { type: "acknowledged"; payload: { changes: Array<SERVER_CHANGE> } }
| { type: "rejected"; payload: { ids: Array<string>; message: string } };
| {
type: "rejected";
payload: { changes: Array<CLIENT_CHANGE>; message: string };
};
export interface ChangesRepository {
saveAll(changes: Array<CLIENT_CHANGE>): Array<SERVER_CHANGE>;

@ -78,6 +78,7 @@ export class ExcalidrawSyncServer {
}
if (versionΔ > 0) {
// TODO: for versioning we need deletions, but not for the "snapshot" update
const changes = this.changesRepository.getSinceVersion(
lastAcknowledgedClientVersion,
);
@ -106,8 +107,8 @@ export class ExcalidrawSyncServer {
return this.send(client, {
type: "rejected",
payload: {
ids: changes.map((i) => i.id),
message: error.message,
changes,
},
});
}

@ -756,6 +756,7 @@ export interface ExcalidrawImperativeAPI {
history: {
clear: InstanceType<typeof App>["resetHistory"];
};
store: InstanceType<typeof App>["store"];
getSceneElements: InstanceType<typeof App>["getSceneElements"];
getAppState: () => InstanceType<typeof App>["state"];
getFiles: () => InstanceType<typeof App>["files"];

@ -10145,27 +10145,13 @@ stringify-object@^3.3.0:
is-obj "^1.0.1"
is-regexp "^1.0.0"
"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1:
"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@6.0.1, strip-ansi@^3.0.0, strip-ansi@^6.0.0, strip-ansi@^6.0.1, strip-ansi@^7.0.1:
version "6.0.1"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9"
integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==
dependencies:
ansi-regex "^5.0.1"
strip-ansi@^3.0.0:
version "3.0.1"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-3.0.1.tgz#6a385fb8853d952d5ff05d0e8aaf94278dc63dcf"
integrity sha512-VhumSSbBqDTP8p2ZLKj40UjBCV4+v8bUSEpUb4KjRgWk9pbqGF4REFj6KEagidb2f/M6AzC0EmFyDNGaw9OCzg==
dependencies:
ansi-regex "^2.0.0"
strip-ansi@^7.0.1:
version "7.1.0"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-7.1.0.tgz#d5b6568ca689d8561370b0707685d22434faff45"
integrity sha512-iq6eVVI64nQQTRYq2KtEg2d2uU7LElhTJwsH4YzIHZshxlgZms/wIc4VoDQTlG/IvVIrBKG06CrZnp0qv7hkcQ==
dependencies:
ansi-regex "^6.0.1"
strip-bom@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/strip-bom/-/strip-bom-3.0.0.tgz#2334c18e9c759f7bdd56fdef7e9ae3d588e68ed3"

Loading…
Cancel
Save