Skip to content

Commit ec84d0e

Browse files
committed
Send MQTT packets atomically in a single socket write
Previously the fixed header and payload were sent in separate async socket writes. Since each write awaits, other tasks could interleave writes on the same socket, potentially corrupting packet framing. This change builds a single contiguous buffer and sends it in one async write call to ensure packet atomicity. Signed-off-by: Takeyoshi Kikuchi <kikuchi@centurysys.co.jp>
1 parent 93bc8e5 commit ec84d0e

1 file changed

Lines changed: 7 additions & 6 deletions

File tree

nmqtt.nim

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -355,24 +355,25 @@ proc send(ctx: MqttCtx, pkt: Pkt): Future[bool] {.async.} =
355355
if ctx.state notin {Connecting, Connected, Disconnecting}:
356356
return false
357357

358-
var hdr: seq[uint8]
359-
hdr.add (pkt.typ.int shl 4).uint8 or pkt.flags
358+
var buf: seq[uint8] = newSeqOfCap[uint8](1 + 4 + pkt.data.len)
359+
buf.add (pkt.typ.int shl 4).uint8 or pkt.flags
360360

361361
var len = pkt.data.len
362362
while true:
363363
var b = len mod 128
364364
len = len div 128
365365
if len > 0:
366366
b = b or 128
367-
hdr.add b.uint8
367+
buf.add b.uint8
368368
if len == 0:
369369
break
370370

371371
ctx.dmp "tx> " & $pkt
372-
await ctx.s.send(hdr[0].unsafeAddr, hdr.len)
373-
374372
if pkt.data.len > 0:
375-
await ctx.s.send(pkt.data[0].unsafeAddr, pkt.data.len)
373+
let hdrlen = buf.len
374+
buf.setLen(hdrlen + pkt.data.len)
375+
copyMem(buf[hdrlen].addr, pkt.data[0].unsafeAddr, pkt.data.len)
376+
await ctx.s.send(buf[0].unsafeAddr, buf.len)
376377

377378
return true
378379

0 commit comments

Comments
 (0)