Skip to content

Commit 1e3fbe1

Browse files
committed
Support for SSL in broker
1 parent 88c0f52 commit 1e3fbe1

2 files changed

Lines changed: 100 additions & 42 deletions

File tree

src/nmqtt.nim

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ type
8080
MqttCtx* = ref object
8181
host: string
8282
port: Port
83-
doSsl: bool
83+
sslOn: bool
84+
verbosity: int
85+
beenConnected: bool
8486
username: string
8587
password: string
8688
state: State
@@ -187,7 +189,9 @@ when defined(broker):
187189
MqttBroker* = ref object
188190
host: string
189191
port: Port
190-
doSsl: bool
192+
sslOn: bool
193+
sslCert: string
194+
sslKey: string
191195
verbosity: int
192196
connections: Table[string, MqttCtx]
193197
retained: Table[string, RetainedMsg] # Topic, RetaindMsg
@@ -276,7 +280,7 @@ proc newPkt(typ: PktType=NOTYPE, flags: uint8=0): Pkt =
276280
#
277281
proc dmp(ctx: MqttCtx, s: string) =
278282
when not defined(broker):
279-
if defined(dev):
283+
if defined(dev) or ctx.verbosity >= 2:
280284
stderr.write "\e[1;30m" & s & "\e[0m\n"
281285
when defined(broker):
282286
if defined(dev) or mqttbroker.verbosity >= 2:
@@ -393,7 +397,8 @@ proc sendDisconnect(ctx: MqttCtx): Future[bool] {.async.}
393397
proc close(ctx: MqttCtx, reason: string) {.async.} =
394398
if ctx.state in {Connecting, Connected}:
395399
ctx.state = Disconnecting
396-
ctx.dbg "Closing: " & reason
400+
if ctx.verbosity >= 1:
401+
ctx.dbg "Closing: " & reason
397402
discard await ctx.sendDisconnect()
398403
ctx.s.close()
399404
ctx.state = Disconnected
@@ -433,7 +438,15 @@ proc recv(ctx: MqttCtx): Future[Pkt] {.async.} =
433438

434439
var r: int
435440
var b: uint8
436-
r = await ctx.s.recvInto(b.addr, b.sizeof)
441+
442+
# TODO:
443+
# When the broker is running in SSL-mode, we need the try/except, since
444+
# we will encounter a silent crash in recvInto, when the client is
445+
# not actually using SSL.
446+
try:
447+
r = await ctx.s.recvInto(b.addr, b.sizeof)
448+
except:
449+
return
437450
if r != 1:
438451
when not defined(broker):
439452
await ctx.close("remote closed connection")
@@ -800,6 +813,7 @@ proc onConnect(ctx: MqttCtx, pkt: Pkt) {.async.} =
800813

801814
mqttbroker.connections[ctx.clientid] = ctx
802815
ctx.state = Connected
816+
ctx.beenConnected = true
803817
asyncCheck keepAliveMonitor(ctx)
804818

805819
if mqttbroker.verbosity >= 1:
@@ -814,7 +828,9 @@ proc onConnAck(ctx: MqttCtx, pkt: Pkt): Future[void] =
814828
ctx.state = Connected
815829
let (code, _) = pkt.getu8(1)
816830
if code == 0:
817-
ctx.dbg "Connection established"
831+
ctx.beenConnected = true
832+
if ctx.verbosity >= 1:
833+
ctx.dbg "Connection established"
818834
else:
819835
ctx.wrn "Connect failed, code: " & $code
820836
result = ctx.work()
@@ -1056,7 +1072,8 @@ proc runRx(ctx: MqttCtx) {.async.} =
10561072
break
10571073
await ctx.handle(pkt)
10581074
except OsError:
1059-
ctx.wrn "Boom, sending packet to closed socket"
1075+
if ctx.verbosity >= 2:
1076+
ctx.wrn "Boom, socket is closed"
10601077

10611078
proc runPing(ctx: MqttCtx) {.async.} =
10621079
while true:
@@ -1071,10 +1088,11 @@ proc connectBroker(ctx: MqttCtx) {.async.} =
10711088
if ctx.keepAlive == 0:
10721089
ctx.keepAlive = 60
10731090

1074-
ctx.dbg "Connecting to " & ctx.host & ":" & $ctx.port
1091+
if ctx.verbosity >= 1:
1092+
ctx.dbg "Connecting to " & ctx.host & ":" & $ctx.port
10751093
try:
10761094
ctx.s = await asyncnet.dial(ctx.host, ctx.port)
1077-
if ctx.doSsl:
1095+
if ctx.sslOn:
10781096
when defined(ssl):
10791097
ctx.ssl = newContext(protSSLv23, CVerifyNone)
10801098
wrapConnectedSocket(ctx.ssl, ctx.s, handshakeAsClient)
@@ -1087,7 +1105,10 @@ proc connectBroker(ctx: MqttCtx) {.async.} =
10871105
asyncCheck ctx.runRx()
10881106
asyncCheck ctx.runPing()
10891107
except OSError as e:
1090-
ctx.dbg "Error connecting to " & ctx.host & " " & e.msg
1108+
if ctx.verbosity >= 1 or not ctx.beenConnected:
1109+
ctx.dbg "Error connecting to " & ctx.host
1110+
if ctx.verbosity >= 2:
1111+
echo e.msg
10911112
ctx.state = Error
10921113

10931114
proc runConnect(ctx: MqttCtx) {.async.} =
@@ -1125,11 +1146,11 @@ proc set_ping_interval*(ctx: MqttCtx, txInterval: int = 60) =
11251146
if txInterval > 0 and txInterval < 65535:
11261147
ctx.keepAlive = txInterval.uint16
11271148

1128-
proc set_host*(ctx: MqttCtx, host: string, port: int=1883, doSsl=false) =
1149+
proc set_host*(ctx: MqttCtx, host: string, port: int=1883, sslOn=false) =
11291150
## Set the MQTT host
11301151
ctx.host = host
11311152
ctx.port = Port(port)
1132-
ctx.doSsl = doSsl
1153+
ctx.sslOn = sslOn
11331154

11341155
proc set_auth*(ctx: MqttCtx, username: string, password: string) =
11351156
## Set the authentication for the host.
@@ -1144,6 +1165,10 @@ proc set_will*(ctx: MqttCtx, topic, msg: string, qos=0, retain=false) =
11441165
ctx.willQoS = qos.uint8
11451166
ctx.willRetain = retain
11461167

1168+
proc set_verbosity*(ctx: MqttCtx, verbosity: int) =
1169+
## Set the verbosity.
1170+
ctx.verbosity = verbosity
1171+
11471172
proc connect*(ctx: MqttCtx) {.async.} =
11481173
## Connect to the broker.
11491174
await ctx.connectBroker()
@@ -1158,7 +1183,7 @@ proc start*(ctx: MqttCtx) {.async.} =
11581183

11591184
proc disconnect*(ctx: MqttCtx) {.async.} =
11601185
## Disconnect from the broker.
1161-
await ctx.close("User request")
1186+
await ctx.close("disconnect")
11621187
ctx.state = Disabled
11631188

11641189
proc publish*(ctx: MqttCtx, topic: string, message: string, qos=0, retain=false) {.async.} =

src/utils/broker.nim

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ proc processClient(s: AsyncSocket) {.async.} =
1818
let ctx = MqttCtx()
1919
ctx.s = s
2020
ctx.state = Connecting
21+
ctx.sslOn = mqttbroker.sslOn
2122

2223
while ctx.state in [Connecting, Connected]:
2324
try:
@@ -34,15 +35,26 @@ proc processClient(s: AsyncSocket) {.async.} =
3435
ctx.state = Error
3536
break
3637

37-
if ctx.state == Error:
38+
if not ctx.beenConnected:
39+
# When the client hasn't been connected to the broker, then we don't
40+
# want to check for LWT, subscriptions etc. - we just want to close
41+
# and remove the socket!
42+
try:
43+
ctx.s.close()
44+
except:
45+
if mqttbroker.verbosity >= 3 and ctx.sslOn:
46+
wrn("Someone is trying to connect, properly without SSL.")
47+
return
48+
49+
if ctx.state == Error and ctx.beenConnected:
3850
# This happens on a ungraceful disconnects from the client.
3951
asyncCheck sendWill(ctx)
4052

41-
if ctx.state in [Disconnected, Error]:
53+
if ctx.state in [Disconnected, Error] and ctx.beenConnected:
4254
# Remove the client from the register for subscribers.
4355
asyncCheck removeSubscriber(ctx)
4456

45-
if ctx.state != Disabled:
57+
if ctx.state != Disabled and ctx.beenConnected:
4658
# The clients `state` is set to `Disabled`, if we cannot accept their
4759
# `Connect`-packet and respond with a `ConnAck`. Since we dont accept
4860
# the connection, the client is never added to `mqttbroker.connections`.
@@ -54,24 +66,35 @@ proc processClient(s: AsyncSocket) {.async.} =
5466
if mqttbroker.retained[top].clientid == ctx.clientid:
5567
mqttbroker.retained.del(top)
5668

57-
if not ctx.s.isClosed():
69+
if not ctx.s.isClosed() and ctx.beenConnected:
5870
ctx.s.close()
5971
ctx.state = Disabled
6072

6173
if mqttbroker.verbosity >= 3:
6274
verbose(ctx)
6375

6476

65-
6677
proc serve(host: string, port: int) {.async.} =
6778
var broker = newAsyncSocket()
6879
broker.setSockOpt(OptReuseAddr, true)
6980
broker.bindAddr(Port(port), host)
7081
broker.listen()
7182

72-
while true:
73-
let client = await broker.accept()
74-
asyncCheck processClient(client)
83+
if mqttbroker.sslOn:
84+
if not fileExists(mqttbroker.sslCert) or not fileExists(mqttbroker.sslKey):
85+
echo "SSL cert or key does not exist. Check the path or generate them:\n" &
86+
"openssl req -x509 -nodes -days 365 -newkey rsa:4096 -keyout mykey.pem -out mycert.pem"
87+
var sslCtx = newContext(certFile = mqttbroker.sslCert, keyFile = mqttbroker.sslKey)
88+
wrapSocket(sslCtx, broker)
89+
while true:
90+
let client = await broker.accept()
91+
wrapConnectedSocket(sslCtx, client, handshakeAsServer)
92+
asyncCheck processClient(client)
93+
94+
else:
95+
while true:
96+
let client = await broker.accept()
97+
asyncCheck processClient(client)
7598

7699

77100
proc showConf(mb: MqttBroker, configfile: string) =
@@ -143,6 +166,12 @@ proc loadConf(mb: MqttBroker, config: string) =
143166
mqttbroker.passClientId = parseBool(dict.getSectionValue("","clientid_pass"))
144167
mqttbroker.clientKickOld = parseBool(dict.getSectionValue("","client_kickold"))
145168
mqttbroker.maxConnections = parseInt(dict.getSectionValue("","max_conn"))
169+
mqttbroker.sslCert = dict.getSectionValue("","ssl_certificate")
170+
mqttbroker.sslKey = dict.getSectionValue("","ssl_key")
171+
172+
# If both certificate and key are present use SSL.
173+
if mqttbroker.sslCert != "" and mqttbroker.sslKey != "":
174+
mqttbroker.sslOn = true
146175

147176
# If anonymous login is allowed return
148177
if not parseBool(dict.getSectionValue("","allow_anonymous")):
@@ -154,20 +183,18 @@ proc loadConf(mb: MqttBroker, config: string) =
154183

155184
proc handler() {.noconv.} =
156185
## Catch ctrl+c from user
186+
echo " "
157187
if mqttbroker.verbosity >= 3:
158-
echo " "
159188
verbose(mqttbroker)
160-
echo "\nQuitting..\n"
161189
quit()
162-
setControlCHook(handler)
163190

164191

165192
proc nmqttBroker(config="", host="127.0.0.1", port=1883, verbosity=0, max_conn=0,
166193
clientid_maxlen=60, clientid_spaces=false, clientid_empty=false,
167-
client_kickold=false, clientid_pass=false, password_file=""
194+
client_kickold=false, clientid_pass=false, password_file="",
195+
ssl=false, ssl_cert="", ssl_key=""
168196
) {.async.} =
169197
## CLI tool for a MQTT broker
170-
171198
if config != "":
172199
loadConf(mqttbroker, config)
173200
else:
@@ -181,30 +208,33 @@ proc nmqttBroker(config="", host="127.0.0.1", port=1883, verbosity=0, max_conn=0
181208
mqttbroker.clientKickOld = client_kickold
182209
mqttbroker.passClientId = clientid_pass
183210
mqttbroker.maxConnections = max_conn
184-
#mqttbroker.retainExpire = 3600
185211

186212
if password_file != "":
187213
loadPasswords(password_file)
188-
echo mqttbroker.passwords
189214

190-
# if ssl:
191-
# newContext()
192-
# path to Key and Cert
193-
# if passwords:
194-
# password & username
215+
if ssl:
216+
mqttbroker.sslOn = true
217+
mqttbroker.sslCert = ssl_cert
218+
mqttbroker.sslKey = ssl_key
195219

196220
if mqttbroker.verbosity >= 1:
197221
showConf(mqttbroker, config)
198222

199-
#let broker = newAsyncSocket()
200-
201223
asyncCheck serve(host, port)
202224

225+
setControlCHook(handler)
226+
227+
runForever()
228+
203229

204230

205231
when isMainModule:
206232

207-
let topLvlUse = """${doc}
233+
let topLvlUse = """nmqtt version """ & nmqttVersion & """
234+
235+
236+
nmqtt is a MQTT v3.1.1 broker
237+
208238
USAGE
209239
$command [options]
210240
$command [-c /path/to/config.conf]
@@ -225,7 +255,6 @@ $options
225255
clCfg.hTabCols = @[clOptKeys, clDescrip]
226256

227257
dispatchGen(nmqttBroker,
228-
doc="nmqtt is a MQTT v3.1.1 broker",
229258
cmdName="nmqtt",
230259
help={
231260
"config": "absolute path to the config file. Overrides all other options.",
@@ -238,16 +267,20 @@ $options
238267
"clientid-empty": "allow empty clientid and assign random id. Defaults to false.",
239268
"client-kickold": "kick old client, if new client has same clientid. Defaults to false.",
240269
"clientid-pass": "pass clientid in payload {clientid:payload}. Defaults to false.",
241-
"password-file": "absolute path to the password file"
270+
"password-file": "absolute path to the password file",
271+
"ssl": "activate ssl for the broker - requires --ssl-cert and --ssl-key.",
272+
"ssl-cert": "absolute path to the ssl certificate.",
273+
"ssl-key": "absolute path to the ssl key."
242274
},
243275
short={
244276
"help": '?',
245-
"max-conn": '\0'
277+
"max-conn": '\0',
278+
"ssl": '\0',
279+
"ssl-cert": '\0',
280+
"ssl-key": '\0'
246281
},
247282
usage=topLvlUse,
248283
dispatchName="brokerCli"
249284
)
250285

251-
asyncCheck brokerCli(skipHelp=true)
252-
253-
runForever()
286+
cligenQuit brokerCli(skipHelp=true)

0 commit comments

Comments
 (0)