Skip to content

Commit 93c7c2f

Browse files
committed
Track and join socket threads on shutdown
Improve BaseProcessorSocket shutdown and restart behavior by tracking and joining threads. - Import multiprocessing.connection.Client to wake a blocking accept() during stop. - Add _accept_thread, _rx_thread and _rx_threads to track the accept thread and per-connection receiver threads. - Start and store the accept thread instead of launching it anonymously; store and start rx threads when clients connect. - On stop(), set _stop, attempt a Client connect to unblock accept(), close connections/listener, then join the accept thread and best-effort join tracked rx threads before clearing them. - Clear _stop in start_server if previously set to allow restart, and minor whitespace/newline cleanup. These changes reduce race conditions on restart, make shutdowns cleaner (especially on Windows), and help avoid lingering daemon threads.
1 parent 98bcac5 commit 93c7c2f

1 file changed

Lines changed: 37 additions & 3 deletions

File tree

dlclivegui/processors/dlc_processor_socket.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import time
99
from collections import deque
1010
from math import acos, atan2, copysign, degrees, pi, sqrt
11-
from multiprocessing.connection import Listener
11+
from multiprocessing.connection import Client, Listener
1212
from pathlib import Path
1313
from threading import Event, Thread
1414

@@ -140,6 +140,9 @@ def __init__(
140140
self.address = bind
141141
self.authkey = authkey if authkey is not None else (b"secret password" if bind is not None else None)
142142
self.listener = None
143+
self._accept_thread = None
144+
self._rx_thread = None
145+
self._rx_threads = set()
143146
self._socket_timeout = float(socket_timeout)
144147

145148
self._stop = Event()
@@ -180,6 +183,9 @@ def start_server(self, bind, authkey=b"secret password", *, timeout: float = 1.0
180183
if self.listener is not None:
181184
return
182185

186+
if self._stop.is_set():
187+
self._stop.clear()
188+
183189
self.address = bind
184190
self.authkey = authkey
185191

@@ -190,7 +196,8 @@ def start_server(self, bind, authkey=b"secret password", *, timeout: float = 1.0
190196
except Exception:
191197
pass
192198

193-
Thread(target=self._accept_loop, daemon=True).start()
199+
self._accept_thread = Thread(target=self._accept_loop, daemon=True)
200+
self._accept_thread.start()
194201
logger.info(f"Processor server started on {bind[0]}:{bind[1]}")
195202

196203
# --------------------------------------------------------------------------------------
@@ -215,7 +222,9 @@ def _accept_loop(self):
215222
logger.debug(f"Client connected from {self.listener.last_accepted}")
216223
self.conns.add(conn)
217224

218-
Thread(target=self._rx_loop, args=(conn,), daemon=True).start()
225+
self._rx_thread = Thread(target=self._rx_loop, args=(conn,), daemon=True)
226+
self._rx_threads.add(self._rx_thread)
227+
self._rx_thread.start()
219228

220229
except (TimeoutError, OSError, EOFError):
221230
if self._stop.is_set():
@@ -323,11 +332,36 @@ def stop(self):
323332
logger.info("Stopping processor...")
324333
self._stop.set()
325334

335+
# Wake accept() so the accept loop exits quickly (especially helpful on Windows)
336+
# This is safe even if no clients are connected.
337+
try:
338+
if self.address is not None and self.authkey is not None:
339+
c = Client(self.address, authkey=self.authkey)
340+
c.close()
341+
except Exception:
342+
pass
343+
326344
for conn in list(self.conns):
327345
self._close_conn(conn)
328346

329347
self._close_listener()
330348

349+
# Join accept thread to avoid race conditions on restart
350+
if self._accept_thread is not None:
351+
self._accept_thread.join(timeout=2.0)
352+
if self._accept_thread.is_alive():
353+
logger.warning("Accept thread did not terminate cleanly")
354+
self._accept_thread = None
355+
356+
# Join rx threads briefly (best effort)
357+
for t in list(self._rx_threads):
358+
try:
359+
t.join(timeout=1.0)
360+
except Exception:
361+
pass
362+
self._rx_threads.clear()
363+
self._rx_thread = None
364+
331365
# Small Windows delay to help TIME_WAIT cleanup
332366
if sys.platform.startswith("win"):
333367
if hasattr(socket, "SO_EXCLUSIVEADDRUSE"):

0 commit comments

Comments
 (0)