diff --git a/server/pyproject.toml b/server/pyproject.toml index c3406c3..efb9889 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -48,10 +48,8 @@ package-data.twisted = [ "plugins/recceiver_plugin.py" ] [tool.ruff] target-version = "py39" line-length = 120 -# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. -# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or -# McCabe complexity (`C901`) by default. -lint.select = [ "E", "F", "I" ] +# E: pycodestyle errors, F: Pyflakes, G: flake8-logging-format (enforce lazy % in log calls), I: isort. +lint.select = [ "E", "F", "G", "I" ] [tool.pytest] ini_options.log_level = "DEBUG" diff --git a/server/recceiver/announcer.py b/server/recceiver/announcer.py index 1cad0fb..eaba18b 100644 --- a/server/recceiver/announcer.py +++ b/server/recceiver/announcer.py @@ -8,7 +8,7 @@ from .protocol.announce import ANNOUNCE_PORT, BROADCAST_ADDRESS, Announce -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) __all__ = ["Announcer", "SharedUDP"] @@ -53,14 +53,14 @@ def __init__( raise RuntimeError("Announce list is empty at start time...") def startProtocol(self): - _log.info("Setup Announcer") + log.info("Setup Announcer") self.D = self.reactor.callLater(0, self.sendOne) # we won't process any received traffic, so no reason to wake # up for it... self.transport.pauseProducing() def stopProtocol(self): - _log.info("Stop Announcer") + log.info("Stop Announcer") self.D.cancel() del self.D @@ -71,14 +71,14 @@ def sendOne(self): self.D = self.reactor.callLater(self.delay, self.sendOne) for A in self.udps: try: - _log.debug("announce to {s}".format(s=A)) + log.debug("announce to %s", A) self.transport.write(self.msg, A) try: self.udpErr.remove(A) - _log.warning("announce OK to {s}".format(s=A)) + log.warning("announce OK to %s", A) except KeyError: pass except MessageLengthError: if A not in self.udpErr: self.udpErr.add(A) - _log.exception("announce Error to {s}".format(s=A)) + log.exception("announce Error to %s", A) diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 4c1d82d..1a58455 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -6,7 +6,8 @@ from twisted.application import service from twisted.internet import defer, pollreactor, task from twisted.internet.error import CannotListenError -from twisted.python import log, usage +from twisted.python import log as twisted_log +from twisted.python import usage from zope.interface import implementer from twisted import plugin @@ -16,9 +17,7 @@ from .processors import ProcessorController from .recast import CastFactory -_log = logging.getLogger(__name__) - -pollreactor.install() +log = logging.getLogger(__name__) class Log2Twisted(logging.StreamHandler): @@ -29,7 +28,7 @@ def __init__(self): # The Twisted log publisher adds a newline, # so strip the newline added by the Python log handler. self.terminator = "" - self.write = log.msg + self.write = twisted_log.msg def flush(self): # Required by logging.StreamHandler; this handler writes directly to Twisted logs. @@ -74,7 +73,7 @@ def __init__(self, config): self.addrlist = [("", 5049)] def privilegedStartService(self): - _log.info("Starting RecService") + log.info("Starting RecService") # Start TCP server on random port self.tcpFactory = CastFactory() @@ -96,7 +95,7 @@ def privilegedStartService(self): # Find out which port is in use addr = self.tcp.getHost() - _log.info("RecService listening on {addr}".format(addr=addr)) + log.info("RecService listening on %s", addr) self.key = random.randint(0, 0xFFFFFFFF) @@ -117,9 +116,9 @@ def privilegedStartService(self): if self.metricsPort > 0: if metrics.available: self.reactor.listenTCP(self.metricsPort, metrics.make_site(), interface=self.bind) - _log.info("Prometheus metrics available on port %d", self.metricsPort) + log.info("Prometheus metrics available on port %d", self.metricsPort) else: - _log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled") + log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled") metrics.connections_limit.set(self.tcpFactory.maxActive) @@ -130,7 +129,7 @@ def privilegedStartService(self): def _logStatus(self): metrics.connections_active.set(self.tcpFactory.NActive) metrics.connections_waiting.set(len(self.tcpFactory.Wait)) - _log.info( + log.info( "status: connections active=%d/%d queued=%d", self.tcpFactory.NActive, self.tcpFactory.maxActive, @@ -138,7 +137,7 @@ def _logStatus(self): ) def stopService(self): - _log.info("Stopping RecService") + log.info("Stopping RecService") if self._statusLoop is not None and self._statusLoop.running: self._statusLoop.stop() @@ -165,6 +164,7 @@ class Maker(object): options = Options def make_service(self, opts): + pollreactor.install() ctrl = ProcessorController(cfile=opts["config"]) conf = ctrl.config("recceiver") S = RecService(conf) diff --git a/server/recceiver/cf/config.py b/server/recceiver/cf/config.py index 36293ee..3006d5a 100644 --- a/server/recceiver/cf/config.py +++ b/server/recceiver/cf/config.py @@ -46,6 +46,7 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig": clean_on_start=conf.getboolean("cleanOnStart", True), clean_on_stop=conf.getboolean("cleanOnStop", True), username=conf.get("username", "cfstore"), + env_owner_variable=conf.get("envOwnerVariable", "ENGINEER"), recceiver_id=conf.get("recceiverId", RECCEIVERID_DEFAULT), timezone=conf.get("timezone", ""), cf_query_limit=conf.get("findSizeLimit", DEFAULT_QUERY_LIMIT), diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 94046f7..ec49445 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -24,10 +24,9 @@ PVStatus, RecordInfo, ) -from recceiver.interfaces import CommitTransaction from recceiver.processors import ConfigAdapter -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) @implementer(interfaces.IProcessor) @@ -73,10 +72,10 @@ def startService(self): def _logStatus(self): metrics.known_iocs.set(len(self.iocs)) metrics.tracked_channels.set(len(self.channel_ioc_ids)) - _log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) + log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) def _start_service_with_lock(self): - _log.info("CF_START with configuration: %s", self.cf_config) + log.info("CF_START with configuration: %s", self.cf_config) if self.client is None: # For setting up mock test client self.client = PyCFClientAdapter( @@ -92,11 +91,11 @@ def _start_service_with_lock(self): cf_properties = set(self.client.get_property_names()) self._setup_cf_properties(cf_properties) except ConnectionError: - _log.exception("Cannot connect to Channelfinder service") + log.exception("Cannot connect to Channelfinder service") raise else: if self.cf_config.clean_on_start: - _log.info("CF Clean: scheduling background startup sweep") + log.info("CF Clean: scheduling background startup sweep") from twisted.internet import reactor reactor.callLater(0, self._start_background_clean) @@ -146,10 +145,10 @@ def _setup_cf_properties(self, cf_properties: Set[str]) -> None: self.record_property_names_list = record_property_names_list self.managed_properties = required_properties | record_property_names_list - _log.debug("record_property_names_list = %s", self.record_property_names_list) + log.debug("record_property_names_list = %s", self.record_property_names_list) def stopService(self): - _log.info("CF_STOP") + log.info("CF_STOP") if self._statusLoop is not None and self._statusLoop.running: self._statusLoop.stop() service.Service.stopService(self) @@ -162,15 +161,13 @@ def _stop_service_with_lock(self): The sweep runs in a background thread to avoid blocking the reactor. The lock is held throughout, preventing new commits from interleaving. """ - _log.info("CF_STOP with lock") + log.info("CF_STOP with lock") if self.cf_config.clean_on_stop: return deferToThread(self.clean_service) def _start_background_clean(self): - _log.info("CF Clean: background startup sweep beginning") - deferToThread(self.clean_service).addErrback( - lambda err: _log.error("CF Clean background sweep failed: %s", err) - ) + log.info("CF Clean: background startup sweep beginning") + deferToThread(self.clean_service).addErrback(lambda err: log.error("CF Clean background sweep failed: %s", err)) # @defer.inlineCallbacks # Twisted v16 does not support cancellation! def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: @@ -200,7 +197,7 @@ def chain_error(err): Note this is not foolproof as the thread may still be running. """ if not err.check(defer.CancelledError): - _log.error("CF_COMMIT FAILURE: %s", err) + log.error("CF_COMMIT FAILURE: %s", err) if self.cancelled: if not err.check(defer.CancelledError): raise defer.CancelledError() @@ -217,7 +214,9 @@ def chain_result(result): t.addCallbacks(chain_result, chain_error) return d - def transaction_to_record_infos(self, ioc_info: IOCInfo, transaction: CommitTransaction) -> Dict[str, RecordInfo]: + def transaction_to_record_infos( + self, ioc_info: IOCInfo, transaction: interfaces.ITransaction + ) -> Dict[str, RecordInfo]: """Build a RecordInfo dict keyed by record_id from a transaction. Merges record types, info-tag properties, aliases, and mapped EPICS @@ -233,7 +232,7 @@ def transaction_to_record_infos(self, ioc_info: IOCInfo, transaction: CommitTran for record_id, (record_infos_to_add) in transaction.record_infos_to_add.items(): # find intersection of these sets if record_id not in record_infos: - _log.warning("IOC: %s: PV not found for recinfo with RID: %s", ioc_info, record_id) + log.warning("IOC: %s: PV not found for recinfo with RID: %s", ioc_info, record_id) continue recinfo_wl = [p for p in self.record_property_names_list if p in record_infos_to_add.keys()] if recinfo_wl: @@ -244,7 +243,7 @@ def transaction_to_record_infos(self, ioc_info: IOCInfo, transaction: CommitTran for record_id, record_aliases in transaction.aliases.items(): if record_id not in record_infos: - _log.warning("IOC: %s: PV not found for alias with RID: %s", ioc_info, record_id) + log.warning("IOC: %s: PV not found for alias with RID: %s", ioc_info, record_id) continue record_infos[record_id].aliases = record_aliases @@ -255,7 +254,7 @@ def _apply_env_vars( self, record_infos: Dict[str, RecordInfo], ioc_info: IOCInfo, - transaction: CommitTransaction, + transaction: interfaces.ITransaction, ) -> None: """Append mapped EPICS environment variable properties to every record.""" for record_id in record_infos: @@ -264,7 +263,7 @@ def _apply_env_vars( if value is not None: record_infos[record_id].info_properties.append(CFProperty(cf_prop_name, ioc_info.owner, value)) else: - _log.debug( + log.debug( "EPICS environment var %s not found in IOC: %s", epics_env_var_name, ioc_info, @@ -279,14 +278,14 @@ def record_info_by_name(record_infos: Dict[str, RecordInfo], ioc_info: IOCInfo) record_info_by_name = {} for info in record_infos.values(): if info.pv_name in record_info_by_name: - _log.warning("Commit contains multiple records with PV name: %s (%s)", info.pv_name, ioc_info) + log.warning("Commit contains multiple records with PV name: %s (%s)", info.pv_name, ioc_info) continue record_info_by_name[info.pv_name] = info return record_info_by_name def update_ioc_infos( self, - transaction: CommitTransaction, + transaction: interfaces.ITransaction, ioc_info: IOCInfo, records_to_delete: List[str], record_info_by_name: Dict[str, RecordInfo], @@ -322,22 +321,22 @@ def _remove_aliases(self, aliases: List[str], iocid: str) -> None: for alias in aliases: self.remove_channel(alias, iocid) - def _commit_with_thread(self, transaction: CommitTransaction): + def _commit_with_thread(self, transaction: interfaces.ITransaction): host = transaction.source_address.host port = transaction.source_address.port if not self.running: raise defer.CancelledError(f"CF Processor is not running (transaction: {host}:{port})") - _log.info("CF_COMMIT: %s", transaction) - _log.debug("CF_COMMIT: transaction: %s", repr(transaction)) + log.info("CF_COMMIT: %s", transaction) + log.debug("CF_COMMIT: transaction: %s", repr(transaction)) ioc_name = transaction.client_infos.get("IOCNAME") if not ioc_name: ioc_name = str(port) - _log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port) + log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port) if ioc_name.isdigit() and 1024 <= int(ioc_name) <= 65535: - _log.warning( + log.warning( "IOC at %s has numeric iocName '%s' (looks like an ephemeral port) — " "iocid will change on every reconnect, causing stale channels in CF; " "configure a stable iocName via reccaster", @@ -351,7 +350,7 @@ def _commit_with_thread(self, transaction: CommitTransaction): or self.cf_config.username ) if owner == self.cf_config.username: - _log.debug( + log.debug( "IOC at %s:%d did not send %s or CF_USERNAME; using service account as owner", host, port, @@ -371,11 +370,11 @@ def _commit_with_thread(self, transaction: CommitTransaction): record_infos = self.transaction_to_record_infos(ioc_info, transaction) records_to_delete = list(transaction.records_to_delete) - _log.debug("Delete records: %s", records_to_delete) + log.debug("Delete records: %s", records_to_delete) record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) if not transaction.connected and ioc_info.id not in self.iocs: - _log.warning( + log.warning( "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", host, port, @@ -400,7 +399,7 @@ def remove_channel(self, record_name: str, iocid: str) -> None: self.iocs[iocid].channelcount -= 1 if self.iocs[iocid].channelcount <= 0: if self.iocs[iocid].channelcount < 0: - _log.error("Channel count negative: %s", iocid) + log.error("Channel count negative: %s", iocid) self.iocs.pop(iocid) if len(self.channel_ioc_ids[record_name]) == 0: del self.channel_ioc_ids[record_name] @@ -413,21 +412,21 @@ def clean_service(self) -> None: recceiverid = self.cf_config.recceiver_id while 1: try: - _log.info("CF Clean Started") + log.info("CF Clean Started") channels = self.get_active_channels(recceiverid) while channels: self.clean_channels(owner, channels) channels = self.get_active_channels(recceiverid) - _log.info("CF Clean Completed") + log.info("CF Clean Completed") return except RequestException: - _log.exception("Clean service failed") + log.exception("Clean service failed") retry_seconds = min(60, sleep) - _log.info("Clean service retry in %s seconds", retry_seconds) + log.info("Clean service retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: - _log.info("Abandoning clean after %s seconds", retry_limit) + log.info("Abandoning clean after %s seconds", retry_limit) return def get_active_channels(self, recceiverid: str) -> List[CFChannel]: @@ -437,8 +436,8 @@ def get_active_channels(self, recceiverid: str) -> List[CFChannel]: def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: """Mark the given channels Inactive in CF.""" names = [ch.name for ch in channels or []] - _log.info("Cleaning %s channels.", len(names)) - _log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) + log.info("Cleaning %s channels.", len(names)) + log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) self.client.update_property(CFProperty(CFPropertyName.PV_STATUS.value, owner, PVStatus.INACTIVE.value), names) def _push_to_cf( @@ -447,12 +446,12 @@ def _push_to_cf( records_to_delete: List[str], ioc_info: IOCInfo, ) -> bool: - _log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) + log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) count = 0 sleep = 1.0 while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: if not self.running: - _log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) + log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) return False count += 1 t0 = time.monotonic() @@ -461,34 +460,38 @@ def _push_to_cf( elapsed = time.monotonic() - t0 metrics.cf_commit_duration_seconds.observe(elapsed) metrics.cf_commits_total.labels(result="success").inc() - _log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) + log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) return True except RequestException: elapsed = time.monotonic() - t0 - _log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) + log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) retry_seconds = min(60, sleep) - _log.info("CF push retry in %s seconds", retry_seconds) + log.info("CF push retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 metrics.cf_commits_total.labels(result="cancelled").inc() - _log.error("CF push gave up after %d attempts: %s", count, ioc_info) + log.error("CF push gave up after %d attempts: %s", count, ioc_info) return False + def _assert_not_cancelled(self, context: str) -> None: + if self.cancelled: + raise defer.CancelledError(f"Processor cancelled: {context}") + def _update_channelfinder( self, record_info_by_name: Dict[str, RecordInfo], records_to_delete: List[str], ioc_info: IOCInfo, ) -> None: - _log.info("CF Update IOC: %s", ioc_info) - _log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) + log.info("CF Update IOC: %s", ioc_info) + log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) recceiverid = self.cf_config.recceiver_id new_channels = set(record_info_by_name.keys()) iocid = ioc_info.id if iocid not in self.iocs and record_info_by_name: # Disconnect-before-upload is already logged in _commit_with_thread. - _log.warning( + log.warning( "IOC %s committed update without prior initial transaction (%d IOCs known)", ioc_info, len(self.iocs), @@ -497,11 +500,10 @@ def _update_channelfinder( if ioc_info.hostname is None or ioc_info.ioc_name is None: raise IOCMissingInfoError(ioc_info) - if self.cancelled: - raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + self._assert_not_cancelled(f"before fetching old channels for {ioc_info}") channels: List[CFChannel] = [] - _log.debug("Find existing channels by IOCID: %s", ioc_info) + log.debug("Find existing channels by IOCID: %s", ioc_info) old_channels: List[CFChannel] = self.client.find_by_ioc_id(iocid) if old_channels: @@ -518,21 +520,19 @@ def _update_channelfinder( # now pvNames contains a list of pv's new on this host/ioc existing_channels = self._get_existing_channels(new_channels) - if self.cancelled: - raise defer.CancelledError(f"CF Processor is cancelled, after fetching existing channels for {ioc_info}") + self._assert_not_cancelled(f"after fetching existing channels for {ioc_info}") self._process_new_channels( new_channels, record_info_by_name, ioc_info, recceiverid, existing_channels, channels, iocid ) - _log.info("Total channels to update: %s for ioc: %s", len(channels), ioc_info) + log.info("Total channels to update: %s for ioc: %s", len(channels), ioc_info) if len(channels) != 0: self._cf_set_chunked(channels) else: if old_channels and len(old_channels) != 0: self._cf_set_chunked(channels) - if self.cancelled: - raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + self._assert_not_cancelled(f"after setting channels for {ioc_info}") def _process_new_channels( self, @@ -562,7 +562,7 @@ def _process_new_channels( ) new_properties = new_properties + record_info.info_properties if channel_name in existing_channels: - _log.debug("update existing channel %s: exists but with a different iocid from %s", channel_name, iocid) + log.debug("update existing channel %s: exists but with a different iocid from %s", channel_name, iocid) self._update_existing_channel_diff_iocid( existing_channels, channel_name, new_properties, channels, record_info_by_name, ioc_info, iocid ) @@ -593,7 +593,7 @@ def _handle_channels( """ for cf_channel in old_channels: if not new_channels or cf_channel.name in records_to_delete: - _log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) + log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) if cf_channel.name in self.channel_ioc_ids: self._handle_channel_is_old(cf_channel, ioc_info, recceiverid, channels, record_info_by_name) else: @@ -621,7 +621,7 @@ def _handle_channel_is_old( self.managed_properties, ) channels.append(cf_channel) - _log.debug("Add existing channel %s to previous IOC %s", cf_channel, last_ioc_id) + log.debug("Add existing channel %s to previous IOC %s", cf_channel, last_ioc_id) if self.cf_config.alias_enabled: if cf_channel.name in record_info_by_name: for alias_name in record_info_by_name[cf_channel.name].aliases: @@ -638,7 +638,7 @@ def _handle_channel_is_old( self.managed_properties, ) channels.append(alias_channel) - _log.debug("Add existing alias %s to previous IOC: %s", alias_channel, last_alias_ioc_id) + log.debug("Add existing alias %s to previous IOC: %s", alias_channel, last_alias_ioc_id) def _orphan_channel( self, @@ -656,7 +656,7 @@ def _orphan_channel( cf_channel, ) channels.append(cf_channel) - _log.debug("Add orphaned channel %s with no IOC: %s", cf_channel, ioc_info) + log.debug("Add orphaned channel %s with no IOC: %s", cf_channel, ioc_info) if self.cf_config.alias_enabled: if cf_channel.name in record_info_by_name: for alias_name in record_info_by_name[cf_channel.name].aliases: @@ -669,7 +669,7 @@ def _orphan_channel( alias_channel, ) channels.append(alias_channel) - _log.debug("Add orphaned alias %s with no IOC: %s", alias_channel, ioc_info) + log.debug("Add orphaned alias %s with no IOC: %s", alias_channel, ioc_info) def _handle_channel_old_and_new( self, @@ -682,7 +682,7 @@ def _handle_channel_old_and_new( old_channels: List[CFChannel], ) -> None: """Channel exists in CF with the same iocid — mark active and update time.""" - _log.debug("Channel %s exists in Channelfinder with same iocid %s", cf_channel.name, iocid) + log.debug("Channel %s exists in Channelfinder with same iocid %s", cf_channel.name, iocid) cf_channel.properties = _merge_property_lists( [ CFProperty(CFPropertyName.PV_STATUS.value, ioc_info.owner, PVStatus.ACTIVE.value), @@ -692,7 +692,7 @@ def _handle_channel_old_and_new( self.managed_properties, ) channels.append(cf_channel) - _log.debug("Add existing channel with same IOC: %s", cf_channel) + log.debug("Add existing channel with same IOC: %s", cf_channel) new_channels.remove(cf_channel.name) if self.cf_config.alias_enabled: @@ -722,7 +722,7 @@ def _handle_channel_old_and_new( ) channels.append(CFChannel(alias_name, ioc_info.owner, aprops)) new_channels.remove(alias_name) - _log.debug("Add existing alias with same IOC: %s", cf_channel) + log.debug("Add existing alias with same IOC: %s", cf_channel) def _get_existing_channels(self, new_channels: Set[str]) -> Dict[str, CFChannel]: """Query CF for channels in new_channels that already exist there.""" @@ -746,7 +746,7 @@ def _update_existing_channel_diff_iocid( self.managed_properties, ) channels.append(existing_channel) - _log.debug("Add existing channel with different IOC: %s", existing_channel) + log.debug("Add existing channel with different IOC: %s", existing_channel) if self.cf_config.alias_enabled and channel_name in record_info_by_name: alias_properties = [CFProperty(CFPropertyName.ALIAS.value, ioc_info.owner, channel_name)] + new_properties for alias_name in record_info_by_name[channel_name].aliases: @@ -756,7 +756,7 @@ def _update_existing_channel_diff_iocid( channels.append(ach) else: channels.append(CFChannel(alias_name, ioc_info.owner, alias_properties)) - _log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid) + log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid) def _create_new_channel( self, @@ -767,12 +767,12 @@ def _create_new_channel( record_info_by_name: Dict[str, RecordInfo], ) -> None: channels.append(CFChannel(channel_name, ioc_info.owner, new_properties)) - _log.debug("Add new channel: %s", channel_name) + log.debug("Add new channel: %s", channel_name) if self.cf_config.alias_enabled and channel_name in record_info_by_name: alias_properties = [CFProperty(CFPropertyName.ALIAS.value, ioc_info.owner, channel_name)] + new_properties for alias in record_info_by_name[channel_name].aliases: channels.append(CFChannel(alias, ioc_info.owner, alias_properties)) - _log.debug("Add new alias: %s from %s", alias, channel_name) + log.debug("Add new alias: %s from %s", alias, channel_name) def create_ioc_properties( diff --git a/server/recceiver/dbstore.py b/server/recceiver/dbstore.py index 9ca0ee7..a8bb80e 100644 --- a/server/recceiver/dbstore.py +++ b/server/recceiver/dbstore.py @@ -10,7 +10,7 @@ from . import interfaces -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) __all__ = ["DBProcessor"] @@ -40,7 +40,7 @@ def wait_for(self, deferred): return deferred def startService(self): - _log.info("Start DBService") + log.info("Start DBService") service.Service.startService(self) # map of source id# to server table id keys @@ -65,7 +65,7 @@ def startService(self): self.wait_for(self.pool.runInteraction(self.cleanupDB)) def stopService(self): - _log.info("Stop DBService") + log.info("Stop DBService") service.Service.stopService(self) @@ -76,7 +76,7 @@ def stopService(self): return defer.DeferredList(list(self.Ds), consumeErrors=True) def cleanupDB(self, cur): - _log.info("Cleanup DBService") + log.info("Cleanup DBService") assert self.mykey != 0 cur.execute("PRAGMA foreign_keys = ON;") diff --git a/server/recceiver/interfaces.py b/server/recceiver/interfaces.py index 26ae07e..b9e4e99 100644 --- a/server/recceiver/interfaces.py +++ b/server/recceiver/interfaces.py @@ -1,17 +1,14 @@ # -*- coding: utf-8 -*- -from dataclasses import dataclass -from typing import Dict, List, Set, Tuple - from twisted.application import service from zope.interface import Attribute, Interface class ITransaction(Interface): - source_address = Attribute("Source Address.") + source_address = Attribute("IAddress of the IOC connection (provides .host: str and .port: int)") records_to_add = Attribute("""Records being added - {recid: ('recname', 'rectype', {'key':'val'})} + {recid: ('recname', 'rectype')} """) records_to_delete = Attribute("A set() of recids which are being removed") @@ -22,23 +19,11 @@ class ITransaction(Interface): recid: {'key':'val'} """) + aliases = Attribute("A dict mapping record id to list of alias names") -@dataclass -class SourceAddress: - host: str - port: int - + initial = Attribute("True if this is the first transaction for this IOC connection") -@dataclass -class CommitTransaction: - source_address: SourceAddress - client_infos: Dict[str, str] - records_to_add: Dict[str, Tuple[str, str]] - records_to_delete: Set[str] - record_infos_to_add: Dict[str, Dict[str, str]] - aliases: Dict[str, List[str]] - initial: bool - connected: bool + connected = Attribute("False if the IOC has disconnected") class IProcessor(service.IService): diff --git a/server/recceiver/processors.py b/server/recceiver/processors.py index 2c79d02..d1c32a3 100644 --- a/server/recceiver/processors.py +++ b/server/recceiver/processors.py @@ -14,7 +14,7 @@ from . import interfaces -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) __all__ = [ "ShowProcessor", @@ -88,7 +88,7 @@ def __init__(self, cfile=None): plugs = {} for plug in plugin.getPlugins(interfaces.IProcessorFactory): - _log.debug("Available plugin: {name}".format(name=plug.name)) + log.debug("Available plugin: %s", plug.name) plugs[plug.name] = plug self.procs = [] @@ -119,13 +119,13 @@ def config(self, section): def commit(self, trans): def punish(err, processor): if err.check(defer.CancelledError): - _log.debug("Cancel processing: {name}: {trans}".format(name=processor.name, trans=trans)) + log.debug("Cancel processing: %s: %s", processor.name, trans) return err try: self.procs.remove(processor) - _log.error("Remove processor: {name}: {err}".format(name=processor.name, err=err)) + log.error("Remove processor: %s: %s", processor.name, err) except ValueError: - _log.debug("Remove processor: {name}: aleady removed".format(name=processor.name)) + log.debug("Remove processor: %s: already removed", processor.name) return err defers = [defer.maybeDeferred(P.commit, trans).addErrback(punish, P) for P in self.procs] @@ -146,7 +146,7 @@ def __init__(self, name, opts): def startService(self): service.Service.startService(self) - _log.info("Show processor '{processor}' starting".format(processor=self.name)) + log.info("Show processor '%s' starting", self.name) def commit(self, transaction): def with_lock(_ignored): @@ -170,27 +170,25 @@ def release_lock(result): return self.lock.acquire().addCallback(with_lock) def _commit(self, trans): - _log.debug("# Show processor '{name}' commit".format(name=self.name)) - _log.info("# From {host}:{port}".format(host=trans.source_address.host, port=trans.source_address.port)) + log.debug("# Show processor '%s' commit", self.name) + log.info("# From %s:%s", trans.source_address.host, trans.source_address.port) if not trans.connected: - _log.info("# connection lost") + log.info("# connection lost") for item in trans.client_infos.items(): - _log.info(" epicsEnvSet('{name}','{value}')".format(name=item[0], value=item[1])) + log.info(" epicsEnvSet('%s','%s')", item[0], item[1]) for record_id, (record_name, record_type) in trans.records_to_add.items(): - _log.info( - ' record({record_type}, "{record_name}") {{'.format(record_type=record_type, record_name=record_name) - ) + log.info(' record(%s, "%s") {', record_type, record_name) for alias in trans.aliases.get(record_id, []): - _log.info(' alias("{alias}")'.format(alias=alias)) + log.info(' alias("%s")', alias) for item in trans.record_infos_to_add.get(record_id, {}).items(): - _log.info(' info({name},"{value}")'.format(name=item[0], value=[1])) - _log.info(" }") + log.info(' info(%s,"%s")', item[0], item[1]) + log.info(" }") yield - _log.info("# End") + log.info("# End") def stopService(self): service.Service.stopService(self) - _log.info("Show processor '{name}' stopping".format(name=self.name)) + log.info("Show processor '%s' stopping", self.name) @implementer(plugin.IPlugin, interfaces.IProcessorFactory) diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index af6f0a0..6cf5689 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -6,13 +6,16 @@ import time from twisted.internet import defer, protocol +from twisted.internet.interfaces import IAddress from twisted.protocols import stateful from zope.interface import implementer from .interfaces import ITransaction from .protocol import messages -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) + +_PROTOCOL_ERROR_MSG = "Protocol error! %s" class CastReceiver(stateful.StatefulProtocol): @@ -44,7 +47,7 @@ def connectionMade(self): if self.active: # Full speed ahead self.phase = 1 # 1: send ping, 2: receive pong - self.T = self.reactor.callLater(self.timeout, self.writePing) + self._ping_timer = self.reactor.callLater(self.timeout, self.writePing) self.transport.write(messages.ServerGreeting(self.version).frame()) self.uploadStart = time.time() else: @@ -53,28 +56,28 @@ def connectionMade(self): def connectionLost(self, reason=protocol.connectionDone): self.factory.isDone(self, self.active) - if self.T and self.T.active(): - self.T.cancel() - del self.T + if self._ping_timer and self._ping_timer.active(): + self._ping_timer.cancel() + del self._ping_timer if self.sess: self.sess.close() del self.sess def restartPingTimer(self): - T, self.T = self.T, self.reactor.callLater(self.timeout, self.writePing) - if T and T.active(): - T.cancel() + old, self._ping_timer = self._ping_timer, self.reactor.callLater(self.timeout, self.writePing) + if old and old.active(): + old.cancel() def writePing(self): if self.phase == 2: self.transport.loseConnection() - _log.debug("pong missed: close connection") + log.debug("pong missed: close connection") else: self.restartPingTimer() self.phase = 2 self.nonce = random.randint(0, 0xFFFFFFFF) self.transport.write(messages.Ping(self.nonce).frame()) - _log.debug("ping nonce: " + str(self.nonce)) + log.debug("ping nonce: %s", self.nonce) def getInitialState(self): return (self.recvHeader, messages.Header.payload.size) @@ -84,11 +87,11 @@ def recvHeader(self, data): try: header = messages.Header.decode(data) except messages.ProtocolError as exc: - _log.error(f"Protocol error! {exc}") + log.exception(_PROTOCOL_ERROR_MSG, exc) self.transport.loseConnection() return if header.body_length == 0: - _log.debug(f"Ignoring empty message {header.msg_id:#06x}") + log.debug("Ignoring empty message %#06x", header.msg_id) return self.getInitialState() self.msgid = header.msg_id fn, minlen = self.rxfn[self.msgid] @@ -102,11 +105,11 @@ def recvClientGreeting(self, body): try: greeting = messages.ClientGreeting.decode(body) except messages.ProtocolError as exc: - _log.error(f"Protocol error! {exc}") + log.exception(_PROTOCOL_ERROR_MSG, exc) self.transport.loseConnection() return if greeting.client_type != 0: - _log.error(f"unsupported client type {greeting.client_type}") + log.error("unsupported client type %s", greeting.client_type) self.transport.loseConnection() return self.version = min(self.version, greeting.version) @@ -119,14 +122,14 @@ def recvPong(self, body): try: pong = messages.Pong.decode(body) except messages.ProtocolError as exc: - _log.error(f"Protocol error! {exc}") + log.exception(_PROTOCOL_ERROR_MSG, exc) self.transport.loseConnection() return if pong.nonce != self.nonce: - _log.error(f"pong nonce does not match! {pong.nonce}!={self.nonce}") + log.error("pong nonce does not match! %s!=%s", pong.nonce, self.nonce) self.transport.loseConnection() else: - _log.debug("pong nonce match") + log.debug("pong nonce match") self.phase = 1 return self.getInitialState() @@ -135,7 +138,7 @@ def recvInfo(self, body): try: info = messages.AddInfo.decode(body) except messages.ProtocolError: - _log.error("Ignoring info update") + log.error("Ignoring info update") return self.getInitialState() if info.record_id: self.sess.rec_info(info.record_id, info.key, info.value) @@ -148,7 +151,7 @@ def recvAddRec(self, body): try: record = messages.AddRecord.decode(body) except messages.ProtocolError: - _log.error("Ignoring record update") + log.error("Ignoring record update") return self.getInitialState() if record.is_alias: self.sess.add_alias(record.record_id, record.record_name) @@ -162,7 +165,7 @@ def recvDelRec(self, body): try: record = messages.DelRecord.decode(body) except messages.ProtocolError: - _log.error("Ignoring delete record update") + log.error("Ignoring delete record update") return self.getInitialState() self.sess.del_record(record.record_id) return self.getInitialState() @@ -172,7 +175,7 @@ def recvDone(self, body): try: messages.UploadDone.decode(body) except messages.ProtocolError: - _log.error("Ignoring done update") + log.error("Ignoring done update") return self.getInitialState() self.factory.isDone(self, self.active) self.sess.done() @@ -182,14 +185,13 @@ def recvDone(self, body): elapsed_s = time.time() - self.uploadStart size_kb = self.uploadSize / 1024 rate_kbs = size_kb / elapsed_s - source_address = "{}:{}".format(self.sess.ep.host, self.sess.ep.port) - _log.info( - "Done message from {source_address}: uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)".format( - source_address=source_address, - size_kb=size_kb, - elapsed_s=elapsed_s, - rate_kbs=rate_kbs, - ) + log.info( + "Done message from %s:%s: uploaded %skB in %ss (%skB/s)", + self.sess.ep.host, + self.sess.ep.port, + size_kb, + elapsed_s, + rate_kbs, ) return self.getInitialState() @@ -203,8 +205,18 @@ def dfact(cls): @implementer(ITransaction) -class Transaction(object): - def __init__(self, ep, id): +class Transaction: + source_address: IAddress + connected: bool + initial: bool + srcid: int + records_to_add: dict[int, tuple[str, str]] + client_infos: dict[str, str] + record_infos_to_add: dict[int, dict[str, str]] + aliases: collections.defaultdict[int, list[str]] + records_to_delete: set[int] + + def __init__(self, ep: IAddress, id: int) -> None: self.connected = True self.initial = False self.source_address = ep @@ -214,57 +226,54 @@ def __init__(self, ep, id): self.records_to_delete = set() def show(self): - _log.info(str(self)) + log.info(str(self)) def __str__(self): - source_address = "{}:{}".format(self.source_address.host, self.source_address.port) - init = self.initial - conn = self.connected - nenv = len(self.client_infos) - nadd = len(self.records_to_add) - ndel = len(self.records_to_delete) - ninfo = len(self.record_infos_to_add) - nalias = len(self.aliases) - return "Transaction(Src:{}, Init:{}, Conn:{}, Env:{}, Rec:{}, Alias:{}, Info:{}, Del:{})".format( - source_address, init, conn, nenv, nadd, nalias, ninfo, ndel + src = f"{self.source_address.host}:{self.source_address.port}" + return ( + f"Transaction(Src:{src}, Init:{self.initial}, Conn:{self.connected}," + f" Env:{len(self.client_infos)}, Rec:{len(self.records_to_add)}," + f" Alias:{len(self.aliases)}, Info:{len(self.record_infos_to_add)}," + f" Del:{len(self.records_to_delete)})" ) def __repr__(self): - return f"""Transaction( - source_address={self.source_address}, - initial={self.initial}, - connected={self.connected}, - records_to_add={self.records_to_add}, - client_infos={self.client_infos}, - record_infos_to_add={self.record_infos_to_add}, - aliases={self.aliases}, - records_to_delete={self.records_to_delete}) - """ - - -class CollectionSession(object): + return ( + f"Transaction(" + f"source_address={self.source_address}, " + f"initial={self.initial}, " + f"connected={self.connected}, " + f"records_to_add={self.records_to_add}, " + f"client_infos={self.client_infos}, " + f"record_infos_to_add={self.record_infos_to_add}, " + f"aliases={self.aliases}, " + f"records_to_delete={self.records_to_delete})" + ) + + +class CollectionSession: timeout = 5.0 trlimit = 5000 def __init__(self, proto, endpoint): from twisted.internet import reactor - _log.info("Open session from {endpoint}".format(endpoint=endpoint)) + log.info("Open session from %s", endpoint) self.reactor = reactor self.proto, self.ep = proto, endpoint self.transaction = Transaction(self.ep, id(self)) self.transaction.initial = True - self.C = defer.succeed(None) - self.T = None + self._commit_chain = defer.succeed(None) + self._flush_deadline = None self.dirty = False def close(self): - _log.info("Close session from {ep}".format(ep=self.ep)) + log.info("Close session from %s", self.ep) - # Do not cancel self.C here. Any data commit that is still queued + # Do not cancel self._commit_chain here. Any data commit that is still queued # behind the global lock must be allowed to complete so that channels # are registered as active in CF before the disconnect is processed. - # The disconnect transaction is chained after self.C and will execute + # The disconnect transaction is chained after self._commit_chain and will execute # once all preceding commits have finished. self.transaction = Transaction(self.ep, id(self)) self.transaction.connected = False @@ -272,8 +281,8 @@ def close(self): self.flush() def flush(self): - _log.info("Flush session from {s}".format(s=self.ep)) - self.T = None + log.info("Flush session from %s", self.ep) + self._flush_deadline = None if not self.dirty: return @@ -282,36 +291,36 @@ def flush(self): self.dirty = False def commit(_ignored): - _log.info("Commit: {transaction}".format(transaction=transaction)) + log.info("Commit: %s", transaction) return defer.maybeDeferred(self.factory.commit, transaction) def abort(err): if err.check(defer.CancelledError): - _log.info("Commit cancelled: {transaction}".format(transaction=transaction)) + log.info("Commit cancelled: %s", transaction) return err else: - _log.error("Commit failure: {err}".format(err=err)) + log.error("Commit failure: %s", err) self.proto.transport.loseConnection() raise defer.CancelledError() - self.C.addCallback(commit).addErrback(abort) + self._commit_chain.addCallback(commit).addErrback(abort) # Flushes must NOT occur at arbitrary points in the data stream # because that can result in a PV and its record info or aliases being split # between transactions. Only flush after Add or Del or Done message received. def flush_safely(self): - if self.T and self.T <= time.time(): - _log.debug("flush_safely: timeout elapsed for %s", self.ep) + if self._flush_deadline and self._flush_deadline <= time.time(): + log.debug("flush_safely: timeout elapsed for %s", self.ep) self.flush() elif self.trlimit and self.trlimit <= ( len(self.transaction.records_to_add) + len(self.transaction.records_to_delete) ): - _log.debug("flush_safely: trlimit %d reached for %s", self.trlimit, self.ep) + log.debug("flush_safely: trlimit %d reached for %s", self.trlimit, self.ep) self.flush() def mark_dirty(self): - if not self.T: - self.T = time.time() + self.timeout + if not self._flush_deadline: + self._flush_deadline = time.time() + self.timeout self.dirty = True def done(self): diff --git a/server/tests/test_recast.py b/server/tests/test_recast.py index 039182b..ec098ce 100644 --- a/server/tests/test_recast.py +++ b/server/tests/test_recast.py @@ -51,6 +51,6 @@ def test_close_does_not_cancel_pending_commit(self): pending = defer.Deferred() cancelled_errors = [] pending.addErrback(lambda f: cancelled_errors.append(f.type) or f) - session.C = pending + session._commit_chain = pending session.close() assert cancelled_errors == [], "close() must not cancel a queued data commit" diff --git a/server/tests/unit/cf/test_config.py b/server/tests/unit/cf/test_config.py index 1ae3b3b..eb0cef0 100644 --- a/server/tests/unit/cf/test_config.py +++ b/server/tests/unit/cf/test_config.py @@ -30,6 +30,16 @@ def test_default_push_always_retry(self): config = CFConfig.loads(adapter) assert config.push_always_retry is False + def test_default_env_owner_variable(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert config.env_owner_variable == "ENGINEER" + + def test_env_owner_variable_from_config(self): + adapter = make_adapter(values={"envownervariable": "RESPONSIBLE_ENGINEER"}) + config = CFConfig.loads(adapter) + assert config.env_owner_variable == "RESPONSIBLE_ENGINEER" + def test_alias_disabled_by_default(self): adapter = make_adapter() config = CFConfig.loads(adapter)