diff --git a/.github/workflows/server.yml b/.github/workflows/server.yml index c43a8e7b..b7e1289e 100644 --- a/.github/workflows/server.yml +++ b/.github/workflows/server.yml @@ -68,7 +68,7 @@ jobs: - name: Test unit tests run: | set -o pipefail - pytest tests/unit -v 2>&1 | tee pytest-unit.log + pytest tests/unit -v --cov=recceiver --cov-report=xml:coverage.xml 2>&1 | tee pytest-unit.log - name: Upload test log if: always() uses: actions/upload-artifact@v4 @@ -76,6 +76,13 @@ jobs: name: pytest-unit-log path: server/pytest-unit.log retention-days: 14 + - name: Upload coverage report + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-xml + path: server/coverage.xml + retention-days: 14 test-integration: runs-on: ubuntu-latest diff --git a/server/pyproject.toml b/server/pyproject.toml index a478e4c1..b604b6c9 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -4,7 +4,7 @@ requires = [ "setuptools" ] [project] name = "recceiver" -version = "1.9.3" +version = "1.9.5" description = """\ recCeiver is a server component of the recsync protocol. It receives record updates from recsync clients (e.g., \ recCasters) and forwards them to a configurable backend such as ChannelFinder.\ @@ -36,11 +36,11 @@ dependencies = [ "twisted>=22.10,<23; python_version<'3.8'", "twisted>=24.11,<24.12; python_version>='3.8'", ] -optional-dependencies.test = [ "pytest>=8.3,<8.4", "testcontainers>=4.8.2,<4.9" ] +optional-dependencies.test = [ "pytest>=8.3,<8.4", "pytest-cov>=6,<7", "testcontainers>=4.8.2,<4.9" ] urls.Repository = "https://github.com/ChannelFinder/recsync" [tool.setuptools] -packages = [ "recceiver", "recceiver.protocol", "twisted.plugins" ] +packages = [ "recceiver", "recceiver.cf", "recceiver.protocol", "twisted.plugins" ] include-package-data = true package-data.twisted = [ "plugins/recceiver_plugin.py" ] diff --git a/server/recceiver/cf/__init__.py b/server/recceiver/cf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/recceiver/cf/adapter.py b/server/recceiver/cf/adapter.py new file mode 100644 index 00000000..3a1f8e23 --- /dev/null +++ b/server/recceiver/cf/adapter.py @@ -0,0 +1,102 @@ +from typing import List + +try: + from typing import Protocol +except ImportError: + from typing_extensions import Protocol # type: ignore[assignment] + +from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus + +# CF query URLs break above this length; names are pipe-joined and chunked to stay under it. +_CF_NAME_QUERY_LIMIT = 600 + + +class ChannelFinderAdapter(Protocol): + """Typed boundary between CFProcessor and the ChannelFinder HTTP client. + + All methods accept and return domain objects (CFChannel, CFProperty). + Dict serialisation is handled inside the implementation, not at callsites. + """ + + def find_by_ioc_id(self, iocid: str) -> List[CFChannel]: + """Return all channels registered under the given IOC ID.""" + ... + + def find_by_names(self, names: List[str]) -> List[CFChannel]: + """Return channels whose names are in the given list.""" + ... + + def find_active_for_recceiver(self, recceiverid: str) -> List[CFChannel]: + """Return all channels marked Active for the given recceiver.""" + ... + + def set_channels(self, channels: List[CFChannel]) -> None: + """Create or overwrite channels.""" + ... + + def update_property(self, prop: CFProperty, channel_names: List[str]) -> None: + """Update a single property value across the named channels.""" + ... + + def get_property_names(self) -> List[str]: + """Return the names of all property definitions registered in ChannelFinder.""" + ... + + def set_property(self, name: str, owner: str) -> None: + """Register a property definition if it does not already exist.""" + ... + + +class PyCFClientAdapter: + """Wraps pyCFClient's ChannelFinderClient to implement ChannelFinderAdapter.""" + + def __init__(self, client, size_limit: int = 0): + self._client = client + self._size_limit = size_limit + + def _find(self, args: List) -> List[CFChannel]: + if self._size_limit > 0: + args = args + [("~size", self._size_limit)] + return [CFChannel.from_dict(ch) for ch in self._client.findByArgs(args)] + + def find_by_ioc_id(self, iocid: str) -> List[CFChannel]: + return self._find([(CFPropertyName.IOC_ID.value, iocid)]) + + def find_by_names(self, names: List[str]) -> List[CFChannel]: + if not names: + return [] + chunks, buf = [], "" + for name in names: + if not buf: + buf = name + elif len(buf) + len(name) < _CF_NAME_QUERY_LIMIT: + buf = buf + "|" + name + else: + chunks.append(buf) + buf = name + if buf: + chunks.append(buf) + results = [] + for chunk in chunks: + results.extend(self._find([("~name", chunk)])) + return results + + def find_active_for_recceiver(self, recceiverid: str) -> List[CFChannel]: + return self._find( + [ + (CFPropertyName.PV_STATUS.value, PVStatus.ACTIVE.value), + (CFPropertyName.RECCEIVER_ID.value, recceiverid), + ] + ) + + def set_channels(self, channels: List[CFChannel]) -> None: + self._client.set(channels=[ch.as_dict() for ch in channels]) + + def update_property(self, prop: CFProperty, channel_names: List[str]) -> None: + self._client.update(property=prop.as_dict(), channelNames=channel_names) + + def get_property_names(self) -> List[str]: + return [p["name"] for p in self._client.getAllProperties()] + + def set_property(self, name: str, owner: str) -> None: + self._client.set(property={"name": name, "owner": owner}) diff --git a/server/recceiver/cf/config.py b/server/recceiver/cf/config.py new file mode 100644 index 00000000..ab6c3321 --- /dev/null +++ b/server/recceiver/cf/config.py @@ -0,0 +1,66 @@ +import socket +from dataclasses import dataclass, fields +from typing import Optional + +from recceiver.processors import ConfigAdapter + +RECCEIVERID_DEFAULT = socket.gethostname() +DEFAULT_QUERY_LIMIT = 10_000 + + +@dataclass +class CFConfig: + """Configuration options for the CF Processor.""" + + alias_enabled: bool = False + record_type_enabled: bool = False + environment_variables: str = "" + info_tags: str = "" + ioc_connection_info: bool = True + record_description_enabled: bool = False + clean_on_start: bool = True + clean_on_stop: bool = True + username: str = "cfstore" + env_owner_variable: str = "ENGINEER" + recceiver_id: str = RECCEIVERID_DEFAULT + timezone: Optional[str] = None + cf_query_limit: int = DEFAULT_QUERY_LIMIT + base_url: Optional[str] = None + cf_username: Optional[str] = None + cf_password: Optional[str] = None + verify_ssl: Optional[bool] = None + push_max_retries: int = 10 + push_always_retry: bool = True + + @classmethod + def loads(cls, conf: ConfigAdapter) -> "CFConfig": + """Load configuration from a ConfigAdapter instance.""" + return CFConfig( + alias_enabled=conf.getboolean("alias", False), + record_type_enabled=conf.getboolean("recordType", False), + environment_variables=conf.get("environment_vars", ""), + info_tags=conf.get("infotags", ""), + ioc_connection_info=conf.getboolean("iocConnectionInfo", True), + record_description_enabled=conf.getboolean("recordDesc", False), + clean_on_start=conf.getboolean("cleanOnStart", True), + clean_on_stop=conf.getboolean("cleanOnStop", True), + username=conf.get("username", "cfstore"), + recceiver_id=conf.get("recceiverId", RECCEIVERID_DEFAULT), + timezone=conf.get("timezone", ""), + cf_query_limit=conf.get("findSizeLimit", DEFAULT_QUERY_LIMIT), + base_url=conf.get("baseUrl"), + cf_username=conf.get("cfUsername"), + cf_password=conf.get("cfPassword"), + verify_ssl=conf.getboolean("verifySSL"), + push_max_retries=conf.getint("pushMaxRetries", 10), + push_always_retry=conf.getboolean("pushAlwaysRetry", True), + ) + + def __repr__(self) -> str: + parts = [] + for f in fields(self): + value = getattr(self, f.name) + if f.name == "cf_password": + value = "***" if value else None + parts.append(f"{f.name}={value!r}") + return f"CFConfig({', '.join(parts)})" diff --git a/server/recceiver/cf/model.py b/server/recceiver/cf/model.py new file mode 100644 index 00000000..8dff6d26 --- /dev/null +++ b/server/recceiver/cf/model.py @@ -0,0 +1,111 @@ +import enum +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +class PVStatus(enum.Enum): + """Active/Inactive status values as used in the pvStatus CF property.""" + + ACTIVE = "Active" + INACTIVE = "Inactive" + + +class CFPropertyName(enum.Enum): + """Canonical property names registered and managed in Channelfinder.""" + + HOSTNAME = "hostName" + IOC_NAME = "iocName" + IOC_ID = "iocid" + IOC_IP = "iocIP" + PV_STATUS = "pvStatus" + TIME = "time" + RECCEIVER_ID = "recceiverID" + ALIAS = "alias" + RECORD_TYPE = "recordType" + RECORD_DESC = "recordDesc" + CA_PORT = "caPort" + PVA_PORT = "pvaPort" + + +@dataclass +class CFProperty: + """A single named property attached to a Channelfinder channel.""" + + name: str + owner: str + value: Optional[str] = None + + def as_dict(self) -> Dict[str, str]: + """Serialise to the dict shape expected by pyCFClient.""" + return {"name": self.name, "owner": self.owner, "value": self.value or ""} + + @classmethod + def from_dict(cls, prop_dict: Dict[str, str]) -> "CFProperty": + """Deserialise from the dict shape returned by pyCFClient.""" + return cls( + name=prop_dict.get("name", ""), + owner=prop_dict.get("owner", ""), + value=prop_dict.get("value"), + ) + + +@dataclass +class CFChannel: + """A Channelfinder channel with its associated properties.""" + + name: str + owner: str + properties: List[CFProperty] + + def as_dict(self) -> Dict[str, Any]: + """Serialise to the dict shape expected by pyCFClient.""" + return { + "name": self.name, + "owner": self.owner, + "properties": [p.as_dict() for p in self.properties], + } + + @classmethod + def from_dict(cls, channel_dict: Dict[str, Any]) -> "CFChannel": + """Deserialise from the dict shape returned by pyCFClient.""" + return cls( + name=channel_dict.get("name", ""), + owner=channel_dict.get("owner", ""), + properties=[CFProperty.from_dict(p) for p in channel_dict.get("properties", [])], + ) + + +@dataclass +class IOCInfo: + """Runtime state for a connected IOC. The .id property is the primary key.""" + + host: str + hostname: str + ioc_name: str + ioc_ip: str + owner: str + time: str + port: int + channelcount: int = 0 + + @property + def id(self) -> str: + return f"{self.host}:{self.port}" + + +@dataclass +class RecordInfo: + """Per-record data extracted from a transaction before pushing to CF.""" + + pv_name: str + record_type: Optional[str] = None + info_properties: List[CFProperty] = field(default_factory=list) + aliases: List[str] = field(default_factory=list) + + +class IOCMissingInfoError(Exception): + """Raised when an IOC is missing required information.""" + + def __init__(self, ioc_info: IOCInfo): + super().__init__(f"Missing hostName {ioc_info.hostname} or iocName {ioc_info.ioc_name}") + self.ioc_info = ioc_info diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py new file mode 100755 index 00000000..1bf565d5 --- /dev/null +++ b/server/recceiver/cf/processor.py @@ -0,0 +1,781 @@ +import datetime +import logging +import time +from collections import defaultdict +from typing import Callable, Dict, List, Optional, Set + +from channelfinder import ChannelFinderClient +from requests import ConnectionError, RequestException +from twisted.application import service +from twisted.internet import defer +from twisted.internet.defer import DeferredLock +from twisted.internet.threads import deferToThread +from zope.interface import implementer + +from recceiver import interfaces +from recceiver.cf.adapter import ChannelFinderAdapter, PyCFClientAdapter +from recceiver.cf.config import CFConfig +from recceiver.cf.model import ( + CFChannel, + CFProperty, + CFPropertyName, + IOCInfo, + IOCMissingInfoError, + PVStatus, + RecordInfo, +) +from recceiver.interfaces import CommitTransaction +from recceiver.processors import ConfigAdapter + +_log = logging.getLogger(__name__) + + +@implementer(interfaces.IProcessor) +class CFProcessor(service.Service): + """IProcessor plugin that synchronises IOC record data to Channelfinder. + + Maintains in-memory state (channel_ioc_ids, iocs) to reconcile the current + snapshot with what CF holds, then pushes the minimal diff on each commit. + """ + + def __init__(self, name: Optional[str], conf: ConfigAdapter): + self.cf_config = CFConfig.loads(conf) + self.name = name # Override name from service.Service + self.channel_ioc_ids: Dict[str, List[str]] = defaultdict(list) + self.iocs: Dict[str, IOCInfo] = {} + self.client: Optional[ChannelFinderAdapter] = None + self.current_time: Callable[[Optional[str]], str] = get_current_time + self.lock: DeferredLock = DeferredLock() + + def startService(self): + service.Service.startService(self) + # Returning a Deferred is not supported by startService(), + # so instead attempt to acquire the lock synchonously! + d = self.lock.acquire() + if not d.called: + d.cancel() + service.Service.stopService(self) + raise RuntimeError("Failed to acquired CF Processor lock for service start") + + try: + self._start_service_with_lock() + except: + service.Service.stopService(self) + raise + finally: + self.lock.release() + + def _start_service_with_lock(self): + _log.info("CF_START with configuration: %s", self.cf_config) + + if self.client is None: # For setting up mock test client + self.client = PyCFClientAdapter( + ChannelFinderClient( + BaseURL=self.cf_config.base_url, + username=self.cf_config.cf_username, + password=self.cf_config.cf_password, + verify_ssl=self.cf_config.verify_ssl, + ), + size_limit=int(self.cf_config.cf_query_limit), + ) + try: + cf_properties = set(self.client.get_property_names()) + self._setup_cf_properties(cf_properties) + except ConnectionError: + _log.exception("Cannot connect to Channelfinder service") + raise + else: + if self.cf_config.clean_on_start: + self.clean_service() + + def _setup_cf_properties(self, cf_properties: Set[str]) -> None: + """Compute required CF properties, register any missing ones, and cache state. + + Sets self.env_vars, self.record_property_names_list, and self.managed_properties. + """ + required_properties = { + CFPropertyName.HOSTNAME.value, + CFPropertyName.IOC_NAME.value, + CFPropertyName.IOC_ID.value, + CFPropertyName.IOC_IP.value, + CFPropertyName.PV_STATUS.value, + CFPropertyName.TIME.value, + CFPropertyName.RECCEIVER_ID.value, + } + if self.cf_config.alias_enabled: + required_properties.add(CFPropertyName.ALIAS.value) + if self.cf_config.record_type_enabled: + required_properties.add(CFPropertyName.RECORD_TYPE.value) + + env_vars_setting = self.cf_config.environment_variables + self.env_vars = {} + if env_vars_setting: + self.env_vars = { + k.strip(): v.strip() for item in env_vars_setting.split(",") for k, v in [item.split(":", 1)] + } + required_properties.update(self.env_vars.values()) + + # CA/PVA port properties are sourced from reccaster env vars + if self.cf_config.ioc_connection_info: + self.env_vars["RSRV_SERVER_PORT"] = "caPort" + self.env_vars["PVAS_SERVER_PORT"] = "pvaPort" + required_properties.add(CFPropertyName.CA_PORT.value) + required_properties.add(CFPropertyName.PVA_PORT.value) + + # Space or comma and space separated strings + record_property_names_list = {s.strip(", ") for s in self.cf_config.info_tags.split()} + if self.cf_config.record_description_enabled: + record_property_names_list.add(CFPropertyName.RECORD_DESC.value) + + owner = self.cf_config.username + for prop_name in (required_properties | record_property_names_list) - cf_properties: + self.client.set_property(prop_name, owner) + + self.record_property_names_list = record_property_names_list + self.managed_properties = required_properties | record_property_names_list + _log.debug("record_property_names_list = %s", self.record_property_names_list) + + def stopService(self): + _log.info("CF_STOP") + service.Service.stopService(self) + return self.lock.run(self._stop_service_with_lock) + + def _stop_service_with_lock(self): + if self.cf_config.clean_on_stop: + self.clean_service() + _log.info("CF_STOP with lock") + + # @defer.inlineCallbacks # Twisted v16 does not support cancellation! + def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: + """Commit a transaction to Channelfinder.""" + return self.lock.run(self._commit_with_lock, transaction_record) + + def _commit_with_lock(self, transaction: interfaces.ITransaction) -> defer.Deferred: + self.cancelled = False + + t = deferToThread(self._commit_with_thread, transaction) + + def cancel_commit(d: defer.Deferred): + self.cancelled = True + d.callback(None) + + d: defer.Deferred = defer.Deferred(cancel_commit) + + def wait_for_thread(_ignored): + if self.cancelled: + return t + + d.addCallback(wait_for_thread) + + def chain_error(err): + """Handle errors from the commit thread. + + Note this is not foolproof as the thread may still be running. + """ + if not err.check(defer.CancelledError): + _log.error("CF_COMMIT FAILURE: %s", err) + if self.cancelled: + if not err.check(defer.CancelledError): + raise defer.CancelledError() + return err + else: + d.callback(None) + + def chain_result(result): + if self.cancelled: + raise defer.CancelledError(f"CF Processor is cancelled, due to {result}") + else: + d.callback(None) + + t.addCallbacks(chain_result, chain_error) + return d + + def transaction_to_record_infos(self, ioc_info: IOCInfo, transaction: CommitTransaction) -> Dict[str, RecordInfo]: + """Build a RecordInfo dict keyed by record_id from a transaction. + + Merges record types, info-tag properties, aliases, and mapped EPICS + environment variables into each record. Only info tags on the + record_property_names_list whitelist are included. + """ + record_infos: Dict[str, RecordInfo] = {} + for record_id, (record_name, record_type) in transaction.records_to_add.items(): + record_infos[record_id] = RecordInfo(pv_name=record_name, record_type=None, info_properties=[], aliases=[]) + if self.cf_config.record_type_enabled: + record_infos[record_id].record_type = record_type + + for record_id, (record_infos_to_add) in transaction.record_infos_to_add.items(): + # find intersection of these sets + if record_id not in record_infos: + _log.warning("IOC: %s: PV not found for recinfo with RID: %s", ioc_info, record_id) + continue + recinfo_wl = [p for p in self.record_property_names_list if p in record_infos_to_add.keys()] + if recinfo_wl: + for infotag in recinfo_wl: + record_infos[record_id].info_properties.append( + CFProperty(infotag, ioc_info.owner, record_infos_to_add[infotag]) + ) + + for record_id, record_aliases in transaction.aliases.items(): + if record_id not in record_infos: + _log.warning("IOC: %s: PV not found for alias with RID: %s", ioc_info, record_id) + continue + record_infos[record_id].aliases = record_aliases + + self._apply_env_vars(record_infos, ioc_info, transaction) + return record_infos + + def _apply_env_vars( + self, + record_infos: Dict[str, RecordInfo], + ioc_info: IOCInfo, + transaction: CommitTransaction, + ) -> None: + """Append mapped EPICS environment variable properties to every record.""" + for record_id in record_infos: + for epics_env_var_name, cf_prop_name in self.env_vars.items(): + value = transaction.client_infos.get(epics_env_var_name) + if value is not None: + record_infos[record_id].info_properties.append(CFProperty(cf_prop_name, ioc_info.owner, value)) + else: + _log.debug( + "EPICS environment var %s not found in IOC: %s", + epics_env_var_name, + ioc_info, + ) + + @staticmethod + def record_info_by_name(record_infos: Dict[str, RecordInfo], ioc_info: IOCInfo) -> Dict[str, RecordInfo]: + """Re-key a record_id-to-RecordInfo dict by pv_name instead. + + Logs and skips duplicate PV names within the same commit. + """ + record_info_by_name = {} + for info in record_infos.values(): + if info.pv_name in record_info_by_name: + _log.warning("Commit contains multiple records with PV name: %s (%s)", info.pv_name, ioc_info) + continue + record_info_by_name[info.pv_name] = info + return record_info_by_name + + def update_ioc_infos( + self, + transaction: CommitTransaction, + ioc_info: IOCInfo, + records_to_delete: List[str], + record_info_by_name: Dict[str, RecordInfo], + ) -> None: + """Reconcile channel_ioc_ids and iocs against the transaction. + + On initial transaction, registers the IOC. On disconnect, queues all + its channels for deletion. Adds or removes channel-to-ioc mappings and + updates channelcount, including aliases when enabled. + """ + iocid = ioc_info.id + if transaction.initial: + self.iocs[iocid] = ioc_info + if not transaction.connected: + records_to_delete.extend(self.channel_ioc_ids.keys()) + for record_name in record_info_by_name: + self.channel_ioc_ids[record_name].append(iocid) + self.iocs[iocid].channelcount += 1 + if self.cf_config.alias_enabled: + self._register_aliases(record_info_by_name[record_name].aliases, iocid) + for record_name in records_to_delete: + if iocid in self.channel_ioc_ids[record_name]: + self.remove_channel(record_name, iocid) + if self.cf_config.alias_enabled and record_name in record_info_by_name: + self._remove_aliases(record_info_by_name[record_name].aliases, iocid) + + def _register_aliases(self, aliases: List[str], iocid: str) -> None: + for alias in aliases: + self.channel_ioc_ids[alias].append(iocid) + self.iocs[iocid].channelcount += 1 + + def _remove_aliases(self, aliases: List[str], iocid: str) -> None: + for alias in aliases: + self.remove_channel(alias, iocid) + + def _commit_with_thread(self, transaction: CommitTransaction): + host = transaction.source_address.host + port = transaction.source_address.port + + if not self.running: + raise defer.CancelledError(f"CF Processor is not running (transaction: {host}:{port})") + + _log.info("CF_COMMIT: %s", transaction) + _log.debug("CF_COMMIT: transaction: %s", repr(transaction)) + + ioc_name = transaction.client_infos.get("IOCNAME") + if not ioc_name: + ioc_name = str(port) + _log.debug("IOC at %s:%d did not send IOCNAME; using port as iocName", host, port) + + owner = ( + transaction.client_infos.get(self.cf_config.env_owner_variable) + or transaction.client_infos.get("CF_USERNAME") + or self.cf_config.username + ) + if owner == self.cf_config.username: + _log.debug( + "IOC at %s:%d did not send %s or CF_USERNAME; using service account as owner", + host, + port, + self.cf_config.env_owner_variable, + ) + + ioc_info = IOCInfo( + host=host, + hostname=transaction.client_infos.get("HOSTNAME") or host, + ioc_name=ioc_name, + ioc_ip=host, + owner=owner, + time=self.current_time(self.cf_config.timezone), + port=port, + ) + + record_infos = self.transaction_to_record_infos(ioc_info, transaction) + + records_to_delete = list(transaction.records_to_delete) + _log.debug("Delete records: %s", records_to_delete) + + record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) + self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) + poll_success = self._push_to_cf(record_info_by_name, records_to_delete, ioc_info) + if not poll_success: + raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}") + + def remove_channel(self, record_name: str, iocid: str) -> None: + """Unlink a channel from an IOC in channel_ioc_ids and decrement channelcount. + + Deletes the channel entry when the last IOC reference is removed, + and deletes the IOC entry when its channelcount reaches zero. + """ + self.channel_ioc_ids[record_name].remove(iocid) + if iocid not in self.iocs: + if len(self.channel_ioc_ids[record_name]) == 0: + del self.channel_ioc_ids[record_name] + return + self.iocs[iocid].channelcount -= 1 + if self.iocs[iocid].channelcount <= 0: + if self.iocs[iocid].channelcount < 0: + _log.error("Channel count negative: %s", iocid) + self.iocs.pop(iocid) + if len(self.channel_ioc_ids[record_name]) == 0: + del self.channel_ioc_ids[record_name] + + def clean_service(self) -> None: + """Mark all channels belonging to this recceiver as 'Inactive'.""" + sleep = 1 + retry_limit = 5 + owner = self.cf_config.username + recceiverid = self.cf_config.recceiver_id + while 1: + try: + _log.info("CF Clean Started") + channels = self.get_active_channels(recceiverid) + while channels: + self.clean_channels(owner, channels) + channels = self.get_active_channels(recceiverid) + _log.info("CF Clean Completed") + return + except RequestException as e: + _log.exception("Clean service failed: %s", e) + retry_seconds = min(60, sleep) + _log.info("Clean service retry in %s seconds", retry_seconds) + time.sleep(retry_seconds) + sleep *= 1.5 + if self.running == 0 and sleep >= retry_limit: + _log.info("Abandoning clean after %s seconds", retry_limit) + return + + def get_active_channels(self, recceiverid: str) -> List[CFChannel]: + """Return all CF channels currently marked Active for this recceiver.""" + return self.client.find_active_for_recceiver(recceiverid) + + def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: + """Mark the given channels Inactive in CF.""" + names = [ch.name for ch in channels or []] + _log.info("Cleaning %s channels.", len(names)) + _log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) + self.client.update_property(CFProperty(CFPropertyName.PV_STATUS.value, owner, PVStatus.INACTIVE.value), names) + + def _push_to_cf( + self, + record_info_by_name: Dict[str, RecordInfo], + records_to_delete: List[str], + ioc_info: IOCInfo, + ) -> bool: + _log.info("Pushing updates for %s begins...", ioc_info) + count = 0 + sleep = 1.0 + while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: + count += 1 + try: + self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info) + return True + except RequestException as e: + _log.exception("ChannelFinder update failed: %s", e) + retry_seconds = min(60, sleep) + _log.info("ChannelFinder update retry in %s seconds", retry_seconds) + time.sleep(retry_seconds) + sleep *= 1.5 + _log.error("Pushing updates for %s complete, failed after %d attempts", ioc_info, count) + return False + + def _update_channelfinder( + self, + record_info_by_name: Dict[str, RecordInfo], + records_to_delete: List[str], + ioc_info: IOCInfo, + ) -> None: + _log.info("CF Update IOC: %s", ioc_info) + _log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) + recceiverid = self.cf_config.recceiver_id + new_channels = set(record_info_by_name.keys()) + iocid = ioc_info.id + + if iocid not in self.iocs: + _log.warning( + "IOC %s did not send an initial transaction to join IOC list (%d IOCs known)", + ioc_info, + len(self.iocs), + ) + + if ioc_info.hostname is None or ioc_info.ioc_name is None: + raise IOCMissingInfoError(ioc_info) + + if self.cancelled: + raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + + channels: List[CFChannel] = [] + _log.debug("Find existing channels by IOCID: %s", ioc_info) + old_channels: List[CFChannel] = self.client.find_by_ioc_id(iocid) + + if old_channels: + self._handle_channels( + old_channels, + new_channels, + records_to_delete, + ioc_info, + recceiverid, + channels, + record_info_by_name, + iocid, + ) + # now pvNames contains a list of pv's new on this host/ioc + existing_channels = self._get_existing_channels(new_channels) + + if self.cancelled: + raise defer.CancelledError(f"CF Processor is cancelled, after fetching existing channels for {ioc_info}") + + self._process_new_channels( + new_channels, record_info_by_name, ioc_info, recceiverid, existing_channels, channels, iocid + ) + _log.info("Total channels to update: %s for ioc: %s", len(channels), ioc_info) + + if len(channels) != 0: + self._cf_set_chunked(channels) + else: + if old_channels and len(old_channels) != 0: + self._cf_set_chunked(channels) + if self.cancelled: + raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + + def _process_new_channels( + self, + new_channels: Set[str], + record_info_by_name: Dict[str, RecordInfo], + ioc_info: IOCInfo, + recceiverid: str, + existing_channels: Dict[str, CFChannel], + channels: List[CFChannel], + iocid: str, + ) -> None: + for channel_name in new_channels: + new_properties = create_ioc_properties( + ioc_info.owner, + ioc_info.time, + recceiverid, + ioc_info.hostname, + ioc_info.ioc_name, + ioc_info.ioc_ip, + ioc_info.id, + ) + record_info = record_info_by_name.get(channel_name) + if record_info: + if self.cf_config.record_type_enabled and record_info.record_type: + new_properties.append( + CFProperty(CFPropertyName.RECORD_TYPE.value, ioc_info.owner, record_info.record_type) + ) + new_properties = new_properties + record_info.info_properties + if channel_name in existing_channels: + _log.debug("update existing channel %s: exists but with a different iocid from %s", channel_name, iocid) + self._update_existing_channel_diff_iocid( + existing_channels, channel_name, new_properties, channels, record_info_by_name, ioc_info, iocid + ) + else: + self._create_new_channel(channels, channel_name, ioc_info, new_properties, record_info_by_name) + + def _cf_set_chunked(self, channels: List[CFChannel]) -> None: + chunk_size = int(self.cf_config.cf_query_limit) + for i in range(0, len(channels), chunk_size): + self.client.set_channels(channels[i : i + chunk_size]) + + def _handle_channels( + self, + old_channels: List[CFChannel], + new_channels: Set[str], + records_to_delete: List[str], + ioc_info: IOCInfo, + recceiverid: str, + channels: List[CFChannel], + record_info_by_name: Dict[str, RecordInfo], + iocid: str, + ) -> None: + """Handle channels already present in Channelfinder for this IOC. + + For each old channel: if it is not in new_channels or is being deleted, + re-assign it to its last known IOC or orphan it; if it is in both old + and new, update its properties in place. + """ + for cf_channel in old_channels: + if not new_channels or cf_channel.name in records_to_delete: + _log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) + if cf_channel.name in self.channel_ioc_ids: + self._handle_channel_is_old(cf_channel, ioc_info, recceiverid, channels, record_info_by_name) + else: + self._orphan_channel(cf_channel, ioc_info, channels, record_info_by_name) + else: + if cf_channel.name in new_channels: + self._handle_channel_old_and_new( + cf_channel, iocid, ioc_info, channels, new_channels, record_info_by_name, old_channels + ) + + def _handle_channel_is_old( + self, + cf_channel: CFChannel, + ioc_info: IOCInfo, + recceiverid: str, + channels: List[CFChannel], + record_info_by_name: Dict[str, RecordInfo], + ) -> None: + """Channel exists in CF but not in this commit — re-assign to its last known IOC.""" + last_ioc_id = self.channel_ioc_ids[cf_channel.name][-1] + cf_channel.owner = self.iocs[last_ioc_id].owner + cf_channel.properties = _merge_property_lists( + create_default_properties(ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel), + cf_channel, + self.managed_properties, + ) + channels.append(cf_channel) + _log.debug("Add existing channel %s to previous IOC %s", cf_channel, last_ioc_id) + if self.cf_config.alias_enabled: + if cf_channel.name in record_info_by_name: + for alias_name in record_info_by_name[cf_channel.name].aliases: + # Legacy alias handling retained to avoid changing runtime behavior. + alias_channel = CFChannel(alias_name, "", []) + if alias_name in self.channel_ioc_ids: + last_alias_ioc_id = self.channel_ioc_ids[alias_name][-1] + alias_channel.owner = self.iocs[last_alias_ioc_id].owner + alias_channel.properties = _merge_property_lists( + create_default_properties( + ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel + ), + alias_channel, + self.managed_properties, + ) + channels.append(alias_channel) + _log.debug("Add existing alias %s to previous IOC: %s", alias_channel, last_alias_ioc_id) + + def _orphan_channel( + self, + cf_channel: CFChannel, + ioc_info: IOCInfo, + channels: List[CFChannel], + record_info_by_name: Dict[str, RecordInfo], + ) -> None: + """Channel exists in CF but has no known IOC — mark inactive.""" + cf_channel.properties = _merge_property_lists( + [ + CFProperty(CFPropertyName.PV_STATUS.value, ioc_info.owner, PVStatus.INACTIVE.value), + CFProperty(CFPropertyName.TIME.value, ioc_info.owner, ioc_info.time), + ], + cf_channel, + ) + channels.append(cf_channel) + _log.debug("Add orphaned channel %s with no IOC: %s", cf_channel, ioc_info) + if self.cf_config.alias_enabled: + if cf_channel.name in record_info_by_name: + for alias_name in record_info_by_name[cf_channel.name].aliases: + alias_channel = CFChannel(alias_name, "", []) + alias_channel.properties = _merge_property_lists( + [ + CFProperty(CFPropertyName.PV_STATUS.value, ioc_info.owner, PVStatus.INACTIVE.value), + CFProperty(CFPropertyName.TIME.value, ioc_info.owner, ioc_info.time), + ], + alias_channel, + ) + channels.append(alias_channel) + _log.debug("Add orphaned alias %s with no IOC: %s", alias_channel, ioc_info) + + def _handle_channel_old_and_new( + self, + cf_channel: CFChannel, + iocid: str, + ioc_info: IOCInfo, + channels: List[CFChannel], + new_channels: Set[str], + record_info_by_name: Dict[str, RecordInfo], + old_channels: List[CFChannel], + ) -> None: + """Channel exists in CF with the same iocid — mark active and update time.""" + _log.debug("Channel %s exists in Channelfinder with same iocid %s", cf_channel.name, iocid) + cf_channel.properties = _merge_property_lists( + [ + CFProperty(CFPropertyName.PV_STATUS.value, ioc_info.owner, PVStatus.ACTIVE.value), + CFProperty(CFPropertyName.TIME.value, ioc_info.owner, ioc_info.time), + ], + cf_channel, + self.managed_properties, + ) + channels.append(cf_channel) + _log.debug("Add existing channel with same IOC: %s", cf_channel) + new_channels.remove(cf_channel.name) + + if self.cf_config.alias_enabled: + if cf_channel.name in record_info_by_name: + for alias_name in record_info_by_name[cf_channel.name].aliases: + if alias_name in old_channels: + alias_channel = CFChannel(alias_name, "", []) + alias_channel.properties = _merge_property_lists( + [ + CFProperty(CFPropertyName.PV_STATUS.value, ioc_info.owner, PVStatus.ACTIVE.value), + CFProperty(CFPropertyName.TIME.value, ioc_info.owner, ioc_info.time), + ], + alias_channel, + self.managed_properties, + ) + channels.append(alias_channel) + new_channels.remove(alias_name) + else: + aprops = _merge_property_lists( + [ + CFProperty(CFPropertyName.PV_STATUS.value, ioc_info.owner, PVStatus.ACTIVE.value), + CFProperty(CFPropertyName.TIME.value, ioc_info.owner, ioc_info.time), + CFProperty(CFPropertyName.ALIAS.value, ioc_info.owner, cf_channel.name), + ], + cf_channel, + self.managed_properties, + ) + channels.append(CFChannel(alias_name, ioc_info.owner, aprops)) + new_channels.remove(alias_name) + _log.debug("Add existing alias with same IOC: %s", cf_channel) + + def _get_existing_channels(self, new_channels: Set[str]) -> Dict[str, CFChannel]: + """Query CF for channels in new_channels that already exist there.""" + return {ch.name: ch for ch in self.client.find_by_names(list(new_channels))} + + def _update_existing_channel_diff_iocid( + self, + existing_channels: Dict[str, CFChannel], + channel_name: str, + new_properties: List[CFProperty], + channels: List[CFChannel], + record_info_by_name: Dict[str, RecordInfo], + ioc_info: IOCInfo, + iocid: str, + ) -> None: + """Update a channel that exists in CF but is moving to a new IOC.""" + existing_channel = existing_channels[channel_name] + existing_channel.properties = _merge_property_lists( + new_properties, + existing_channel, + self.managed_properties, + ) + channels.append(existing_channel) + _log.debug("Add existing channel with different IOC: %s", existing_channel) + if self.cf_config.alias_enabled and channel_name in record_info_by_name: + alias_properties = [CFProperty(CFPropertyName.ALIAS.value, ioc_info.owner, channel_name)] + new_properties + for alias_name in record_info_by_name[channel_name].aliases: + if alias_name in existing_channels: + ach = existing_channels[alias_name] + ach.properties = _merge_property_lists(alias_properties, ach, self.managed_properties) + channels.append(ach) + else: + channels.append(CFChannel(alias_name, ioc_info.owner, alias_properties)) + _log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid) + + def _create_new_channel( + self, + channels: List[CFChannel], + channel_name: str, + ioc_info: IOCInfo, + new_properties: List[CFProperty], + record_info_by_name: Dict[str, RecordInfo], + ) -> None: + channels.append(CFChannel(channel_name, ioc_info.owner, new_properties)) + _log.debug("Add new channel: %s", channel_name) + if self.cf_config.alias_enabled and channel_name in record_info_by_name: + alias_properties = [CFProperty(CFPropertyName.ALIAS.value, ioc_info.owner, channel_name)] + new_properties + for alias in record_info_by_name[channel_name].aliases: + channels.append(CFChannel(alias, ioc_info.owner, alias_properties)) + _log.debug("Add new alias: %s from %s", alias, channel_name) + + +def create_ioc_properties( + owner: str, ioc_time: str, recceiverid: str, host_name: str, ioc_name: str, ioc_ip: str, iocid: str +) -> List[CFProperty]: + """Build the standard set of IOC-level CF properties for a channel.""" + return [ + CFProperty(CFPropertyName.HOSTNAME.value, owner, host_name), + CFProperty(CFPropertyName.IOC_NAME.value, owner, ioc_name), + CFProperty(CFPropertyName.IOC_ID.value, owner, iocid), + CFProperty(CFPropertyName.IOC_IP.value, owner, ioc_ip), + CFProperty(CFPropertyName.PV_STATUS.value, owner, PVStatus.ACTIVE.value), + CFProperty(CFPropertyName.TIME.value, owner, ioc_time), + CFProperty(CFPropertyName.RECCEIVER_ID.value, owner, recceiverid), + ] + + +def create_default_properties( + ioc_info: IOCInfo, + recceiverid: str, + channels_iocs: Dict[str, List[str]], + iocs: Dict[str, IOCInfo], + cf_channel: CFChannel, +) -> List[CFProperty]: + """Build IOC properties using the last known IOC for a channel.""" + channel_name = cf_channel.name + last_ioc_info = iocs[channels_iocs[channel_name][-1]] + return create_ioc_properties( + ioc_info.owner, + ioc_info.time, + recceiverid, + last_ioc_info.hostname, + last_ioc_info.ioc_name, + last_ioc_info.ioc_ip, + last_ioc_info.id, + ) + + +def _merge_property_lists( + new_properties: List[CFProperty], channel: CFChannel, managed_properties: Optional[Set[str]] = None +) -> List[CFProperty]: + """Merge two property lists; new_properties wins on name collision. + + Properties in channel not in new_properties are kept unless they are + managed by this recceiver (in which case the absence is intentional). + """ + managed = managed_properties or set() + new_property_names = [p.name for p in new_properties] + for old_property in channel.properties: + if old_property.name not in new_property_names and old_property.name not in managed: + new_properties = new_properties + [old_property] + return new_properties + + +def get_current_time(timezone: Optional[str] = None) -> str: + """Return the current time as a string, localised if a timezone is given.""" + if timezone: + return str(datetime.datetime.now().astimezone()) + return str(datetime.datetime.now()) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py deleted file mode 100755 index 553df3d9..00000000 --- a/server/recceiver/cfstore.py +++ /dev/null @@ -1,1347 +0,0 @@ -# -*- coding: utf-8 -*- - -import datetime -import enum -import logging -import socket -import time -from collections import defaultdict -from dataclasses import dataclass, field, fields -from typing import Any, Callable, Dict, List, Optional, Set, Tuple - -from channelfinder import ChannelFinderClient -from requests import ConnectionError, RequestException -from twisted.application import service -from twisted.internet import defer -from twisted.internet.defer import DeferredLock -from twisted.internet.threads import deferToThread -from zope.interface import implementer - -from . import interfaces -from .interfaces import CommitTransaction -from .processors import ConfigAdapter - -_log = logging.getLogger(__name__) - -__all__ = ["CFProcessor"] - -RECCEIVERID_DEFAULT = socket.gethostname() -DEFAULT_MAX_CHANNEL_NAME_QUERY_LENGTH = 600 -DEFAULT_QUERY_LIMIT = 10_000 - - -class PVStatus(str, enum.Enum): - """PV Status values.""" - - ACTIVE = "Active" - INACTIVE = "Inactive" - - -@dataclass -class CFConfig: - """Configuration options for the CF Processor""" - - alias_enabled: bool = False - record_type_enabled: bool = False - environment_variables: str = "" - info_tags: str = "" - ioc_connection_info: bool = True - record_description_enabled: bool = False - clean_on_start: bool = True - clean_on_stop: bool = True - username: str = "cfstore" - env_owner_variable: str = "ENGINEER" - recceiver_id: str = RECCEIVERID_DEFAULT - timezone: Optional[str] = None - cf_query_limit: int = DEFAULT_QUERY_LIMIT - base_url: Optional[str] = None - cf_username: Optional[str] = None - cf_password: Optional[str] = None - verify_ssl: Optional[bool] = None - push_max_retries: int = 10 - push_always_retry: bool = True - - @classmethod - def loads(cls, conf: ConfigAdapter) -> "CFConfig": - """Load configuration from a ConfigAdapter instance. - - Args: - conf: ConfigAdapter instance containing configuration data. - """ - return CFConfig( - alias_enabled=conf.getboolean("alias", False), - record_type_enabled=conf.getboolean("recordType", False), - environment_variables=conf.get("environment_vars", ""), - info_tags=conf.get("infotags", ""), - ioc_connection_info=conf.getboolean("iocConnectionInfo", True), - record_description_enabled=conf.getboolean("recordDesc", False), - clean_on_start=conf.getboolean("cleanOnStart", True), - clean_on_stop=conf.getboolean("cleanOnStop", True), - username=conf.get("username", "cfstore"), - recceiver_id=conf.get("recceiverId", RECCEIVERID_DEFAULT), - timezone=conf.get("timezone", ""), - cf_query_limit=conf.get("findSizeLimit", DEFAULT_QUERY_LIMIT), - base_url=conf.get("baseUrl"), - cf_username=conf.get("cfUsername"), - cf_password=conf.get("cfPassword"), - verify_ssl=conf.getboolean("verifySSL"), - push_max_retries=conf.getint("pushMaxRetries", 10), - push_always_retry=conf.getboolean("pushAlwaysRetry", True), - ) - - def __repr__(self) -> str: - parts = [] - for f in fields(self): - value = getattr(self, f.name) - if f.name == "cf_password": - value = "***" if value else None - parts.append(f"{f.name}={value!r}") - return f"CFConfig({', '.join(parts)})" - - -@dataclass -class CFProperty: - name: str - owner: str - value: Optional[str] = None - - def as_dict(self) -> Dict[str, str]: - """Convert to dictionary for Channelfinder API.""" - return {"name": self.name, "owner": self.owner, "value": self.value or ""} - - @classmethod - def from_dict(cls, prop_dict: Dict[str, str]) -> "CFProperty": - """Create CFProperty from Channelfinder json output. - - Args: - prop_dict: Dictionary representing a property from Channelfinder. - """ - return cls( - name=prop_dict.get("name", ""), - owner=prop_dict.get("owner", ""), - value=prop_dict.get("value"), - ) - - @classmethod - def record_type(cls, owner: str, record_type: str) -> "CFProperty": - """Create a Channelfinder recordType property. - - Args: - owner: The owner of the property. - recordType: The recordType of the property. - """ - return cls(CFPropertyName.RECORD_TYPE.value, owner, record_type) - - @classmethod - def alias(cls, owner: str, alias: str) -> "CFProperty": - """Create a Channelfinder alias property. - - Args: - owner: The owner of the property. - alias: The alias of the property. - """ - return cls(CFPropertyName.ALIAS.value, owner, alias) - - @classmethod - def pv_status(cls, owner: str, pv_status: PVStatus) -> "CFProperty": - """Create a Channelfinder pvStatus property. - - Args: - owner: The owner of the property. - pvStatus: The pvStatus of the property. - """ - return cls(CFPropertyName.PV_STATUS.value, owner, pv_status.value) - - @classmethod - def active(cls, owner: str) -> "CFProperty": - """Create a Channelfinder active property. - - Args: - owner: The owner of the property. - """ - return cls.pv_status(owner, PVStatus.ACTIVE) - - @classmethod - def inactive(cls, owner: str) -> "CFProperty": - """Create a Channelfinder inactive property. - - Args: - owner: The owner of the property. - """ - return cls.pv_status(owner, PVStatus.INACTIVE) - - @classmethod - def time(cls, owner: str, time: str) -> "CFProperty": - """Create a Channelfinder time property. - - Args: - owner: The owner of the property. - time: The time of the property. - """ - return cls(CFPropertyName.TIME.value, owner, time) - - -@dataclass -class RecordInfo: - """Information about a record to be stored in Channelfinder.""" - - pv_name: str - record_type: Optional[str] = None - info_properties: List[CFProperty] = field(default_factory=list) - aliases: List[str] = field(default_factory=list) - - -class CFPropertyName(str, enum.Enum): - """Standard property names used in Channelfinder.""" - - HOSTNAME = "hostName" - IOC_NAME = "iocName" - IOC_ID = "iocid" - IOC_IP = "iocIP" - PV_STATUS = "pvStatus" - TIME = "time" - RECCEIVER_ID = "recceiverID" - ALIAS = "alias" - RECORD_TYPE = "recordType" - RECORD_DESC = "recordDesc" - CA_PORT = "caPort" - PVA_PORT = "pvaPort" - - -@dataclass -class IocInfo: - """Information about an IOC instance.""" - - host: str - hostname: str - ioc_name: str - ioc_ip: str - owner: str - time: str - port: int - channelcount: int = 0 - - @property - def ioc_id(self): - """Generate a unique IOC ID based on hostname and port.""" - return self.host + ":" + str(self.port) - - -@dataclass -class CFChannel: - """Representation of a Channelfinder channel.""" - - name: str - owner: str - properties: List[CFProperty] - - def as_dict(self) -> Dict[str, Any]: - """Convert to dictionary for conversion to json in Channelfinder API.""" - return { - "name": self.name, - "owner": self.owner, - "properties": [p.as_dict() for p in self.properties], - } - - @classmethod - def from_dict(cls, channel_dict: Dict[str, Any]) -> "CFChannel": - """Create CFChannel from Channelfinder json output. - - Args: - channel_dict: Dictionary representing a channel from Channelfinder. - """ - return cls( - name=channel_dict.get("name", ""), - owner=channel_dict.get("owner", ""), - properties=[CFProperty.from_dict(p) for p in channel_dict.get("properties", [])], - ) - - -@implementer(interfaces.IProcessor) -class CFProcessor(service.Service): - """Processor for committing IOC and Record information to Channelfinder.""" - - def __init__(self, name: Optional[str], conf: ConfigAdapter): - """Initialize the CFProcessor with configuration. - - Args: - name: The name of the processor. - conf: The configuration for the processor. - """ - self.cf_config = CFConfig.loads(conf) - self.name = name # Override name from service.Service - self.channel_ioc_ids: Dict[str, List[str]] = defaultdict(list) - self.iocs: Dict[str, IocInfo] = {} - self.client: Optional[ChannelFinderClient] = None - self.current_time: Callable[[Optional[str]], str] = get_current_time - self.lock: DeferredLock = DeferredLock() - - def startService(self): - """Start the CFProcessor service. - - Overridden method of service.Service.startService() - """ - service.Service.startService(self) - # Returning a Deferred is not supported by startService(), - # so instead attempt to acquire the lock synchonously! - d = self.lock.acquire() - if not d.called: - d.cancel() - service.Service.stopService(self) - raise RuntimeError("Failed to acquired CF Processor lock for service start") - - try: - self._start_service_with_lock() - except: - service.Service.stopService(self) - raise - finally: - self.lock.release() - - def _start_service_with_lock(self): - """Start the CFProcessor service with lock held. - - Using the default python cf-client. The url, username, and - password are provided by the channelfinder._conf module. - """ - _log.info("CF_START with configuration: %s", self.cf_config) - - if self.client is None: # For setting up mock test client - self.client = ChannelFinderClient( - BaseURL=self.cf_config.base_url, - username=self.cf_config.cf_username, - password=self.cf_config.cf_password, - verify_ssl=self.cf_config.verify_ssl, - ) - try: - cf_properties = {cf_property["name"] for cf_property in self.client.getAllProperties()} - required_properties = { - CFPropertyName.HOSTNAME.value, - CFPropertyName.IOC_NAME.value, - CFPropertyName.IOC_ID.value, - CFPropertyName.IOC_IP.value, - CFPropertyName.PV_STATUS.value, - CFPropertyName.TIME.value, - CFPropertyName.RECCEIVER_ID.value, - } - - if self.cf_config.alias_enabled: - required_properties.add(CFPropertyName.ALIAS.value) - if self.cf_config.record_type_enabled: - required_properties.add(CFPropertyName.RECORD_TYPE.value) - env_vars_setting = self.cf_config.environment_variables - self.env_vars = {} - if env_vars_setting != "" and env_vars_setting is not None: - env_vars_dict = dict(item.strip().split(":") for item in env_vars_setting.split(",")) - self.env_vars = {k.strip(): v.strip() for k, v in env_vars_dict.items()} - for epics_env_var_name, cf_prop_name in self.env_vars.items(): - required_properties.add(cf_prop_name) - # Standard property names for CA/PVA name server connections. These are - # environment variables from reccaster so take advantage of env_vars - # iocConnectionInfo enabled by default - if self.cf_config.ioc_connection_info: - self.env_vars["RSRV_SERVER_PORT"] = "caPort" - self.env_vars["PVAS_SERVER_PORT"] = "pvaPort" - required_properties.add(CFPropertyName.CA_PORT.value) - required_properties.add(CFPropertyName.PVA_PORT.value) - - # Space or comma and space separated strings - record_property_names_list = {s.strip(", ") for s in self.cf_config.info_tags.split()} - if self.cf_config.record_description_enabled: - record_property_names_list.add(CFPropertyName.RECORD_DESC.value) - # Are any required properties not already present on CF? - properties = required_properties - cf_properties - # Are any whitelisted properties not already present on CF? - # If so, add them too. - properties.update(record_property_names_list - cf_properties) - - owner = self.cf_config.username - for cf_property_name in properties: - self.client.set(property={"name": cf_property_name, "owner": owner}) - - self.record_property_names_list = record_property_names_list - self.managed_properties = required_properties.union(record_property_names_list) - _log.debug("record_property_names_list = %s", self.record_property_names_list) - except ConnectionError: - _log.exception("Cannot connect to Channelfinder service") - raise - else: - if self.cf_config.clean_on_start: - self.clean_service() - - def stopService(self): - """Stop the CFProcessor service. - - Overridden method of service.Service.stopService() - """ - _log.info("CF_STOP") - service.Service.stopService(self) - return self.lock.run(self._stop_service_with_lock) - - def _stop_service_with_lock(self): - """Stop the CFProcessor service with lock held. - - If clean_on_stop is enabled, mark all channels as inactive. - """ - if self.cf_config.clean_on_stop: - self.clean_service() - _log.info("CF_STOP with lock") - - # @defer.inlineCallbacks # Twisted v16 does not support cancellation! - def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: - """Commit a transaction to Channelfinder. - - Args: - transaction_record: The transaction to commit. - """ - return self.lock.run(self._commit_with_lock, transaction_record) - - def _commit_with_lock(self, transaction: interfaces.ITransaction) -> defer.Deferred: - """Commit a transaction to Channelfinder with lock held. - - Args: - transaction: The transaction to commit. - """ - self.cancelled = False - - t = deferToThread(self._commit_with_thread, transaction) - - def cancel_commit(d: defer.Deferred): - """Cancel the commit operation.""" - self.cancelled = True - d.callback(None) - - d: defer.Deferred = defer.Deferred(cancel_commit) - - def wait_for_thread(_ignored): - """Wait for the commit thread to finish.""" - if self.cancelled: - return t - - d.addCallback(wait_for_thread) - - def chain_error(err): - """Handle errors from the commit thread. - - Note this is not foolproof as the thread may still be running. - """ - if not err.check(defer.CancelledError): - _log.error("CF_COMMIT FAILURE: %s", err) - if self.cancelled: - if not err.check(defer.CancelledError): - raise defer.CancelledError() - return err - else: - d.callback(None) - - def chain_result(result): - """Handle successful completion of the commit thread. - - If the commit was cancelled, raise CancelledError. - """ - if self.cancelled: - raise defer.CancelledError(f"CF Processor is cancelled, due to {result}") - else: - d.callback(None) - - t.addCallbacks(chain_result, chain_error) - return d - - def transaction_to_record_infos(self, ioc_info: IocInfo, transaction: CommitTransaction) -> Dict[str, RecordInfo]: - """Convert a CommitTransaction and IocInfo to a dictionary of RecordInfo objects. - - Combines record additions, info tags, aliases, and environment variables. - - Args: - ioc_info: Information from the IOC - transaction: transaction from reccaster - """ - record_infos: Dict[str, RecordInfo] = {} - for record_id, (record_name, record_type) in transaction.records_to_add.items(): - record_infos[record_id] = RecordInfo(pv_name=record_name, record_type=None, info_properties=[], aliases=[]) - if self.cf_config.record_type_enabled: - record_infos[record_id].record_type = record_type - - for record_id, (record_infos_to_add) in transaction.record_infos_to_add.items(): - # find intersection of these sets - if record_id not in record_infos: - _log.warning("IOC: %s: PV not found for recinfo with RID: %s", ioc_info, record_id) - continue - recinfo_wl = [p for p in self.record_property_names_list if p in record_infos_to_add.keys()] - if recinfo_wl: - for infotag in recinfo_wl: - record_infos[record_id].info_properties.append( - CFProperty(infotag, ioc_info.owner, record_infos_to_add[infotag]) - ) - - for record_id, record_aliases in transaction.aliases.items(): - if record_id not in record_infos: - _log.warning("IOC: %s: PV not found for alias with RID: %s", ioc_info, record_id) - continue - record_infos[record_id].aliases = record_aliases - - for record_id in record_infos: - for epics_env_var_name, cf_prop_name in self.env_vars.items(): - if transaction.client_infos.get(epics_env_var_name) is not None: - record_infos[record_id].info_properties.append( - CFProperty(cf_prop_name, ioc_info.owner, transaction.client_infos.get(epics_env_var_name)) - ) - else: - _log.debug( - "EPICS environment var %s listed in environment_vars setting list not found in this IOC: %s", - epics_env_var_name, - ioc_info, - ) - return record_infos - - @staticmethod - def record_info_by_name(record_infos: Dict[str, RecordInfo], ioc_info: IocInfo) -> Dict[str, RecordInfo]: - """Create a dictionary of RecordInfo objects keyed by pvName. - - Args: - record_infos: Dictionary of RecordInfo objects keyed by record_id. - ioc_info: Information from the IOC. - """ - record_info_by_name = {} - for record_id, (info) in record_infos.items(): - if info.pv_name in record_info_by_name: - _log.warning("Commit contains multiple records with PV name: %s (%s)", info.pv_name, ioc_info) - continue - record_info_by_name[info.pv_name] = info - return record_info_by_name - - def update_ioc_infos( - self, - transaction: CommitTransaction, - ioc_info: IocInfo, - records_to_delete: List[str], - record_info_by_name: Dict[str, RecordInfo], - ) -> None: - """Update the internal IOC information based on the transaction. - - Makes changed to self.iocs and self.channel_ioc_ids and records_to_delete. - - Args: - transaction: The CommitTransaction being processed. - ioc_info: The IocInfo for the IOC in the transaction. - records_to_delete: List of record names to delete. - record_info_by_name: Dictionary of RecordInfo objects keyed by pvName. - """ - iocid = ioc_info.ioc_id - if transaction.initial: - # Add IOC to source list - self.iocs[iocid] = ioc_info - if not transaction.connected: - records_to_delete.extend(self.channel_ioc_ids.keys()) - for record_name in record_info_by_name.keys(): - self.channel_ioc_ids[record_name].append(iocid) - self.iocs[iocid].channelcount += 1 - # In case, alias exists - if self.cf_config.alias_enabled: - if record_name in record_info_by_name: - for record_aliases in record_info_by_name[record_name].aliases: - self.channel_ioc_ids[record_aliases].append(iocid) # add iocname to pvName in dict - self.iocs[iocid].channelcount += 1 - for record_name in records_to_delete: - if iocid in self.channel_ioc_ids[record_name]: - self.remove_channel(record_name, iocid) - # In case, alias exists - if self.cf_config.alias_enabled: - if record_name in record_info_by_name: - for record_aliases in record_info_by_name[record_name].aliases: - self.remove_channel(record_aliases, iocid) - - def _commit_with_thread(self, transaction: CommitTransaction): - """Commit the transaction to Channelfinder. - - Collects the ioc info from the transaction. - Collects the record infos from the transaction. - Collects the records to delete from the transaction. - Calculates the records by names. - Updates the local IOC information. - Polls Channelfinder with the required updates until it passes. - - Args: - transaction: The transaction to commit. - """ - host = transaction.source_address.host - port = transaction.source_address.port - - if not self.running: - raise defer.CancelledError(f"CF Processor is not running (transaction: {host}:{port})") - - _log.info("CF_COMMIT: %s", transaction) - _log.debug("CF_COMMIT: transaction: %s", repr(transaction)) - - ioc_name = transaction.client_infos.get("IOCNAME") - if not ioc_name: - ioc_name = str(port) - _log.debug("IOC at %s:%d did not send IOCNAME; using port as iocName", host, port) - - owner = ( - transaction.client_infos.get(self.cf_config.env_owner_variable) - or transaction.client_infos.get("CF_USERNAME") - or self.cf_config.username - ) - if owner == self.cf_config.username: - _log.debug( - "IOC at %s:%d did not send %s or CF_USERNAME; using service account as owner", - host, - port, - self.cf_config.env_owner_variable, - ) - - ioc_info = IocInfo( - host=host, - hostname=transaction.client_infos.get("HOSTNAME") or host, - ioc_name=ioc_name, - ioc_ip=host, - owner=owner, - time=self.current_time(self.cf_config.timezone), - port=port, - ) - - record_infos = self.transaction_to_record_infos(ioc_info, transaction) - - records_to_delete = list(transaction.records_to_delete) - _log.debug("Delete records: %s", records_to_delete) - - record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) - self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) - poll_success = push_to_cf(_update_channelfinder, self, record_info_by_name, records_to_delete, ioc_info) - if not poll_success: - raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}") - - def remove_channel(self, record_name: str, iocid: str) -> None: - """Remove channel from self.iocs and self.channel_ioc_ids. - - Args: - record_name: The name of the record to remove. - iocid: The IOC ID of the record to remove from. - """ - self.channel_ioc_ids[record_name].remove(iocid) - if iocid not in self.iocs: - if len(self.channel_ioc_ids[record_name]) == 0: - del self.channel_ioc_ids[record_name] - return - self.iocs[iocid].channelcount -= 1 - if self.iocs[iocid].channelcount <= 0: - if self.iocs[iocid].channelcount < 0: - _log.error("Channel count negative: %s", iocid) - self.iocs.pop(iocid) - if len(self.channel_ioc_ids[record_name]) == 0: - del self.channel_ioc_ids[record_name] - - def clean_service(self) -> None: - """Marks all channels belonging to this recceiver (as found by the recceiver id) as 'Inactive'.""" - sleep = 1 - retry_limit = 5 - owner = self.cf_config.username - recceiverid = self.cf_config.recceiver_id - while 1: - try: - _log.info("CF Clean Started") - channels = self.get_active_channels(recceiverid) - if channels is not None: - while channels is not None and len(channels) > 0: - self.clean_channels(owner, channels) - channels = self.get_active_channels(recceiverid) - _log.info("CF Clean Completed") - return - else: - _log.info("CF Clean Completed") - return - except RequestException as e: - _log.error("Clean service failed: %s", e) - retry_seconds = min(60, sleep) - _log.info("Clean service retry in %s seconds", retry_seconds) - time.sleep(retry_seconds) - sleep *= 1.5 - if self.running == 0 and sleep >= retry_limit: - _log.info("Abandoning clean after %s seconds", retry_limit) - return - - def get_active_channels(self, recceiverid: str) -> List[CFChannel]: - """Gets all the channels which are active for the given recceiver id. - - Args: - recceiverid: The current recceiver id. - """ - return [ - CFChannel.from_dict(ch) - for ch in self.client.findByArgs( - prepare_find_args( - cf_config=self.cf_config, - args=[ - (CFPropertyName.PV_STATUS.value, PVStatus.ACTIVE.value), - (CFPropertyName.RECCEIVER_ID.value, recceiverid), - ], - ) - ) - ] - - def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: - """Set the pvStatus property to 'Inactive' for the given channels. - - Args: - owner: The owner of the channels. - channels: The channels to set to 'Inactive'. - """ - new_channels = [] - for cf_channel in channels or []: - new_channels.append(cf_channel.name) - _log.info("Cleaning %s channels.", len(new_channels)) - _log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(new_channels)) - self.client.update( - property=CFProperty.inactive(owner).as_dict(), - channelNames=new_channels, - ) - - -def handle_channel_is_old( - channel_ioc_ids: Dict[str, List[str]], - cf_channel: CFChannel, - iocs: Dict[str, IocInfo], - ioc_info: IocInfo, - recceiverid: str, - managed_properties: Set[str], - cf_config: CFConfig, - channels: List[CFChannel], - record_info_by_name: Dict[str, RecordInfo], -) -> None: - """Handle the case when the channel exists in channelfinder but not in the recceiver. - - Modifies: - channels - - Args: - channel_ioc_ids: mapping of channels to ioc ids - cf_channel: The channel that is old - iocs: List of all known iocs - ioc_info: Current ioc - recceiverid: id of current recceiver - managed_properties: List of managed properties - cf_config: Configuration used for processor - channels: list of the current channel changes - record_info_by_name: Input information from the transaction - """ - last_ioc_id = channel_ioc_ids[cf_channel.name][-1] - cf_channel.owner = iocs[last_ioc_id].owner - cf_channel.properties = __merge_property_lists( - create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), - cf_channel, - managed_properties, - ) - channels.append(cf_channel) - _log.debug("Add existing channel %s to previous IOC %s", cf_channel, last_ioc_id) - # In case alias exist, also delete them - if cf_config.alias_enabled: - if cf_channel.name in record_info_by_name: - for alias_name in record_info_by_name[cf_channel.name].aliases: - # Legacy alias handling retained to avoid changing runtime behavior. - alias_channel = CFChannel(alias_name, "", []) - if alias_name in channel_ioc_ids: - last_alias_ioc_id = channel_ioc_ids[alias_name][-1] - alias_channel.owner = iocs[last_alias_ioc_id].owner - alias_channel.properties = __merge_property_lists( - create_default_properties( - ioc_info, - recceiverid, - channel_ioc_ids, - iocs, - cf_channel, - ), - alias_channel, - managed_properties, - ) - channels.append(alias_channel) - _log.debug("Add existing alias %s to previous IOC: %s", alias_channel, last_alias_ioc_id) - - -def orphan_channel( - cf_channel: CFChannel, - ioc_info: IocInfo, - channels: List[CFChannel], - cf_config: CFConfig, - record_info_by_name: Dict[str, RecordInfo], -) -> None: - """Handle a channel that exists in channelfinder but not on this recceiver. - - Modifies: - channels - - Args: - cf_channel: The channel to orphan - ioc_info: Info of the current ioc - channels: The current list of channel changes - cf_config: Configuration of the proccessor - record_info_by_name: information from the transaction - """ - cf_channel.properties = __merge_property_lists( - [ - CFProperty.inactive(ioc_info.owner), - CFProperty.time(ioc_info.owner, ioc_info.time), - ], - cf_channel, - ) - channels.append(cf_channel) - _log.debug("Add orphaned channel %s with no IOC: %s", cf_channel, ioc_info) - # Also orphan any alias - if cf_config.alias_enabled: - if cf_channel.name in record_info_by_name: - for alias_name in record_info_by_name[cf_channel.name].aliases: - alias_channel = CFChannel(alias_name, "", []) - alias_channel.properties = __merge_property_lists( - [ - CFProperty.inactive(ioc_info.owner), - CFProperty.time(ioc_info.owner, ioc_info.time), - ], - alias_channel, - ) - channels.append(alias_channel) - _log.debug("Add orphaned alias %s with no IOC: %s", alias_channel, ioc_info) - - -def handle_channel_old_and_new( - cf_channel: CFChannel, - iocid: str, - ioc_info: IocInfo, - managed_properties: Set[str], - channels: List[CFChannel], - new_channels: Set[str], - cf_config: CFConfig, - record_info_by_name: Dict[str, RecordInfo], - old_channels: List[CFChannel], -) -> None: - """ - Channel exists in Channelfinder with same iocid. - Update the status to ensure it is marked active and update the time. - - Modifies: - channels - new_channels - - Args: - cf_channel: The channel to update - iocid: The IOC ID of the channel - ioc_info: Info of the current ioc - managed_properties: List of managed properties - channels: The current list of channel changes - new_channels: The list of new channels - cf_config: Configuration of the processor - record_info_by_name: information from the transaction - old_channels: The list of old channels - """ - _log.debug("Channel %s exists in Channelfinder with same iocid %s", cf_channel.name, iocid) - cf_channel.properties = __merge_property_lists( - [ - CFProperty.active(ioc_info.owner), - CFProperty.time(ioc_info.owner, ioc_info.time), - ], - cf_channel, - managed_properties, - ) - channels.append(cf_channel) - _log.debug("Add existing channel with same IOC: %s", cf_channel) - new_channels.remove(cf_channel.name) - - # In case, alias exist - if cf_config.alias_enabled: - if cf_channel.name in record_info_by_name: - for alias_name in record_info_by_name[cf_channel.name].aliases: - if alias_name in old_channels: - # alias exists in old list - alias_channel = CFChannel(alias_name, "", []) - alias_channel.properties = __merge_property_lists( - [ - CFProperty.active(ioc_info.owner), - CFProperty.time(ioc_info.owner, ioc_info.time), - ], - alias_channel, - managed_properties, - ) - channels.append(alias_channel) - new_channels.remove(alias_name) - else: - # alias exists but not part of old list - aprops = __merge_property_lists( - [ - CFProperty.active(ioc_info.owner), - CFProperty.time(ioc_info.owner, ioc_info.time), - CFProperty.alias( - ioc_info.owner, - cf_channel.name, - ), - ], - cf_channel, - managed_properties, - ) - channels.append( - CFChannel( - alias_name, - ioc_info.owner, - aprops, - ) - ) - new_channels.remove(alias_name) - _log.debug("Add existing alias with same IOC: %s", cf_channel) - - -def get_existing_channels( - new_channels: Set[str], client: ChannelFinderClient, cf_config: CFConfig -) -> Dict[str, CFChannel]: - """Get the channels existing in channelfinder from the list of new channels. - - Args: - new_channels: The list of new channels. - client: The client to contact channelfinder - cf_config: The configuration for the processor. - """ - existing_channels: Dict[str, CFChannel] = {} - - # The list of pv's is searched keeping in mind the limitations on the URL length - search_strings = [] - search_string = "" - for channel_name in new_channels: - if not search_string: - search_string = channel_name - elif len(search_string) + len(channel_name) < 600: - search_string = search_string + "|" + channel_name - else: - search_strings.append(search_string) - search_string = channel_name - if search_string: - search_strings.append(search_string) - - for each_search_string in search_strings: - _log.debug("Find existing channels by name: %s", each_search_string) - for found_channel in client.findByArgs( - prepare_find_args(cf_config=cf_config, args=[("~name", each_search_string)]) - ): - existing_channels[found_channel["name"]] = CFChannel.from_dict(found_channel) - return existing_channels - - -def handle_channels( - old_channels: List[CFChannel], - new_channels: Set[str], - records_to_delete: List[str], - channel_ioc_ids: Dict[str, List[str]], - iocs: Dict[str, IocInfo], - ioc_info: IocInfo, - recceiverid: str, - managed_properties: Set[str], - cf_config: CFConfig, - channels: List[CFChannel], - record_info_by_name: Dict[str, RecordInfo], - iocid: str, -) -> None: - """Handle channels already present in Channelfinder for this IOC. - - Loops through all the old_channels, - if it is on another ioc clean up reference to old ioc - if it is not on another ioc set as Inactive - if it is on current ioc update the properties - - Modifies: - channels: The list of channels. - iocs: The dictionary of IOCs. - channel_ioc_ids: The dictionary of channel names to IOC IDs. - new_channels: The list of new channels. - - Args: - old_channels: The list of old channels. - new_channels: The list of new channels. - channel_ioc_ids: The dictionary of channel names to IOC IDs. - recceiver_id: The recceiver ID. - iocs: The dictionary of IOCs. - records_to_delete: The list of records to delete. - ioc_info: The IOC information. - managed_properties: The properites managed by this recceiver. - channels: The list of channels. - alias_enabled: Whether aliases are enabled. - record_type_enabled: Whether record types are enabled. - record_info_by_name: The dictionary of record names to information. - iocid: The IOC ID. - cf_config: The configuration for the processor. - """ - for cf_channel in old_channels: - if ( - not new_channels or cf_channel.name in records_to_delete - ): # case: empty commit/del, remove all reference to ioc - _log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) - if cf_channel.name in channel_ioc_ids: - handle_channel_is_old( - channel_ioc_ids, - cf_channel, - iocs, - ioc_info, - recceiverid, - managed_properties, - cf_config, - channels, - record_info_by_name, - ) - else: - orphan_channel(cf_channel, ioc_info, channels, cf_config, record_info_by_name) - else: - if cf_channel.name in new_channels: # case: channel in old and new - handle_channel_old_and_new( - cf_channel, - iocid, - ioc_info, - managed_properties, - channels, - new_channels, - cf_config, - record_info_by_name, - old_channels, - ) - - -def update_existing_channel_diff_iocid( - existing_channels: Dict[str, CFChannel], - channel_name: str, - new_properties: List[CFProperty], - managed_properties: Set[str], - channels: List[CFChannel], - cf_config: CFConfig, - record_info_by_name: Dict[str, RecordInfo], - ioc_info: IocInfo, - iocid: str, -) -> None: - """Update existing channel with the changed properties. - - Modifies: - channels - - Args: - existing_channels: The dictionary of existing channels. - channel_name: The name of the channel. - new_properties: The new properties. - managed_properties: The managed properties. - channels: The list of channels. - cf_config: configuration of processor - record_info_by_name: The dictionary of record names to information. - ioc_info: The IOC information. - iocid: The IOC ID. - """ - existing_channel = existing_channels[channel_name] - existing_channel.properties = __merge_property_lists( - new_properties, - existing_channel, - managed_properties, - ) - channels.append(existing_channel) - _log.debug("Add existing channel with different IOC: %s", existing_channel) - # in case, alias exists, update their properties too - if cf_config.alias_enabled and channel_name in record_info_by_name: - alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)] - for p in new_properties: - alias_properties.append(p) - for alias_name in record_info_by_name[channel_name].aliases: - if alias_name in existing_channels: - ach = existing_channels[alias_name] - ach.properties = __merge_property_lists( - alias_properties, - ach, - managed_properties, - ) - channels.append(ach) - else: - channels.append(CFChannel(alias_name, ioc_info.owner, alias_properties)) - _log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid) - - -def create_new_channel( - channels: List[CFChannel], - channel_name: str, - ioc_info: IocInfo, - new_properties: List[CFProperty], - cf_config: CFConfig, - record_info_by_name: Dict[str, RecordInfo], -) -> None: - """Create a new channel. - - Modifies: - channels - - Args: - channels: The list of channels. - channel_name: The name of the channel. - ioc_info: The IOC information. - new_properties: The new properties. - cf_config: configuration of processor - record_info_by_name: The dictionary of record names to information. - """ - - channels.append(CFChannel(channel_name, ioc_info.owner, new_properties)) - _log.debug("Add new channel: %s", channel_name) - if cf_config.alias_enabled and channel_name in record_info_by_name: - alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)] - for p in new_properties: - alias_properties.append(p) - for alias in record_info_by_name[channel_name].aliases: - channels.append(CFChannel(alias, ioc_info.owner, alias_properties)) - _log.debug("Add new alias: %s from %s", alias, channel_name) - - -class IOCMissingInfoError(Exception): - """Raised when an IOC is missing required information.""" - - def __init__(self, ioc_info: IocInfo): - super().__init__(f"Missing hostName {ioc_info.hostname} or iocName {ioc_info.ioc_name}") - self.ioc_info = ioc_info - - -def _update_channelfinder( - processor: CFProcessor, record_info_by_name: Dict[str, RecordInfo], records_to_delete, ioc_info: IocInfo -) -> None: - """Update Channelfinder with the provided IOC and Record information. - - Calculates the changes required to the channels list and pushes the update the channelfinder. - - Args: - processor: The processor. - record_info_by_name: The dictionary of record names to information. - records_to_delete: The list of records to delete. - ioc_info: The IOC information. - """ - _log.info("CF Update IOC: %s", ioc_info) - _log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) - # Consider making this function a class methed then 'processor' simply becomes 'self' - client = processor.client - channel_ioc_ids = processor.channel_ioc_ids - iocs = processor.iocs - cf_config = processor.cf_config - recceiverid = processor.cf_config.recceiver_id - new_channels = set(record_info_by_name.keys()) - iocid = ioc_info.ioc_id - - if iocid not in iocs: - _log.warning( - "IOC %s did not send an initial transaction to join IOC list (%d IOCs known)", - ioc_info, - len(iocs), - ) - - if ioc_info.hostname is None or ioc_info.ioc_name is None: - raise IOCMissingInfoError(ioc_info) - - if processor.cancelled: - raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") - - channels: List[CFChannel] = [] - # A list of channels in channelfinder with the associated hostName and iocName - _log.debug("Find existing channels by IOCID: %s", ioc_info) - old_channels: List[CFChannel] = [ - CFChannel.from_dict(ch) - for ch in client.findByArgs(prepare_find_args(cf_config=cf_config, args=[("iocid", iocid)])) - ] - - if old_channels: - handle_channels( - old_channels, - new_channels, - records_to_delete, - channel_ioc_ids, - iocs, - ioc_info, - recceiverid, - processor.managed_properties, - cf_config, - channels, - record_info_by_name, - iocid, - ) - # now pvNames contains a list of pv's new on this host/ioc - existing_channels = get_existing_channels(new_channels, client, cf_config) - - if processor.cancelled: - raise defer.CancelledError(f"CF Processor is cancelled, after fetching existing channels for {ioc_info}") - - for channel_name in new_channels: - new_properties = create_ioc_properties( - ioc_info.owner, - ioc_info.time, - recceiverid, - ioc_info.hostname, - ioc_info.ioc_name, - ioc_info.ioc_ip, - ioc_info.ioc_id, - ) - if ( - cf_config.record_type_enabled - and channel_name in record_info_by_name - and record_info_by_name[channel_name].record_type - ): - new_properties.append(CFProperty.record_type(ioc_info.owner, record_info_by_name[channel_name].record_type)) - if channel_name in record_info_by_name: - new_properties = new_properties + record_info_by_name[channel_name].info_properties - - if channel_name in existing_channels: - _log.debug("update existing channel %s: exists but with a different iocid from %s", channel_name, iocid) - update_existing_channel_diff_iocid( - existing_channels, - channel_name, - new_properties, - processor.managed_properties, - channels, - cf_config, - record_info_by_name, - ioc_info, - iocid, - ) - else: - create_new_channel(channels, channel_name, ioc_info, new_properties, cf_config, record_info_by_name) - _log.info("Total channels to update: %s for ioc: %s", len(channels), ioc_info) - - if len(channels) != 0: - cf_set_chunked(client, channels, cf_config.cf_query_limit) - else: - if old_channels and len(old_channels) != 0: - cf_set_chunked(client, channels, cf_config.cf_query_limit) - if processor.cancelled: - raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") - - -def cf_set_chunked(client: ChannelFinderClient, channels: List[CFChannel], chunk_size=DEFAULT_QUERY_LIMIT) -> None: - """Submit a list of channels to channelfinder in a chunked way. - - Args: - client: The channelfinder client. - channels: The list of channels. - chunk_size: The chunk size. - """ - for i in range(0, len(channels), chunk_size): - chunk = [ch.as_dict() for ch in channels[i : i + chunk_size]] - client.set(channels=chunk) - - -def create_ioc_properties( - owner: str, ioc_time: str, recceiverid: str, host_name: str, ioc_name: str, ioc_ip: str, iocid: str -) -> List[CFProperty]: - """Create the properties from an IOC. - - Args: - owner: The owner of the properties. - ioc_time: The time of the properties. - recceiverid: The recceiver ID of the properties. - host_name: The host name of the properties. - ioc_name: The IOC name of the properties. - ioc_ip: The IOC IP of the properties. - iocid: The IOC ID of the properties. - """ - return [ - CFProperty(CFPropertyName.HOSTNAME.value, owner, host_name), - CFProperty(CFPropertyName.IOC_NAME.value, owner, ioc_name), - CFProperty(CFPropertyName.IOC_ID.value, owner, iocid), - CFProperty(CFPropertyName.IOC_IP.value, owner, ioc_ip), - CFProperty.active(owner), - CFProperty.time(owner, ioc_time), - CFProperty(CFPropertyName.RECCEIVER_ID.value, owner, recceiverid), - ] - - -def create_default_properties( - ioc_info: IocInfo, recceiverid: str, channels_iocs: Dict[str, List[str]], iocs: Dict[str, IocInfo], cf_channel -) -> List[CFProperty]: - """Create the default properties for an IOC. - - Args: - ioc_info: The IOC information. - recceiverid: The recceiver ID of the properties. - channels_iocs: The dictionary of channel names to IOC IDs. - iocs: The dictionary of IOCs. - cf_channel: The Channelfinder channel. - """ - channel_name = cf_channel.name - last_ioc_info = iocs[channels_iocs[channel_name][-1]] - return create_ioc_properties( - ioc_info.owner, - ioc_info.time, - recceiverid, - last_ioc_info.hostname, - last_ioc_info.ioc_name, - last_ioc_info.ioc_ip, - last_ioc_info.ioc_id, - ) - - -def __merge_property_lists( - new_properties: List[CFProperty], channel: CFChannel, managed_properties: Set[str] = set() -) -> List[CFProperty]: - """Merges two lists of properties. - - Ensures that there are no 2 properties with - the same name In case of overlap between the new and old property lists the - new property list wins out. - - Args: - new_properties: The new properties. - channel: The channel. - managed_properties: The managed properties - """ - new_property_names = [p.name for p in new_properties] - for old_property in channel.properties: - if old_property.name not in new_property_names and (old_property.name not in managed_properties): - new_properties = new_properties + [old_property] - return new_properties - - -def get_current_time(timezone: Optional[str] = None) -> str: - """Get the current time. - - Args: - timezone: The timezone. - """ - if timezone: - return str(datetime.datetime.now().astimezone()) - return str(datetime.datetime.now()) - - -def prepare_find_args(cf_config: CFConfig, args) -> List[Tuple[str, str]]: - """Prepare the find arguments. - - Args: - cf_config: The configuration. - args: The arguments. - """ - size_limit = int(cf_config.cf_query_limit) - if size_limit > 0: - args.append(("~size", size_limit)) - return args - - -def push_to_cf( - update_method: Callable[[CFProcessor, Dict[str, RecordInfo], List[str], IocInfo], None], - processor: CFProcessor, - record_info_by_name: Dict[str, RecordInfo], - records_to_delete, - ioc_info: IocInfo, -) -> bool: - """Push updates for an IOC to channelfinder until it passes. - - Args: - update_method: The update method. - processor: The processor. - record_info_by_name: The record information by name. - records_to_delete: The records to delete. - ioc_info: The IOC information. - """ - _log.info("Pushing updates for %s begins...", ioc_info) - count = 0 - sleep = 1.0 - while processor.cf_config.push_always_retry or count < processor.cf_config.push_max_retries: - count += 1 - try: - update_method(processor, record_info_by_name, records_to_delete, ioc_info) - return True - except RequestException as e: - _log.error("ChannelFinder update failed: %s", e) - retry_seconds = min(60, sleep) - _log.info("ChannelFinder update retry in %s seconds", retry_seconds) - time.sleep(retry_seconds) - sleep *= 1.5 - _log.error("Pushing updates for %s complete, failed after %d attempts", ioc_info, count) - return False diff --git a/server/recceiver/mock_client.py b/server/recceiver/mock_client.py deleted file mode 100644 index 3d77d58b..00000000 --- a/server/recceiver/mock_client.py +++ /dev/null @@ -1,94 +0,0 @@ -from requests import HTTPError -from twisted.internet.address import IPv4Address - -from recceiver.cfstore import CFPropertyName, PVStatus - -MOCK_CF_HTTP_ERROR = "Mock Channelfinder Client HTTPError" - - -class MockCFClient: - def __init__(self): - self.cf = {} - self.connected = True - self.fail_find = False - self.fail_set = False - - def _find_by_iocid(self, key, value): - return [ - channel - for channel in self.cf.values() - if any(prop["name"] == key and prop["value"] == value for prop in channel["properties"]) - ] - - def _find_by_names(self, names): - return [self.cf[name] for name in names if name in self.cf] - - def _find_active(self): - return [ - channel - for channel in self.cf.values() - if any( - prop["name"] == CFPropertyName.PV_STATUS and prop["value"] == PVStatus.ACTIVE - for prop in channel["properties"] - ) - ] - - def findByArgs(self, args): # NOSONAR - mirrors pyCFClient API. - if not self.connected: - raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) - - key, value = args[0] - if key == CFPropertyName.IOC_ID: - return self._find_by_iocid(key, value) - if key == "~name": - return self._find_by_names(str(value).split("|")) - if key == CFPropertyName.PV_STATUS and value == PVStatus.ACTIVE: - return self._find_active() - return [] - - def findProperty(self, prop_name): # NOSONAR - mirrors pyCFClient API. - if not self.connected: - raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) - if prop_name in ("hostName", "iocName", "pvStatus", "time", "iocid"): - return prop_name - - def set(self, channels): - if not self.connected or self.fail_set: - raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) - for channel in channels: - self.addChannel(channel) - - def update(self, property, channelNames): # NOSONAR - mirrors pyCFClient API. - if not self.connected or self.fail_find: - raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) - for channel in channelNames: - self.__updateChannelWithProp(property, channel) - - def addChannel(self, channel): # NOSONAR - mirrors pyCFClient API. - self.cf[channel["name"]] = channel - - def __updateChannelWithProp(self, property, channel): # NOSONAR - legacy helper kept for compatibility. - if channel in self.cf: - for prop in self.cf[channel]["properties"]: - if prop["name"] == property["name"]: - prop["value"] = property["value"] - prop["owner"] = property["owner"] # also overwriting owner because that's what CF does - return - - -class MockConfig: - def get(self, _name, _target): - return "cf-engi" - - -class MockTransaction: - def __init__(self): - self.addrec = {} - self.src = IPv4Address("TCP", "testhosta", 1111) - self.delrec = () - self.infos = {"CF_USERNAME": "cf-update", "ENGINEER": "cf-engi"} - self.initial = True - self.connected = True - self.fail_set = False - self.fail_find = False - self.recinfos = {} diff --git a/server/requirements-ci-py39.txt b/server/requirements-ci-py39.txt index a8cf1c8c..fb6d198a 100644 --- a/server/requirements-ci-py39.txt +++ b/server/requirements-ci-py39.txt @@ -2,4 +2,5 @@ channelfinder @ https://github.com/ChannelFinder/pyCFClient/archive/refs/tags/v3 requests==2.32.3 twisted==24.11.0 pytest==8.3.5 +pytest-cov==6.0.0 testcontainers==4.8.2 diff --git a/server/tests/unit/cf/__init__.py b/server/tests/unit/cf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/tests/unit/cf/conftest.py b/server/tests/unit/cf/conftest.py new file mode 100644 index 00000000..00c62679 --- /dev/null +++ b/server/tests/unit/cf/conftest.py @@ -0,0 +1,28 @@ +from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, IOCInfo, PVStatus + +DEFAULT_RECCEIVER_ID = "test-recceiver" + + +def make_ioc(channelcount: int = 1) -> IOCInfo: + return IOCInfo( + host="1.2.3.4", # NOSONAR + hostname="ioc1.example.com", + ioc_name="IOC1", + ioc_ip="1.2.3.4", # NOSONAR + owner="engineer", + time="2026-01-01T00:00:00", + port=5064, + channelcount=channelcount, + ) + + +def make_channel(name: str, recceiver_id: str = DEFAULT_RECCEIVER_ID, active: bool = True) -> CFChannel: + status = PVStatus.ACTIVE if active else PVStatus.INACTIVE + return CFChannel( + name=name, + owner="admin", + properties=[ + CFProperty(CFPropertyName.PV_STATUS.value, "admin", status.value), + CFProperty(CFPropertyName.RECCEIVER_ID.value, "admin", recceiver_id), + ], + ) diff --git a/server/tests/unit/cf/mock_adapter.py b/server/tests/unit/cf/mock_adapter.py new file mode 100644 index 00000000..188187bf --- /dev/null +++ b/server/tests/unit/cf/mock_adapter.py @@ -0,0 +1,71 @@ +from typing import Dict, List + +from requests import HTTPError + +from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus + +MOCK_CF_HTTP_ERROR = "Mock Channelfinder Client HTTPError" + + +class MockCFAdapter: + """In-memory ChannelFinderAdapter for unit tests.""" + + def __init__(self): + self._channels: Dict[str, CFChannel] = {} + self.connected = True + self.fail_find = False + self.fail_set = False + + def find_by_ioc_id(self, iocid: str) -> List[CFChannel]: + if not self.connected or self.fail_find: + raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) + return [ + ch + for ch in self._channels.values() + if any(p.name == CFPropertyName.IOC_ID.value and p.value == iocid for p in ch.properties) + ] + + def find_by_names(self, names: List[str]) -> List[CFChannel]: + if not self.connected or self.fail_find: + raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) + return [self._channels[n] for n in names if n in self._channels] + + def find_active_for_recceiver(self, recceiverid: str) -> List[CFChannel]: + if not self.connected or self.fail_find: + raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) + return [ + ch + for ch in self._channels.values() + if any(p.name == CFPropertyName.PV_STATUS.value and p.value == PVStatus.ACTIVE.value for p in ch.properties) + and any(p.name == CFPropertyName.RECCEIVER_ID.value and p.value == recceiverid for p in ch.properties) + ] + + def set_channels(self, channels: List[CFChannel]) -> None: + if not self.connected or self.fail_set: + raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) + for channel in channels: + self._channels[channel.name] = channel + + def update_property(self, prop: CFProperty, channel_names: List[str]) -> None: + if not self.connected or self.fail_find: + raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) + for name in channel_names: + self._update_channel_with_prop(prop, name) + + def get_property_names(self) -> List[str]: + if not self.connected: + raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) + return ["hostName", "iocName", "pvStatus", "time", "iocid", "iocIP", "recceiverID"] + + def set_property(self, _name: str, _owner: str) -> None: + if not self.connected: + raise HTTPError(MOCK_CF_HTTP_ERROR, response=self) + + def _update_channel_with_prop(self, prop: CFProperty, channel_name: str) -> None: + if channel_name not in self._channels: + return + for p in self._channels[channel_name].properties: + if p.name == prop.name: + p.value = prop.value + p.owner = prop.owner + return diff --git a/server/tests/unit/cf/test_config.py b/server/tests/unit/cf/test_config.py new file mode 100644 index 00000000..af2da40b --- /dev/null +++ b/server/tests/unit/cf/test_config.py @@ -0,0 +1,39 @@ +from recceiver.cf.config import CFConfig +from tests.unit.conftest import make_adapter + + +class TestCFConfigLoads: + def test_loads_defaults_without_error(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert isinstance(config, CFConfig) + + def test_default_push_max_retries(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert config.push_max_retries == 10 + + def test_push_max_retries_from_config(self): + adapter = make_adapter(values={"pushmaxretries": "3"}) + config = CFConfig.loads(adapter) + assert config.push_max_retries == 3 + + def test_push_max_retries_from_env(self): + adapter = make_adapter(env={"pushmaxretries": "7"}) + config = CFConfig.loads(adapter) + assert config.push_max_retries == 7 + + def test_default_push_always_retry(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert config.push_always_retry is True + + def test_alias_disabled_by_default(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert config.alias_enabled is False + + def test_alias_enabled_from_config(self): + adapter = make_adapter(values={"alias": "true"}) + config = CFConfig.loads(adapter) + assert config.alias_enabled is True diff --git a/server/tests/unit/cf/test_model.py b/server/tests/unit/cf/test_model.py new file mode 100644 index 00000000..5a917107 --- /dev/null +++ b/server/tests/unit/cf/test_model.py @@ -0,0 +1,52 @@ +from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, IOCInfo, PVStatus + + +class TestIOCInfo: + def test_id_combines_host_and_port(self): + ioc = IOCInfo(host="1.2.3.4", hostname="h", ioc_name="n", ioc_ip="1.2.3.4", owner="o", time="t", port=5064) + assert ioc.id == "1.2.3.4:5064" + + +class TestPVStatus: + def test_active_value(self): + assert PVStatus.ACTIVE.value == "Active" + + def test_inactive_value(self): + assert PVStatus.INACTIVE.value == "Inactive" + + +class TestCFPropertyName: + def test_ioc_id_value(self): + assert CFPropertyName.IOC_ID.value == "iocid" + + def test_pv_status_value(self): + assert CFPropertyName.PV_STATUS.value == "pvStatus" + + +class TestCFProperty: + def test_as_dict_includes_all_fields(self): + p = CFProperty(name="hostName", owner="admin", value="ioc1") + d = p.as_dict() + assert d == {"name": "hostName", "owner": "admin", "value": "ioc1"} + + def test_as_dict_empty_value_becomes_empty_string(self): + p = CFProperty(name="hostName", owner="admin", value=None) + assert p.as_dict()["value"] == "" + + def test_from_dict_roundtrip(self): + original = CFProperty(name="pvStatus", owner="cf", value="Active") + assert CFProperty.from_dict(original.as_dict()) == original + + +class TestCFChannel: + def test_from_dict_roundtrip(self): + ch = CFChannel( + name="PV:1", + owner="admin", + properties=[CFProperty(CFPropertyName.PV_STATUS.value, "admin", PVStatus.ACTIVE.value)], + ) + assert CFChannel.from_dict(ch.as_dict()) == ch + + def test_from_dict_missing_properties_defaults_to_empty(self): + ch = CFChannel.from_dict({"name": "PV:1", "owner": "admin"}) + assert ch.properties == [] diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py new file mode 100644 index 00000000..209f89fd --- /dev/null +++ b/server/tests/unit/cf/test_processor.py @@ -0,0 +1,117 @@ +from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo +from recceiver.cf.processor import CFProcessor +from tests.unit.cf.conftest import DEFAULT_RECCEIVER_ID, make_channel, make_ioc +from tests.unit.cf.mock_adapter import MockCFAdapter +from tests.unit.conftest import make_adapter + + +def make_processor() -> CFProcessor: + return CFProcessor("test", make_adapter()) + + +def make_processor_with_mock(): + proc = CFProcessor("test", make_adapter(values={"recceiverid": DEFAULT_RECCEIVER_ID})) + adapter = MockCFAdapter() + proc.client = adapter + return proc, adapter + + +class TestRemoveChannel: + def test_missing_iocid_does_not_raise(self): + proc = make_processor() + iocid = make_ioc().id + proc.channel_ioc_ids["CHAN:1"].append(iocid) + # iocid deliberately absent from proc.iocs + proc.remove_channel("CHAN:1", iocid) + assert "CHAN:1" not in proc.channel_ioc_ids + + def test_missing_iocid_preserves_channel_when_other_iocs_remain(self): + proc = make_processor() + iocid = make_ioc().id + other_iocid = "9.9.9.9:5064" # NOSONAR + proc.channel_ioc_ids["CHAN:1"].append(iocid) + proc.channel_ioc_ids["CHAN:1"].append(other_iocid) + proc.remove_channel("CHAN:1", iocid) + assert "CHAN:1" in proc.channel_ioc_ids + assert other_iocid in proc.channel_ioc_ids["CHAN:1"] + + def test_removes_ioc_when_channelcount_reaches_zero(self): + proc = make_processor() + ioc = make_ioc(channelcount=1) + iocid = ioc.id + proc.iocs[iocid] = ioc + proc.channel_ioc_ids["CHAN:1"].append(iocid) + proc.remove_channel("CHAN:1", iocid) + assert iocid not in proc.iocs + assert "CHAN:1" not in proc.channel_ioc_ids + + def test_keeps_ioc_when_channelcount_still_positive(self): + proc = make_processor() + ioc = make_ioc(channelcount=2) + iocid = ioc.id + proc.iocs[iocid] = ioc + proc.channel_ioc_ids["CHAN:1"].append(iocid) + proc.channel_ioc_ids["CHAN:2"].append(iocid) + proc.remove_channel("CHAN:1", iocid) + assert iocid in proc.iocs + assert proc.iocs[iocid].channelcount == 1 + + +class TestCleanService: + def test_marks_active_channels_inactive(self): + proc, adapter = make_processor_with_mock() + adapter.set_channels([make_channel("PV:1"), make_channel("PV:2")]) + proc.clean_service() + for name in ("PV:1", "PV:2"): + status = next(p for p in adapter._channels[name].properties if p.name == CFPropertyName.PV_STATUS.value) + assert status.value == PVStatus.INACTIVE.value + + def test_is_no_op_when_no_active_channels(self): + proc, _ = make_processor_with_mock() + proc.clean_service() + + +class TestUpdateChannelFinder: + def _make_proc(self): + proc, adapter = make_processor_with_mock() + proc.cancelled = False + proc.managed_properties = set() + proc.record_property_names_list = set() + proc.env_vars = {} + return proc, adapter + + def test_registers_new_channel_as_active(self): + proc, adapter = self._make_proc() + ioc = make_ioc() + proc.iocs[ioc.id] = ioc + + proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc) + + assert "PV:1" in adapter._channels + status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) + assert status.value == PVStatus.ACTIVE.value + + def test_orphans_channel_absent_from_local_state(self): + proc, adapter = self._make_proc() + ioc = make_ioc() + iocid = ioc.id + proc.iocs[iocid] = ioc + # Channel is in CF under this IOC but has no entry in channel_ioc_ids — + # processor has no record of it, so it should be marked inactive. + adapter.set_channels( + [ + CFChannel( + "PV:1", + "admin", + [ + CFProperty(CFPropertyName.PV_STATUS.value, "admin", PVStatus.ACTIVE.value), + CFProperty(CFPropertyName.IOC_ID.value, "admin", iocid), + ], + ) + ] + ) + + proc._update_channelfinder({}, [], ioc) + + status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) + assert status.value == PVStatus.INACTIVE.value diff --git a/server/tests/unit/conftest.py b/server/tests/unit/conftest.py index e69de29b..f2102be8 100644 --- a/server/tests/unit/conftest.py +++ b/server/tests/unit/conftest.py @@ -0,0 +1,14 @@ +from configparser import ConfigParser + +from recceiver.processors import ConfigAdapter + + +def make_adapter(section: str = "cf", values: dict = None, env: dict = None) -> ConfigAdapter: + parser = ConfigParser() + parser.add_section(section) + for key, value in (values or {}).items(): + parser.set(section, key, str(value)) + adapter = ConfigAdapter(parser, section) + if env: + adapter.env_vars = env + return adapter diff --git a/server/tests/unit/test_cfstore.py b/server/tests/unit/test_cfstore.py deleted file mode 100644 index 9748efb1..00000000 --- a/server/tests/unit/test_cfstore.py +++ /dev/null @@ -1,109 +0,0 @@ -from configparser import ConfigParser - -from recceiver.cfstore import CFConfig, CFProcessor, IocInfo -from recceiver.processors import ConfigAdapter - - -def make_adapter(section: str = "cf", values: dict = None, env: dict = None) -> ConfigAdapter: - parser = ConfigParser() - parser.add_section(section) - for key, value in (values or {}).items(): - parser.set(section, key, str(value)) - adapter = ConfigAdapter(parser, section) - if env: - adapter.env_vars = env - return adapter - - -class TestCFConfigLoads: - def test_loads_defaults_without_error(self): - adapter = make_adapter() - config = CFConfig.loads(adapter) - assert isinstance(config, CFConfig) - - def test_default_push_max_retries(self): - adapter = make_adapter() - config = CFConfig.loads(adapter) - assert config.push_max_retries == 10 - - def test_push_max_retries_from_config(self): - adapter = make_adapter(values={"pushmaxretries": "3"}) - config = CFConfig.loads(adapter) - assert config.push_max_retries == 3 - - def test_push_max_retries_from_env(self): - adapter = make_adapter(env={"pushmaxretries": "7"}) - config = CFConfig.loads(adapter) - assert config.push_max_retries == 7 - - def test_default_push_always_retry(self): - adapter = make_adapter() - config = CFConfig.loads(adapter) - assert config.push_always_retry is True - - def test_alias_disabled_by_default(self): - adapter = make_adapter() - config = CFConfig.loads(adapter) - assert config.alias_enabled is False - - def test_alias_enabled_from_config(self): - adapter = make_adapter(values={"alias": "true"}) - config = CFConfig.loads(adapter) - assert config.alias_enabled is True - - -def make_processor() -> CFProcessor: - return CFProcessor("test", make_adapter()) - - -def make_ioc(channelcount: int = 1) -> IocInfo: - return IocInfo( - host="1.2.3.4", - hostname="ioc1.example.com", - ioc_name="IOC1", - ioc_ip="1.2.3.4", - owner="engineer", - time="2026-01-01T00:00:00", - port=5064, - channelcount=channelcount, - ) - - -class TestRemoveChannel: - def test_missing_iocid_does_not_raise(self): - proc = make_processor() - iocid = "1.2.3.4:5064" - proc.channel_ioc_ids["CHAN:1"].append(iocid) - # iocid deliberately absent from proc.iocs - proc.remove_channel("CHAN:1", iocid) - assert "CHAN:1" not in proc.channel_ioc_ids - - def test_missing_iocid_preserves_channel_when_other_iocs_remain(self): - proc = make_processor() - iocid = "1.2.3.4:5064" - proc.channel_ioc_ids["CHAN:1"].append(iocid) - proc.channel_ioc_ids["CHAN:1"].append("9.9.9.9:5064") - proc.remove_channel("CHAN:1", iocid) - assert "CHAN:1" in proc.channel_ioc_ids - assert "9.9.9.9:5064" in proc.channel_ioc_ids["CHAN:1"] - - def test_removes_ioc_when_channelcount_reaches_zero(self): - proc = make_processor() - ioc = make_ioc(channelcount=1) - iocid = ioc.ioc_id - proc.iocs[iocid] = ioc - proc.channel_ioc_ids["CHAN:1"].append(iocid) - proc.remove_channel("CHAN:1", iocid) - assert iocid not in proc.iocs - assert "CHAN:1" not in proc.channel_ioc_ids - - def test_keeps_ioc_when_channelcount_still_positive(self): - proc = make_processor() - ioc = make_ioc(channelcount=2) - iocid = ioc.ioc_id - proc.iocs[iocid] = ioc - proc.channel_ioc_ids["CHAN:1"].append(iocid) - proc.channel_ioc_ids["CHAN:2"].append(iocid) - proc.remove_channel("CHAN:1", iocid) - assert iocid in proc.iocs - assert proc.iocs[iocid].channelcount == 1 diff --git a/server/tests/unit/test_processors.py b/server/tests/unit/test_processors.py index 16bc2a64..066fa7c2 100644 --- a/server/tests/unit/test_processors.py +++ b/server/tests/unit/test_processors.py @@ -1,20 +1,9 @@ import textwrap -from configparser import ConfigParser from pathlib import Path -from recceiver.cfstore import CFProcessor -from recceiver.processors import ConfigAdapter, ProcessorController - - -def make_adapter(section: str = "cf", values: dict = None, env: dict = None) -> ConfigAdapter: - parser = ConfigParser() - parser.add_section(section) - for key, value in (values or {}).items(): - parser.set(section, key, str(value)) - adapter = ConfigAdapter(parser, section) - if env: - adapter.env_vars = env - return adapter +from recceiver.cf.processor import CFProcessor +from recceiver.processors import ProcessorController +from tests.unit.conftest import make_adapter class TestConfigAdapterGet: diff --git a/server/twisted/plugins/recceiver_plugin.py b/server/twisted/plugins/recceiver_plugin.py index 87e5825e..6b43f8b5 100644 --- a/server/twisted/plugins/recceiver_plugin.py +++ b/server/twisted/plugins/recceiver_plugin.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- -from recceiver import cfstore, dbstore, processors +from recceiver import dbstore, processors from recceiver.application import Maker +from recceiver.cf.processor import CFProcessor serviceMaker = Maker() showfactory = processors.ProcessorFactory("show", processors.ShowProcessor) dbfactory = processors.ProcessorFactory("db", dbstore.DBProcessor) -cffactory = processors.ProcessorFactory("cf", cfstore.CFProcessor) +cffactory = processors.ProcessorFactory("cf", CFProcessor)