@@ -207,9 +207,9 @@ when defined(broker):
207207 clientid: string
208208
209209when defined (broker):
210- var
211- mqttbroker = MqttBroker ()
212- r = initRand (toInt (epochTime ()))
210+ var
211+ mqttbroker = MqttBroker ()
212+ r = initRand (toInt (epochTime ()))
213213
214214
215215#
@@ -250,15 +250,15 @@ proc getstring(pkt: Pkt, offset: int, withLen: bool): (string, int) =
250250 result = (val, pkt.data.len)
251251
252252when defined (broker):
253- proc getstring (pkt: Pkt , offset: int , len: int ): (string , int ) =
254- var val: string
255- for i in offset..< len+ offset:
256- val.add pkt.data[i].char
257- result = (val, len+ offset)
253+ proc getstring (pkt: Pkt , offset: int , len: int ): (string , int ) =
254+ var val: string
255+ for i in offset..< len+ offset:
256+ val.add pkt.data[i].char
257+ result = (val, len+ offset)
258258
259259when defined (broker):
260- proc getbin (pkt: Pkt , b: int ): (string , int ) =
261- result = (toBin (parseBiggestInt ($ pkt.data[b]), 8 ), b+ 1 )
260+ proc getbin (pkt: Pkt , b: int ): (string , int ) =
261+ result = (toBin (parseBiggestInt ($ pkt.data[b]), 8 ), b+ 1 )
262262
263263proc `$` (pkt: Pkt ): string =
264264 result .add $ pkt.typ & " (" & $ pkt.flags.toHex & " ): "
@@ -276,7 +276,7 @@ proc newPkt(typ: PktType=NOTYPE, flags: uint8=0): Pkt =
276276proc dmp (ctx: MqttCtx , s: string ) =
277277 when not defined (broker):
278278 if defined (dev):
279- stderr.write " \e [1;30m" & s & " \e [0m\n "
279+ stderr.write " \e [1;30m" & s & " \e [0m\n "
280280 when defined (broker):
281281 if defined (dev) or mqttbroker.verbosity >= 2 :
282282 stderr.write " \e [1;30m" & s & " \e [0m\n "
@@ -331,48 +331,48 @@ proc wrn(s: string) =
331331#
332332
333333when defined (broker):
334- proc addSubscriber * (ctx: MqttCtx , topic: string ) {.async .} =
335- # # Adds a subscriber to MqttBroker
336- try :
337- if mqttbroker.subscribers.hasKey (topic):
338- mqttbroker.subscribers[topic].insert (ctx)
339- else :
340- mqttbroker.subscribers[topic] = @ [ctx]
341- except :
334+ proc addSubscriber * (ctx: MqttCtx , topic: string ) {.async .} =
335+ # # Adds a subscriber to MqttBroker
336+ try :
337+ if mqttbroker.subscribers.hasKey (topic):
338+ mqttbroker.subscribers[topic].insert (ctx)
339+ else :
340+ mqttbroker.subscribers[topic] = @ [ctx]
341+ except :
342342 wrn (" Crash when adding a new subcriber" )
343343
344344when defined (broker):
345- proc removeSubscriber * (ctx: MqttCtx , topic: string ) {.async .} =
346- # # Removes a subscriber from specific topic
347- try :
348- if mqttbroker.subscribers.hasKey (topic):
349- mqttbroker.subscribers[topic] = filter (mqttbroker.subscribers[topic], proc (x: MqttCtx ): bool = x != ctx)
350- except :
345+ proc removeSubscriber * (ctx: MqttCtx , topic: string ) {.async .} =
346+ # # Removes a subscriber from specific topic
347+ try :
348+ if mqttbroker.subscribers.hasKey (topic):
349+ mqttbroker.subscribers[topic] = filter (mqttbroker.subscribers[topic], proc (x: MqttCtx ): bool = x != ctx)
350+ except :
351351 wrn (" Crash when removing subscriber with specific topic" )
352352
353353when defined (broker):
354- proc removeSubscriber * (ctx: MqttCtx ) {.async .} =
355- # # Removes a subscriber without knowing the topics
354+ proc removeSubscriber * (ctx: MqttCtx ) {.async .} =
355+ # # Removes a subscriber without knowing the topics
356356 var delTop: seq [string ]
357- for t, c in mqttbroker.subscribers:
358- if ctx in c:
359- mqttbroker.subscribers[t] = filter (c, proc (x: MqttCtx ): bool = x != ctx)
357+ for t, c in mqttbroker.subscribers:
358+ if ctx in c:
359+ mqttbroker.subscribers[t] = filter (c, proc (x: MqttCtx ): bool = x != ctx)
360360
361- if mqttbroker.subscribers[t].len () == 0 :
361+ if mqttbroker.subscribers[t].len () == 0 :
362362 delTop.add (t)
363363
364364 for t in delTop:
365- mqttbroker.subscribers.del (t)
365+ mqttbroker.subscribers.del (t)
366366
367367when defined (broker):
368- proc qosAlign (qP, qS: uint8 ): uint8 =
368+ proc qosAlign (qP, qS: uint8 ): uint8 =
369369 # # Aligns the QOS for publisher and subscriber.
370- if qP == qS:
371- result = qP
372- elif qP > qS:
373- result = qS
370+ if qP == qS:
371+ result = qP
372+ elif qP > qS:
373+ result = qS
374374 else :
375- result = qP
375+ result = qP
376376
377377when defined (broker):
378378 proc keepAliveMonitor (ctx: MqttCtx ) {.async .}
@@ -671,31 +671,31 @@ proc work(ctx: MqttCtx) {.async.} =
671671 ctx.inWork = false
672672
673673when defined (broker):
674- proc sendWill (ctx: MqttCtx ) {.async .} =
675- # # Send the will
676- if ctx.willTopic != " " :
677- for c in mqttbroker.subscribers[ctx.willTopic]:
678- let msgId = c.nextMsgId ()
679- let qos = qosAlign (ctx.willQos, c.subscribed[ctx.willTopic])
680- c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: ctx.willTopic, qos: qos, message: ctx.willMsg, typ: Publish )
681- await c.work ()
674+ proc sendWill (ctx: MqttCtx ) {.async .} =
675+ # # Send the will
676+ if ctx.willTopic != " " :
677+ for c in mqttbroker.subscribers[ctx.willTopic]:
678+ let msgId = c.nextMsgId ()
679+ let qos = qosAlign (ctx.willQos, c.subscribed[ctx.willTopic])
680+ c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: ctx.willTopic, qos: qos, message: ctx.willMsg, typ: Publish )
681+ await c.work ()
682682
683683when defined (broker):
684684 proc publishToSubscribers (seqctx: seq [MqttCtx ], pkt: Pkt , topic, message: string , qos: uint8 , retain: bool , senderId: string ) {.async .} =
685- # # Publish async to clients
686- for c in seqctx:
687- if c.state != Connected :
688- asyncCheck removeSubscriber (c, topic)
689- continue
690- let
691- msgId = c.nextMsgId ()
692- qosSub = qosAlign (qos, c.subscribed[topic])
693-
694- if mqttbroker.passClientId:
685+ # # Publish async to clients
686+ for c in seqctx:
687+ if c.state != Connected :
688+ asyncCheck removeSubscriber (c, topic)
689+ continue
690+ let
691+ msgId = c.nextMsgId ()
692+ qosSub = qosAlign (qos, c.subscribed[topic])
693+
694+ if mqttbroker.passClientId:
695695 c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: topic, qos: qosSub, retain: retain, message: senderId & " :" & message, typ: Publish )
696- else :
696+ else :
697697 c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: topic, qos: qosSub, retain: retain, message: message, typ: Publish )
698- await c.work ()
698+ await c.work ()
699699
700700when defined (broker):
701701 proc denyConnect (ctx: MqttCtx , connFlag: ConnAckFlag ) {.async .} =
@@ -709,47 +709,47 @@ proc onConnect(ctx: MqttCtx, pkt: Pkt) {.async.} =
709709 when not defined (broker):
710710 ctx.wrn " Packet type only supported for broker: " & $ pkt.typ
711711 else :
712- var
713- offset: int
714- nextLen: uint16
715-
716- # Main data
717- (ctx.proto, offset) = pkt.getstring (0 , true )
718- (ctx.version, offset) = pkt.getu8 (offset)
719- (ctx.connFlags, offset) = getbin (pkt, offset)
720- (ctx.keepAlive, offset) = pkt.getu16 (offset)
721- (nextLen, offset) = pkt.getu16 (offset)
722- (ctx.clientId, offset) = pkt.getstring (offset, parseInt ($ nextLen))
723- if not mqttbroker.spacesInClientId:
724- ctx.clientid = ctx.clientid.replace (" " , " " )
725-
726- # Will Topic
727- if ctx.connFlags[5 ] == '1' :
728- (nextLen, offset) = pkt.getu16 (offset)
729- (ctx.willTopic, offset) = pkt.getstring (offset, parseInt ($ nextLen))
730- (nextLen, offset) = pkt.getu16 (offset)
731- (ctx.willMsg, offset) = pkt.getstring (offset, parseInt ($ nextLen))
732-
733- # Will Retain
734- if ctx.connFlags[2 ] == '1' :
735- ctx.willRetain = true
736-
737- # Will qos=2
738- if ctx.connFlags[3 ] == '1' :
739- ctx.willQos = 2 .uint8
740- # Will qos=1
741- elif ctx.connFlags[4 ] == '1' :
742- ctx.willQos = 1 .uint8
743-
744- # Username
745- if ctx.connFlags[0 ] == '1' :
712+ var
713+ offset: int
714+ nextLen: uint16
715+
716+ # Main data
717+ (ctx.proto, offset) = pkt.getstring (0 , true )
718+ (ctx.version, offset) = pkt.getu8 (offset)
719+ (ctx.connFlags, offset) = getbin (pkt, offset)
720+ (ctx.keepAlive, offset) = pkt.getu16 (offset)
746721 (nextLen, offset) = pkt.getu16 (offset)
747- (ctx.username, offset) = pkt.getstring (offset, parseInt ($ nextLen))
748-
749- # Password
750- if ctx.connFlags[1 ] == '1' :
751- (nextLen, offset) = pkt.getu16 (offset)
752- (ctx.password, offset) = pkt.getstring (offset, parseInt ($ nextLen))
722+ (ctx.clientId, offset) = pkt.getstring (offset, parseInt ($ nextLen))
723+ if not mqttbroker.spacesInClientId:
724+ ctx.clientid = ctx.clientid.replace (" " , " " )
725+
726+ # Will Topic
727+ if ctx.connFlags[5 ] == '1' :
728+ (nextLen, offset) = pkt.getu16 (offset)
729+ (ctx.willTopic, offset) = pkt.getstring (offset, parseInt ($ nextLen))
730+ (nextLen, offset) = pkt.getu16 (offset)
731+ (ctx.willMsg, offset) = pkt.getstring (offset, parseInt ($ nextLen))
732+
733+ # Will Retain
734+ if ctx.connFlags[2 ] == '1' :
735+ ctx.willRetain = true
736+
737+ # Will qos=2
738+ if ctx.connFlags[3 ] == '1' :
739+ ctx.willQos = 2 .uint8
740+ # Will qos=1
741+ elif ctx.connFlags[4 ] == '1' :
742+ ctx.willQos = 1 .uint8
743+
744+ # Username
745+ if ctx.connFlags[0 ] == '1' :
746+ (nextLen, offset) = pkt.getu16 (offset)
747+ (ctx.username, offset) = pkt.getstring (offset, parseInt ($ nextLen))
748+
749+ # Password
750+ if ctx.connFlags[1 ] == '1' :
751+ (nextLen, offset) = pkt.getu16 (offset)
752+ (ctx.password, offset) = pkt.getstring (offset, parseInt ($ nextLen))
753753
754754 # Check password and username
755755 if mqttbroker.passwords.len () > 0 :
@@ -759,7 +759,7 @@ proc onConnect(ctx: MqttCtx, pkt: Pkt) {.async.} =
759759 return
760760
761761 # 3.1.2.2 Protocol Level
762- if ctx.proto != " MQTT" :
762+ if ctx.proto != " MQTT" :
763763 await denyConnect (ctx, ConnRefProtocol )
764764 return
765765
@@ -771,19 +771,19 @@ proc onConnect(ctx: MqttCtx, pkt: Pkt) {.async.} =
771771 # Check if clientid already exist as an connection.
772772 # Close connection if clientID is identical to existing clientID [MQTT-3.1.4-2]
773773 if mqttbroker.connections.hasKey (ctx.clientid):
774- if mqttbroker.clientKickOld:
775- mqttbroker.connections[ctx.clientid].state = Disabled
776- mqttbroker.connections[ctx.clientid].s.close ()
777- else :
774+ if mqttbroker.clientKickOld:
775+ mqttbroker.connections[ctx.clientid].state = Disabled
776+ mqttbroker.connections[ctx.clientid].s.close ()
777+ else :
778778 await denyConnect (ctx, ConnRefRejected )
779779 return
780780
781781 # Require a clientID [MQTT-3.1.3-5], [MQTT-3.1.3-8], [MQTT-3.1.3-9]
782782 if ctx.clientid.strip () == " " :
783783 # [MQTT-3.1.3-6] Assign random unique key
784- if mqttbroker.emptyClientId:
785- ctx.clientid = $ r.next ()
786- else :
784+ if mqttbroker.emptyClientId:
785+ ctx.clientid = $ r.next ()
786+ else :
787787 await denyConnect (ctx, ConnRefRejected )
788788 return
789789
@@ -859,7 +859,7 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
859859 # Check if client already has published a retained messaged on this topic. In that
860860 # case do not add it, since the QOS, msg and time is preserved in the MqttBroker.retained.
861861 if topic notin ctx.retained:
862- ctx.retained.add (topic)
862+ ctx.retained.add (topic)
863863
864864 if mqttbroker.verbosity >= 1 :
865865 verbose (" Retained " , mqttbroker.retained)
@@ -914,12 +914,12 @@ proc onSubscribe(ctx: MqttCtx, pkt: Pkt) {.async.} =
914914 when not defined (broker):
915915 ctx.wrn " Packet type only supported for broker: " & $ pkt.typ
916916 else :
917- var
918- offset: int
919- msgId: MsgId
920- topic: string
921- qos: uint8
922- nextLen: uint16
917+ var
918+ offset: int
919+ msgId: MsgId
920+ topic: string
921+ qos: uint8
922+ nextLen: uint16
923923
924924 (msgId, offset) = pkt.getu16 (0 )
925925 ctx.msgIdSeq = msgId
@@ -966,11 +966,11 @@ proc onUnsubscribe(ctx: MqttCtx, pkt: Pkt) {.async.} =
966966 when not defined (broker):
967967 ctx.wrn " Packet type only supported for broker: " & $ pkt.typ
968968 else :
969- var
970- offset: int
971- msgId: MsgId
972- topic: string
973- nextLen: uint16
969+ var
970+ offset: int
971+ msgId: MsgId
972+ topic: string
973+ nextLen: uint16
974974
975975 (msgId, offset) = pkt.getu16 (0 )
976976 ctx.msgIdSeq = msgId
@@ -1001,9 +1001,9 @@ proc onDisconnect(ctx: MqttCtx, pkt: Pkt) {.async.} =
10011001 when not defined (broker):
10021002 ctx.wrn " Packet type only supported for broker: " & $ pkt.typ
10031003 else :
1004- # await removeSubscriber(ctx)
1005- # await sendWill(ctx)
1006- ctx.state = Disconnected
1004+ # await removeSubscriber(ctx)
1005+ # await sendWill(ctx)
1006+ ctx.state = Disconnected
10071007 if mqttbroker.verbosity >= 1 :
10081008 verbose (" Connections >> " & ctx.clientid & " has disconnected" )
10091009
0 commit comments