Skip to content

Commit 3773545

Browse files
live query web worker syncing
1 parent 44efe5f commit 3773545

7 files changed

Lines changed: 243 additions & 71 deletions

File tree

apps/client/src/lib/runtime-client.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Layer, ManagedRuntime } from "effect";
22
import { ApiClient } from "./api-client";
33
import { Dexie } from "./dexie";
44
import { LoroStorage } from "./services/loro-storage";
5+
import { Sync } from "./services/sync";
56
import { TempWorkspace } from "./services/temp-workspace";
67
import { WorkspaceManager } from "./services/workspace-manager";
78

@@ -10,7 +11,8 @@ const MainLayer = Layer.mergeAll(
1011
ApiClient.Default,
1112
WorkspaceManager.Default,
1213
TempWorkspace.Default,
13-
LoroStorage.Default
14+
LoroStorage.Default,
15+
Sync.Default
1416
);
1517

1618
export const RuntimeClient = ManagedRuntime.make(MainLayer);
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { Effect } from "effect";
2+
import { ApiClient } from "../api-client";
3+
import { Dexie } from "../dexie";
4+
import type { WorkspaceTable } from "../schema";
5+
import { TempWorkspace } from "./temp-workspace";
6+
import { WorkspaceManager } from "./workspace-manager";
7+
import { LoroDoc } from "loro-crdt";
8+
9+
export class Sync extends Effect.Service<Sync>()("Sync", {
10+
dependencies: [
11+
TempWorkspace.Default,
12+
WorkspaceManager.Default,
13+
ApiClient.Default,
14+
Dexie.Default,
15+
],
16+
effect: Effect.gen(function* () {
17+
const { client } = yield* ApiClient;
18+
const { initClient } = yield* Dexie;
19+
const manager = yield* WorkspaceManager;
20+
const temp = yield* TempWorkspace;
21+
22+
return {
23+
push: ({
24+
snapshot,
25+
workspace,
26+
}: {
27+
workspace: WorkspaceTable;
28+
snapshot: globalThis.Uint8Array;
29+
}) =>
30+
Effect.gen(function* () {
31+
const clientId = yield* initClient;
32+
33+
const response = yield* Effect.fromNullable(workspace.token).pipe(
34+
Effect.flatMap((token) =>
35+
client.syncData
36+
.push({
37+
// headers: { Authorization: `Bearer ${token}` },
38+
path: {
39+
workspaceId: workspace.workspaceId,
40+
},
41+
payload: { clientId, snapshot },
42+
})
43+
.pipe(
44+
Effect.map((response) => ({
45+
...response,
46+
token,
47+
}))
48+
)
49+
),
50+
Effect.orElse(() =>
51+
client.syncAuth
52+
.generateToken({
53+
payload: {
54+
clientId,
55+
snapshot,
56+
workspaceId: workspace.workspaceId,
57+
},
58+
})
59+
.pipe(
60+
Effect.tap(({ token }) =>
61+
manager.setToken({
62+
workspaceId: workspace.workspaceId,
63+
token,
64+
})
65+
)
66+
)
67+
)
68+
);
69+
70+
const doc = new LoroDoc();
71+
doc.import(response.snapshot);
72+
yield* Effect.all([
73+
manager.put({
74+
workspaceId: response.workspaceId,
75+
snapshot: response.snapshot,
76+
token: response.token,
77+
version: doc.version().encode(),
78+
}),
79+
temp.clean({
80+
workspaceId: workspace.workspaceId,
81+
}),
82+
]);
83+
}),
84+
};
85+
}),
86+
}) {}

apps/client/src/routes/$workspaceId.tsx

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ import { Worker } from "@effect/platform";
22
import { BrowserWorker } from "@effect/platform-browser";
33
import { createFileRoute } from "@tanstack/react-router";
44
import { Effect } from "effect";
5-
import { startTransition } from "react";
5+
import { startTransition, useEffect } from "react";
66
import { useActivity } from "../lib/hooks/use-activity";
77
import { RuntimeClient } from "../lib/runtime-client";
88
import { LoroStorage } from "../lib/services/loro-storage";
99
import { WorkspaceManager } from "../lib/services/workspace-manager";
1010
import { useActionEffect } from "../lib/use-action-effect";
11-
import { Bootstrap } from "../workers/schema";
11+
import { Bootstrap, LiveQuery } from "../workers/schema";
1212

1313
export const Route = createFileRoute("/$workspaceId")({
1414
component: RouteComponent,
@@ -62,6 +62,31 @@ function RouteComponent() {
6262
})
6363
);
6464

65+
useEffect(() => {
66+
const url = new URL("./src/workers/live.ts", globalThis.origin);
67+
const newWorker = new globalThis.Worker(url, { type: "module" });
68+
69+
void RuntimeClient.runPromise(
70+
Effect.gen(function* () {
71+
const pool = yield* Worker.makePoolSerialized({ size: 1 });
72+
return yield* pool.broadcast(
73+
new LiveQuery({ workspaceId: workspace.workspaceId })
74+
);
75+
}).pipe(
76+
Effect.scoped,
77+
Effect.provide(BrowserWorker.layer(() => newWorker))
78+
)
79+
);
80+
81+
newWorker.onerror = (error) => {
82+
console.error("Live query worker error", error);
83+
};
84+
85+
return () => {
86+
newWorker.terminate();
87+
};
88+
}, []);
89+
6590
return (
6691
<div>
6792
<p>{workspace.workspaceId}</p>

apps/client/src/workers/live.ts

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { WorkerRunner } from "@effect/platform";
2+
import { BrowserWorkerRunner } from "@effect/platform-browser";
3+
import { Snapshot } from "@local/sync/loro";
4+
import { liveQuery } from "dexie";
5+
import {
6+
Array,
7+
Effect,
8+
Layer,
9+
Number,
10+
Schema,
11+
Stream,
12+
SynchronizedRef,
13+
} from "effect";
14+
import { Dexie } from "../lib/dexie";
15+
import { RuntimeClient } from "../lib/runtime-client";
16+
import { Sync } from "../lib/services/sync";
17+
import { WorkspaceManager } from "../lib/services/workspace-manager";
18+
import { type LiveQuery } from "./schema";
19+
20+
const main = (params: { workspaceId: string }) =>
21+
Effect.gen(function* () {
22+
const manager = yield* WorkspaceManager;
23+
const { db } = yield* Dexie;
24+
const { push } = yield* Sync;
25+
26+
const snapshotEq = Array.getEquivalence(Number.Equivalence);
27+
28+
yield* Effect.log(`Live query workspace '${params.workspaceId}'`);
29+
30+
const workspace = yield* manager
31+
.getById({ workspaceId: params.workspaceId })
32+
.pipe(Effect.flatMap(Effect.fromNullable));
33+
34+
const live = liveQuery(() =>
35+
db.temp_workspace
36+
.where("workspaceId")
37+
.equals(params.workspaceId)
38+
.toArray()
39+
);
40+
41+
yield* Effect.forkScoped(
42+
Effect.acquireRelease(
43+
Effect.gen(function* () {
44+
yield* Effect.log("Subscribing");
45+
46+
const ref = yield* SynchronizedRef.make(0);
47+
return live.subscribe((payload) =>
48+
Effect.runPromise(
49+
Effect.gen(function* () {
50+
yield* Effect.log(`Change detected`);
51+
52+
const id = yield* ref.pipe(
53+
SynchronizedRef.updateAndGet((n) => n + 1)
54+
);
55+
56+
yield* Stream.runDrain(
57+
Stream.make(...payload).pipe(
58+
Stream.changesWith((a, b) =>
59+
snapshotEq(a.snapshot, b.snapshot)
60+
),
61+
Stream.debounce("3 seconds"),
62+
Stream.tap((message) =>
63+
Effect.gen(function* () {
64+
const streamId = yield* ref.get;
65+
if (streamId === id) {
66+
yield* Effect.log(
67+
`Syncing ${payload.length} changes`
68+
);
69+
70+
const snapshot = yield* Schema.decode(Snapshot)(
71+
message.snapshot
72+
);
73+
74+
yield* push({
75+
workspace,
76+
snapshot,
77+
});
78+
}
79+
})
80+
)
81+
)
82+
);
83+
})
84+
)
85+
);
86+
}),
87+
(subscription) =>
88+
Effect.gen(function* () {
89+
yield* Effect.log("Live query unsubscribing");
90+
return subscription.unsubscribe();
91+
})
92+
)
93+
);
94+
95+
return true;
96+
});
97+
98+
const WorkerLive = WorkerRunner.layer((params: LiveQuery) =>
99+
Effect.scoped(
100+
Effect.gen(function* () {
101+
yield* Effect.log("Startup live query connection");
102+
103+
yield* Effect.fork(main({ workspaceId: params.workspaceId }));
104+
yield* Effect.forever(Effect.never);
105+
106+
yield* Effect.log("Closed live query connection");
107+
}).pipe(Effect.mapError(() => "Live query error"))
108+
)
109+
).pipe(Layer.provide(BrowserWorkerRunner.layer));
110+
111+
RuntimeClient.runFork(WorkerRunner.launch(WorkerLive));

apps/client/src/workers/schema.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import { Schema } from "effect";
22

3+
export class LiveQuery extends Schema.TaggedRequest<LiveQuery>()("LiveQuery", {
4+
failure: Schema.String,
5+
payload: { workspaceId: Schema.String },
6+
success: Schema.Boolean,
7+
}) {}
8+
39
export class Bootstrap extends Schema.TaggedRequest<Bootstrap>()("Bootstrap", {
410
failure: Schema.String,
511
payload: { workspaceId: Schema.String },

apps/client/src/workers/sync.ts

Lines changed: 8 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
import { WorkerRunner } from "@effect/platform";
22
import { BrowserWorkerRunner } from "@effect/platform-browser";
33
import { Effect, Layer } from "effect";
4-
import { LoroDoc } from "loro-crdt";
5-
import { ApiClient } from "../lib/api-client";
6-
import { Dexie } from "../lib/dexie";
74
import { RuntimeClient } from "../lib/runtime-client";
5+
import { Sync } from "../lib/services/sync";
86
import { TempWorkspace } from "../lib/services/temp-workspace";
97
import { WorkspaceManager } from "../lib/services/workspace-manager";
108
import { WorkerMessage } from "./schema";
119

1210
const WorkerLive = WorkerRunner.layerSerialized(WorkerMessage, {
1311
Bootstrap: (params) =>
1412
Effect.gen(function* () {
15-
const { client } = yield* ApiClient;
16-
const { initClient } = yield* Dexie;
13+
const { push } = yield* Sync;
1714

1815
const manager = yield* WorkspaceManager;
1916
const temp = yield* TempWorkspace;
@@ -22,77 +19,21 @@ const WorkerLive = WorkerRunner.layerSerialized(WorkerMessage, {
2219

2320
const workspace = yield* manager
2421
.getById({ workspaceId: params.workspaceId })
25-
.pipe(
26-
Effect.flatMap(Effect.fromNullable),
27-
Effect.mapError(() => "Get workspace error")
28-
);
22+
.pipe(Effect.flatMap(Effect.fromNullable));
2923

30-
const clientId = yield* initClient.pipe(
31-
Effect.mapError(() => "Init client error")
32-
);
33-
34-
const tempUpdates = yield* temp
35-
.getById({ workspaceId: workspace.workspaceId })
36-
.pipe(Effect.mapError(() => "Get temp workspace error"));
24+
const tempUpdates = yield* temp.getById({
25+
workspaceId: workspace.workspaceId,
26+
});
3727

3828
if (tempUpdates !== undefined) {
39-
const response = yield* Effect.fromNullable(workspace.token).pipe(
40-
Effect.flatMap((token) =>
41-
client.syncData
42-
.push({
43-
// headers: { Authorization: `Bearer ${token}` },
44-
path: { workspaceId: workspace.workspaceId },
45-
payload: { clientId, snapshot: tempUpdates.snapshot },
46-
})
47-
.pipe(
48-
Effect.map((response) => ({ ...response, token })),
49-
Effect.mapError(() => "Sync push error")
50-
)
51-
),
52-
Effect.orElse(() =>
53-
client.syncAuth
54-
.generateToken({
55-
payload: {
56-
clientId,
57-
snapshot: tempUpdates.snapshot,
58-
workspaceId: params.workspaceId,
59-
},
60-
})
61-
.pipe(
62-
Effect.tap(({ token }) =>
63-
manager.setToken({
64-
workspaceId: workspace.workspaceId,
65-
token,
66-
})
67-
),
68-
Effect.mapError(() => "Generate token error")
69-
)
70-
)
71-
);
72-
73-
const doc = new LoroDoc();
74-
doc.import(response.snapshot);
75-
76-
yield* manager
77-
.put({
78-
workspaceId: response.workspaceId,
79-
snapshot: response.snapshot,
80-
token: response.token,
81-
version: doc.version().encode(),
82-
})
83-
.pipe(Effect.mapError(() => "Put workspace error"));
84-
85-
yield* temp
86-
.clean({ workspaceId: workspace.workspaceId })
87-
.pipe(Effect.mapError(() => "Clean temp workspace error"));
88-
29+
yield* push({ snapshot: tempUpdates.snapshot, workspace });
8930
yield* Effect.log("Sync completed");
9031
} else {
9132
yield* Effect.log("No sync updates");
9233
}
9334

9435
return true;
95-
}),
36+
}).pipe(Effect.mapError(() => "Bootstrap error")),
9637
}).pipe(Layer.provide(BrowserWorkerRunner.layer));
9738

9839
RuntimeClient.runFork(WorkerRunner.launch(WorkerLive));

apps/server/src/group/sync-data.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { HttpApiBuilder } from "@effect/platform";
22
import { SyncApi } from "@local/sync";
33
import { SnapshotToLoroDoc } from "@local/sync/loro";
4-
import { and, eq } from "drizzle-orm";
4+
import { and, desc, eq } from "drizzle-orm";
55
import { Array, Effect, Layer, Schema } from "effect";
66
import { workspaceTable } from "../db/schema";
77
import { Drizzle } from "../drizzle";
@@ -36,6 +36,7 @@ export const SyncDataGroupLive = HttpApiBuilder.group(
3636
eq(workspaceTable.clientId, clientId)
3737
)
3838
)
39+
.orderBy(desc(workspaceTable.createdAt))
3940
.limit(1),
4041
})({ clientId, workspaceId }).pipe(Effect.flatMap(Array.head));
4142

0 commit comments

Comments
 (0)