@@ -835,18 +835,19 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
835835 verbose (" Retained " , mqttbroker.retained)
836836
837837 when not defined (broker):
838+ var callbacks: seq [PubCallback ]
838839 for top, cb in ctx.pubCallbacks:
839840 if top == topic or top == " #" :
840- cb. cb (topic, message )
841+ callbacks. add (cb )
841842 if top.endsWith (" /#" ):
842843 # the multi-level wildcard can represent zero levels.
843844 if topic == top[0 .. ^ 3 ]:
844- cb. cb (topic, message )
845+ callbacks. add (cb )
845846 continue
846847 var topicw = top
847848 topicw.removeSuffix (" #" )
848849 if topic.contains (topicw):
849- cb. cb (topic, message )
850+ callbacks. add (cb )
850851 if top.contains (" +" ):
851852 var topelem = split (top, '/' )
852853 if len (topelem) == count (topic, '/' ) + 1 :
@@ -855,7 +856,9 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
855856 if topelem[i] != " +" and e != topelem[i]: break
856857 i = i+ 1
857858 if i == len (topelem):
858- cb.cb (topic, message)
859+ callbacks.add (cb)
860+ for cb in callbacks:
861+ cb.cb (topic, message)
859862
860863 if qos == 1 :
861864 ctx.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, state: WorkNew , qos: 1 , typ: PubAck )
@@ -1062,36 +1065,40 @@ proc connectBroker(ctx: MqttCtx) {.async.} =
10621065
10631066 if ctx.verbosity >= 1 :
10641067 ctx.dbg " Connecting to " & ctx.host & " :" & $ ctx.port
1065- try :
1066- ctx.s = await asyncnet.dial (ctx.host, ctx.port)
1067- if ctx.sslOn:
1068- when defined (ssl):
1069- ctx.ssl = newContext (protSSLv23, CVerifyNone , ctx.sslCert, ctx.sslKey)
1070- wrapConnectedSocket (ctx.ssl, ctx.s, handshakeAsClient)
1071- else :
1072- ctx.wrn " Requested SSL session but ssl is not enabled"
1073- await ctx.close (" SSL not enabled" )
1074- ctx.state = Error
1075- let ok = await ctx.sendConnect ()
1076- if ok:
1077- asyncCheck ctx.runRx ()
1078- asyncCheck ctx.runPing ()
1079- except OSError as e:
1080- if ctx.verbosity >= 1 or not ctx.beenConnected:
1081- ctx.dbg " Error connecting to " & ctx.host
1082- if ctx.verbosity >= 2 :
1083- echo e.msg
1084- ctx.state = Error
1068+
1069+ ctx.state = Error # set to Connecting by sendConnect
1070+
1071+ ctx.s = await asyncnet.dial (ctx.host, ctx.port)
1072+ if ctx.sslOn:
1073+ when defined (ssl):
1074+ ctx.ssl = newContext (protSSLv23, CVerifyNone , ctx.sslCert, ctx.sslKey)
1075+ wrapConnectedSocket (ctx.ssl, ctx.s, handshakeAsClient)
1076+ else :
1077+ ctx.wrn " Requested SSL session but ssl is not enabled"
1078+ await ctx.close (" SSL not enabled" )
1079+
1080+ let ok = await ctx.sendConnect ()
1081+ if ok:
1082+ asyncCheck ctx.runRx ()
1083+ asyncCheck ctx.runPing ()
1084+
10851085
10861086proc runConnect (ctx: MqttCtx ) {.async .} =
10871087 # # Auto-connect and reconnect to broker
1088- await ctx.connectBroker ()
10891088
10901089 while true :
10911090 if ctx.state == Disabled :
10921091 break
10931092 elif ctx.state in [Disconnected , Error ]:
1094- await ctx.connectBroker ()
1093+ try :
1094+ await ctx.connectBroker ()
1095+ except OSError as e:
1096+ if ctx.verbosity >= 1 or not ctx.beenConnected:
1097+ ctx.dbg " Error connecting to " & ctx.host
1098+ if ctx.verbosity >= 2 :
1099+ echo e.msg
1100+ ctx.state = Error
1101+
10951102 # If the client has been disconnect, it is necessary to tell the broker,
10961103 # that we still want to be Subscribed. PubCallbacks still holds the
10971104 # callbacks, but we need to re-Subscribe to the broker.
@@ -1111,7 +1118,7 @@ proc runConnect(ctx: MqttCtx) {.async.} =
11111118
11121119proc newMqttCtx * (clientId: string ): MqttCtx =
11131120 # # Initiate a new MQTT client
1114- MqttCtx (clientId: clientId)
1121+ MqttCtx (clientId: clientId, state: Disconnected )
11151122
11161123proc set_ping_interval * (ctx: MqttCtx , txInterval: int = 60 ) =
11171124 # # Set the clients ping interval in seconds. Default is 60 seconds.
0 commit comments