From 171e560a07813ab6d176fe21f0cb284505799c78 Mon Sep 17 00:00:00 2001 From: agessaman Date: Sun, 26 Apr 2026 09:37:57 -0700 Subject: [PATCH 1/7] Enhance KissModemWrapper command handling and response management - Introduced a set of allowed responses for SetHardware commands to handle legitimate OK responses. - Implemented single-flight command execution to prevent race conditions during concurrent requests. - Updated methods to accept an optional timeout parameter for better control over response waiting. - Enhanced response queue management to handle late or out-of-order responses effectively. - Added tests to ensure correct behavior of command responses and timeout handling. --- src/pymc_core/hardware/kiss_modem_wrapper.py | 246 +++++++++++--- src/pymc_core/node/dispatcher.py | 2 +- tests/test_dispatcher.py | 25 +- tests/test_kiss_modem_wrapper.py | 335 ++++++++++++++++++- 4 files changed, 548 insertions(+), 60 deletions(-) diff --git a/src/pymc_core/hardware/kiss_modem_wrapper.py b/src/pymc_core/hardware/kiss_modem_wrapper.py index 17180c1..09eaaae 100644 --- a/src/pymc_core/hardware/kiss_modem_wrapper.py +++ b/src/pymc_core/hardware/kiss_modem_wrapper.py @@ -221,6 +221,15 @@ class KissModemWrapper(LoRaRadio): specific packet, avoiding race conditions with get_last_rssi/get_last_snr. """ + # Some SetHardware requests may legitimately respond with OK instead of the + # command|0x80 specific response code. + _SETHW_ALLOW_OK_FOR: set[int] = { + HW_CMD_SET_RADIO, + HW_CMD_SET_TX_POWER, + HW_CMD_SET_SIGNAL_REPORT, + HW_CMD_REBOOT, + } + def __init__( self, port: str, @@ -295,9 +304,14 @@ def __init__( self._callback_executor: Optional[ThreadPoolExecutor] = None # Response handling + # Single-flight SetHardware command execution (send -> wait -> return) + self._command_lock = threading.Lock() self._response_event = threading.Event() self._pending_response: Optional[tuple[int, bytes]] = None self._response_lock = threading.Lock() + self._expected_response_subcmds: Optional[set[int]] = None + self._active_request_subcmd: Optional[int] = None + self._response_queue: deque[tuple[int, bytes]] = deque(maxlen=32) # TX completion tracking self._tx_done_event = threading.Event() @@ -734,33 +748,67 @@ def _send_command( Returns: Tuple of (response_sub_cmd, response_data) or None on timeout """ - with self._response_lock: - self._response_event.clear() - self._pending_response = None + # Ensure SetHardware requests are single-flight. This prevents concurrent + # callers from clearing the shared waiter state or stealing responses. + with self._command_lock: + expected = sub_cmd | 0x80 + acceptable: set[int] = {expected, HW_RESP_ERROR} + if sub_cmd in self._SETHW_ALLOW_OK_FOR: + acceptable.add(HW_RESP_OK) - # SetHardware frame: type 0x06, payload = sub_cmd (1 byte) + data - kiss_frame = self._encode_kiss_frame(KISS_CMD_SETHARDWARE, bytes([sub_cmd]) + data) + # Check queued responses first (late/out-of-order arrivals). + with self._response_lock: + if self._response_queue: + n = len(self._response_queue) + matched: Optional[tuple[int, bytes]] = None + for _ in range(n): + resp_sub, resp_payload = self._response_queue.popleft() + if matched is None and resp_sub in acceptable: + matched = (resp_sub, resp_payload) + else: + self._response_queue.append((resp_sub, resp_payload)) + if matched is not None: + return matched - if not self._write_frame(kiss_frame): - logger.warning("SetHardware frame write failed") - return None + self._response_event.clear() + self._pending_response = None + self._expected_response_subcmds = acceptable + self._active_request_subcmd = sub_cmd - # Wait for response - if self._response_event.wait(timeout): - with self._response_lock: - return self._pending_response - else: - logger.warning(f"SetHardware sub_cmd 0x{sub_cmd:02X} timeout") - return None + try: + # SetHardware frame: type 0x06, payload = sub_cmd (1 byte) + data + kiss_frame = self._encode_kiss_frame(KISS_CMD_SETHARDWARE, bytes([sub_cmd]) + data) - def get_radio_config(self) -> Optional[Dict[str, Any]]: + if not self._write_frame(kiss_frame): + logger.warning("SetHardware frame write failed") + return None + + # Wait for response (RX thread will only signal on acceptable sub_cmd) + if self._response_event.wait(timeout): + with self._response_lock: + return self._pending_response + + logger.warning(f"SetHardware sub_cmd 0x{sub_cmd:02X} timeout") + return None + finally: + with self._response_lock: + self._expected_response_subcmds = None + self._active_request_subcmd = None + + def get_radio_config(self, timeout: Optional[float] = None) -> Optional[Dict[str, Any]]: """ - Get current radio configuration from modem + Get current radio configuration from modem. + + Blocks the caller thread for up to ``timeout`` seconds (default RESPONSE_TIMEOUT). + + Args: + timeout: SetHardware response wait in seconds, or None for RESPONSE_TIMEOUT. Returns: Dict with frequency, bandwidth, sf, cr, or None on error """ - resp = self._send_command(CMD_GET_RADIO) + t = timeout if timeout is not None else RESPONSE_TIMEOUT + resp = self._send_command(CMD_GET_RADIO, timeout=t) if resp and resp[0] == RESP_RADIO and len(resp[1]) >= 10: freq, bw, sf, cr = struct.unpack(" bool: logger.error(f"Error setting TX power: {e}") return False - def get_tx_power(self) -> Optional[int]: - """Get current TX power in dBm""" - resp = self._send_command(CMD_GET_TX_POWER) + def get_tx_power(self, timeout: Optional[float] = None) -> Optional[int]: + """Get current TX power in dBm. + + Blocks the caller thread for up to ``timeout`` seconds (default RESPONSE_TIMEOUT). + + Args: + timeout: SetHardware response wait in seconds, or None for RESPONSE_TIMEOUT. + """ + t = timeout if timeout is not None else RESPONSE_TIMEOUT + resp = self._send_command(CMD_GET_TX_POWER, timeout=t) if resp and resp[0] == RESP_TX_POWER and len(resp[1]) >= 1: return resp[1][0] return None - def get_current_rssi(self) -> int: - """Get current RSSI from modem""" - resp = self._send_command(CMD_GET_CURRENT_RSSI) + def get_current_rssi(self, timeout: Optional[float] = None) -> int: + """Get current RSSI from modem. + + Blocks the caller thread for up to ``timeout`` seconds (default RESPONSE_TIMEOUT). + + Args: + timeout: SetHardware response wait in seconds, or None for RESPONSE_TIMEOUT. + """ + t = timeout if timeout is not None else RESPONSE_TIMEOUT + resp = self._send_command(CMD_GET_CURRENT_RSSI, timeout=t) if resp and resp[0] == RESP_CURRENT_RSSI and len(resp[1]) >= 1: # RSSI is signed byte rssi = resp[1][0] @@ -810,9 +872,16 @@ def get_current_rssi(self) -> int: return rssi return -999 - def is_channel_busy(self) -> bool: - """Check if channel is busy""" - resp = self._send_command(CMD_IS_CHANNEL_BUSY) + def is_channel_busy(self, timeout: Optional[float] = None) -> bool: + """Check if channel is busy. + + Blocks the caller thread for up to ``timeout`` seconds (default RESPONSE_TIMEOUT). + + Args: + timeout: SetHardware response wait in seconds, or None for RESPONSE_TIMEOUT. + """ + t = timeout if timeout is not None else RESPONSE_TIMEOUT + resp = self._send_command(CMD_IS_CHANNEL_BUSY, timeout=t) if resp and resp[0] == RESP_CHANNEL_BUSY and len(resp[1]) >= 1: return resp[1][0] == 0x01 return False @@ -836,9 +905,16 @@ def get_airtime(self, packet_length: int, timeout: Optional[float] = None) -> Op return struct.unpack(" Optional[int]: - """Get noise floor in dBm""" - resp = self._send_command(CMD_GET_NOISE_FLOOR) + def get_noise_floor(self, timeout: Optional[float] = None) -> Optional[int]: + """Get noise floor in dBm. + + Blocks the caller thread for up to ``timeout`` seconds (default RESPONSE_TIMEOUT). + + Args: + timeout: SetHardware response wait in seconds, or None for RESPONSE_TIMEOUT. + """ + t = timeout if timeout is not None else RESPONSE_TIMEOUT + resp = self._send_command(CMD_GET_NOISE_FLOOR, timeout=t) if resp and resp[0] == RESP_NOISE_FLOOR and len(resp[1]) >= 2: # Noise floor is signed 16-bit noise = struct.unpack(" Optional[int]: return noise return None - def get_modem_stats(self) -> Optional[Dict[str, int]]: + def get_modem_stats(self, timeout: Optional[float] = None) -> Optional[Dict[str, int]]: """ - Get modem statistics + Get modem statistics. + + Blocks the caller thread for up to ``timeout`` seconds (default RESPONSE_TIMEOUT). + + Args: + timeout: SetHardware response wait in seconds, or None for RESPONSE_TIMEOUT. Returns: Dict with rx, tx, errors counts or None on error """ - resp = self._send_command(CMD_GET_STATS) + t = timeout if timeout is not None else RESPONSE_TIMEOUT + resp = self._send_command(CMD_GET_STATS, timeout=t) if resp and resp[0] == RESP_STATS and len(resp[1]) >= 12: rx, tx, errors = struct.unpack(" Optional[int]: - """Get battery voltage in millivolts""" - resp = self._send_command(CMD_GET_BATTERY) + def get_battery(self, timeout: Optional[float] = None) -> Optional[int]: + """Get battery voltage in millivolts. + + Blocks the caller thread for up to ``timeout`` seconds (default RESPONSE_TIMEOUT). + + Args: + timeout: SetHardware response wait in seconds, or None for RESPONSE_TIMEOUT. + """ + t = timeout if timeout is not None else RESPONSE_TIMEOUT + resp = self._send_command(CMD_GET_BATTERY, timeout=t) if resp and resp[0] == RESP_BATTERY and len(resp[1]) >= 2: return struct.unpack(" Optional[Dict[str, Any]]: # Use short timeout for GET_AIRTIME so TX path is not blocked if modem # is busy or unresponsive (avoids 5s stall and subsequent bad state). - airtime = self.get_airtime(len(data), timeout=1.0) + # Run off the event loop: get_airtime uses blocking SetHardware wait. + airtime = await asyncio.to_thread(self.get_airtime, len(data), 1.0) if airtime is None: airtime = int(PacketTimingUtils.estimate_airtime_ms(len(data), self.radio_config)) return { @@ -1226,10 +1316,10 @@ def get_stats(self) -> Dict[str, Any]: """Get interface statistics""" return self.stats.copy() - def get_status(self) -> Dict[str, Any]: - """Get radio status. Uses cached config/stats where possible.""" - cfg = self.get_radio_config() - tx_power = self.get_tx_power() + def _sync_get_status(self, timeout: Optional[float] = None) -> Dict[str, Any]: + """Build radio status dict (blocking SetHardware reads for config and TX power).""" + cfg = self.get_radio_config(timeout=timeout) + tx_power = self.get_tx_power(timeout=timeout) status: Dict[str, Any] = { "initialized": self.is_connected, "frequency": cfg["frequency"] if cfg else self.radio_config.get("frequency", 0), @@ -1248,6 +1338,54 @@ def get_status(self) -> Dict[str, Any]: } return status + def get_status(self, timeout: Optional[float] = None) -> Dict[str, Any]: + """Get radio status. Queries modem for config and TX power; blocks the caller thread. + + Args: + timeout: Per-query SetHardware timeout in seconds for each modem read + (default RESPONSE_TIMEOUT). + """ + return self._sync_get_status(timeout) + + async def get_status_async(self, timeout: Optional[float] = None) -> Dict[str, Any]: + """Get radio status without blocking the asyncio event loop. + + Runs blocking modem I/O in ``asyncio``'s default thread pool executor. + """ + return await asyncio.to_thread(self._sync_get_status, timeout) + + async def get_radio_config_async( + self, timeout: Optional[float] = None + ) -> Optional[Dict[str, Any]]: + """Async-safe :meth:`get_radio_config`; runs blocking modem I/O in a worker thread.""" + return await asyncio.to_thread(self.get_radio_config, timeout) + + async def get_tx_power_async(self, timeout: Optional[float] = None) -> Optional[int]: + """Async-safe :meth:`get_tx_power`; runs blocking modem I/O in a worker thread.""" + return await asyncio.to_thread(self.get_tx_power, timeout) + + async def get_current_rssi_async(self, timeout: Optional[float] = None) -> int: + """Async-safe :meth:`get_current_rssi`; runs blocking modem I/O in a worker thread.""" + return await asyncio.to_thread(self.get_current_rssi, timeout) + + async def is_channel_busy_async(self, timeout: Optional[float] = None) -> bool: + """Async-safe :meth:`is_channel_busy`; runs blocking modem I/O in a worker thread.""" + return await asyncio.to_thread(self.is_channel_busy, timeout) + + async def get_noise_floor_async(self, timeout: Optional[float] = None) -> Optional[int]: + """Async-safe :meth:`get_noise_floor`; runs blocking modem I/O in a worker thread.""" + return await asyncio.to_thread(self.get_noise_floor, timeout) + + async def get_modem_stats_async( + self, timeout: Optional[float] = None + ) -> Optional[Dict[str, int]]: + """Async-safe :meth:`get_modem_stats`; runs blocking modem I/O in a worker thread.""" + return await asyncio.to_thread(self.get_modem_stats, timeout) + + async def get_battery_async(self, timeout: Optional[float] = None) -> Optional[int]: + """Async-safe :meth:`get_battery`; runs blocking modem I/O in a worker thread.""" + return await asyncio.to_thread(self.get_battery, timeout) + # KISS frame encoding/decoding def _encode_kiss_frame(self, cmd: int, data: bytes) -> bytes: @@ -1428,14 +1566,32 @@ def _process_received_frame(self): self.stats["errors"] += 1 logger.warning(f"Modem error: 0x{payload[0]:02X}") with self._response_lock: - self._pending_response = (sub_cmd, payload) - self._response_event.set() + expected = self._expected_response_subcmds + if expected is not None and sub_cmd in expected: + self._pending_response = (sub_cmd, payload) + self._response_event.set() + else: + if len(self._response_queue) == self._response_queue.maxlen: + logger.debug( + "Dropping oldest SetHardware response (queue full); sub_cmd=0x%02X", + sub_cmd, + ) + self._response_queue.append((sub_cmd, payload)) else: # Other response sub-commands (Identity, Radio, OK, etc.) with self._response_lock: - self._pending_response = (sub_cmd, payload) - self._response_event.set() + expected = self._expected_response_subcmds + if expected is not None and sub_cmd in expected: + self._pending_response = (sub_cmd, payload) + self._response_event.set() + else: + if len(self._response_queue) == self._response_queue.maxlen: + logger.debug( + "Dropping oldest SetHardware response (queue full); sub_cmd=0x%02X", + sub_cmd, + ) + self._response_queue.append((sub_cmd, payload)) # cmd 0xFF (Return) has port=15 so is already discarded above def _rx_worker(self): diff --git a/src/pymc_core/node/dispatcher.py b/src/pymc_core/node/dispatcher.py index a0525b6..4948272 100644 --- a/src/pymc_core/node/dispatcher.py +++ b/src/pymc_core/node/dispatcher.py @@ -676,7 +676,7 @@ async def run_forever(self) -> None: if health_check_counter >= 60: health_check_counter = 0 if hasattr(self.radio, "check_radio_health"): - self.radio.check_radio_health() + await asyncio.to_thread(self.radio.check_radio_health) # With callback-based RX, just do maintenance tasks await asyncio.sleep(1.0) # Check every second for cleanup diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index 199b75f..536d260 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -1,5 +1,5 @@ import asyncio -from unittest.mock import AsyncMock, Mock +from unittest.mock import AsyncMock, Mock, patch import pytest @@ -652,6 +652,29 @@ async def test_packet_filter_cleanup(self, dispatcher): # Verify cleanup was called dispatcher.packet_filter.cleanup_old_hashes.assert_called_once() + @pytest.mark.asyncio + async def test_run_forever_health_check_uses_to_thread(self, dispatcher): + """Health checks should run via asyncio.to_thread to avoid loop blocking.""" + dispatcher.radio.check_radio_health = Mock(return_value=True) + + sleep_calls = {"count": 0} + + async def fake_sleep(_seconds): + sleep_calls["count"] += 1 + if sleep_calls["count"] >= 60: + raise asyncio.CancelledError() + + to_thread_mock = AsyncMock(return_value=True) + + with ( + patch("pymc_core.node.dispatcher.asyncio.sleep", side_effect=fake_sleep), + patch("pymc_core.node.dispatcher.asyncio.to_thread", to_thread_mock), + ): + with pytest.raises(asyncio.CancelledError): + await dispatcher.run_forever() + + to_thread_mock.assert_awaited_once_with(dispatcher.radio.check_radio_health) + class TestDispatcherErrorHandling: """Test error handling.""" diff --git a/tests/test_kiss_modem_wrapper.py b/tests/test_kiss_modem_wrapper.py index a7397a6..0d9c688 100644 --- a/tests/test_kiss_modem_wrapper.py +++ b/tests/test_kiss_modem_wrapper.py @@ -7,13 +7,14 @@ import struct import threading -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from pymc_core.hardware.kiss_modem_wrapper import ( CMD_DATA, CMD_GET_BATTERY, + CMD_GET_NOISE_FLOOR, CMD_GET_RADIO, CMD_GET_STATS, CMD_GET_VERSION, @@ -44,12 +45,14 @@ RESP_BATTERY, RESP_ERROR, RESP_IDENTITY, + RESP_NOISE_FLOOR, RESP_OK, RESP_PONG, RESP_RADIO, RESP_SIGNATURE, RESP_STATS, RESP_TX_DONE, + RESP_VERSION, KissModemWrapper, ) @@ -274,15 +277,13 @@ def test_response_parsing_identity(self): bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_IDENTITY]) + pubkey + bytes([KISS_FEND]) ) - modem._response_event = threading.Event() - modem._pending_response = None - for byte in raw_bytes: modem._decode_kiss_byte(byte) - assert modem._pending_response is not None - assert modem._pending_response[0] == RESP_IDENTITY - assert modem._pending_response[1] == pubkey + # Without an active waiter, SetHardware responses are queued for later consumption. + assert len(modem._response_queue) == 1 + assert modem._response_queue[0][0] == RESP_IDENTITY + assert modem._response_queue[0][1] == pubkey def test_response_parsing_error(self): """Test parsing SetHardware Error response (FEND + 0x06 + 0x2A + code + FEND)""" @@ -291,15 +292,259 @@ def test_response_parsing_error(self): raw_bytes = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_ERROR, 0x05, KISS_FEND]) - modem._response_event = threading.Event() - modem._pending_response = None - for byte in raw_bytes: modem._decode_kiss_byte(byte) - assert modem._pending_response is not None - assert modem._pending_response[0] == RESP_ERROR - assert modem._pending_response[1][0] == 0x05 + assert len(modem._response_queue) == 1 + assert modem._response_queue[0][0] == RESP_ERROR + assert modem._response_queue[0][1][0] == 0x05 + + def test_send_command_uses_queued_late_response(self): + """If a matching response is already queued, _send_command returns it without writing.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + # Queue a late PONG from a previous ping. + modem._response_queue.append((RESP_PONG, b"")) + + modem._write_frame = MagicMock(return_value=True) + + resp = modem._send_command(CMD_PING, timeout=0.1) + assert resp == (RESP_PONG, b"") + assert modem._write_frame.call_count == 0 + + def test_send_command_correlates_expected_response(self): + """Non-matching responses are queued; waiter completes only on expected sub_cmd.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + # Set up a serial conn so _send_command can write. + mock_serial = MagicMock() + mock_serial.is_open = True + mock_serial.write.side_effect = lambda b: len(b) + modem.serial_conn = mock_serial + + result_holder: dict[str, object] = {} + + def caller(): + result_holder["resp"] = modem._send_command(CMD_PING, timeout=0.5) + + t = threading.Thread(target=caller) + t.start() + + # Feed an unrelated identity response first; should be queued, not delivered. + pubkey = bytes(range(32)) + identity_bytes = ( + bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_IDENTITY]) + pubkey + bytes([KISS_FEND]) + ) + for b in identity_bytes: + modem._decode_kiss_byte(b) + + # Now feed the expected PONG. + pong_bytes = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_PONG, KISS_FEND]) + for b in pong_bytes: + modem._decode_kiss_byte(b) + + t.join(timeout=1.0) + assert result_holder.get("resp") == (RESP_PONG, b"") + + # The unrelated identity response should remain queued. + assert len(modem._response_queue) == 1 + assert modem._response_queue[0][0] == RESP_IDENTITY + + def test_send_command_is_single_flight(self): + """Concurrent _send_command calls must not interleave shared waiter state.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + wrote_first = threading.Event() + allow_first_write = threading.Event() + wrote_second = threading.Event() + + def mock_write_frame(frame: bytes) -> bool: + # sub_cmd is the first payload byte (frame[2]) in SetHardware frames. + if frame[2] == CMD_GET_VERSION: + wrote_first.set() + allow_first_write.wait(timeout=1.0) + elif frame[2] == CMD_PING: + wrote_second.set() + return True + + modem._write_frame = mock_write_frame + + results: dict[str, object] = {} + + def call_version(): + results["v"] = modem._send_command(CMD_GET_VERSION, timeout=0.5) + + def call_ping(): + results["p"] = modem._send_command(CMD_PING, timeout=0.5) + + t1 = threading.Thread(target=call_version) + t2 = threading.Thread(target=call_ping) + + t1.start() + assert wrote_first.wait(timeout=1.0) + + # Start second call while first is still holding the command lock in _write_frame. + t2.start() + assert not wrote_second.wait(timeout=0.1) + + # Let the first command proceed and respond. + allow_first_write.set() + version_bytes = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_VERSION, 0x01, KISS_FEND]) + for b in version_bytes: + modem._decode_kiss_byte(b) + + # Now second command can write and receive response. + assert wrote_second.wait(timeout=1.0) + pong_bytes = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_PONG, KISS_FEND]) + for b in pong_bytes: + modem._decode_kiss_byte(b) + + t1.join(timeout=1.0) + t2.join(timeout=1.0) + + assert results.get("v") is not None + assert results.get("p") == (RESP_PONG, b"") + + def test_send_command_timeout_clears_waiter_state(self): + """Timeout path must clear active waiter metadata for later commands.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + modem._write_frame = MagicMock(return_value=True) + + resp = modem._send_command(CMD_GET_VERSION, timeout=0.05) + assert resp is None + assert modem._expected_response_subcmds is None + assert modem._active_request_subcmd is None + + # Ensure no lock leak by issuing another command. + resp2 = modem._send_command(CMD_PING, timeout=0.05) + assert resp2 is None + assert modem._expected_response_subcmds is None + assert modem._active_request_subcmd is None + + def test_send_command_write_failure_clears_waiter_state(self): + """Write-failure path must clear active waiter metadata.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + modem._write_frame = MagicMock(return_value=False) + + resp = modem._send_command(CMD_GET_VERSION, timeout=0.1) + assert resp is None + assert modem._expected_response_subcmds is None + assert modem._active_request_subcmd is None + + def test_response_queue_drop_oldest_when_full(self): + """Unmatched SetHardware responses should drop oldest when queue is full.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + maxlen = modem._response_queue.maxlen or 0 + for i in range(maxlen): + modem._response_queue.append((0xA0 + (i % 10), bytes([i % 256]))) + oldest = modem._response_queue[0] + + # No active waiter; incoming response should be enqueued as unmatched. + frame = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_IDENTITY, 0x42, KISS_FEND]) + for b in frame: + modem._decode_kiss_byte(b) + + assert len(modem._response_queue) == maxlen + assert modem._response_queue[0] != oldest + assert modem._response_queue[-1] == (RESP_IDENTITY, b"\x42") + + def test_send_command_ok_policy_allowlisted_command(self): + """Allowlisted SetHardware commands may resolve with HW_RESP_OK.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + mock_serial = MagicMock() + mock_serial.is_open = True + mock_serial.write.side_effect = lambda b: len(b) + modem.serial_conn = mock_serial + + result_holder: dict[str, object] = {} + + def caller(): + result_holder["resp"] = modem._send_command(CMD_SET_TX_POWER, b"\x16", timeout=0.5) + + t = threading.Thread(target=caller) + t.start() + + ok_frame = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, HW_RESP_OK, KISS_FEND]) + for b in ok_frame: + modem._decode_kiss_byte(b) + + t.join(timeout=1.0) + assert result_holder.get("resp") == (HW_RESP_OK, b"") + + def test_send_command_ok_policy_non_allowlisted_command(self): + """Non-allowlisted commands should not complete on HW_RESP_OK.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + mock_serial = MagicMock() + mock_serial.is_open = True + mock_serial.write.side_effect = lambda b: len(b) + modem.serial_conn = mock_serial + + result_holder: dict[str, object] = {} + + def caller(): + result_holder["resp"] = modem._send_command(CMD_PING, timeout=0.5) + + t = threading.Thread(target=caller) + t.start() + + ok_frame = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, HW_RESP_OK, KISS_FEND]) + for b in ok_frame: + modem._decode_kiss_byte(b) + + pong_frame = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_PONG, KISS_FEND]) + for b in pong_frame: + modem._decode_kiss_byte(b) + + t.join(timeout=1.0) + assert result_holder.get("resp") == (RESP_PONG, b"") + assert len(modem._response_queue) == 1 + assert modem._response_queue[0] == (HW_RESP_OK, b"") + + def test_send_command_preserves_unrelated_response_order(self): + """Multiple unrelated responses remain queued in arrival order.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + mock_serial = MagicMock() + mock_serial.is_open = True + mock_serial.write.side_effect = lambda b: len(b) + modem.serial_conn = mock_serial + + result_holder: dict[str, object] = {} + + def caller(): + result_holder["resp"] = modem._send_command(CMD_PING, timeout=0.5) + + t = threading.Thread(target=caller) + t.start() + + identity = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_IDENTITY, 0xAA, KISS_FEND]) + version = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_VERSION, 0x01, KISS_FEND]) + stats = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_STATS, 0x02, KISS_FEND]) + for frame in (identity, version, stats): + for b in frame: + modem._decode_kiss_byte(b) + + pong = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_PONG, KISS_FEND]) + for b in pong: + modem._decode_kiss_byte(b) + + t.join(timeout=1.0) + assert result_holder.get("resp") == (RESP_PONG, b"") + assert [entry[0] for entry in modem._response_queue] == [ + RESP_IDENTITY, + RESP_VERSION, + RESP_STATS, + ] + assert [entry[1] for entry in modem._response_queue] == [b"\xAA", b"\x01", b"\x02"] def test_tx_done_response(self): """Test SetHardware TxDone (0xF8) response sets event""" @@ -315,6 +560,70 @@ def test_tx_done_response(self): assert modem._tx_done_event.is_set() assert modem._tx_done_result is True + @pytest.mark.asyncio + async def test_send_offloads_get_airtime_to_thread(self): + """send() must not call blocking get_airtime on the asyncio event loop.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.send_frame = MagicMock(return_value=True) + + to_thread_mock = AsyncMock(return_value=42) + with patch("pymc_core.hardware.kiss_modem_wrapper.asyncio.to_thread", to_thread_mock): + result = await modem.send(b"payload") + + assert result is not None + assert result["airtime_ms"] == 42 + to_thread_mock.assert_awaited_once_with(modem.get_airtime, len(b"payload"), 1.0) + + +class TestKissAsyncTelemetry: + """Async-safe telemetry entrypoints delegate blocking work via asyncio.to_thread.""" + + @pytest.mark.asyncio + async def test_get_status_async_delegates_to_thread(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + to_thread_mock = AsyncMock(return_value={"ok": True}) + with patch("pymc_core.hardware.kiss_modem_wrapper.asyncio.to_thread", to_thread_mock): + result = await modem.get_status_async(1.25) + assert result == {"ok": True} + to_thread_mock.assert_awaited_once_with(modem._sync_get_status, 1.25) + + @pytest.mark.asyncio + async def test_get_noise_floor_async_delegates_to_thread(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + to_thread_mock = AsyncMock(return_value=-95) + with patch("pymc_core.hardware.kiss_modem_wrapper.asyncio.to_thread", to_thread_mock): + result = await modem.get_noise_floor_async(0.75) + assert result == -95 + to_thread_mock.assert_awaited_once_with(modem.get_noise_floor, 0.75) + + @pytest.mark.asyncio + async def test_get_modem_stats_async_delegates_to_thread(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + stats = {"rx": 1, "tx": 2, "errors": 0} + to_thread_mock = AsyncMock(return_value=stats) + with patch("pymc_core.hardware.kiss_modem_wrapper.asyncio.to_thread", to_thread_mock): + result = await modem.get_modem_stats_async(None) + assert result == stats + to_thread_mock.assert_awaited_once_with(modem.get_modem_stats, None) + + def test_get_noise_floor_forwards_timeout_to_send_command(self): + """Optional timeout on sync getter must reach _send_command.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + calls: list[tuple] = [] + + def mock_send_command(cmd, data=b"", timeout=5.0): + calls.append((cmd, data, timeout)) + if cmd == CMD_GET_NOISE_FLOOR: + return (RESP_NOISE_FLOOR, struct.pack(" Date: Sun, 10 May 2026 09:07:32 -0700 Subject: [PATCH 2/7] Implement UART write serialization in KissModemWrapper - Added a serial write lock to ensure atomic writes across threads, preventing interleaving of frame bytes during concurrent operations. - Updated the _write_frame method to utilize the new lock, enhancing reliability in multi-threaded environments. - Introduced tests to verify that UART writes from different callers do not overlap, ensuring data integrity during transmission. --- src/pymc_core/hardware/kiss_modem_wrapper.py | 38 +-- tests/test_kiss_modem_wrapper.py | 274 +++++++++++++++++++ 2 files changed, 296 insertions(+), 16 deletions(-) diff --git a/src/pymc_core/hardware/kiss_modem_wrapper.py b/src/pymc_core/hardware/kiss_modem_wrapper.py index 09eaaae..9e77f34 100644 --- a/src/pymc_core/hardware/kiss_modem_wrapper.py +++ b/src/pymc_core/hardware/kiss_modem_wrapper.py @@ -306,6 +306,9 @@ def __init__( # Response handling # Single-flight SetHardware command execution (send -> wait -> return) self._command_lock = threading.Lock() + # Serialize all UART writes so frame bytes from different callers/threads + # (TX worker vs SetHardware/control paths) cannot interleave. + self._serial_write_lock = threading.Lock() self._response_event = threading.Event() self._pending_response: Optional[tuple[int, bytes]] = None self._response_lock = threading.Lock() @@ -453,29 +456,32 @@ def _write_frame(self, frame: bytes) -> bool: Ensures the entire frame (including trailing FEND) is written; retries on partial write so we never send a truncated frame. + This method is atomic across threads so frame bytes cannot interleave + on the UART when multiple callers write concurrently. Returns: True if all bytes written, False on error or incomplete write. """ - if not self.serial_conn or not self.serial_conn.is_open: - return False - offset = 0 - while offset < len(frame): - try: - n = self.serial_conn.write(frame[offset:]) - if n is None or n <= 0: - logger.error("Serial write returned %s", n) + with self._serial_write_lock: + if not self.serial_conn or not self.serial_conn.is_open: + return False + offset = 0 + while offset < len(frame): + try: + n = self.serial_conn.write(frame[offset:]) + if n is None or n <= 0: + logger.error("Serial write returned %s", n) + return False + offset += n + except Exception as e: + logger.error("Serial write error: %s", e) return False - offset += n + try: + self.serial_conn.flush() except Exception as e: - logger.error("Serial write error: %s", e) + logger.error("Serial flush error: %s", e) return False - try: - self.serial_conn.flush() - except Exception as e: - logger.error("Serial flush error: %s", e) - return False - return True + return True def _set_kiss_tx_delay(self, delay_ms: int) -> None: """ diff --git a/tests/test_kiss_modem_wrapper.py b/tests/test_kiss_modem_wrapper.py index 0d9c688..cbdad74 100644 --- a/tests/test_kiss_modem_wrapper.py +++ b/tests/test_kiss_modem_wrapper.py @@ -7,6 +7,7 @@ import struct import threading +import time from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -830,6 +831,279 @@ def test_send_frame_queues_to_buffer(self): assert frame[-1] == KISS_FEND +class TestSerialWriteSerialization: + """Test UART write serialization across concurrent callers.""" + + def test_write_frame_serializes_data_and_sethardware_callers(self): + """Data TX and SetHardware writes must not interleave at the UART layer.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + + class BlockingSerial: + def __init__(self): + self.is_open = True + self._active = 0 + self.max_active = 0 + self._state_lock = threading.Lock() + self.first_write_entered = threading.Event() + self.release_first_write = threading.Event() + self.first_seen = False + self.writes = [] + self.flush_count = 0 + + def write(self, data): + with self._state_lock: + self._active += 1 + self.max_active = max(self.max_active, self._active) + + if not self.first_seen: + self.first_seen = True + self.first_write_entered.set() + self.release_first_write.wait(timeout=1.0) + + self.writes.append(bytes(data)) + + with self._state_lock: + self._active -= 1 + return len(data) + + def flush(self): + self.flush_count += 1 + + serial_conn = BlockingSerial() + modem.serial_conn = serial_conn + modem.is_connected = True + + data_frame = modem._encode_kiss_frame(CMD_DATA, b"\x01\x02\x03") + sethw_frame = modem._encode_kiss_frame(KISS_CMD_SETHARDWARE, bytes([CMD_PING])) + + results: dict[str, bool] = {} + second_started = threading.Event() + second_done = threading.Event() + + def write_data(): + results["data"] = modem._write_frame(data_frame) + + def write_sethw(): + second_started.set() + results["sethw"] = modem._write_frame(sethw_frame) + second_done.set() + + t1 = threading.Thread(target=write_data) + t1.start() + assert serial_conn.first_write_entered.wait(timeout=1.0) + + t2 = threading.Thread(target=write_sethw) + t2.start() + assert second_started.wait(timeout=1.0) + + # While the first writer is blocked inside serial.write, a second caller + # should not enter serial.write concurrently. + assert not second_done.wait(timeout=0.05) + assert serial_conn.max_active == 1 + + serial_conn.release_first_write.set() + + t1.join(timeout=1.0) + t2.join(timeout=1.0) + + assert results.get("data") is True + assert results.get("sethw") is True + assert serial_conn.max_active == 1 + assert serial_conn.flush_count == 2 + assert serial_conn.writes == [data_frame, sethw_frame] + + def test_ping_and_noise_floor_under_concurrent_data_load(self): + """Concurrent data TX should not cause ping/noise-floor command timeouts.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + class RespondingSerial: + def __init__(self): + self.is_open = True + self._modem: KissModemWrapper | None = None + self.flush_count = 0 + + def set_modem(self, m: KissModemWrapper) -> None: + self._modem = m + + def write(self, data): + frame = bytes(data) + if ( + self._modem is not None + and len(frame) >= 4 + and frame[0] == KISS_FEND + and frame[-1] == KISS_FEND + and frame[1] == KISS_CMD_SETHARDWARE + ): + sub_cmd = frame[2] + if sub_cmd == CMD_PING: + response_sub = RESP_PONG + response_payload = b"" + elif sub_cmd == CMD_GET_NOISE_FLOOR: + response_sub = RESP_NOISE_FLOOR + response_payload = struct.pack(" None: + resp = ( + bytes([KISS_FEND, KISS_CMD_SETHARDWARE, response_sub]) + + response_payload + + bytes([KISS_FEND]) + ) + for b in resp: + self._modem._decode_kiss_byte(b) + + threading.Thread(target=emit, daemon=True).start() + return len(frame) + + def flush(self): + self.flush_count += 1 + + serial_conn = RespondingSerial() + serial_conn.set_modem(modem) + modem.serial_conn = serial_conn + + stop_event = threading.Event() + data_frame = modem._encode_kiss_frame(CMD_DATA, b"\xAA\xBB\xCC") + + def data_tx_worker() -> None: + for _ in range(200): + if stop_event.is_set(): + return + modem._write_frame(data_frame) + + tx_thread = threading.Thread(target=data_tx_worker) + tx_thread.start() + + try: + for _ in range(40): + ping_resp = modem._send_command(CMD_PING, timeout=0.2) + assert ping_resp is not None + assert ping_resp[0] == RESP_PONG + + noise = modem.get_noise_floor(timeout=0.2) + assert noise == -95 + finally: + stop_event.set() + tx_thread.join(timeout=1.0) + + def test_tx_worker_and_sethardware_queries_make_progress_together(self): + """Queued data TX should still make progress while periodic SetHardware queries run.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + class RespondingSerial: + def __init__(self): + self.is_open = True + self._modem: KissModemWrapper | None = None + self.flush_count = 0 + self.data_writes = 0 + + def set_modem(self, m: KissModemWrapper) -> None: + self._modem = m + + def write(self, data): + frame = bytes(data) + if ( + self._modem is not None + and len(frame) >= 4 + and frame[0] == KISS_FEND + and frame[-1] == KISS_FEND + and frame[1] == KISS_CMD_SETHARDWARE + ): + sub_cmd = frame[2] + if sub_cmd == CMD_PING: + response_sub = RESP_PONG + response_payload = b"" + elif sub_cmd == CMD_GET_NOISE_FLOOR: + response_sub = RESP_NOISE_FLOOR + response_payload = struct.pack(" None: + resp = ( + bytes([KISS_FEND, KISS_CMD_SETHARDWARE, response_sub]) + + response_payload + + bytes([KISS_FEND]) + ) + for b in resp: + self._modem._decode_kiss_byte(b) + + threading.Thread(target=emit, daemon=True).start() + elif len(frame) >= 2 and frame[0] == KISS_FEND and frame[1] == CMD_DATA: + self.data_writes += 1 + + return len(frame) + + def flush(self): + self.flush_count += 1 + + serial_conn = RespondingSerial() + serial_conn.set_modem(modem) + modem.serial_conn = serial_conn + + for _ in range(120): + assert modem.send_frame(b"\x01\x02\x03") + + modem.stop_event.clear() + tx_thread = threading.Thread(target=modem._tx_worker, daemon=True) + tx_thread.start() + + try: + for _ in range(25): + ping_resp = modem._send_command(CMD_PING, timeout=0.3) + assert ping_resp is not None + assert ping_resp[0] == RESP_PONG + + noise = modem.get_noise_floor(timeout=0.3) + assert noise == -92 + + deadline = time.time() + 2.0 + while modem.tx_buffer and time.time() < deadline: + time.sleep(0.01) + assert len(modem.tx_buffer) == 0 + assert serial_conn.data_writes > 0 + finally: + modem.stop_event.set() + tx_thread.join(timeout=1.0) + + def test_write_error_does_not_poison_future_writes(self): + """A serial write error should fail fast and allow subsequent writes to proceed.""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + class FlakySerial: + def __init__(self): + self.is_open = True + self.calls = 0 + self.flush_count = 0 + + def write(self, data): + self.calls += 1 + if self.calls == 1: + raise OSError("simulated serial failure") + return len(data) + + def flush(self): + self.flush_count += 1 + + serial_conn = FlakySerial() + modem.serial_conn = serial_conn + + frame = modem._encode_kiss_frame(CMD_DATA, b"\xAA\xBB") + assert modem._write_frame(frame) is False + assert modem._write_frame(frame) is True + assert serial_conn.flush_count == 1 + + class TestQueryMethods: """Test modem query methods""" From 81800cda08068ccf9a9ecc7e68291f8d507e627b Mon Sep 17 00:00:00 2001 From: agessaman Date: Sun, 10 May 2026 14:46:05 -0700 Subject: [PATCH 3/7] Update versioning to indicate development stage - Changed the version from "1.0.10" to "1.0.10-agdev" in both pyproject.toml and __init__.py to reflect the current development status of the project. --- pyproject.toml | 2 +- src/pymc_core/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b48fede..abf2d82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pymc_core" -version = "1.0.10" +version = "1.0.10-agdev" authors = [ {name = "Lloyd Newton", email = "lloyd@rightup.co.uk"}, ] diff --git a/src/pymc_core/__init__.py b/src/pymc_core/__init__.py index f9e4d57..66d9f52 100644 --- a/src/pymc_core/__init__.py +++ b/src/pymc_core/__init__.py @@ -3,7 +3,7 @@ Clean, simple API for building mesh network applications. """ -__version__ = "1.0.10" +__version__ = "1.0.10-agdev" # Core mesh functionality from .node.node import MeshNode From 1e1ba9b9cb5ed57a8226f8fef1a4c3966ee8b59a Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 15 May 2026 19:13:31 -0700 Subject: [PATCH 4/7] fix(kiss modem): improved connection handling and recovery - Added mechanisms for managing degraded connection states, including flags for degraded status and reasons. - Implemented a reconnect worker to handle automatic reconnection attempts after connection failures. - Updated the connect and disconnect methods to properly manage connection states and thread handling. - Enhanced error handling during serial communication to ensure robust operation in multi-threaded environments. - Introduced tests to validate the new reconnect behavior and degraded state management. --- src/pymc_core/hardware/kiss_modem_wrapper.py | 241 +++++++++++++++---- tests/test_kiss_modem_wrapper.py | 94 +++++++- 2 files changed, 281 insertions(+), 54 deletions(-) diff --git a/src/pymc_core/hardware/kiss_modem_wrapper.py b/src/pymc_core/hardware/kiss_modem_wrapper.py index 9e77f34..cbbc35a 100644 --- a/src/pymc_core/hardware/kiss_modem_wrapper.py +++ b/src/pymc_core/hardware/kiss_modem_wrapper.py @@ -14,6 +14,7 @@ import random import struct import threading +import time from collections import deque from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, Dict, Optional, Union @@ -283,6 +284,8 @@ def __init__( self.serial_conn: Optional[serial.Serial] = None self.is_connected = False + self._degraded = False + self._degraded_reason: Optional[str] = None self.rx_buffer = deque(maxlen=RX_BUFFER_SIZE) self.tx_buffer = deque(maxlen=TX_BUFFER_SIZE) @@ -293,7 +296,22 @@ def __init__( self.rx_thread: Optional[threading.Thread] = None self.tx_thread: Optional[threading.Thread] = None + self.reconnect_thread: Optional[threading.Thread] = None self.stop_event = threading.Event() + self._reconnecting_event = threading.Event() + self._connection_lock = threading.RLock() + self._failure_log_lock = threading.Lock() + self._last_failure_log_ts = 0.0 + self._failure_log_interval_s = float( + self.radio_config.get("failure_log_interval_seconds", 10.0) + ) + self._reconnect_base_delay_s = float( + self.radio_config.get("reconnect_base_delay_seconds", 0.5) + ) + self._reconnect_max_delay_s = float( + self.radio_config.get("reconnect_max_delay_seconds", 15.0) + ) + self._reconnect_max_attempts = int(self.radio_config.get("reconnect_max_attempts", 0)) # Callbacks self.on_frame_received = on_frame_received @@ -378,6 +396,40 @@ def connect(self) -> bool: Returns: True if connection successful, False otherwise """ + with self._connection_lock: + self.stop_event.clear() + if not self._open_serial_and_start_threads(): + return False + if not self._run_post_connect_handshake(): + self._close_serial_connection() + self.is_connected = False + return False + self._reconnecting_event.clear() + self._degraded = False + self._degraded_reason = None + return True + + def disconnect(self): + """Disconnect from serial port and stop threads""" + with self._connection_lock: + self.stop_event.set() + self.is_connected = False + self._degraded = False + self._degraded_reason = None + self._reconnecting_event.clear() + self._close_serial_connection() + + self._stop_io_threads(join_timeout=2.0) + self._stop_reconnect_thread(join_timeout=2.0) + + if self._callback_executor is not None: + self._callback_executor.shutdown(wait=False) + self._callback_executor = None + + logger.info(f"KISS modem disconnected from {self.port}") + + def _open_serial_and_start_threads(self) -> bool: + """Open serial device and start RX/TX workers.""" try: self.serial_conn = serial.Serial( port=self.port, @@ -387,68 +439,70 @@ def connect(self) -> bool: parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, ) - self.is_connected = True - self.stop_event.clear() - # Start communication threads self.rx_thread = threading.Thread(target=self._rx_worker, daemon=True) self.tx_thread = threading.Thread(target=self._tx_worker, daemon=True) - self.rx_thread.start() self.tx_thread.start() - - logger.info(f"KISS modem connected to {self.port} at {self.baudrate} baud") - - # Auto-configure if requested - if self.auto_configure and self.radio_config: - if not self.configure_radio(): - logger.warning("Auto-configuration failed") - return False - - # Query modem info - self._query_modem_info() - - # Set KISS TXDELAY so key-up delay is not the firmware default 500ms (reduces - # round-trip latency for repeaters). Value in 10ms units; default 50ms. - tx_delay_ms = self.radio_config.get("tx_delay_ms", 50) - self._set_kiss_tx_delay(tx_delay_ms) - if "kiss_persistence" in self.radio_config: - self.set_kiss_persistence(self.radio_config["kiss_persistence"]) - if "kiss_slottime_ms" in self.radio_config: - self.set_kiss_slottime(self.radio_config["kiss_slottime_ms"]) - if "kiss_txtail_ms" in self.radio_config: - self.set_kiss_txtail(self.radio_config["kiss_txtail_ms"]) - if "kiss_full_duplex" in self.radio_config: - self.set_kiss_full_duplex(bool(self.radio_config["kiss_full_duplex"])) - + logger.info("KISS modem connected to %s at %s baud", self.port, self.baudrate) return True - except Exception as e: - logger.error(f"Failed to connect to {self.port}: {e}") + logger.error("Failed to connect to %s: %s", self.port, e) self.is_connected = False return False - def disconnect(self): - """Disconnect from serial port and stop threads""" - self.is_connected = False - self.stop_event.set() - - # Wait for threads to finish - if self.rx_thread and self.rx_thread.is_alive(): - self.rx_thread.join(timeout=2.0) - if self.tx_thread and self.tx_thread.is_alive(): - self.tx_thread.join(timeout=2.0) - - if self._callback_executor is not None: - self._callback_executor.shutdown(wait=False) - self._callback_executor = None - - # Close serial connection - if self.serial_conn and self.serial_conn.is_open: - self.serial_conn.close() + def _run_post_connect_handshake(self) -> bool: + """Run modem setup steps after serial open.""" + # Auto-configure if requested + if self.auto_configure and self.radio_config: + if not self.configure_radio(): + logger.warning("Auto-configuration failed") + return False - logger.info(f"KISS modem disconnected from {self.port}") + # Query modem info + self._query_modem_info() + + # Set KISS TXDELAY so key-up delay is not the firmware default 500ms. + tx_delay_ms = self.radio_config.get("tx_delay_ms", 50) + self._set_kiss_tx_delay(tx_delay_ms) + if "kiss_persistence" in self.radio_config: + self.set_kiss_persistence(self.radio_config["kiss_persistence"]) + if "kiss_slottime_ms" in self.radio_config: + self.set_kiss_slottime(self.radio_config["kiss_slottime_ms"]) + if "kiss_txtail_ms" in self.radio_config: + self.set_kiss_txtail(self.radio_config["kiss_txtail_ms"]) + if "kiss_full_duplex" in self.radio_config: + self.set_kiss_full_duplex(bool(self.radio_config["kiss_full_duplex"])) + return True + + def _close_serial_connection(self) -> None: + """Close serial handle without waiting for worker threads.""" + conn = self.serial_conn + self.serial_conn = None + if conn and conn.is_open: + try: + conn.close() + except Exception: + pass + + def _stop_io_threads(self, join_timeout: float = 2.0) -> None: + """Join RX/TX threads, skipping current thread to avoid deadlock.""" + current = threading.current_thread() + if self.rx_thread and self.rx_thread.is_alive() and self.rx_thread is not current: + self.rx_thread.join(timeout=join_timeout) + if self.tx_thread and self.tx_thread.is_alive() and self.tx_thread is not current: + self.tx_thread.join(timeout=join_timeout) + + def _stop_reconnect_thread(self, join_timeout: float = 2.0) -> None: + """Join reconnect thread if it is running.""" + current = threading.current_thread() + if ( + self.reconnect_thread + and self.reconnect_thread.is_alive() + and self.reconnect_thread is not current + ): + self.reconnect_thread.join(timeout=join_timeout) def _write_frame(self, frame: bytes) -> bool: """ @@ -464,6 +518,7 @@ def _write_frame(self, frame: bytes) -> bool: """ with self._serial_write_lock: if not self.serial_conn or not self.serial_conn.is_open: + self._mark_serial_failure("Serial connection closed during write") return False offset = 0 while offset < len(frame): @@ -471,18 +526,88 @@ def _write_frame(self, frame: bytes) -> bool: n = self.serial_conn.write(frame[offset:]) if n is None or n <= 0: logger.error("Serial write returned %s", n) + self._mark_serial_failure(f"Serial write returned {n}") return False offset += n except Exception as e: logger.error("Serial write error: %s", e) + self._mark_serial_failure(f"Serial write failed: {e}") return False try: self.serial_conn.flush() except Exception as e: logger.error("Serial flush error: %s", e) + self._mark_serial_failure(f"Serial flush failed: {e}") return False return True + def _mark_serial_failure(self, reason: str) -> None: + """Transition to degraded mode and trigger reconnect loop once.""" + if self.stop_event.is_set(): + return + + now = time.time() + with self._failure_log_lock: + should_log = (now - self._last_failure_log_ts) >= self._failure_log_interval_s + if should_log: + self._last_failure_log_ts = now + logger.warning("Marking KISS serial link degraded: %s", reason) + + with self._connection_lock: + self._degraded = True + self._degraded_reason = reason + self.is_connected = False + self._close_serial_connection() + + self._start_reconnect_worker() + + def _start_reconnect_worker(self) -> None: + """Start reconnect thread once.""" + if self.stop_event.is_set() or self._reconnecting_event.is_set(): + return + self._reconnecting_event.set() + self.reconnect_thread = threading.Thread(target=self._reconnect_worker, daemon=True) + self.reconnect_thread.start() + + def _reconnect_worker(self) -> None: + """Reconnect with exponential backoff and re-run modem handshake.""" + attempts = 0 + while not self.stop_event.is_set(): + attempts += 1 + if self._reconnect_max_attempts > 0 and attempts > self._reconnect_max_attempts: + logger.error( + "KISS modem reconnect exhausted after %s attempts (last reason: %s)", + self._reconnect_max_attempts, + self._degraded_reason or "unknown", + ) + break + + delay = min( + self._reconnect_base_delay_s * (2 ** max(0, attempts - 1)), + self._reconnect_max_delay_s, + ) + jitter = random.uniform(0.0, min(0.25, delay * 0.2)) + if attempts > 1: + time.sleep(delay + jitter) + + with self._connection_lock: + if self.stop_event.is_set(): + break + self._stop_io_threads(join_timeout=0.5) + if not self._open_serial_and_start_threads(): + continue + if not self._run_post_connect_handshake(): + self._close_serial_connection() + self.is_connected = False + continue + self._degraded = False + self._degraded_reason = None + logger.info("KISS modem serial reconnect successful on attempt %s", attempts) + self._reconnecting_event.clear() + return + + self._reconnecting_event.clear() + def _set_kiss_tx_delay(self, delay_ms: int) -> None: """ Send KISS TXDELAY command so modem key-up delay is not the default 500ms. @@ -756,6 +881,13 @@ def _send_command( """ # Ensure SetHardware requests are single-flight. This prevents concurrent # callers from clearing the shared waiter state or stealing responses. + in_reconnect_thread = threading.current_thread() is self.reconnect_thread + reconnecting_from_non_reconnect_thread = ( + self._reconnecting_event.is_set() and not in_reconnect_thread + ) + degraded_from_non_reconnect_thread = self._degraded and not in_reconnect_thread + if reconnecting_from_non_reconnect_thread or degraded_from_non_reconnect_thread: + return None with self._command_lock: expected = sub_cmd | 0x80 acceptable: set[int] = {expected, HW_RESP_ERROR} @@ -1184,9 +1316,13 @@ def check_radio_health(self) -> bool: if not self.is_connected: return False try: - return self.ping() + healthy = self.ping() + if not healthy: + self._mark_serial_failure("Health check ping failed") + return healthy except Exception as e: logger.debug(f"KISS modem health check failed: {e}") + self._mark_serial_failure(f"Health check exception: {e}") return False # Optional host-side LBT (only when lbt_enabled, e.g. full-duplex on half-duplex link) @@ -1616,6 +1752,7 @@ def _rx_worker(self): except Exception as e: if self.is_connected: logger.error(f"RX worker error: {e}") + self._mark_serial_failure(f"RX worker error: {e}") break def _tx_worker(self): @@ -1633,12 +1770,14 @@ def _tx_worker(self): logger.warning("TX frame write failed, dropping frame") else: logger.warning("Serial connection not open") + self._mark_serial_failure("Serial connection not open in TX worker") else: threading.Event().wait(0.01) except Exception as e: if self.is_connected: logger.error(f"TX worker error: {e}") + self._mark_serial_failure(f"TX worker error: {e}") break def __enter__(self): diff --git a/tests/test_kiss_modem_wrapper.py b/tests/test_kiss_modem_wrapper.py index cbdad74..0cf1218 100644 --- a/tests/test_kiss_modem_wrapper.py +++ b/tests/test_kiss_modem_wrapper.py @@ -1076,7 +1076,7 @@ def flush(self): tx_thread.join(timeout=1.0) def test_write_error_does_not_poison_future_writes(self): - """A serial write error should fail fast and allow subsequent writes to proceed.""" + """A serial write error should fail fast and transition to degraded mode.""" modem = KissModemWrapper(port="/dev/null", auto_configure=False) modem.is_connected = True @@ -1097,11 +1097,13 @@ def flush(self): serial_conn = FlakySerial() modem.serial_conn = serial_conn + modem._start_reconnect_worker = MagicMock() frame = modem._encode_kiss_frame(CMD_DATA, b"\xAA\xBB") assert modem._write_frame(frame) is False - assert modem._write_frame(frame) is True - assert serial_conn.flush_count == 1 + assert modem._degraded is True + assert modem.is_connected is False + modem._start_reconnect_worker.assert_called_once() class TestQueryMethods: @@ -1582,3 +1584,89 @@ def test_context_manager_calls_connect_disconnect(self): mock_connect.assert_called_once() mock_disconnect.assert_called_once() _ = modem # hold ref so __del__ runs after assert, not before + + +class TestSerialRecovery: + """Test serial degraded-state and reconnect behavior.""" + + def test_write_frame_marks_degraded_and_triggers_reconnect(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + class _FailingSerial: + is_open = True + + def write(self, _data): + raise OSError(5, "Input/output error") + + modem.serial_conn = _FailingSerial() + modem._start_reconnect_worker = MagicMock() + + frame = modem._encode_kiss_frame(CMD_DATA, b"\x01\x02") + assert modem._write_frame(frame) is False + assert modem._degraded is True + assert modem.is_connected is False + assert modem.serial_conn is None + modem._start_reconnect_worker.assert_called_once() + + def test_send_command_fails_fast_while_reconnecting(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + modem._reconnecting_event.set() + modem._write_frame = MagicMock(return_value=True) + + assert modem._send_command(CMD_PING, timeout=0.1) is None + modem._write_frame.assert_not_called() + + def test_send_command_allowed_from_reconnect_thread_during_reconnect(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + modem._reconnecting_event.set() + modem.reconnect_thread = threading.current_thread() + modem._response_queue.append((RESP_PONG, b"")) + + assert modem._send_command(CMD_PING, timeout=0.1) == (RESP_PONG, b"") + + def test_send_command_allowed_from_reconnect_thread_while_degraded(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + modem._degraded = True + modem.reconnect_thread = threading.current_thread() + modem._response_queue.append((RESP_PONG, b"")) + + assert modem._send_command(CMD_PING, timeout=0.1) == (RESP_PONG, b"") + + def test_reconnect_worker_recovers_after_open_failure(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem._reconnecting_event.set() + modem._degraded = True + modem._degraded_reason = "test failure" + modem._reconnect_base_delay_s = 0.0 + modem._reconnect_max_delay_s = 0.0 + + modem._open_serial_and_start_threads = MagicMock(side_effect=[False, True]) + modem._run_post_connect_handshake = MagicMock(return_value=True) + modem._stop_io_threads = MagicMock() + + with patch("pymc_core.hardware.kiss_modem_wrapper.time.sleep", return_value=None): + modem._reconnect_worker() + + assert modem._open_serial_and_start_threads.call_count == 2 + assert modem._run_post_connect_handshake.call_count == 1 + assert modem._degraded is False + assert modem._reconnecting_event.is_set() is False + + def test_start_reconnect_worker_guard_prevents_duplicate_thread(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem._reconnecting_event.set() + modem._start_reconnect_worker() + assert modem.reconnect_thread is None + + def test_connect_clears_reconnecting_gate_after_success(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem._reconnecting_event.set() + modem._open_serial_and_start_threads = MagicMock(return_value=True) + modem._run_post_connect_handshake = MagicMock(return_value=True) + + assert modem.connect() is True + assert modem._reconnecting_event.is_set() is False From f270faf8dbb1edac4fe57cfaf7678bfd8a169503 Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 15 May 2026 19:50:52 -0700 Subject: [PATCH 5/7] fix(kiss modem): refine connection state management and improve thread safety - Updated connection handling to set `is_connected` based on the success of the handshake process. - Enhanced thread conditions in the RX and TX workers to check for the serial connection's readiness. - Modified the `configure_radio` method to validate the serial connection state before proceeding. - Added tests to ensure correct behavior of connection states during connect and reconnect operations. --- src/pymc_core/hardware/kiss_modem_wrapper.py | 22 +++++++-- tests/test_kiss_modem_wrapper.py | 52 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/src/pymc_core/hardware/kiss_modem_wrapper.py b/src/pymc_core/hardware/kiss_modem_wrapper.py index cbbc35a..3214847 100644 --- a/src/pymc_core/hardware/kiss_modem_wrapper.py +++ b/src/pymc_core/hardware/kiss_modem_wrapper.py @@ -398,12 +398,14 @@ def connect(self) -> bool: """ with self._connection_lock: self.stop_event.clear() + self.is_connected = False if not self._open_serial_and_start_threads(): return False if not self._run_post_connect_handshake(): self._close_serial_connection() self.is_connected = False return False + self.is_connected = True self._reconnecting_event.clear() self._degraded = False self._degraded_reason = None @@ -439,7 +441,7 @@ def _open_serial_and_start_threads(self) -> bool: parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, ) - self.is_connected = True + self.is_connected = False self.rx_thread = threading.Thread(target=self._rx_worker, daemon=True) self.tx_thread = threading.Thread(target=self._tx_worker, daemon=True) @@ -593,6 +595,7 @@ def _reconnect_worker(self) -> None: with self._connection_lock: if self.stop_event.is_set(): break + self.is_connected = False self._stop_io_threads(join_timeout=0.5) if not self._open_serial_and_start_threads(): continue @@ -600,6 +603,7 @@ def _reconnect_worker(self) -> None: self._close_serial_connection() self.is_connected = False continue + self.is_connected = True self._degraded = False self._degraded_reason = None logger.info("KISS modem serial reconnect successful on attempt %s", attempts) @@ -744,8 +748,8 @@ def configure_radio( Returns: True if configuration successful, False otherwise """ - if not self.is_connected: - logger.error("Cannot configure radio: not connected") + if not self.serial_conn or not self.serial_conn.is_open: + logger.error("Cannot configure radio: serial link not ready") return False try: @@ -1738,7 +1742,11 @@ def _process_received_frame(self): def _rx_worker(self): """Background thread for receiving data""" - while not self.stop_event.is_set() and self.is_connected: + while ( + not self.stop_event.is_set() + and self.serial_conn is not None + and self.serial_conn.is_open + ): try: if self.serial_conn and self.serial_conn.in_waiting > 0: data = self.serial_conn.read(self.serial_conn.in_waiting) @@ -1757,7 +1765,11 @@ def _rx_worker(self): def _tx_worker(self): """Background thread for sending data""" - while not self.stop_event.is_set() and self.is_connected: + while ( + not self.stop_event.is_set() + and self.serial_conn is not None + and self.serial_conn.is_open + ): try: if self.tx_buffer: frame = self.tx_buffer.popleft() diff --git a/tests/test_kiss_modem_wrapper.py b/tests/test_kiss_modem_wrapper.py index 0cf1218..edfb244 100644 --- a/tests/test_kiss_modem_wrapper.py +++ b/tests/test_kiss_modem_wrapper.py @@ -669,6 +669,7 @@ def mock_send_command(cmd, data=b"", timeout=5.0): modem._send_command = mock_send_command modem.is_connected = True + modem.serial_conn = MagicMock(is_open=True) result = modem.configure_radio() @@ -1334,6 +1335,7 @@ def mock_send_command(cmd, data=b"", timeout=5.0): modem._send_command = mock_send_command modem.is_connected = True + modem.serial_conn = MagicMock(is_open=True) modem.configure_radio() @@ -1358,6 +1360,7 @@ def mock_send_command(cmd, data=b"", timeout=5.0): modem._send_command = mock_send_command modem.is_connected = True + modem.serial_conn = MagicMock(is_open=True) modem.configure_radio() @@ -1382,6 +1385,7 @@ def mock_send_command(cmd, data=b"", timeout=5.0): modem._send_command = mock_send_command modem.is_connected = True + modem.serial_conn = MagicMock(is_open=True) modem.configure_radio() @@ -1669,4 +1673,52 @@ def test_connect_clears_reconnecting_gate_after_success(self): modem._run_post_connect_handshake = MagicMock(return_value=True) assert modem.connect() is True + assert modem.is_connected is True + assert modem._reconnecting_event.is_set() is False + + def test_connect_sets_connected_only_after_handshake_success(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem._open_serial_and_start_threads = MagicMock(return_value=True) + + def handshake() -> bool: + # is_connected should stay false until handshake fully succeeds. + assert modem.is_connected is False + return True + + modem._run_post_connect_handshake = MagicMock(side_effect=handshake) + + assert modem.connect() is True + assert modem.is_connected is True + + def test_connect_handshake_failure_leaves_disconnected(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem._open_serial_and_start_threads = MagicMock(return_value=True) + modem._run_post_connect_handshake = MagicMock(return_value=False) + modem._close_serial_connection = MagicMock() + + assert modem.connect() is False + assert modem.is_connected is False + modem._close_serial_connection.assert_called_once() + + def test_reconnect_sets_connected_only_after_handshake_success(self): + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem._reconnecting_event.set() + modem._degraded = True + modem._degraded_reason = "test failure" + modem._reconnect_base_delay_s = 0.0 + modem._reconnect_max_delay_s = 0.0 + modem._open_serial_and_start_threads = MagicMock(return_value=True) + + def reconnect_handshake() -> bool: + assert modem.is_connected is False + return True + + modem._run_post_connect_handshake = MagicMock(side_effect=reconnect_handshake) + modem._stop_io_threads = MagicMock() + + with patch("pymc_core.hardware.kiss_modem_wrapper.time.sleep", return_value=None): + modem._reconnect_worker() + + assert modem.is_connected is True + assert modem._degraded is False assert modem._reconnecting_event.is_set() is False From 95f85d87a644334093296c637f7672f73400797f Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 15 May 2026 19:53:14 -0700 Subject: [PATCH 6/7] Update versioning to reflect development stage - Changed the version from "1.0.10-agdev" to "1.0.10.dev1" in both pyproject.toml and __init__.py to indicate a new development iteration. --- pyproject.toml | 2 +- src/pymc_core/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index abf2d82..8d70175 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pymc_core" -version = "1.0.10-agdev" +version = "1.0.10.dev1" authors = [ {name = "Lloyd Newton", email = "lloyd@rightup.co.uk"}, ] diff --git a/src/pymc_core/__init__.py b/src/pymc_core/__init__.py index 66d9f52..8021d95 100644 --- a/src/pymc_core/__init__.py +++ b/src/pymc_core/__init__.py @@ -3,7 +3,7 @@ Clean, simple API for building mesh network applications. """ -__version__ = "1.0.10-agdev" +__version__ = "1.0.10.dev1" # Core mesh functionality from .node.node import MeshNode From a2d70d438996a90824de5598a97eafedd1f41196 Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 15 May 2026 20:34:51 -0700 Subject: [PATCH 7/7] feat(kiss modem): enhance post-connect configuration and retry logic - Introduced new parameters for post-connect settling and configuration retries to improve connection robustness. - Implemented retry logic for the `configure_radio` method, allowing for transient failures during configuration attempts. - Added tests to validate the new retry behavior and ensure correct connection state management during configuration. --- src/pymc_core/hardware/kiss_modem_wrapper.py | 58 ++++++++++++++- tests/test_kiss_modem_wrapper.py | 77 ++++++++++++++++++++ 2 files changed, 132 insertions(+), 3 deletions(-) diff --git a/src/pymc_core/hardware/kiss_modem_wrapper.py b/src/pymc_core/hardware/kiss_modem_wrapper.py index 3214847..f1afcdd 100644 --- a/src/pymc_core/hardware/kiss_modem_wrapper.py +++ b/src/pymc_core/hardware/kiss_modem_wrapper.py @@ -193,6 +193,9 @@ def _invoke_rx_callback( DEFAULT_BAUDRATE = 115200 DEFAULT_TIMEOUT = 1.0 RESPONSE_TIMEOUT = 5.0 # Timeout for command responses +POST_CONNECT_SETTLE_SECONDS = 0.75 +POST_CONNECT_CONFIGURE_RETRIES = 2 +POST_CONNECT_CONFIGURE_RETRY_BACKOFF_SECONDS = 0.25 logger = logging.getLogger("KissModemWrapper") @@ -312,6 +315,33 @@ def __init__( self.radio_config.get("reconnect_max_delay_seconds", 15.0) ) self._reconnect_max_attempts = int(self.radio_config.get("reconnect_max_attempts", 0)) + self._post_connect_settle_s = max( + 0.0, + float( + self.radio_config.get( + "post_connect_settle_seconds", + POST_CONNECT_SETTLE_SECONDS, + ) + ), + ) + self._post_connect_configure_retries = max( + 0, + int( + self.radio_config.get( + "post_connect_configure_retries", + POST_CONNECT_CONFIGURE_RETRIES, + ) + ), + ) + self._post_connect_configure_retry_backoff_s = max( + 0.0, + float( + self.radio_config.get( + "post_connect_configure_retry_backoff_seconds", + POST_CONNECT_CONFIGURE_RETRY_BACKOFF_SECONDS, + ) + ), + ) # Callbacks self.on_frame_received = on_frame_received @@ -456,11 +486,33 @@ def _open_serial_and_start_threads(self) -> bool: def _run_post_connect_handshake(self) -> bool: """Run modem setup steps after serial open.""" + if self._post_connect_settle_s > 0: + logger.debug( + "Post-connect settle delay %.2fs before SetHardware handshake", + self._post_connect_settle_s, + ) + time.sleep(self._post_connect_settle_s) + # Auto-configure if requested if self.auto_configure and self.radio_config: - if not self.configure_radio(): - logger.warning("Auto-configuration failed") - return False + total_attempts = 1 + self._post_connect_configure_retries + for attempt in range(1, total_attempts + 1): + if self.configure_radio(): + break + + if attempt >= total_attempts: + logger.warning("Auto-configuration failed after %s attempts", total_attempts) + return False + + retry_delay = self._post_connect_configure_retry_backoff_s * attempt + logger.warning( + "Auto-configuration attempt %s/%s failed; retrying in %.2fs", + attempt, + total_attempts, + retry_delay, + ) + if retry_delay > 0: + time.sleep(retry_delay) # Query modem info self._query_modem_info() diff --git a/tests/test_kiss_modem_wrapper.py b/tests/test_kiss_modem_wrapper.py index edfb244..e438dea 100644 --- a/tests/test_kiss_modem_wrapper.py +++ b/tests/test_kiss_modem_wrapper.py @@ -1700,6 +1700,83 @@ def test_connect_handshake_failure_leaves_disconnected(self): assert modem.is_connected is False modem._close_serial_connection.assert_called_once() + def test_connect_retries_transient_configure_failure_then_succeeds(self): + modem = KissModemWrapper( + port="/dev/null", + auto_configure=True, + radio_config={"frequency": 869618000}, + ) + modem._open_serial_and_start_threads = MagicMock(return_value=True) + modem._close_serial_connection = MagicMock() + modem._query_modem_info = MagicMock() + modem._set_kiss_tx_delay = MagicMock() + + connected_states = [] + + def transient_configure_failure() -> bool: + connected_states.append(modem.is_connected) + return len(connected_states) > 1 + + modem.configure_radio = MagicMock(side_effect=transient_configure_failure) + + with patch( + "pymc_core.hardware.kiss_modem_wrapper.time.sleep", return_value=None + ) as sleep_mock: + assert modem.connect() is True + + assert modem.configure_radio.call_count == 2 + assert connected_states == [False, False] + assert modem.is_connected is True + assert modem._close_serial_connection.call_count == 0 + assert len(sleep_mock.call_args_list) == 2 # settle + one retry backoff + + def test_connect_persistent_configure_failures_still_fail(self): + modem = KissModemWrapper( + port="/dev/null", + auto_configure=True, + radio_config={"frequency": 869618000}, + ) + modem._open_serial_and_start_threads = MagicMock(return_value=True) + modem._close_serial_connection = MagicMock() + modem._query_modem_info = MagicMock() + modem._set_kiss_tx_delay = MagicMock() + modem.configure_radio = MagicMock(return_value=False) + + with patch( + "pymc_core.hardware.kiss_modem_wrapper.time.sleep", return_value=None + ) as sleep_mock: + assert modem.connect() is False + + assert modem.is_connected is False + assert modem.configure_radio.call_count == (1 + modem._post_connect_configure_retries) + assert len(sleep_mock.call_args_list) == (1 + modem._post_connect_configure_retries) + modem._close_serial_connection.assert_called_once() + + def test_connect_sets_connected_only_after_retrying_handshake(self): + modem = KissModemWrapper( + port="/dev/null", + auto_configure=True, + radio_config={"frequency": 869618000}, + ) + modem._open_serial_and_start_threads = MagicMock(return_value=True) + modem._close_serial_connection = MagicMock() + modem._query_modem_info = MagicMock() + modem._set_kiss_tx_delay = MagicMock() + + observed_states = [] + + def configure_with_one_retry() -> bool: + observed_states.append(modem.is_connected) + return len(observed_states) >= 2 + + modem.configure_radio = MagicMock(side_effect=configure_with_one_retry) + + with patch("pymc_core.hardware.kiss_modem_wrapper.time.sleep", return_value=None): + assert modem.connect() is True + + assert observed_states == [False, False] + assert modem.is_connected is True + def test_reconnect_sets_connected_only_after_handshake_success(self): modem = KissModemWrapper(port="/dev/null", auto_configure=False) modem._reconnecting_event.set()