diff --git a/server/demo.conf b/server/demo.conf index 1a3da16..3dea772 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -34,6 +34,13 @@ # Time interval for sending recceiver advertisments #announceInterval = 15.0 +# Interval in seconds between periodic status log lines (0 to disable) +#statusInterval = 60.0 + +# TCP port to expose Prometheus metrics on (0 or absent to disable). +# Requires prometheus_client: pip install recceiver[metrics] +#metricsPort = 0 + # Idle Timeout for TCP connections. #tcptimeout = 15.0 @@ -126,5 +133,6 @@ # Number of times to retry polling before giving up. Default is 10. #pushMaxRetries = 10 -# Whether to retry polling indefinitely until success. Default is True. -#pushAlwaysRetry = True +# Whether to retry polling indefinitely until success. Default is False. +# Enabling this holds the global CF commit lock until CF recovers; use with caution. +#pushAlwaysRetry = False diff --git a/server/pyproject.toml b/server/pyproject.toml index b604b6c..c3406c3 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "twisted>=22.10,<23; python_version<'3.8'", "twisted>=24.11,<24.12; python_version>='3.8'", ] +optional-dependencies.metrics = [ "prometheus-client" ] optional-dependencies.test = [ "pytest>=8.3,<8.4", "pytest-cov>=6,<7", "testcontainers>=4.8.2,<4.9" ] urls.Repository = "https://github.com/ChannelFinder/recsync" diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 33007a1..6fded6b 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -4,13 +4,14 @@ import random from twisted.application import service -from twisted.internet import defer, pollreactor +from twisted.internet import defer, pollreactor, task from twisted.internet.error import CannotListenError from twisted.python import log, usage from zope.interface import implementer from twisted import plugin +from . import metrics from .announce import Announcer from .processors import ProcessorController from .recast import CastFactory @@ -43,6 +44,7 @@ def __init__(self, config): self.reactor = reactor service.MultiService.__init__(self) + self._statusLoop = None self.annperiod = float(config.get("announceInterval", "15.0")) self.tcptimeout = float(config.get("tcptimeout", "15.0")) self.commitperiod = float(config.get("commitInterval", "5.0")) @@ -52,6 +54,8 @@ def __init__(self, config): self.addrlist = [] self.port = int(portn or "0") + self.statusInterval = float(config.get("statusInterval", "60.0")) + self.metricsPort = int(config.get("metricsPort", "0")) for addr in config.get("addrlist", "").split(","): if not addr: @@ -111,9 +115,35 @@ def privilegedStartService(self): # This will start up plugin Processors service.MultiService.privilegedStartService(self) + if self.metricsPort > 0: + if metrics.available: + self.reactor.listenTCP(self.metricsPort, metrics.make_site(), interface=self.bind) + _log.info("Prometheus metrics available on port %d", self.metricsPort) + else: + _log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled") + + metrics.connections_limit.set(self.tcpFactory.maxActive) + + if self.statusInterval > 0: + self._statusLoop = task.LoopingCall(self._logStatus) + self._statusLoop.start(self.statusInterval, now=False) + + def _logStatus(self): + metrics.connections_active.set(self.tcpFactory.NActive) + metrics.connections_waiting.set(len(self.tcpFactory.Wait)) + _log.info( + "status: connections active=%d/%d queued=%d", + self.tcpFactory.NActive, + self.tcpFactory.maxActive, + len(self.tcpFactory.Wait), + ) + def stopService(self): _log.info("Stopping RecService") + if self._statusLoop is not None and self._statusLoop.running: + self._statusLoop.stop() + # This will stop plugin Processors D2 = defer.maybeDeferred(service.MultiService.stopService, self) diff --git a/server/recceiver/cf/config.py b/server/recceiver/cf/config.py index ab6c332..36293ee 100644 --- a/server/recceiver/cf/config.py +++ b/server/recceiver/cf/config.py @@ -30,7 +30,8 @@ class CFConfig: cf_password: Optional[str] = None verify_ssl: Optional[bool] = None push_max_retries: int = 10 - push_always_retry: bool = True + push_always_retry: bool = False + status_interval: float = 60.0 @classmethod def loads(cls, conf: ConfigAdapter) -> "CFConfig": @@ -53,7 +54,8 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig": cf_password=conf.get("cfPassword"), verify_ssl=conf.getboolean("verifySSL"), push_max_retries=conf.getint("pushMaxRetries", 10), - push_always_retry=conf.getboolean("pushAlwaysRetry", True), + push_always_retry=conf.getboolean("pushAlwaysRetry", False), + status_interval=float(conf.get("statusInterval", "60.0")), ) def __repr__(self) -> str: diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 1bf565d..94046f7 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -7,12 +7,12 @@ from channelfinder import ChannelFinderClient from requests import ConnectionError, RequestException from twisted.application import service -from twisted.internet import defer +from twisted.internet import defer, task from twisted.internet.defer import DeferredLock from twisted.internet.threads import deferToThread from zope.interface import implementer -from recceiver import interfaces +from recceiver import interfaces, metrics from recceiver.cf.adapter import ChannelFinderAdapter, PyCFClientAdapter from recceiver.cf.config import CFConfig from recceiver.cf.model import ( @@ -46,6 +46,7 @@ def __init__(self, name: Optional[str], conf: ConfigAdapter): self.client: Optional[ChannelFinderAdapter] = None self.current_time: Callable[[Optional[str]], str] = get_current_time self.lock: DeferredLock = DeferredLock() + self._statusLoop = None def startService(self): service.Service.startService(self) @@ -65,6 +66,15 @@ def startService(self): finally: self.lock.release() + if self.cf_config.status_interval > 0: + self._statusLoop = task.LoopingCall(self._logStatus) + self._statusLoop.start(self.cf_config.status_interval, now=False) + + def _logStatus(self): + metrics.known_iocs.set(len(self.iocs)) + metrics.tracked_channels.set(len(self.channel_ioc_ids)) + _log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) + def _start_service_with_lock(self): _log.info("CF_START with configuration: %s", self.cf_config) @@ -86,7 +96,10 @@ def _start_service_with_lock(self): raise else: if self.cf_config.clean_on_start: - self.clean_service() + _log.info("CF Clean: scheduling background startup sweep") + from twisted.internet import reactor + + reactor.callLater(0, self._start_background_clean) def _setup_cf_properties(self, cf_properties: Set[str]) -> None: """Compute required CF properties, register any missing ones, and cache state. @@ -137,13 +150,27 @@ def _setup_cf_properties(self, cf_properties: Set[str]) -> None: def stopService(self): _log.info("CF_STOP") + if self._statusLoop is not None and self._statusLoop.running: + self._statusLoop.stop() service.Service.stopService(self) return self.lock.run(self._stop_service_with_lock) def _stop_service_with_lock(self): - if self.cf_config.clean_on_stop: - self.clean_service() + """Stop the CFProcessor service with lock held. + + If clean_on_stop is enabled, mark all channels as inactive. + The sweep runs in a background thread to avoid blocking the reactor. + The lock is held throughout, preventing new commits from interleaving. + """ _log.info("CF_STOP with lock") + if self.cf_config.clean_on_stop: + return deferToThread(self.clean_service) + + def _start_background_clean(self): + _log.info("CF Clean: background startup sweep beginning") + deferToThread(self.clean_service).addErrback( + lambda err: _log.error("CF Clean background sweep failed: %s", err) + ) # @defer.inlineCallbacks # Twisted v16 does not support cancellation! def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: @@ -308,7 +335,15 @@ def _commit_with_thread(self, transaction: CommitTransaction): ioc_name = transaction.client_infos.get("IOCNAME") if not ioc_name: ioc_name = str(port) - _log.debug("IOC at %s:%d did not send IOCNAME; using port as iocName", host, port) + _log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port) + if ioc_name.isdigit() and 1024 <= int(ioc_name) <= 65535: + _log.warning( + "IOC at %s has numeric iocName '%s' (looks like an ephemeral port) — " + "iocid will change on every reconnect, causing stale channels in CF; " + "configure a stable iocName via reccaster", + host, + ioc_name, + ) owner = ( transaction.client_infos.get(self.cf_config.env_owner_variable) @@ -339,6 +374,13 @@ def _commit_with_thread(self, transaction: CommitTransaction): _log.debug("Delete records: %s", records_to_delete) record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) + if not transaction.connected and ioc_info.id not in self.iocs: + _log.warning( + "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", + host, + port, + ) + return self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) poll_success = self._push_to_cf(record_info_by_name, records_to_delete, ioc_info) if not poll_success: @@ -378,8 +420,8 @@ def clean_service(self) -> None: channels = self.get_active_channels(recceiverid) _log.info("CF Clean Completed") return - except RequestException as e: - _log.exception("Clean service failed: %s", e) + except RequestException: + _log.exception("Clean service failed") retry_seconds = min(60, sleep) _log.info("Clean service retry in %s seconds", retry_seconds) time.sleep(retry_seconds) @@ -405,21 +447,31 @@ def _push_to_cf( records_to_delete: List[str], ioc_info: IOCInfo, ) -> bool: - _log.info("Pushing updates for %s begins...", ioc_info) + _log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) count = 0 sleep = 1.0 while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: + if not self.running: + _log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) + return False count += 1 + t0 = time.monotonic() try: self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info) + elapsed = time.monotonic() - t0 + metrics.cf_commit_duration_seconds.observe(elapsed) + metrics.cf_commits_total.labels(result="success").inc() + _log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) return True - except RequestException as e: - _log.exception("ChannelFinder update failed: %s", e) + except RequestException: + elapsed = time.monotonic() - t0 + _log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) retry_seconds = min(60, sleep) - _log.info("ChannelFinder update retry in %s seconds", retry_seconds) + _log.info("CF push retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 - _log.error("Pushing updates for %s complete, failed after %d attempts", ioc_info, count) + metrics.cf_commits_total.labels(result="cancelled").inc() + _log.error("CF push gave up after %d attempts: %s", count, ioc_info) return False def _update_channelfinder( @@ -434,9 +486,10 @@ def _update_channelfinder( new_channels = set(record_info_by_name.keys()) iocid = ioc_info.id - if iocid not in self.iocs: + if iocid not in self.iocs and record_info_by_name: + # Disconnect-before-upload is already logged in _commit_with_thread. _log.warning( - "IOC %s did not send an initial transaction to join IOC list (%d IOCs known)", + "IOC %s committed update without prior initial transaction (%d IOCs known)", ioc_info, len(self.iocs), ) diff --git a/server/recceiver/metrics.py b/server/recceiver/metrics.py new file mode 100644 index 0000000..ca5f2a3 --- /dev/null +++ b/server/recceiver/metrics.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- + +import logging + +_log = logging.getLogger(__name__) + +try: + from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Counter, Gauge, Histogram, generate_latest + from twisted.web.resource import Resource + from twisted.web.server import Site + + _registry = CollectorRegistry(auto_describe=True) + + connections_active = Gauge( + "recceiver_connections_active", + "Active uploading IOC connections", + registry=_registry, + ) + connections_waiting = Gauge( + "recceiver_connections_waiting", + "IOC connections waiting for an upload slot", + registry=_registry, + ) + connections_limit = Gauge( + "recceiver_connections_limit", + "Maximum concurrent active connections (maxActive)", + registry=_registry, + ) + known_iocs = Gauge( + "recceiver_known_iocs", + "IOCs with channels currently registered in CF", + registry=_registry, + ) + tracked_channels = Gauge( + "recceiver_tracked_channels", + "Unique channel names tracked by the CF processor", + registry=_registry, + ) + cf_commits_total = Counter( + "recceiver_cf_commits_total", + "CF push attempts by result", + ["result"], + registry=_registry, + ) + cf_commit_duration_seconds = Histogram( + "recceiver_cf_commit_duration_seconds", + "CF push duration in seconds", + buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], + registry=_registry, + ) + + class _MetricsResource(Resource): + isLeaf = True + + def render_GET(self, request): + request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode()) + return generate_latest(_registry) + + def make_site(): + return Site(_MetricsResource()) + + available = True + +except ImportError: + available = False + + class _Noop: + def set(self, v=None): + pass # no-op: prometheus_client not installed + + def inc(self, v=1): + pass # no-op: prometheus_client not installed + + def labels(self, **_kw): + return self + + def observe(self, v): + pass # no-op: prometheus_client not installed + + connections_active = _Noop() + connections_waiting = _Noop() + connections_limit = _Noop() + known_iocs = _Noop() + tracked_channels = _Noop() + cf_commits_total = _Noop() + cf_commit_duration_seconds = _Noop() + + def make_site(): + raise RuntimeError("prometheus_client is not installed") diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index b12601d..af6f0a0 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -278,6 +278,7 @@ def flush(self): return transaction, self.transaction = self.transaction, Transaction(self.ep, id(self)) + self.transaction.client_infos = dict(transaction.client_infos) self.dirty = False def commit(_ignored): diff --git a/server/recceiver_full.conf b/server/recceiver_full.conf index fbaf187..5c62f47 100644 --- a/server/recceiver_full.conf +++ b/server/recceiver_full.conf @@ -31,6 +31,9 @@ procs = cf, show, db:lite # Time interval for sending recceiver advertisments announceInterval = 15.0 +# Interval in seconds between periodic status log lines (0 to disable) +#statusInterval = 60.0 + # Idle Timeout for TCP connections. tcptimeout = 15.0 @@ -42,6 +45,10 @@ commitInterval = 5.0 # to allow. maxActive = 20 +# TCP port to expose Prometheus metrics on (0 or absent to disable). +# Requires prometheus_client: pip install recceiver[metrics] +#metricsPort = 0 + [lite] # example of "db" plugin config # Database access module @@ -123,3 +130,6 @@ pushMaxRetries = 10 # Whether to retry polling indefinitely until success. Default is True. pushAlwaysRetry = False + +# Interval in seconds between periodic CF status log lines (0 to disable) +#statusInterval = 60.0 diff --git a/server/tests/integration/docker_compose.py b/server/tests/integration/docker_compose.py index 3a38d1e..05aafbd 100644 --- a/server/tests/integration/docker_compose.py +++ b/server/tests/integration/docker_compose.py @@ -66,6 +66,14 @@ def shutdown_container(compose: DockerCompose, host_name: str) -> str: return container.ID +def kill_container(compose: DockerCompose, host_name: str) -> str: + """Send SIGKILL to a container, bypassing graceful shutdown hooks.""" + container = compose.get_container(host_name) + docker_client = DockerClient() + docker_client.containers.get(container.ID).kill() + return container.ID + + def start_container( compose: DockerCompose, host_name: Optional[str] = None, container_id: Optional[str] = None ) -> None: diff --git a/server/tests/integration/test_single_ioc.py b/server/tests/integration/test_single_ioc.py index 49e5514..5dacf81 100644 --- a/server/tests/integration/test_single_ioc.py +++ b/server/tests/integration/test_single_ioc.py @@ -19,6 +19,7 @@ from .docker_compose import ( ComposeFixtureFactory, clone_container, + kill_container, restart_container, shutdown_container, start_container, @@ -122,6 +123,37 @@ def test_status_property_works_between_cf_down( assert all(INACTIVE_PROPERTY in ch["properties"] for ch in channels_inactive) +class TestCleanStopRecceiver: + def test_clean_stop_marks_channels_inactive( + self, setup_compose: DockerCompose, cf_client: ChannelFinderClient + ) -> None: # noqa: F811 + shutdown_container(setup_compose, "recc1") + assert wait_for_sync( + cf_client, + lambda client: check_channel_property(client, DEFAULT_CHANNEL_NAME, INACTIVE_PROPERTY), + ) + channels_inactive = cf_client.find(property=[("iocName", "IOC1-1")]) + assert all(INACTIVE_PROPERTY in ch["properties"] for ch in channels_inactive) + + +class TestCleanStartRecceiver: + def test_startup_sweep_marks_stale_channels_inactive( + self, setup_compose: DockerCompose, cf_client: ChannelFinderClient + ) -> None: # noqa: F811 + # Kill recceiver hard — cleanOnStop does NOT run, channels stay Active in CF + recc1_id = kill_container(setup_compose, "recc1") + # Stop IOC so it cannot reconnect when the recceiver comes back + shutdown_container(setup_compose, "ioc1-1") + # Start recceiver — cleanOnStart sweep should mark the stale channels Inactive + start_container(setup_compose, container_id=recc1_id) + assert wait_for_sync( + cf_client, + lambda client: check_channel_property(client, DEFAULT_CHANNEL_NAME, INACTIVE_PROPERTY), + ) + channels_inactive = cf_client.find(property=[("iocName", "IOC1-1")]) + assert all(INACTIVE_PROPERTY in ch["properties"] for ch in channels_inactive) + + class TestMoveIocHost: def test_move_ioc_host( self, diff --git a/server/tests/unit/cf/test_config.py b/server/tests/unit/cf/test_config.py index af2da40..1ae3b3b 100644 --- a/server/tests/unit/cf/test_config.py +++ b/server/tests/unit/cf/test_config.py @@ -1,3 +1,5 @@ +import pytest + from recceiver.cf.config import CFConfig from tests.unit.conftest import make_adapter @@ -26,7 +28,7 @@ def test_push_max_retries_from_env(self): def test_default_push_always_retry(self): adapter = make_adapter() config = CFConfig.loads(adapter) - assert config.push_always_retry is True + assert config.push_always_retry is False def test_alias_disabled_by_default(self): adapter = make_adapter() @@ -37,3 +39,13 @@ def test_alias_enabled_from_config(self): adapter = make_adapter(values={"alias": "true"}) config = CFConfig.loads(adapter) assert config.alias_enabled is True + + def test_default_status_interval(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert config.status_interval == pytest.approx(60.0) + + def test_status_interval_from_config(self): + adapter = make_adapter(values={"statusinterval": "120.0"}) + config = CFConfig.loads(adapter) + assert config.status_interval == pytest.approx(120.0) diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py index 209f89f..7e3643f 100644 --- a/server/tests/unit/cf/test_processor.py +++ b/server/tests/unit/cf/test_processor.py @@ -1,3 +1,7 @@ +import time + +from requests import RequestException + from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo from recceiver.cf.processor import CFProcessor from tests.unit.cf.conftest import DEFAULT_RECCEIVER_ID, make_channel, make_ioc @@ -115,3 +119,26 @@ def test_orphans_channel_absent_from_local_state(self): status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) assert status.value == PVStatus.INACTIVE.value + + +class TestPushToCF: + def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): + monkeypatch.setattr(time, "sleep", lambda _: None) + + processor = make_processor() + processor.running = True + processor.cf_config.push_always_retry = True + + call_count = 0 + + def failing_update(record_info_by_name, records_to_delete, ioc_info): + nonlocal call_count + call_count += 1 + processor.running = False + raise RequestException("CF unreachable") + + monkeypatch.setattr(processor, "_update_channelfinder", failing_update) + result = processor._push_to_cf({}, [], make_ioc()) + + assert result is False + assert call_count == 1 diff --git a/server/tests/unit/test_metrics.py b/server/tests/unit/test_metrics.py new file mode 100644 index 0000000..73cfb18 --- /dev/null +++ b/server/tests/unit/test_metrics.py @@ -0,0 +1,40 @@ +import pytest + +pytest.importorskip("prometheus_client") + +from prometheus_client import CONTENT_TYPE_LATEST # noqa: E402 +from twisted.web.test.requesthelper import DummyRequest # noqa: E402 + +from recceiver import metrics # noqa: E402 + + +class TestMetricsAvailable: + def test_available_flag_is_true(self): + assert metrics.available is True + + def test_make_site_returns_twisted_site(self): + from twisted.web.server import Site + + assert isinstance(metrics.make_site(), Site) + + +class TestMetricsEndpoint: + def test_render_get_sets_prometheus_content_type(self): + request = DummyRequest([b"/metrics"]) + metrics._MetricsResource().render_GET(request) + assert request.responseHeaders.getRawHeaders(b"Content-Type") == [CONTENT_TYPE_LATEST.encode()] + + def test_render_get_returns_expected_metric_names(self): + request = DummyRequest([b"/metrics"]) + body = metrics._MetricsResource().render_GET(request) + assert isinstance(body, bytes) + for name in ( + b"recceiver_connections_active", + b"recceiver_connections_waiting", + b"recceiver_connections_limit", + b"recceiver_known_iocs", + b"recceiver_tracked_channels", + b"recceiver_cf_commits_total", + b"recceiver_cf_commit_duration_seconds", + ): + assert name in body, f"{name!r} not found in metrics output" diff --git a/server/tests/unit/test_recast.py b/server/tests/unit/test_recast.py new file mode 100644 index 0000000..026a323 --- /dev/null +++ b/server/tests/unit/test_recast.py @@ -0,0 +1,44 @@ +from unittest.mock import MagicMock + +from twisted.internet.address import IPv4Address + +from recceiver.recast import CollectionSession + + +def _make_session() -> CollectionSession: + ep = IPv4Address("TCP", "1.2.3.4", 5678) + session = CollectionSession(MagicMock(), ep) + session.factory = MagicMock() + session.factory.commit.return_value = None + return session + + +class TestCollectionSessionFlush: + def test_client_infos_carried_forward_after_intermediate_flush(self): + session = _make_session() + session.ioc_info("IOCNAME", "MY-IOC") + session.ioc_info("HOSTNAME", "myhost") + + session.flush() + + assert session.transaction.client_infos == {"IOCNAME": "MY-IOC", "HOSTNAME": "myhost"} + + def test_replacement_transaction_is_not_initial(self): + session = _make_session() + session.ioc_info("IOCNAME", "MY-IOC") + assert session.transaction.initial is True + + session.flush() + + assert session.transaction.initial is False + + def test_flush_triggered_by_trlimit_carries_forward_client_infos(self): + session = _make_session() + session.ioc_info("IOCNAME", "MY-IOC") + session.trlimit = 2 + + # add_record calls flush_safely, which triggers flush once trlimit is reached + session.add_record(1, "ai", "PV:1") + session.add_record(2, "ai", "PV:2") + + assert session.transaction.client_infos.get("IOCNAME") == "MY-IOC"