@@ -3,72 +3,135 @@ import z from 'zod';
33import { type RawData , WebSocket } from 'ws' ;
44import { WebSocketMessageSchema } from '@srcbook/shared' ;
55
6- const VALID_TOPIC_RE = / ^ [ a - z A - Z 0 - 9 _ : ] + $ / ;
6+ type TopicPart = { dynamic : false ; segment : string } | { dynamic : true ; parameter : string } ;
7+
8+ export type MessageContextType < Key extends string = string > = {
9+ topic : string ;
10+ event : string ;
11+ params : Record < Key , string > ;
12+ } ;
13+
14+ type TopicMatch = Pick < MessageContextType , 'topic' | 'params' > ;
15+
16+ export interface ConnectionContextType {
17+ reply : ( topic : string , event : string , payload : Record < string , any > ) => void ;
18+ }
719
820/**
9- * Channel is responsible for dispatching incoming and outgoing messages for a given topic.
21+ * Channel is responsible for dispatching incoming messages for a given topic.
22+ *
23+ * Topics are strings that represent a channel for messages. Topics
24+ * can be broken into multiple parts separated by a colon. The following
25+ * are all examples of valid topics:
26+ *
27+ * - session
28+ * - session:123
29+ * - room:123:users:456:messages
30+ *
31+ * When we define a topic, we can use the `<variable>` syntax to indicate a
32+ * wildcard match. For example, the topic `room:<roomId>:messages` would match
33+ * `room:123:messages`, `room:456:messages`, etc.
34+ *
35+ * The wildcard syntax must be between two colons (or at the start/end of the string).
36+ * The text inside must be a valid JavaScript identifier.
1037 *
1138 * Examples:
1239 *
13- * const channel = new Channel("session") // matches "session" only
14- * const channel = new Channel("session:* ") // matches "session:123", "session:456", etc.
40+ * const channel = new Channel("session") // matches "session" only
41+ * const channel = new Channel("session:<sessionId> ") // matches "session:123", "session:456", etc.
1542 *
1643 */
1744export class Channel {
45+ // The topic pattern, e.g. "sessions:<sessionId>"
1846 readonly topic : string ;
1947
20- readonly events : {
21- incoming : Record <
22- string ,
23- { schema : z . ZodTypeAny ; handler : ( payload : Record < string , any > ) => void }
24- > ;
25- outgoing : Record < string , z . ZodTypeAny > ;
26- } = { incoming : { } , outgoing : { } } ;
27-
28- private wildcardMatch = false ;
48+ // The parts of the topic string, e.g. "sessions" and "<sessionId>" for "sessions:<sessionId>"
49+ private readonly parts : TopicPart [ ] ;
50+
51+ readonly events : Record <
52+ string ,
53+ {
54+ schema : z . ZodTypeAny ;
55+ handler : (
56+ payload : Record < string , any > ,
57+ context : MessageContextType ,
58+ conn : ConnectionContextType ,
59+ ) => void ;
60+ }
61+ > = { } ;
2962
3063 constructor ( topic : string ) {
31- if ( topic . endsWith ( ':*' ) ) {
32- // Remove asterisk from topic
33- topic = topic . slice ( 0 , - 1 ) ;
34- this . wildcardMatch = true ;
35- }
64+ this . topic = topic ;
65+ this . parts = this . splitIntoParts ( topic ) ;
66+ }
67+
68+ private splitIntoParts ( topic : string ) {
69+ const parts : TopicPart [ ] = [ ] ;
70+
71+ for ( const part of topic . split ( ':' ) ) {
72+ const parameter = part . match ( / ^ < ( [ a - z A - Z _ ] + [ a - z A - Z 0 - 9 _ ] * ) > $ / ) ;
3673
37- if ( ! VALID_TOPIC_RE . test ( topic ) ) {
38- throw new Error ( `Invalid channel topic '${ topic } '` ) ;
74+ if ( parameter !== null ) {
75+ parts . push ( { dynamic : true , parameter : parameter [ 1 ] as string } ) ;
76+ continue ;
77+ }
78+
79+ if ( / ^ [ a - z A - Z 0 - 9 _ ] + $ / . test ( part ) ) {
80+ parts . push ( { dynamic : false , segment : part } ) ;
81+ continue ;
82+ }
83+
84+ throw new Error ( `Invalid channel topic: ${ topic } ` ) ;
3985 }
4086
41- this . topic = topic ;
87+ return parts ;
4288 }
4389
44- matches ( topic : string ) {
45- if ( topic === this . topic ) {
46- return true ;
90+ match ( topic : string ) : TopicMatch | null {
91+ const parts = topic . split ( ':' ) ;
92+
93+ if ( parts . length !== this . parts . length ) {
94+ return null ;
4795 }
4896
49- if ( this . wildcardMatch ) {
50- return topic . startsWith ( this . topic ) && topic . length > this . topic . length ;
97+ const match : TopicMatch = {
98+ topic : topic ,
99+ params : { } ,
100+ } ;
101+
102+ for ( let i = 0 , len = this . parts . length ; i < len ; i ++ ) {
103+ const thisPart = this . parts [ i ] as TopicPart ;
104+
105+ if ( thisPart . dynamic ) {
106+ match . params [ thisPart . parameter ] = parts [ i ] as string ;
107+ continue ;
108+ } else if ( thisPart . segment === parts [ i ] ) {
109+ continue ;
110+ }
111+
112+ return null ;
51113 }
52114
53- return false ;
115+ return match ;
54116 }
55117
56- incoming < T extends z . ZodTypeAny > (
118+ on < T extends z . ZodTypeAny > (
57119 event : string ,
58120 schema : T ,
59- handler : ( payload : z . infer < T > ) => void ,
121+ handler : (
122+ payload : z . infer < T > ,
123+ context : MessageContextType ,
124+ conn : ConnectionContextType ,
125+ ) => void ,
60126 ) {
61- this . events . incoming [ event ] = { schema, handler } ;
62- return this ;
63- }
64-
65- outgoing < T extends z . ZodTypeAny > ( event : string , schema : T ) {
66- this . events . outgoing [ event ] = schema ;
127+ this . events [ event ] = { schema, handler } ;
67128 return this ;
68129 }
69130}
70131
71132type ConnectionType = {
133+ // Reply only to this connection, not to all connections.
134+ reply : ( topic : string , event : string , payload : Record < string , any > ) => void ;
72135 socket : WebSocket ;
73136 subscriptions : string [ ] ;
74137} ;
@@ -90,7 +153,13 @@ export default class WebSocketServer {
90153 return ;
91154 }
92155
93- const connection = { socket, subscriptions : [ ] } ;
156+ const connection = {
157+ socket,
158+ subscriptions : [ ] ,
159+ reply : ( topic : string , event : string , payload : Record < string , any > ) => {
160+ this . send ( connection , topic , event , payload ) ;
161+ } ,
162+ } ;
94163
95164 this . connections . push ( connection ) ;
96165
@@ -115,23 +184,9 @@ export default class WebSocketServer {
115184 }
116185
117186 broadcast ( topic : string , event : string , payload : Record < string , any > ) {
118- const channel = this . findChannel ( topic ) ;
119-
120- if ( channel === undefined ) {
121- throw new Error ( `Cannot broadcast to unknown topic '${ topic } '` ) ;
122- }
123-
124- const schema = channel . events . outgoing [ event ] ;
125-
126- if ( schema === undefined ) {
127- throw new Error ( `Cannot broadcast to unknown event '${ event } '` ) ;
128- }
129-
130- const validatedPayload = schema . parse ( payload ) ;
131-
132187 for ( const conn of this . connections ) {
133188 if ( conn . subscriptions . includes ( topic ) ) {
134- conn . socket . send ( JSON . stringify ( [ topic , event , validatedPayload ] ) ) ;
189+ this . send ( conn , topic , event , payload ) ;
135190 }
136191 }
137192 }
@@ -140,9 +195,9 @@ export default class WebSocketServer {
140195 const parsed = JSON . parse ( message . toString ( 'utf8' ) ) ;
141196 const [ topic , event , payload ] = WebSocketMessageSchema . parse ( parsed ) ;
142197
143- const channel = this . findChannel ( topic ) ;
198+ const channelMatch = this . findChannelMatch ( topic ) ;
144199
145- if ( channel === undefined ) {
200+ if ( channelMatch === null ) {
146201 console . warn ( `Server received unknown topic '${ topic } '` ) ;
147202 return ;
148203 }
@@ -157,7 +212,9 @@ export default class WebSocketServer {
157212 return ;
158213 }
159214
160- const registeredEvent = channel . events . incoming [ event ] ;
215+ const { channel, match } = channelMatch ;
216+
217+ const registeredEvent = channel . events [ event ] ;
161218
162219 if ( registeredEvent === undefined ) {
163220 console . warn ( `Server received unknown event '${ event } ' for topic '${ topic } '` ) ;
@@ -175,20 +232,28 @@ export default class WebSocketServer {
175232 return ;
176233 }
177234
178- handler ( result . data ) ;
235+ handler ( result . data , { topic : match . topic , event : event , params : match . params } , conn ) ;
179236 }
180237
181- private findChannel ( topic : string ) {
238+ private findChannelMatch ( topic : string ) : { channel : Channel ; match : TopicMatch } | null {
182239 for ( const channel of this . channels ) {
183- if ( channel . matches ( topic ) ) {
184- return channel ;
240+ const match = channel . match ( topic ) ;
241+
242+ if ( match !== null ) {
243+ return { channel, match } ;
185244 }
186245 }
246+
247+ return null ;
187248 }
188249
189250 private removeConnection ( socket : WebSocket ) {
190251 this . connections = this . connections . filter ( ( conn ) => {
191252 return conn . socket !== socket ;
192253 } ) ;
193254 }
255+
256+ private send ( conn : ConnectionType , topic : string , event : string , payload : Record < string , any > ) {
257+ conn . socket . send ( JSON . stringify ( [ topic , event , payload ] ) ) ;
258+ }
194259}
0 commit comments