Skip to content

Commit 2eac588

Browse files
authored
Merge pull request #36 from lpapier/plus_wildcard
Plus wildcard
2 parents 4791cba + 71642d9 commit 2eac588

2 files changed

Lines changed: 57 additions & 0 deletions

File tree

nmqtt.nim

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,10 +839,23 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
839839
if top == topic or top == "#":
840840
cb.cb(topic, message)
841841
if top.endsWith("/#"):
842+
# the multi-level wildcard can represent zero levels.
843+
if topic == top[0 .. ^3]:
844+
cb.cb(topic, message)
845+
continue
842846
var topicw = top
843847
topicw.removeSuffix("#")
844848
if topic.contains(topicw):
845849
cb.cb(topic, message)
850+
if top.contains("+"):
851+
var topelem = split(top, '/')
852+
if len(topelem) == count(topic, '/') + 1:
853+
var i = 0
854+
for e in split(topic, '/'):
855+
if topelem[i] != "+" and e != topelem[i]: break
856+
i = i+1
857+
if i == len(topelem):
858+
cb.cb(topic, message)
846859

847860
if qos == 1:
848861
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 1, typ: PubAck)

tests/subscribe.nim

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,54 @@ suite "test suite for subscribe":
201201
await sleepAsync 500
202202
await ctxMain.publish("test/random1", msg, 0)
203203
await ctxMain.publish("second/random2", msg, 0)
204+
await ctxMain.publish("test", msg, 0)
204205
await ctxMain.publish("test/random3", msg, 0)
205206
await sleepAsync 500
206207
await ctxListen.unsubscribe("test/#")
207208
await sleepAsync 500
209+
check(msgCount == 3)
210+
211+
waitFor conn()
212+
213+
test "subscribe to test/+":
214+
let (_, msg) = tdata("subscribe to test/+")
215+
216+
proc conn() {.async.} =
217+
var msgCount: int
218+
proc on_data_sub_wild(topic: string, message: string) =
219+
msgCount += 1
220+
221+
await ctxListen.subscribe("test/+", 0, on_data_sub_wild)
222+
await sleepAsync 500
223+
await ctxMain.publish("test/random1", msg, 0)
224+
await ctxMain.publish("second/random2", msg, 0)
225+
await ctxMain.publish("test", msg, 0)
226+
await ctxMain.publish("test/random3", msg, 0)
227+
await sleepAsync 500
228+
await ctxListen.unsubscribe("test/+")
229+
await sleepAsync 500
230+
check(msgCount == 2)
231+
232+
waitFor conn()
233+
234+
test "subscribe to test/+/test":
235+
let (_, msg) = tdata("subscribe to test/+/test")
236+
237+
proc conn() {.async.} =
238+
var msgCount: int
239+
proc on_data_sub_wild(topic: string, message: string) =
240+
msgCount += 1
241+
242+
await ctxListen.subscribe("test/+/data", 0, on_data_sub_wild)
243+
await sleepAsync 500
244+
await ctxMain.publish("test/random1/data", msg, 0)
245+
await ctxMain.publish("second/random2/data", msg, 0)
246+
await ctxMain.publish("test/random3", msg, 0)
247+
await ctxMain.publish("test/random4/data", msg, 0)
248+
await ctxMain.publish("test/random5/data/random6", msg, 0)
249+
await sleepAsync 500
250+
await ctxListen.unsubscribe("test/+/data")
251+
await sleepAsync 500
208252
check(msgCount == 2)
209253

210254
waitFor conn()

0 commit comments

Comments
 (0)