From b5ec32769e51ceafc846d338fc513221c66195e2 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 8 May 2026 11:36:19 +0200 Subject: [PATCH 1/8] fix(server): default push_always_retry to False, fixing a 1.9.x regression Defaulting to True changed the failure mode from "exhaust retries and release the global lock" to "hold the lock indefinitely", which freezes all commit processing when CF is unreachable. False restores sensible behaviour: the commit drops and the IOC retries on its next reconnect. --- server/demo.conf | 5 +++-- server/recceiver/cf/config.py | 4 ++-- server/tests/unit/cf/test_config.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/demo.conf b/server/demo.conf index 1a3da16..821cbd4 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -126,5 +126,6 @@ # Number of times to retry polling before giving up. Default is 10. #pushMaxRetries = 10 -# Whether to retry polling indefinitely until success. Default is True. -#pushAlwaysRetry = True +# Whether to retry polling indefinitely until success. Default is False. +# Enabling this holds the global CF commit lock until CF recovers; use with caution. +#pushAlwaysRetry = False diff --git a/server/recceiver/cf/config.py b/server/recceiver/cf/config.py index ab6c332..3c19b8c 100644 --- a/server/recceiver/cf/config.py +++ b/server/recceiver/cf/config.py @@ -30,7 +30,7 @@ class CFConfig: cf_password: Optional[str] = None verify_ssl: Optional[bool] = None push_max_retries: int = 10 - push_always_retry: bool = True + push_always_retry: bool = False @classmethod def loads(cls, conf: ConfigAdapter) -> "CFConfig": @@ -53,7 +53,7 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig": cf_password=conf.get("cfPassword"), verify_ssl=conf.getboolean("verifySSL"), push_max_retries=conf.getint("pushMaxRetries", 10), - push_always_retry=conf.getboolean("pushAlwaysRetry", True), + push_always_retry=conf.getboolean("pushAlwaysRetry", False), ) def __repr__(self) -> str: diff --git a/server/tests/unit/cf/test_config.py b/server/tests/unit/cf/test_config.py index af2da40..df88c2f 100644 --- a/server/tests/unit/cf/test_config.py +++ b/server/tests/unit/cf/test_config.py @@ -26,7 +26,7 @@ def test_push_max_retries_from_env(self): def test_default_push_always_retry(self): adapter = make_adapter() config = CFConfig.loads(adapter) - assert config.push_always_retry is True + assert config.push_always_retry is False def test_alias_disabled_by_default(self): adapter = make_adapter() From 474a05b5b1bbc178bab75d87635bdd0ee4dd733b Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 8 May 2026 11:37:40 +0200 Subject: [PATCH 2/8] fix(server): run cleanOnStart and cleanOnStop sweeps off the reactor thread clean_service() blocks the reactor for the full duration of the sweep, holding the global lock and preventing all IOC commits. cleanOnStart now schedules the sweep as a background thread after startup, so commits are accepted immediately. cleanOnStop uses deferToThread so the reactor stays live during shutdown while the lock is still held. --- server/recceiver/cf/processor.py | 28 ++++++++++++++++-- server/tests/integration/docker_compose.py | 8 ++++++ server/tests/integration/test_single_ioc.py | 32 +++++++++++++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 1bf565d..f7af578 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -86,7 +86,10 @@ def _start_service_with_lock(self): raise else: if self.cf_config.clean_on_start: - self.clean_service() + _log.info("CF Clean: scheduling background startup sweep") + from twisted.internet import reactor + + reactor.callLater(0, self._start_background_clean) def _setup_cf_properties(self, cf_properties: Set[str]) -> None: """Compute required CF properties, register any missing ones, and cache state. @@ -141,9 +144,21 @@ def stopService(self): return self.lock.run(self._stop_service_with_lock) def _stop_service_with_lock(self): - if self.cf_config.clean_on_stop: - self.clean_service() + """Stop the CFProcessor service with lock held. + + If clean_on_stop is enabled, mark all channels as inactive. + The sweep runs in a background thread to avoid blocking the reactor. + The lock is held throughout, preventing new commits from interleaving. + """ _log.info("CF_STOP with lock") + if self.cf_config.clean_on_stop: + return deferToThread(self.clean_service) + + def _start_background_clean(self): + _log.info("CF Clean: background startup sweep beginning") + deferToThread(self.clean_service).addErrback( + lambda err: _log.error("CF Clean background sweep failed: %s", err) + ) # @defer.inlineCallbacks # Twisted v16 does not support cancellation! def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: @@ -339,6 +354,13 @@ def _commit_with_thread(self, transaction: CommitTransaction): _log.debug("Delete records: %s", records_to_delete) record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) + if not transaction.connected and ioc_info.id not in self.iocs: + _log.warning( + "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", + host, + port, + ) + return self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) poll_success = self._push_to_cf(record_info_by_name, records_to_delete, ioc_info) if not poll_success: diff --git a/server/tests/integration/docker_compose.py b/server/tests/integration/docker_compose.py index 3a38d1e..05aafbd 100644 --- a/server/tests/integration/docker_compose.py +++ b/server/tests/integration/docker_compose.py @@ -66,6 +66,14 @@ def shutdown_container(compose: DockerCompose, host_name: str) -> str: return container.ID +def kill_container(compose: DockerCompose, host_name: str) -> str: + """Send SIGKILL to a container, bypassing graceful shutdown hooks.""" + container = compose.get_container(host_name) + docker_client = DockerClient() + docker_client.containers.get(container.ID).kill() + return container.ID + + def start_container( compose: DockerCompose, host_name: Optional[str] = None, container_id: Optional[str] = None ) -> None: diff --git a/server/tests/integration/test_single_ioc.py b/server/tests/integration/test_single_ioc.py index 49e5514..5dacf81 100644 --- a/server/tests/integration/test_single_ioc.py +++ b/server/tests/integration/test_single_ioc.py @@ -19,6 +19,7 @@ from .docker_compose import ( ComposeFixtureFactory, clone_container, + kill_container, restart_container, shutdown_container, start_container, @@ -122,6 +123,37 @@ def test_status_property_works_between_cf_down( assert all(INACTIVE_PROPERTY in ch["properties"] for ch in channels_inactive) +class TestCleanStopRecceiver: + def test_clean_stop_marks_channels_inactive( + self, setup_compose: DockerCompose, cf_client: ChannelFinderClient + ) -> None: # noqa: F811 + shutdown_container(setup_compose, "recc1") + assert wait_for_sync( + cf_client, + lambda client: check_channel_property(client, DEFAULT_CHANNEL_NAME, INACTIVE_PROPERTY), + ) + channels_inactive = cf_client.find(property=[("iocName", "IOC1-1")]) + assert all(INACTIVE_PROPERTY in ch["properties"] for ch in channels_inactive) + + +class TestCleanStartRecceiver: + def test_startup_sweep_marks_stale_channels_inactive( + self, setup_compose: DockerCompose, cf_client: ChannelFinderClient + ) -> None: # noqa: F811 + # Kill recceiver hard — cleanOnStop does NOT run, channels stay Active in CF + recc1_id = kill_container(setup_compose, "recc1") + # Stop IOC so it cannot reconnect when the recceiver comes back + shutdown_container(setup_compose, "ioc1-1") + # Start recceiver — cleanOnStart sweep should mark the stale channels Inactive + start_container(setup_compose, container_id=recc1_id) + assert wait_for_sync( + cf_client, + lambda client: check_channel_property(client, DEFAULT_CHANNEL_NAME, INACTIVE_PROPERTY), + ) + channels_inactive = cf_client.find(property=[("iocName", "IOC1-1")]) + assert all(INACTIVE_PROPERTY in ch["properties"] for ch in channels_inactive) + + class TestMoveIocHost: def test_move_ioc_host( self, From 90a4d4073c340636b66336984afd63a49c3ca929 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 8 May 2026 11:39:18 +0200 Subject: [PATCH 3/8] fix(server): improve warning messages for IOC connection problems A numeric iocName that matches the source port range means the iocid changes on every reconnect, silently accumulating stale channels in CF. Log a warning when this is detected so misconfigured reccasters can be found. Distinguishes disconnect-before-upload from update-without-initial to avoid the same warning firing twice for the same event. --- server/recceiver/cf/processor.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index f7af578..a2fbebd 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -323,7 +323,15 @@ def _commit_with_thread(self, transaction: CommitTransaction): ioc_name = transaction.client_infos.get("IOCNAME") if not ioc_name: ioc_name = str(port) - _log.debug("IOC at %s:%d did not send IOCNAME; using port as iocName", host, port) + _log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port) + if ioc_name.isdigit() and 1024 <= int(ioc_name) <= 65535: + _log.warning( + "IOC at %s has numeric iocName '%s' (looks like an ephemeral port) — " + "iocid will change on every reconnect, causing stale channels in CF; " + "configure a stable iocName via reccaster", + host, + ioc_name, + ) owner = ( transaction.client_infos.get(self.cf_config.env_owner_variable) @@ -456,9 +464,10 @@ def _update_channelfinder( new_channels = set(record_info_by_name.keys()) iocid = ioc_info.id - if iocid not in self.iocs: + if iocid not in self.iocs and record_info_by_name: + # Disconnect-before-upload is already logged in _commit_with_thread. _log.warning( - "IOC %s did not send an initial transaction to join IOC list (%d IOCs known)", + "IOC %s committed update without prior initial transaction (%d IOCs known)", ioc_info, len(self.iocs), ) From fa14a127df723968203ab4dc10ade7c6346e79f2 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 8 May 2026 11:41:46 +0200 Subject: [PATCH 4/8] feat(server): add periodic status logging for connections and CF state There is currently no runtime visibility into whether maxActive is throttling connections or how many channels the CF processor is tracking. A LoopingCall in RecService logs active/queued connections against the limit every 60 s; CFProcessor logs known_iocs and tracked_channels on the same interval. Both are configurable via statusInterval (0 disables). --- server/demo.conf | 3 +++ server/recceiver/application.py | 19 ++++++++++++++++++- server/recceiver/cf/config.py | 2 ++ server/recceiver/cf/processor.py | 12 +++++++++++- server/recceiver_full.conf | 6 ++++++ server/tests/unit/cf/test_config.py | 12 ++++++++++++ 6 files changed, 52 insertions(+), 2 deletions(-) diff --git a/server/demo.conf b/server/demo.conf index 821cbd4..6fe448f 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -34,6 +34,9 @@ # Time interval for sending recceiver advertisments #announceInterval = 15.0 +# Interval in seconds between periodic status log lines (0 to disable) +#statusInterval = 60.0 + # Idle Timeout for TCP connections. #tcptimeout = 15.0 diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 33007a1..3969c3b 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -4,7 +4,7 @@ import random from twisted.application import service -from twisted.internet import defer, pollreactor +from twisted.internet import defer, pollreactor, task from twisted.internet.error import CannotListenError from twisted.python import log, usage from zope.interface import implementer @@ -43,6 +43,7 @@ def __init__(self, config): self.reactor = reactor service.MultiService.__init__(self) + self._statusLoop = None self.annperiod = float(config.get("announceInterval", "15.0")) self.tcptimeout = float(config.get("tcptimeout", "15.0")) self.commitperiod = float(config.get("commitInterval", "5.0")) @@ -52,6 +53,7 @@ def __init__(self, config): self.addrlist = [] self.port = int(portn or "0") + self.statusInterval = float(config.get("statusInterval", "60.0")) for addr in config.get("addrlist", "").split(","): if not addr: @@ -111,9 +113,24 @@ def privilegedStartService(self): # This will start up plugin Processors service.MultiService.privilegedStartService(self) + if self.statusInterval > 0: + self._statusLoop = task.LoopingCall(self._logStatus) + self._statusLoop.start(self.statusInterval, now=False) + + def _logStatus(self): + _log.info( + "status: connections active=%d/%d queued=%d", + self.tcpFactory.NActive, + self.tcpFactory.maxActive, + len(self.tcpFactory.Wait), + ) + def stopService(self): _log.info("Stopping RecService") + if self._statusLoop is not None and self._statusLoop.running: + self._statusLoop.stop() + # This will stop plugin Processors D2 = defer.maybeDeferred(service.MultiService.stopService, self) diff --git a/server/recceiver/cf/config.py b/server/recceiver/cf/config.py index 3c19b8c..36293ee 100644 --- a/server/recceiver/cf/config.py +++ b/server/recceiver/cf/config.py @@ -31,6 +31,7 @@ class CFConfig: verify_ssl: Optional[bool] = None push_max_retries: int = 10 push_always_retry: bool = False + status_interval: float = 60.0 @classmethod def loads(cls, conf: ConfigAdapter) -> "CFConfig": @@ -54,6 +55,7 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig": verify_ssl=conf.getboolean("verifySSL"), push_max_retries=conf.getint("pushMaxRetries", 10), push_always_retry=conf.getboolean("pushAlwaysRetry", False), + status_interval=float(conf.get("statusInterval", "60.0")), ) def __repr__(self) -> str: diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index a2fbebd..67cc5eb 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -7,7 +7,7 @@ from channelfinder import ChannelFinderClient from requests import ConnectionError, RequestException from twisted.application import service -from twisted.internet import defer +from twisted.internet import defer, task from twisted.internet.defer import DeferredLock from twisted.internet.threads import deferToThread from zope.interface import implementer @@ -46,6 +46,7 @@ def __init__(self, name: Optional[str], conf: ConfigAdapter): self.client: Optional[ChannelFinderAdapter] = None self.current_time: Callable[[Optional[str]], str] = get_current_time self.lock: DeferredLock = DeferredLock() + self._statusLoop = None def startService(self): service.Service.startService(self) @@ -65,6 +66,13 @@ def startService(self): finally: self.lock.release() + if self.cf_config.status_interval > 0: + self._statusLoop = task.LoopingCall(self._logStatus) + self._statusLoop.start(self.cf_config.status_interval, now=False) + + def _logStatus(self): + _log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) + def _start_service_with_lock(self): _log.info("CF_START with configuration: %s", self.cf_config) @@ -140,6 +148,8 @@ def _setup_cf_properties(self, cf_properties: Set[str]) -> None: def stopService(self): _log.info("CF_STOP") + if self._statusLoop is not None and self._statusLoop.running: + self._statusLoop.stop() service.Service.stopService(self) return self.lock.run(self._stop_service_with_lock) diff --git a/server/recceiver_full.conf b/server/recceiver_full.conf index fbaf187..393be06 100644 --- a/server/recceiver_full.conf +++ b/server/recceiver_full.conf @@ -31,6 +31,9 @@ procs = cf, show, db:lite # Time interval for sending recceiver advertisments announceInterval = 15.0 +# Interval in seconds between periodic status log lines (0 to disable) +#statusInterval = 60.0 + # Idle Timeout for TCP connections. tcptimeout = 15.0 @@ -123,3 +126,6 @@ pushMaxRetries = 10 # Whether to retry polling indefinitely until success. Default is True. pushAlwaysRetry = False + +# Interval in seconds between periodic CF status log lines (0 to disable) +#statusInterval = 60.0 diff --git a/server/tests/unit/cf/test_config.py b/server/tests/unit/cf/test_config.py index df88c2f..1ae3b3b 100644 --- a/server/tests/unit/cf/test_config.py +++ b/server/tests/unit/cf/test_config.py @@ -1,3 +1,5 @@ +import pytest + from recceiver.cf.config import CFConfig from tests.unit.conftest import make_adapter @@ -37,3 +39,13 @@ def test_alias_enabled_from_config(self): adapter = make_adapter(values={"alias": "true"}) config = CFConfig.loads(adapter) assert config.alias_enabled is True + + def test_default_status_interval(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert config.status_interval == pytest.approx(60.0) + + def test_status_interval_from_config(self): + adapter = make_adapter(values={"statusinterval": "120.0"}) + config = CFConfig.loads(adapter) + assert config.status_interval == pytest.approx(120.0) From 56dfed948c421ca1fc12d95f958e5081fec53f46 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 8 May 2026 11:43:03 +0200 Subject: [PATCH 5/8] feat(server): log CF push duration and stop retrying after service stop Without timing, slow CF commits are invisible until they cause a backlog. Per-attempt duration is now logged so latency regressions show up in the log stream. push_to_cf also checks processor.running on each iteration so a service stop during a retry loop drains immediately rather than waiting up to 60 s per attempt. --- server/recceiver/cf/processor.py | 17 +++++++++++----- server/tests/unit/cf/test_processor.py | 28 ++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 67cc5eb..33c3abf 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -445,21 +445,28 @@ def _push_to_cf( records_to_delete: List[str], ioc_info: IOCInfo, ) -> bool: - _log.info("Pushing updates for %s begins...", ioc_info) + _log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) count = 0 sleep = 1.0 while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: + if not self.running: + _log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) + return False count += 1 + t0 = time.monotonic() try: self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info) + elapsed = time.monotonic() - t0 + _log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) return True - except RequestException as e: - _log.exception("ChannelFinder update failed: %s", e) + except RequestException: + elapsed = time.monotonic() - t0 + _log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) retry_seconds = min(60, sleep) - _log.info("ChannelFinder update retry in %s seconds", retry_seconds) + _log.info("CF push retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 - _log.error("Pushing updates for %s complete, failed after %d attempts", ioc_info, count) + _log.error("CF push gave up after %d attempts: %s", count, ioc_info) return False def _update_channelfinder( diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py index 209f89f..ee103ba 100644 --- a/server/tests/unit/cf/test_processor.py +++ b/server/tests/unit/cf/test_processor.py @@ -1,3 +1,8 @@ +import time + +import pytest +from requests import RequestException + from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo from recceiver.cf.processor import CFProcessor from tests.unit.cf.conftest import DEFAULT_RECCEIVER_ID, make_channel, make_ioc @@ -115,3 +120,26 @@ def test_orphans_channel_absent_from_local_state(self): status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) assert status.value == PVStatus.INACTIVE.value + + +class TestPushToCF: + def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): + monkeypatch.setattr(time, "sleep", lambda _: None) + + processor = make_processor() + processor.running = True + processor.cf_config.push_always_retry = True + + call_count = 0 + + def failing_update(record_info_by_name, records_to_delete, ioc_info): + nonlocal call_count + call_count += 1 + processor.running = False + raise RequestException("CF unreachable") + + monkeypatch.setattr(processor, "_update_channelfinder", failing_update) + result = processor._push_to_cf({}, [], make_ioc()) + + assert result is False + assert call_count == 1 From b907b468387a00ec1e79b2d841c9cf49578a9e51 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Fri, 8 May 2026 11:52:56 +0200 Subject: [PATCH 6/8] feat(server): add optional Prometheus metrics endpoint Exposes connection state, CF processor state, and CF push performance as Prometheus metrics on a configurable HTTP port (metricsPort, disabled by default). Requires the optional prometheus-client dependency (pip install recceiver[metrics]). Gracefully degrades to no-ops if prometheus-client is not installed so the dependency is truly optional. --- server/demo.conf | 4 ++ server/pyproject.toml | 1 + server/recceiver/application.py | 13 ++++ server/recceiver/cf/processor.py | 11 +++- server/recceiver/metrics.py | 89 ++++++++++++++++++++++++++ server/recceiver_full.conf | 4 ++ server/tests/unit/cf/test_processor.py | 1 - server/tests/unit/test_metrics.py | 47 ++++++++++++++ 8 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 server/recceiver/metrics.py create mode 100644 server/tests/unit/test_metrics.py diff --git a/server/demo.conf b/server/demo.conf index 6fe448f..3dea772 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -37,6 +37,10 @@ # Interval in seconds between periodic status log lines (0 to disable) #statusInterval = 60.0 +# TCP port to expose Prometheus metrics on (0 or absent to disable). +# Requires prometheus_client: pip install recceiver[metrics] +#metricsPort = 0 + # Idle Timeout for TCP connections. #tcptimeout = 15.0 diff --git a/server/pyproject.toml b/server/pyproject.toml index b604b6c..c3406c3 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "twisted>=22.10,<23; python_version<'3.8'", "twisted>=24.11,<24.12; python_version>='3.8'", ] +optional-dependencies.metrics = [ "prometheus-client" ] optional-dependencies.test = [ "pytest>=8.3,<8.4", "pytest-cov>=6,<7", "testcontainers>=4.8.2,<4.9" ] urls.Repository = "https://github.com/ChannelFinder/recsync" diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 3969c3b..6fded6b 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -11,6 +11,7 @@ from twisted import plugin +from . import metrics from .announce import Announcer from .processors import ProcessorController from .recast import CastFactory @@ -54,6 +55,7 @@ def __init__(self, config): self.port = int(portn or "0") self.statusInterval = float(config.get("statusInterval", "60.0")) + self.metricsPort = int(config.get("metricsPort", "0")) for addr in config.get("addrlist", "").split(","): if not addr: @@ -113,11 +115,22 @@ def privilegedStartService(self): # This will start up plugin Processors service.MultiService.privilegedStartService(self) + if self.metricsPort > 0: + if metrics.available: + self.reactor.listenTCP(self.metricsPort, metrics.make_site(), interface=self.bind) + _log.info("Prometheus metrics available on port %d", self.metricsPort) + else: + _log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled") + + metrics.connections_limit.set(self.tcpFactory.maxActive) + if self.statusInterval > 0: self._statusLoop = task.LoopingCall(self._logStatus) self._statusLoop.start(self.statusInterval, now=False) def _logStatus(self): + metrics.connections_active.set(self.tcpFactory.NActive) + metrics.connections_waiting.set(len(self.tcpFactory.Wait)) _log.info( "status: connections active=%d/%d queued=%d", self.tcpFactory.NActive, diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 33c3abf..94046f7 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -12,7 +12,7 @@ from twisted.internet.threads import deferToThread from zope.interface import implementer -from recceiver import interfaces +from recceiver import interfaces, metrics from recceiver.cf.adapter import ChannelFinderAdapter, PyCFClientAdapter from recceiver.cf.config import CFConfig from recceiver.cf.model import ( @@ -71,6 +71,8 @@ def startService(self): self._statusLoop.start(self.cf_config.status_interval, now=False) def _logStatus(self): + metrics.known_iocs.set(len(self.iocs)) + metrics.tracked_channels.set(len(self.channel_ioc_ids)) _log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) def _start_service_with_lock(self): @@ -418,8 +420,8 @@ def clean_service(self) -> None: channels = self.get_active_channels(recceiverid) _log.info("CF Clean Completed") return - except RequestException as e: - _log.exception("Clean service failed: %s", e) + except RequestException: + _log.exception("Clean service failed") retry_seconds = min(60, sleep) _log.info("Clean service retry in %s seconds", retry_seconds) time.sleep(retry_seconds) @@ -457,6 +459,8 @@ def _push_to_cf( try: self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info) elapsed = time.monotonic() - t0 + metrics.cf_commit_duration_seconds.observe(elapsed) + metrics.cf_commits_total.labels(result="success").inc() _log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) return True except RequestException: @@ -466,6 +470,7 @@ def _push_to_cf( _log.info("CF push retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 + metrics.cf_commits_total.labels(result="cancelled").inc() _log.error("CF push gave up after %d attempts: %s", count, ioc_info) return False diff --git a/server/recceiver/metrics.py b/server/recceiver/metrics.py new file mode 100644 index 0000000..ca5f2a3 --- /dev/null +++ b/server/recceiver/metrics.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- + +import logging + +_log = logging.getLogger(__name__) + +try: + from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Counter, Gauge, Histogram, generate_latest + from twisted.web.resource import Resource + from twisted.web.server import Site + + _registry = CollectorRegistry(auto_describe=True) + + connections_active = Gauge( + "recceiver_connections_active", + "Active uploading IOC connections", + registry=_registry, + ) + connections_waiting = Gauge( + "recceiver_connections_waiting", + "IOC connections waiting for an upload slot", + registry=_registry, + ) + connections_limit = Gauge( + "recceiver_connections_limit", + "Maximum concurrent active connections (maxActive)", + registry=_registry, + ) + known_iocs = Gauge( + "recceiver_known_iocs", + "IOCs with channels currently registered in CF", + registry=_registry, + ) + tracked_channels = Gauge( + "recceiver_tracked_channels", + "Unique channel names tracked by the CF processor", + registry=_registry, + ) + cf_commits_total = Counter( + "recceiver_cf_commits_total", + "CF push attempts by result", + ["result"], + registry=_registry, + ) + cf_commit_duration_seconds = Histogram( + "recceiver_cf_commit_duration_seconds", + "CF push duration in seconds", + buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], + registry=_registry, + ) + + class _MetricsResource(Resource): + isLeaf = True + + def render_GET(self, request): + request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode()) + return generate_latest(_registry) + + def make_site(): + return Site(_MetricsResource()) + + available = True + +except ImportError: + available = False + + class _Noop: + def set(self, v=None): + pass # no-op: prometheus_client not installed + + def inc(self, v=1): + pass # no-op: prometheus_client not installed + + def labels(self, **_kw): + return self + + def observe(self, v): + pass # no-op: prometheus_client not installed + + connections_active = _Noop() + connections_waiting = _Noop() + connections_limit = _Noop() + known_iocs = _Noop() + tracked_channels = _Noop() + cf_commits_total = _Noop() + cf_commit_duration_seconds = _Noop() + + def make_site(): + raise RuntimeError("prometheus_client is not installed") diff --git a/server/recceiver_full.conf b/server/recceiver_full.conf index 393be06..5c62f47 100644 --- a/server/recceiver_full.conf +++ b/server/recceiver_full.conf @@ -45,6 +45,10 @@ commitInterval = 5.0 # to allow. maxActive = 20 +# TCP port to expose Prometheus metrics on (0 or absent to disable). +# Requires prometheus_client: pip install recceiver[metrics] +#metricsPort = 0 + [lite] # example of "db" plugin config # Database access module diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py index ee103ba..7e3643f 100644 --- a/server/tests/unit/cf/test_processor.py +++ b/server/tests/unit/cf/test_processor.py @@ -1,6 +1,5 @@ import time -import pytest from requests import RequestException from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo diff --git a/server/tests/unit/test_metrics.py b/server/tests/unit/test_metrics.py new file mode 100644 index 0000000..76a9526 --- /dev/null +++ b/server/tests/unit/test_metrics.py @@ -0,0 +1,47 @@ +import pytest + +pytest.importorskip("prometheus_client") + +from prometheus_client import CONTENT_TYPE_LATEST # noqa: E402 + +from recceiver import metrics # noqa: E402 + + +class _MockRequest: + def __init__(self): + self.headers = {} + + def setHeader(self, name, value): + self.headers[name] = value + + +class TestMetricsAvailable: + def test_available_flag_is_true(self): + assert metrics.available is True + + def test_make_site_returns_twisted_site(self): + from twisted.web.server import Site + + assert isinstance(metrics.make_site(), Site) + + +class TestMetricsEndpoint: + def test_render_get_sets_prometheus_content_type(self): + request = _MockRequest() + metrics._MetricsResource().render_GET(request) + assert request.headers.get(b"Content-Type") == CONTENT_TYPE_LATEST.encode() + + def test_render_get_returns_expected_metric_names(self): + request = _MockRequest() + body = metrics._MetricsResource().render_GET(request) + assert isinstance(body, bytes) + for name in ( + b"recceiver_connections_active", + b"recceiver_connections_waiting", + b"recceiver_connections_limit", + b"recceiver_known_iocs", + b"recceiver_tracked_channels", + b"recceiver_cf_commits_total", + b"recceiver_cf_commit_duration_seconds", + ): + assert name in body, f"{name!r} not found in metrics output" From b7556a3a1232a0707c31507815e02406cc9505a6 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Tue, 12 May 2026 07:56:00 +0200 Subject: [PATCH 7/8] test(server): use DummyRequest in metrics test to fix naming convention --- server/tests/unit/test_metrics.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/server/tests/unit/test_metrics.py b/server/tests/unit/test_metrics.py index 76a9526..73cfb18 100644 --- a/server/tests/unit/test_metrics.py +++ b/server/tests/unit/test_metrics.py @@ -3,18 +3,11 @@ pytest.importorskip("prometheus_client") from prometheus_client import CONTENT_TYPE_LATEST # noqa: E402 +from twisted.web.test.requesthelper import DummyRequest # noqa: E402 from recceiver import metrics # noqa: E402 -class _MockRequest: - def __init__(self): - self.headers = {} - - def setHeader(self, name, value): - self.headers[name] = value - - class TestMetricsAvailable: def test_available_flag_is_true(self): assert metrics.available is True @@ -27,12 +20,12 @@ def test_make_site_returns_twisted_site(self): class TestMetricsEndpoint: def test_render_get_sets_prometheus_content_type(self): - request = _MockRequest() + request = DummyRequest([b"/metrics"]) metrics._MetricsResource().render_GET(request) - assert request.headers.get(b"Content-Type") == CONTENT_TYPE_LATEST.encode() + assert request.responseHeaders.getRawHeaders(b"Content-Type") == [CONTENT_TYPE_LATEST.encode()] def test_render_get_returns_expected_metric_names(self): - request = _MockRequest() + request = DummyRequest([b"/metrics"]) body = metrics._MetricsResource().render_GET(request) assert isinstance(body, bytes) for name in ( From afcd4ea6553819b65114445a056667f820a30242 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Tue, 12 May 2026 08:30:13 +0200 Subject: [PATCH 8/8] fix(server): preserve client_infos across intermediate transaction flushes When an IOC upload exceeds trlimit and flush() splits it mid-stream, the replacement transaction was created with empty client_infos. Subsequent batches then fell back to the source port as iocName, writing the wrong iocName property to CF for any channels not yet seen in that session. Copy client_infos forward so all transactions in a session share the same IOC-level environment variables. --- server/recceiver/recast.py | 1 + server/tests/unit/test_recast.py | 44 ++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 server/tests/unit/test_recast.py diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index b12601d..af6f0a0 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -278,6 +278,7 @@ def flush(self): return transaction, self.transaction = self.transaction, Transaction(self.ep, id(self)) + self.transaction.client_infos = dict(transaction.client_infos) self.dirty = False def commit(_ignored): diff --git a/server/tests/unit/test_recast.py b/server/tests/unit/test_recast.py new file mode 100644 index 0000000..026a323 --- /dev/null +++ b/server/tests/unit/test_recast.py @@ -0,0 +1,44 @@ +from unittest.mock import MagicMock + +from twisted.internet.address import IPv4Address + +from recceiver.recast import CollectionSession + + +def _make_session() -> CollectionSession: + ep = IPv4Address("TCP", "1.2.3.4", 5678) + session = CollectionSession(MagicMock(), ep) + session.factory = MagicMock() + session.factory.commit.return_value = None + return session + + +class TestCollectionSessionFlush: + def test_client_infos_carried_forward_after_intermediate_flush(self): + session = _make_session() + session.ioc_info("IOCNAME", "MY-IOC") + session.ioc_info("HOSTNAME", "myhost") + + session.flush() + + assert session.transaction.client_infos == {"IOCNAME": "MY-IOC", "HOSTNAME": "myhost"} + + def test_replacement_transaction_is_not_initial(self): + session = _make_session() + session.ioc_info("IOCNAME", "MY-IOC") + assert session.transaction.initial is True + + session.flush() + + assert session.transaction.initial is False + + def test_flush_triggered_by_trlimit_carries_forward_client_infos(self): + session = _make_session() + session.ioc_info("IOCNAME", "MY-IOC") + session.trlimit = 2 + + # add_record calls flush_safely, which triggers flush once trlimit is reached + session.add_record(1, "ai", "PV:1") + session.add_record(2, "ai", "PV:2") + + assert session.transaction.client_infos.get("IOCNAME") == "MY-IOC"