Skip to content

Commit 71642d9

Browse files
author
Laurent Papier
committed
Single level '+' wildcard support
1 parent 13b3b1f commit 71642d9

2 files changed

Lines changed: 52 additions & 0 deletions

File tree

nmqtt.nim

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,15 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
847847
topicw.removeSuffix("#")
848848
if topic.contains(topicw):
849849
cb.cb(topic, message)
850+
if top.contains("+"):
851+
var topelem = split(top, '/')
852+
if len(topelem) == count(topic, '/') + 1:
853+
var i = 0
854+
for e in split(topic, '/'):
855+
if topelem[i] != "+" and e != topelem[i]: break
856+
i = i+1
857+
if i == len(topelem):
858+
cb.cb(topic, message)
850859

851860
if qos == 1:
852861
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 1, typ: PubAck)

tests/subscribe.nim

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,49 @@ suite "test suite for subscribe":
210210

211211
waitFor conn()
212212

213+
test "subscribe to test/+":
214+
let (_, msg) = tdata("subscribe to test/+")
215+
216+
proc conn() {.async.} =
217+
var msgCount: int
218+
proc on_data_sub_wild(topic: string, message: string) =
219+
msgCount += 1
220+
221+
await ctxListen.subscribe("test/+", 0, on_data_sub_wild)
222+
await sleepAsync 500
223+
await ctxMain.publish("test/random1", msg, 0)
224+
await ctxMain.publish("second/random2", msg, 0)
225+
await ctxMain.publish("test", msg, 0)
226+
await ctxMain.publish("test/random3", msg, 0)
227+
await sleepAsync 500
228+
await ctxListen.unsubscribe("test/+")
229+
await sleepAsync 500
230+
check(msgCount == 2)
231+
232+
waitFor conn()
233+
234+
test "subscribe to test/+/test":
235+
let (_, msg) = tdata("subscribe to test/+/test")
236+
237+
proc conn() {.async.} =
238+
var msgCount: int
239+
proc on_data_sub_wild(topic: string, message: string) =
240+
msgCount += 1
241+
242+
await ctxListen.subscribe("test/+/data", 0, on_data_sub_wild)
243+
await sleepAsync 500
244+
await ctxMain.publish("test/random1/data", msg, 0)
245+
await ctxMain.publish("second/random2/data", msg, 0)
246+
await ctxMain.publish("test/random3", msg, 0)
247+
await ctxMain.publish("test/random4/data", msg, 0)
248+
await ctxMain.publish("test/random5/data/random6", msg, 0)
249+
await sleepAsync 500
250+
await ctxListen.unsubscribe("test/+/data")
251+
await sleepAsync 500
252+
check(msgCount == 2)
253+
254+
waitFor conn()
255+
213256
test "stay subscribed after disconnect with reconnect":
214257
let (tpc, msg) = tdata("stay subscribed after disconnect with reconnect")
215258

0 commit comments

Comments
 (0)