Skip to content

Commit dc43835

Browse files
committed
feat(hocuspocus): scope history queries to the collab document room
Resolve version list and watch data using the WebSocket document name instead of trusting client-supplied ids, and reply on the same connection with unicast stateless payloads. Made-with: Cursor
1 parent b14394b commit dc43835

2 files changed

Lines changed: 173 additions & 67 deletions

File tree

packages/hocuspocus.server/src/hocuspocus.server.ts

Lines changed: 77 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,106 @@
1+
import type { Connection } from '@hocuspocus/server'
12
import { Server } from '@hocuspocus/server'
23

34
import HocuspocusConfig from './config/hocuspocus.config'
5+
import { handleHistoryStateless } from './lib/history-stateless'
46
import { logger } from './lib/logger'
5-
import { prisma, shutdownDatabase } from './lib/prisma'
7+
import { shutdownDatabase } from './lib/prisma'
68
import { disconnectRedis } from './lib/redis'
7-
import type { HistoryPayload } from './types'
9+
import type { HistoryPayload } from './types/document.types'
810
import { verifyJWT } from './utils'
911

1012
process.env.NODE_ENV = process.env.NODE_ENV || 'development'
1113

12-
// Create logger for WebSocket server
1314
const wsLogger = logger.child({ service: 'websocket' })
1415

15-
async function handleHistoryEvents(payload: HistoryPayload, _context: any, _document: any) {
16-
const { type, documentId } = payload
17-
18-
switch (type) {
19-
case 'history.list': {
20-
const docs = await prisma.documents.findMany({
21-
where: { documentId },
22-
orderBy: { createdAt: 'desc' },
23-
select: { version: true, commitMessage: true, createdAt: true }
24-
})
25-
return docs
26-
}
27-
28-
case 'history.watch': {
29-
const doc = await prisma.documents.findFirst({
30-
where: { documentId, version: payload.version },
31-
select: { data: true, version: true, commitMessage: true, createdAt: true }
32-
})
33-
34-
if (!doc) return null
35-
36-
// Convert Buffer to Base64 string for transport
37-
return {
38-
data: Buffer.from(doc.data).toString('base64'),
39-
version: doc.version,
40-
commitMessage: doc.commitMessage,
41-
createdAt: doc.createdAt
42-
}
43-
}
44-
45-
case 'history.prev':
46-
return prisma.documents.findFirst({
47-
where: { documentId, version: { lt: payload.currentVersion || 0 } },
48-
orderBy: { version: 'desc' }
49-
})
50-
51-
case 'history.next':
52-
return prisma.documents.findFirst({
53-
where: { documentId, version: { gt: payload.currentVersion || 0 } },
54-
orderBy: { version: 'asc' }
55-
})
56-
57-
default:
58-
return payload
59-
}
16+
function sendHistoryResponse(
17+
connection: Connection,
18+
type: string,
19+
response: unknown,
20+
error?: string
21+
) {
22+
connection.sendStateless(
23+
JSON.stringify({
24+
msg: 'history.response',
25+
type,
26+
response,
27+
...(error ? { error } : {})
28+
})
29+
)
6030
}
6131

62-
const broadcastToAll = (document: any, payload: any) => {
63-
document.broadcastStateless(JSON.stringify(payload))
32+
/** Hocuspocus document name === provider `name` / room id; never trust client `documentId` for Prisma. */
33+
function roomDocumentId(document: { name?: string }): string | null {
34+
const n = document?.name
35+
return typeof n === 'string' && n.length > 0 ? n : null
6436
}
6537

6638
const statelessExtension = {
67-
async onStateless({ payload, context, document, _connection }: any) {
68-
const parsedPayload = JSON.parse(payload)
39+
async onStateless({
40+
payload,
41+
connection,
42+
document
43+
}: {
44+
payload: string
45+
connection: Connection
46+
document: { name?: string; broadcastStateless: (p: string) => void }
47+
}) {
48+
let parsedPayload: {
49+
msg?: string
50+
type?: string
51+
documentId?: string
52+
version?: number
53+
currentVersion?: number
54+
}
55+
try {
56+
parsedPayload = JSON.parse(payload) as typeof parsedPayload
57+
} catch {
58+
return
59+
}
6960

7061
if (parsedPayload.msg === 'history') {
62+
const canonicalId = roomDocumentId(document)
63+
const type = parsedPayload.type
64+
65+
if (!canonicalId || !type) {
66+
wsLogger.warn(
67+
{ parsedPayload, hasRoomName: Boolean(canonicalId) },
68+
'history stateless missing room or type'
69+
)
70+
if (type) sendHistoryResponse(connection, type, null, 'history_failed')
71+
return
72+
}
73+
74+
if (parsedPayload.documentId != null && parsedPayload.documentId !== canonicalId) {
75+
wsLogger.warn(
76+
{ clientDocumentId: parsedPayload.documentId, canonicalId },
77+
'history stateless documentId does not match connection room'
78+
)
79+
sendHistoryResponse(connection, type, null, 'history_failed')
80+
return
81+
}
82+
83+
const historyPayload: HistoryPayload = {
84+
type,
85+
documentId: canonicalId,
86+
version: parsedPayload.version,
87+
currentVersion: parsedPayload.currentVersion
88+
}
89+
7190
try {
72-
const response = await handleHistoryEvents(parsedPayload, context, document)
73-
broadcastToAll(document, {
74-
msg: 'history.response',
75-
type: parsedPayload.type,
76-
response
77-
})
91+
const response = await handleHistoryStateless(historyPayload)
92+
sendHistoryResponse(connection, type, response)
7893
} catch (error) {
7994
wsLogger.error({ err: error }, 'Error handling history event')
95+
sendHistoryResponse(connection, type, null, 'history_failed')
8096
}
81-
} else {
82-
document.broadcastStateless(JSON.stringify(parsedPayload))
97+
return
8398
}
99+
100+
document.broadcastStateless(JSON.stringify(parsedPayload))
84101
}
85102
}
86103

87-
// Get the base configuration and add our stateless extension
88104
const baseConfig = HocuspocusConfig()
89105
const serverConfig = {
90106
...baseConfig,
@@ -98,7 +114,6 @@ const serverConfig = {
98114

99115
try {
100116
const tokenData = JSON.parse(token)
101-
// Extract deviceType from token (sent by webapp)
102117
const deviceType = tokenData.deviceType || 'desktop'
103118

104119
if (tokenData.accessToken) {
@@ -109,7 +124,6 @@ const serverConfig = {
109124
return { user, slug: tokenData.slug || '', documentId: documentName, deviceType }
110125
}
111126

112-
// Token invalid
113127
if (process.env.NODE_ENV === 'production') {
114128
throw new Error('Invalid authentication token')
115129
}
@@ -118,7 +132,6 @@ const serverConfig = {
118132
return { user: null, slug: tokenData.slug || '', documentId: documentName, deviceType }
119133
}
120134

121-
// No accessToken, allow with slug only
122135
return { user: null, slug: tokenData.slug || '', documentId: documentName, deviceType }
123136
} catch (error) {
124137
wsLogger.error({ err: error, documentName }, 'Auth error')
@@ -132,10 +145,8 @@ const serverConfig = {
132145
}
133146
}
134147

135-
// Configure and start the server
136148
const server = new Server(serverConfig)
137149

138-
// Start listening
139150
server.listen()
140151

141152
wsLogger.info({
@@ -145,7 +156,6 @@ wsLogger.info({
145156
url: `ws://localhost:${baseConfig.port}`
146157
})
147158

148-
// Graceful shutdown
149159
const shutdown = async () => {
150160
wsLogger.info('Shutting down WebSocket server gracefully...')
151161

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import type { HistoryPayload } from '../types/document.types'
2+
import { prisma } from './prisma'
3+
4+
/** Metadata rows for the version sidebar (no Yjs payload). */
5+
export type HistoryVersionMeta = {
6+
version: number
7+
commitMessage: string | null
8+
createdAt: Date
9+
}
10+
11+
/** Full row for editor hydration (base64 Yjs). */
12+
export type HistorySnapshot = {
13+
data: string
14+
version: number
15+
commitMessage: string | null
16+
createdAt: Date
17+
}
18+
19+
/** New list shape: sidebar list + latest body in one response (one network round-trip). */
20+
export type HistoryListResult = {
21+
versions: HistoryVersionMeta[]
22+
latestSnapshot: HistorySnapshot | null
23+
}
24+
25+
function toSnapshot(doc: {
26+
data: Buffer | Uint8Array
27+
version: number
28+
commitMessage: string | null
29+
createdAt: Date
30+
}): HistorySnapshot {
31+
return {
32+
data: Buffer.from(doc.data).toString('base64'),
33+
version: doc.version,
34+
commitMessage: doc.commitMessage,
35+
createdAt: doc.createdAt
36+
}
37+
}
38+
39+
/**
40+
* Document version history over Hocuspocus stateless channel.
41+
* `history.prev` / `history.next` are reserved for future UI; not used by the webapp today.
42+
*/
43+
export async function handleHistoryStateless(payload: HistoryPayload): Promise<unknown> {
44+
const { type, documentId } = payload
45+
46+
switch (type) {
47+
case 'history.list': {
48+
const versions = await prisma.documents.findMany({
49+
where: { documentId },
50+
orderBy: { createdAt: 'desc' },
51+
select: { version: true, commitMessage: true, createdAt: true }
52+
})
53+
54+
if (versions.length === 0) {
55+
return { versions: [], latestSnapshot: null } satisfies HistoryListResult
56+
}
57+
58+
const latestVersion = versions[0].version
59+
const full = await prisma.documents.findFirst({
60+
where: { documentId, version: latestVersion },
61+
select: { data: true, version: true, commitMessage: true, createdAt: true }
62+
})
63+
64+
return {
65+
versions,
66+
latestSnapshot: full ? toSnapshot(full) : null
67+
} satisfies HistoryListResult
68+
}
69+
70+
case 'history.watch': {
71+
const doc = await prisma.documents.findFirst({
72+
where: { documentId, version: payload.version },
73+
select: { data: true, version: true, commitMessage: true, createdAt: true }
74+
})
75+
76+
if (!doc) return null
77+
78+
return toSnapshot(doc)
79+
}
80+
81+
case 'history.prev':
82+
return prisma.documents.findFirst({
83+
where: { documentId, version: { lt: payload.currentVersion || 0 } },
84+
orderBy: { version: 'desc' }
85+
})
86+
87+
case 'history.next':
88+
return prisma.documents.findFirst({
89+
where: { documentId, version: { gt: payload.currentVersion || 0 } },
90+
orderBy: { version: 'asc' }
91+
})
92+
93+
default:
94+
return payload
95+
}
96+
}

0 commit comments

Comments
 (0)