Skip to content

Commit 5b76994

Browse files
authored
Merge pull request #34 from lmn/master
Wildcard Support
2 parents a8e94a2 + 7553844 commit 5b76994

2 files changed

Lines changed: 27 additions & 3 deletions

File tree

nmqtt.nim

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,13 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
836836

837837
when not defined(broker):
838838
for top, cb in ctx.pubCallbacks:
839-
if top == topic or top == "#": cb.cb(topic, message)
839+
if top == topic or top == "#":
840+
cb.cb(topic, message)
841+
if top.endsWith("/#"):
842+
var topicw = top
843+
topicw.removeSuffix("#")
844+
if topic.contains(topicw):
845+
cb.cb(topic, message)
840846

841847
if qos == 1:
842848
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 1, typ: PubAck)

tests/subscribe.nim

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,13 @@ suite "test suite for subscribe":
169169

170170
waitFor conn()
171171

172-
173172
test "subscribe to #":
174173
let (_, msg) = tdata("subscribe to #")
175174

176175
proc conn() {.async.} =
177176
var msgCount: int
178177
proc on_data_sub_all(topic: string, message: string) =
179-
msgCount += 1
178+
msgCount += 1
180179

181180
await ctxListen.subscribe("#", 0, on_data_sub_all)
182181
await sleepAsync 500
@@ -190,6 +189,25 @@ suite "test suite for subscribe":
190189

191190
waitFor conn()
192191

192+
test "subscribe to test/#":
193+
let (_, msg) = tdata("subscribe to test/#")
194+
195+
proc conn() {.async.} =
196+
var msgCount: int
197+
proc on_data_sub_wild(topic: string, message: string) =
198+
msgCount += 1
199+
200+
await ctxListen.subscribe("test/#", 0, on_data_sub_wild)
201+
await sleepAsync 500
202+
await ctxMain.publish("test/random1", msg, 0)
203+
await ctxMain.publish("second/random2", msg, 0)
204+
await ctxMain.publish("test/random3", msg, 0)
205+
await sleepAsync 500
206+
await ctxListen.unsubscribe("test/#")
207+
await sleepAsync 500
208+
check(msgCount == 2)
209+
210+
waitFor conn()
193211

194212
test "stay subscribed after disconnect with reconnect":
195213
let (tpc, msg) = tdata("stay subscribed after disconnect with reconnect")

0 commit comments

Comments
 (0)