1- include " ../nmqtt.nim"
1+ #
2+ # Code for the broker
3+ # --------------------
4+ #
5+ # This file contains the code for the MQTT broker. Broker specific procs
6+ # and CLI are defined within this file, while all base MQTT-code is included
7+ # from the main library file (`../nmqtt.nim`).
8+ #
9+ # This file is named the same as the library file, so when the binaries are
10+ # generated, the brokers binary will be named `nmqtt`.
11+ #
12+
13+ include " ../nmqtt.nim"
14+
15+ proc keepAliveMonitor (ctx: MqttCtx ) {.async .} =
16+ ctx.lastAction = epochTime ()
17+ # The keep alive time is one and a half times the Keep Alive periode [MQTT-3.1.2-24]
18+ let keepAlive = toFloat (ctx.keepAlive.int ) * 1.5 * 1000
19+ while ctx.state in [Connecting , Connected ]:
20+ let saveLastAction = ctx.lastAction
21+ await sleepAsync (keepAlive)
22+ if ctx.lastAction <= saveLastAction:
23+ ctx.state = Error
24+ ctx.s.close ()
25+ if mqttbroker.verbosity >= 1 :
26+ verbose (" Connections >> " & ctx.clientid & " was disconnected. Keep alive time overdue." )
27+ break
28+
29+
30+ proc processClient (s: AsyncSocket ) {.async .} =
31+ # # Create new client
32+ let ctx = MqttCtx ()
33+ ctx.s = s
34+ ctx.state = Connecting
35+ ctx.sslOn = mqttbroker.sslOn
36+
37+ while ctx.state in [Connecting , Connected ]:
38+ try :
39+ var pkt = await ctx.recv ()
40+ if pkt.typ == Notype :
41+ ctx.state = Error
42+ break
43+ await ctx.handle (pkt)
44+ except :
45+ # We are closing the socket, when we are disabling or disconnecting
46+ # the client. Therefor in these situations, the `ctx.state` must not
47+ # be set to `Error`, since we are breaking the socket on purpose.
48+ if ctx.state notin [Disabled , Disconnecting ]:
49+ ctx.state = Error
50+ break
51+
52+ if not ctx.beenConnected:
53+ # When the client hasn't been connected to the broker, then we don't
54+ # want to check for LWT, subscriptions etc. - we just want to close
55+ # and remove the socket!
56+ try :
57+ ctx.s.close ()
58+ except :
59+ if mqttbroker.verbosity >= 3 and ctx.sslOn:
60+ wrn (" Someone is trying to connect, properly without SSL." )
61+ return
62+
63+ if ctx.state == Error and ctx.beenConnected:
64+ # This happens on a ungraceful disconnects from the client.
65+ asyncCheck sendWill (ctx)
66+
67+ if ctx.state in [Disconnected , Error ] and ctx.beenConnected:
68+ # Remove the client from the register for subscribers.
69+ asyncCheck removeSubscriber (ctx)
70+
71+ if ctx.state != Disabled and ctx.beenConnected:
72+ # The clients `state` is set to `Disabled`, if we cannot accept their
73+ # `Connect`-packet and respond with a `ConnAck`. Since we dont accept
74+ # the connection, the client is never added to `mqttbroker.connections`.
75+ if mqttbroker.connections.hasKey (ctx.clientid):
76+ mqttbroker.connections.del (ctx.clientid)
77+
78+ # Cleanup retained messages from client.
79+ for top in ctx.retained:
80+ if mqttbroker.retained[top].clientid == ctx.clientid:
81+ mqttbroker.retained.del (top)
82+
83+ if not ctx.s.isClosed () and ctx.beenConnected:
84+ ctx.s.close ()
85+ ctx.state = Disabled
86+
87+ if mqttbroker.verbosity >= 3 :
88+ verbose (ctx)
89+
90+
91+ proc serve (host: string , port: int ) {.async .} =
92+ var broker = newAsyncSocket ()
93+ broker.setSockOpt (OptReuseAddr , true )
94+ broker.bindAddr (Port (port), host)
95+ broker.listen ()
96+
97+ if mqttbroker.sslOn:
98+ if not fileExists (mqttbroker.sslCert) or not fileExists (mqttbroker.sslKey):
99+ echo " SSL cert or key does not exist. Check the path or generate them:\n " &
100+ " openssl req -x509 -nodes -days 365 -newkey rsa:4096 -keyout mykey.pem -out mycert.pem"
101+ var sslCtx = newContext (certFile = mqttbroker.sslCert, keyFile = mqttbroker.sslKey)
102+ wrapSocket (sslCtx, broker)
103+ while true :
104+ let client = await broker.accept ()
105+ wrapConnectedSocket (sslCtx, client, handshakeAsServer)
106+ asyncCheck processClient (client)
107+
108+ else :
109+ while true :
110+ let client = await broker.accept ()
111+ asyncCheck processClient (client)
112+
113+
114+ proc showConf (mb: MqttBroker , configfile: string ) =
115+ # # Show the config details
116+
117+ echo """
118+ Running nmqtt v$1
119+
120+ BROKER:
121+ Host: $2
122+ Port: $3
123+ SSL: $4
124+ Starting: $5
125+
126+ OPTIONS:
127+ Verbosity: $6
128+ Max connections: $7
129+ ClientID max lenght: $8
130+ ClientID allow spaces: $9
131+ ClientID allow empty: $10
132+ ClientID in payload: $11
133+ Client kick old: $12
134+ Number of passwords: $13
135+
136+ """ .format (nmqttVersion, mb.host, mb.port, mb.sslOn, now (), mb.verbosity,
137+ mb.maxConnections, mb.clientIdMaxLen, mb.spacesInClientId,
138+ mb.emptyClientId, mb.passClientId, mb.clientKickOld, mb.passwords.len ()
139+ )
140+
141+
142+ if configfile != " " :
143+ echo """
144+ CONFIG:
145+ Using configuration file:
146+ $1
147+
148+ """ .format (configfile)
149+
150+
151+ proc loadPasswords (passwordFile: string ) =
152+ # # Loads the usernames and passwords
153+ if passwordFile == " " :
154+ echo " \n Path to password file is missing..\n "
155+ quit ()
156+ elif not fileExists (passwordFile):
157+ echo " \n Password file does not exists..\n - " & passwordFile
158+ quit ()
159+
160+ for line in readFile (passwordFile).split (" \n " ):
161+ let pass = split (line, " :" , maxsplit= 1 )
162+ mqttbroker.passwords[pass[0 ]] = pass[1 ]
163+
164+
165+ proc loadConf (mb: MqttBroker , config: string ) =
166+ # # Parses the config file
167+
168+ if not fileExists (config):
169+ echo " \n Configuration files does not exists.."
170+ quit ()
171+
172+ let dict = loadConfig (config)
173+ mqttbroker.version = 4
174+ mqttbroker.host = dict.getSectionValue (" " ," host" )
175+ mqttbroker.port = Port (parseInt (dict.getSectionValue (" " ," port" )))
176+ mqttbroker.verbosity = parseInt (dict.getSectionValue (" " ," verbosity" ))
177+ mqttbroker.clientIdMaxLen = parseInt (dict.getSectionValue (" " ," clientid_maxlen" ))
178+ mqttbroker.spacesInClientId = parseBool (dict.getSectionValue (" " ," clientid_spaces" ))
179+ mqttbroker.emptyClientId = parseBool (dict.getSectionValue (" " ," clientid_empty" ))
180+ mqttbroker.passClientId = parseBool (dict.getSectionValue (" " ," clientid_pass" ))
181+ mqttbroker.clientKickOld = parseBool (dict.getSectionValue (" " ," client_kickold" ))
182+ mqttbroker.maxConnections = parseInt (dict.getSectionValue (" " ," max_conn" ))
183+ mqttbroker.sslCert = dict.getSectionValue (" " ," ssl_certificate" )
184+ mqttbroker.sslKey = dict.getSectionValue (" " ," ssl_key" )
185+
186+ # If both certificate and key are present use SSL.
187+ if mqttbroker.sslCert != " " and mqttbroker.sslKey != " " :
188+ mqttbroker.sslOn = true
189+
190+ # If anonymous login is allowed return
191+ if not parseBool (dict.getSectionValue (" " ," allow_anonymous" )):
192+
193+ # Is password is required parse the password file
194+ let passwordFile = dict.getSectionValue (" " ," password_file" )
195+ loadPasswords (passwordFile)
196+
197+
198+ proc handler () {.noconv .} =
199+ # # Catch ctrl+c from user
200+ echo " "
201+ if mqttbroker.verbosity >= 3 :
202+ verbose (mqttbroker)
203+ quit ()
204+
205+
206+ proc nmqttBroker (config= " " , host= " 127.0.0.1" , port= 1883 , verbosity= 0 , max_conn= 0 ,
207+ clientid_maxlen= 60 , clientid_spaces= false , clientid_empty= false ,
208+ client_kickold= false , clientid_pass= false , password_file= " " ,
209+ ssl= false , ssl_cert= " " , ssl_key= " "
210+ ) {.async .} =
211+ # # CLI tool for a MQTT broker
212+ if config != " " :
213+ loadConf (mqttbroker, config)
214+ else :
215+ mqttbroker.version = 4
216+ mqttbroker.host = host
217+ mqttbroker.port = Port (port)
218+ mqttbroker.verbosity = verbosity
219+ mqttbroker.clientIdMaxLen = clientid_maxlen
220+ mqttbroker.spacesInClientId = clientid_spaces
221+ mqttbroker.emptyClientId = clientid_empty
222+ mqttbroker.clientKickOld = client_kickold
223+ mqttbroker.passClientId = clientid_pass
224+ mqttbroker.maxConnections = max_conn
225+
226+ if password_file != " " :
227+ loadPasswords (password_file)
228+
229+ if ssl:
230+ mqttbroker.sslOn = true
231+ mqttbroker.sslCert = ssl_cert
232+ mqttbroker.sslKey = ssl_key
233+
234+ if mqttbroker.verbosity >= 1 :
235+ showConf (mqttbroker, config)
236+
237+ asyncCheck serve (host, port)
238+
239+ setControlCHook (handler)
240+
241+ runForever ()
242+
243+
244+
245+ when isMainModule :
246+
247+ let topLvlUse = """ nmqtt version """ & nmqttVersion & """
248+
249+
250+ nmqtt is a MQTT v3.1.1 broker
251+
252+ USAGE
253+ $command [options]
254+ $command [-c /path/to/config.conf]
255+ $command [-h hostIP -p port]
256+
257+ CONFIG
258+ Use the configuration file for detailed settings,
259+ such as SSL, adjusting keep alive timer, etc. or
260+ specify options at the command line.
261+
262+ To add and delete users from the password file
263+ please use nmqtt_password:
264+ - nmqtt_password -a|-b|-d [options]
265+
266+ OPTIONS
267+ $options
268+ """
269+ clCfg.hTabCols = @ [clOptKeys, clDescrip]
270+
271+ dispatchGen (nmqttBroker,
272+ cmdName= " nmqtt" ,
273+ help= {
274+ " config" : " absolute path to the config file. Overrides all other options." ,
275+ " host" : " IP-address to serve the broker on." ,
276+ " port" : " network port to accept connecting from." ,
277+ " verbosity" : " verbosity from 0-3." ,
278+ " max-conn" : " max simultaneous connections. Defaults to no limit." ,
279+ " clientid-maxlen" : " max lenght of clientid. Defaults to 65535." ,
280+ " clientid-spaces" : " allow spaces in clientid. Defaults to false." ,
281+ " clientid-empty" : " allow empty clientid and assign random id. Defaults to false." ,
282+ " client-kickold" : " kick old client, if new client has same clientid. Defaults to false." ,
283+ " clientid-pass" : " pass clientid in payload {clientid:payload}. Defaults to false." ,
284+ " password-file" : " absolute path to the password file" ,
285+ " ssl" : " activate ssl for the broker - requires --ssl-cert and --ssl-key." ,
286+ " ssl-cert" : " absolute path to the ssl certificate." ,
287+ " ssl-key" : " absolute path to the ssl key."
288+ },
289+ short= {
290+ " help" : '?' ,
291+ " max-conn" : '\0 ' ,
292+ " ssl" : '\0 ' ,
293+ " ssl-cert" : '\0 ' ,
294+ " ssl-key" : '\0 '
295+ },
296+ usage= topLvlUse,
297+ dispatchName= " brokerCli"
298+ )
299+
300+ cligenQuit brokerCli (skipHelp= true )
0 commit comments