From 07c66d5f3397908f3b67b3584e93ee84e1b51256 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 12:47:59 +0200 Subject: [PATCH 1/8] fix(server): allow env_owner_variable to be set via config The field existed on CFConfig but was never read from the config adapter, so it was permanently hardcoded to "ENGINEER" with no way to override it per-deployment. --- server/recceiver/cf/config.py | 1 + server/tests/unit/cf/test_config.py | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/server/recceiver/cf/config.py b/server/recceiver/cf/config.py index 36293ee..3006d5a 100644 --- a/server/recceiver/cf/config.py +++ b/server/recceiver/cf/config.py @@ -46,6 +46,7 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig": clean_on_start=conf.getboolean("cleanOnStart", True), clean_on_stop=conf.getboolean("cleanOnStop", True), username=conf.get("username", "cfstore"), + env_owner_variable=conf.get("envOwnerVariable", "ENGINEER"), recceiver_id=conf.get("recceiverId", RECCEIVERID_DEFAULT), timezone=conf.get("timezone", ""), cf_query_limit=conf.get("findSizeLimit", DEFAULT_QUERY_LIMIT), diff --git a/server/tests/unit/cf/test_config.py b/server/tests/unit/cf/test_config.py index 1ae3b3b..eb0cef0 100644 --- a/server/tests/unit/cf/test_config.py +++ b/server/tests/unit/cf/test_config.py @@ -30,6 +30,16 @@ def test_default_push_always_retry(self): config = CFConfig.loads(adapter) assert config.push_always_retry is False + def test_default_env_owner_variable(self): + adapter = make_adapter() + config = CFConfig.loads(adapter) + assert config.env_owner_variable == "ENGINEER" + + def test_env_owner_variable_from_config(self): + adapter = make_adapter(values={"envownervariable": "RESPONSIBLE_ENGINEER"}) + config = CFConfig.loads(adapter) + assert config.env_owner_variable == "RESPONSIBLE_ENGINEER" + def test_alias_disabled_by_default(self): adapter = make_adapter() config = CFConfig.loads(adapter) From a429f7c1648fd9864910bf00d53de5b9efcc742b Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 12:58:47 +0200 Subject: [PATCH 2/8] fix(server): avoid replacing reactor at import time pollreactor.install() was called at module level, replacing the global Twisted reactor as a side effect of any import of recceiver.application. This made import order significant in tests and prevented them from controlling which reactor is used. --- server/recceiver/application.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 4c1d82d..cf9e253 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -18,8 +18,6 @@ _log = logging.getLogger(__name__) -pollreactor.install() - class Log2Twisted(logging.StreamHandler): """Print logging module stream to the twisted log""" @@ -165,6 +163,7 @@ class Maker(object): options = Options def make_service(self, opts): + pollreactor.install() ctrl = ProcessorController(cfile=opts["config"]) conf = ctrl.config("recceiver") S = RecService(conf) From 3d42a335dc6c832a50548bdc6c816c0465c5372c Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 13:02:11 +0200 Subject: [PATCH 3/8] refactor(server): extract cancel guard into _assert_not_cancelled helper Three identical if-self.cancelled / raise CancelledError blocks were scattered through _update_channelfinder with slightly inconsistent messages. Collects the pattern into one place so the cancellation contract is visible and the checks read as one-liners. --- server/recceiver/cf/processor.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 94046f7..32a0e7c 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -474,6 +474,10 @@ def _push_to_cf( _log.error("CF push gave up after %d attempts: %s", count, ioc_info) return False + def _assert_not_cancelled(self, context: str) -> None: + if self.cancelled: + raise defer.CancelledError(f"Processor cancelled: {context}") + def _update_channelfinder( self, record_info_by_name: Dict[str, RecordInfo], @@ -497,8 +501,7 @@ def _update_channelfinder( if ioc_info.hostname is None or ioc_info.ioc_name is None: raise IOCMissingInfoError(ioc_info) - if self.cancelled: - raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + self._assert_not_cancelled(f"before fetching old channels for {ioc_info}") channels: List[CFChannel] = [] _log.debug("Find existing channels by IOCID: %s", ioc_info) @@ -518,8 +521,7 @@ def _update_channelfinder( # now pvNames contains a list of pv's new on this host/ioc existing_channels = self._get_existing_channels(new_channels) - if self.cancelled: - raise defer.CancelledError(f"CF Processor is cancelled, after fetching existing channels for {ioc_info}") + self._assert_not_cancelled(f"after fetching existing channels for {ioc_info}") self._process_new_channels( new_channels, record_info_by_name, ioc_info, recceiverid, existing_channels, channels, iocid @@ -531,8 +533,7 @@ def _update_channelfinder( else: if old_channels and len(old_channels) != 0: self._cf_set_chunked(channels) - if self.cancelled: - raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + self._assert_not_cancelled(f"after setting channels for {ioc_info}") def _process_new_channels( self, From 62e195b223b3cc2e84a9694dd9dcbad974abbfcc Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 13:02:45 +0200 Subject: [PATCH 4/8] refactor(server): remove CommitTransaction, annotate against ITransaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CommitTransaction was defined in interfaces.py and used as a type annotation in processor.py, but was never constructed anywhere — the actual runtime value passed is always a Transaction (duck-typed via ITransaction). The annotation was a dead end for any reader following the type. Remove CommitTransaction and SourceAddress (only used by CommitTransaction), extend ITransaction with the aliases/initial/connected attributes that were also defined on CommitTransaction, and annotate processor methods with interfaces.ITransaction to match the real call contract. --- server/recceiver/cf/processor.py | 11 ++++++----- server/recceiver/interfaces.py | 21 +++------------------ 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 32a0e7c..3cb0824 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -24,7 +24,6 @@ PVStatus, RecordInfo, ) -from recceiver.interfaces import CommitTransaction from recceiver.processors import ConfigAdapter _log = logging.getLogger(__name__) @@ -217,7 +216,9 @@ def chain_result(result): t.addCallbacks(chain_result, chain_error) return d - def transaction_to_record_infos(self, ioc_info: IOCInfo, transaction: CommitTransaction) -> Dict[str, RecordInfo]: + def transaction_to_record_infos( + self, ioc_info: IOCInfo, transaction: interfaces.ITransaction + ) -> Dict[str, RecordInfo]: """Build a RecordInfo dict keyed by record_id from a transaction. Merges record types, info-tag properties, aliases, and mapped EPICS @@ -255,7 +256,7 @@ def _apply_env_vars( self, record_infos: Dict[str, RecordInfo], ioc_info: IOCInfo, - transaction: CommitTransaction, + transaction: interfaces.ITransaction, ) -> None: """Append mapped EPICS environment variable properties to every record.""" for record_id in record_infos: @@ -286,7 +287,7 @@ def record_info_by_name(record_infos: Dict[str, RecordInfo], ioc_info: IOCInfo) def update_ioc_infos( self, - transaction: CommitTransaction, + transaction: interfaces.ITransaction, ioc_info: IOCInfo, records_to_delete: List[str], record_info_by_name: Dict[str, RecordInfo], @@ -322,7 +323,7 @@ def _remove_aliases(self, aliases: List[str], iocid: str) -> None: for alias in aliases: self.remove_channel(alias, iocid) - def _commit_with_thread(self, transaction: CommitTransaction): + def _commit_with_thread(self, transaction: interfaces.ITransaction): host = transaction.source_address.host port = transaction.source_address.port diff --git a/server/recceiver/interfaces.py b/server/recceiver/interfaces.py index 26ae07e..76f6886 100644 --- a/server/recceiver/interfaces.py +++ b/server/recceiver/interfaces.py @@ -1,8 +1,5 @@ # -*- coding: utf-8 -*- -from dataclasses import dataclass -from typing import Dict, List, Set, Tuple - from twisted.application import service from zope.interface import Attribute, Interface @@ -22,23 +19,11 @@ class ITransaction(Interface): recid: {'key':'val'} """) + aliases = Attribute("A dict mapping record id to list of alias names") -@dataclass -class SourceAddress: - host: str - port: int - + initial = Attribute("True if this is the first transaction for this IOC connection") -@dataclass -class CommitTransaction: - source_address: SourceAddress - client_infos: Dict[str, str] - records_to_add: Dict[str, Tuple[str, str]] - records_to_delete: Set[str] - record_infos_to_add: Dict[str, Dict[str, str]] - aliases: Dict[str, List[str]] - initial: bool - connected: bool + connected = Attribute("False if the IOC has disconnected") class IProcessor(service.IService): From ace283ab22d0585181dbaaab22fdaebb3e62aa51 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 13:02:53 +0200 Subject: [PATCH 5/8] refactor(server): clean up old-style code in recast.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Drop (object) bases on Transaction and CollectionSession - Rename self.T/_ping_timer in CastReceiver and self.T/_flush_deadline, self.C/_commit_chain in CollectionSession — the same name T carried different types in the two classes, making the code harder to follow - Convert remaining .format() strings to f-strings throughout --- server/recceiver/recast.py | 100 +++++++++++++++++------------------- server/tests/test_recast.py | 2 +- 2 files changed, 47 insertions(+), 55 deletions(-) diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index af6f0a0..a8b0256 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -44,7 +44,7 @@ def connectionMade(self): if self.active: # Full speed ahead self.phase = 1 # 1: send ping, 2: receive pong - self.T = self.reactor.callLater(self.timeout, self.writePing) + self._ping_timer = self.reactor.callLater(self.timeout, self.writePing) self.transport.write(messages.ServerGreeting(self.version).frame()) self.uploadStart = time.time() else: @@ -53,17 +53,17 @@ def connectionMade(self): def connectionLost(self, reason=protocol.connectionDone): self.factory.isDone(self, self.active) - if self.T and self.T.active(): - self.T.cancel() - del self.T + if self._ping_timer and self._ping_timer.active(): + self._ping_timer.cancel() + del self._ping_timer if self.sess: self.sess.close() del self.sess def restartPingTimer(self): - T, self.T = self.T, self.reactor.callLater(self.timeout, self.writePing) - if T and T.active(): - T.cancel() + old, self._ping_timer = self._ping_timer, self.reactor.callLater(self.timeout, self.writePing) + if old and old.active(): + old.cancel() def writePing(self): if self.phase == 2: @@ -74,7 +74,7 @@ def writePing(self): self.phase = 2 self.nonce = random.randint(0, 0xFFFFFFFF) self.transport.write(messages.Ping(self.nonce).frame()) - _log.debug("ping nonce: " + str(self.nonce)) + _log.debug(f"ping nonce: {self.nonce}") def getInitialState(self): return (self.recvHeader, messages.Header.payload.size) @@ -182,14 +182,9 @@ def recvDone(self, body): elapsed_s = time.time() - self.uploadStart size_kb = self.uploadSize / 1024 rate_kbs = size_kb / elapsed_s - source_address = "{}:{}".format(self.sess.ep.host, self.sess.ep.port) _log.info( - "Done message from {source_address}: uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)".format( - source_address=source_address, - size_kb=size_kb, - elapsed_s=elapsed_s, - rate_kbs=rate_kbs, - ) + f"Done message from {self.sess.ep.host}:{self.sess.ep.port}:" + f" uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)" ) return self.getInitialState() @@ -203,7 +198,7 @@ def dfact(cls): @implementer(ITransaction) -class Transaction(object): +class Transaction: def __init__(self, ep, id): self.connected = True self.initial = False @@ -217,54 +212,51 @@ def show(self): _log.info(str(self)) def __str__(self): - source_address = "{}:{}".format(self.source_address.host, self.source_address.port) - init = self.initial - conn = self.connected - nenv = len(self.client_infos) - nadd = len(self.records_to_add) - ndel = len(self.records_to_delete) - ninfo = len(self.record_infos_to_add) - nalias = len(self.aliases) - return "Transaction(Src:{}, Init:{}, Conn:{}, Env:{}, Rec:{}, Alias:{}, Info:{}, Del:{})".format( - source_address, init, conn, nenv, nadd, nalias, ninfo, ndel + src = f"{self.source_address.host}:{self.source_address.port}" + return ( + f"Transaction(Src:{src}, Init:{self.initial}, Conn:{self.connected}," + f" Env:{len(self.client_infos)}, Rec:{len(self.records_to_add)}," + f" Alias:{len(self.aliases)}, Info:{len(self.record_infos_to_add)}," + f" Del:{len(self.records_to_delete)})" ) def __repr__(self): - return f"""Transaction( - source_address={self.source_address}, - initial={self.initial}, - connected={self.connected}, - records_to_add={self.records_to_add}, - client_infos={self.client_infos}, - record_infos_to_add={self.record_infos_to_add}, - aliases={self.aliases}, - records_to_delete={self.records_to_delete}) - """ - - -class CollectionSession(object): + return ( + f"Transaction(" + f"source_address={self.source_address}, " + f"initial={self.initial}, " + f"connected={self.connected}, " + f"records_to_add={self.records_to_add}, " + f"client_infos={self.client_infos}, " + f"record_infos_to_add={self.record_infos_to_add}, " + f"aliases={self.aliases}, " + f"records_to_delete={self.records_to_delete})" + ) + + +class CollectionSession: timeout = 5.0 trlimit = 5000 def __init__(self, proto, endpoint): from twisted.internet import reactor - _log.info("Open session from {endpoint}".format(endpoint=endpoint)) + _log.info(f"Open session from {endpoint}") self.reactor = reactor self.proto, self.ep = proto, endpoint self.transaction = Transaction(self.ep, id(self)) self.transaction.initial = True - self.C = defer.succeed(None) - self.T = None + self._commit_chain = defer.succeed(None) + self._flush_deadline = None self.dirty = False def close(self): - _log.info("Close session from {ep}".format(ep=self.ep)) + _log.info(f"Close session from {self.ep}") - # Do not cancel self.C here. Any data commit that is still queued + # Do not cancel self._commit_chain here. Any data commit that is still queued # behind the global lock must be allowed to complete so that channels # are registered as active in CF before the disconnect is processed. - # The disconnect transaction is chained after self.C and will execute + # The disconnect transaction is chained after self._commit_chain and will execute # once all preceding commits have finished. self.transaction = Transaction(self.ep, id(self)) self.transaction.connected = False @@ -272,8 +264,8 @@ def close(self): self.flush() def flush(self): - _log.info("Flush session from {s}".format(s=self.ep)) - self.T = None + _log.info(f"Flush session from {self.ep}") + self._flush_deadline = None if not self.dirty: return @@ -282,25 +274,25 @@ def flush(self): self.dirty = False def commit(_ignored): - _log.info("Commit: {transaction}".format(transaction=transaction)) + _log.info(f"Commit: {transaction}") return defer.maybeDeferred(self.factory.commit, transaction) def abort(err): if err.check(defer.CancelledError): - _log.info("Commit cancelled: {transaction}".format(transaction=transaction)) + _log.info(f"Commit cancelled: {transaction}") return err else: - _log.error("Commit failure: {err}".format(err=err)) + _log.error(f"Commit failure: {err}") self.proto.transport.loseConnection() raise defer.CancelledError() - self.C.addCallback(commit).addErrback(abort) + self._commit_chain.addCallback(commit).addErrback(abort) # Flushes must NOT occur at arbitrary points in the data stream # because that can result in a PV and its record info or aliases being split # between transactions. Only flush after Add or Del or Done message received. def flush_safely(self): - if self.T and self.T <= time.time(): + if self._flush_deadline and self._flush_deadline <= time.time(): _log.debug("flush_safely: timeout elapsed for %s", self.ep) self.flush() elif self.trlimit and self.trlimit <= ( @@ -310,8 +302,8 @@ def flush_safely(self): self.flush() def mark_dirty(self): - if not self.T: - self.T = time.time() + self.timeout + if not self._flush_deadline: + self._flush_deadline = time.time() + self.timeout self.dirty = True def done(self): diff --git a/server/tests/test_recast.py b/server/tests/test_recast.py index 039182b..ec098ce 100644 --- a/server/tests/test_recast.py +++ b/server/tests/test_recast.py @@ -51,6 +51,6 @@ def test_close_does_not_cancel_pending_commit(self): pending = defer.Deferred() cancelled_errors = [] pending.addErrback(lambda f: cancelled_errors.append(f.type) or f) - session.C = pending + session._commit_chain = pending session.close() assert cancelled_errors == [], "close() must not cancel a queued data commit" From 55744e8975c0f97ada748bb6671d263101da41c9 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 13:14:51 +0200 Subject: [PATCH 6/8] refactor(server): rename _log to log and enforce lazy log formatting Rename the module-level logger from _log to log in all recceiver modules so that ruff's G (flake8-logging-format) rules can detect logging calls by variable name. In application.py, the existing twisted.python.log import is aliased to twisted_log to avoid the name collision. Convert all logging calls that used f-strings or str.format to use %-style lazy formatting, and enable the G ruleset in ruff so regressions are caught automatically. --- server/pyproject.toml | 6 +- server/recceiver/announcer.py | 12 ++-- server/recceiver/application.py | 19 +++--- server/recceiver/cf/processor.py | 108 +++++++++++++++---------------- server/recceiver/dbstore.py | 8 +-- server/recceiver/processors.py | 34 +++++----- server/recceiver/recast.py | 56 ++++++++-------- 7 files changed, 122 insertions(+), 121 deletions(-) diff --git a/server/pyproject.toml b/server/pyproject.toml index c3406c3..efb9889 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -48,10 +48,8 @@ package-data.twisted = [ "plugins/recceiver_plugin.py" ] [tool.ruff] target-version = "py39" line-length = 120 -# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. -# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or -# McCabe complexity (`C901`) by default. -lint.select = [ "E", "F", "I" ] +# E: pycodestyle errors, F: Pyflakes, G: flake8-logging-format (enforce lazy % in log calls), I: isort. +lint.select = [ "E", "F", "G", "I" ] [tool.pytest] ini_options.log_level = "DEBUG" diff --git a/server/recceiver/announcer.py b/server/recceiver/announcer.py index 1cad0fb..eaba18b 100644 --- a/server/recceiver/announcer.py +++ b/server/recceiver/announcer.py @@ -8,7 +8,7 @@ from .protocol.announce import ANNOUNCE_PORT, BROADCAST_ADDRESS, Announce -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) __all__ = ["Announcer", "SharedUDP"] @@ -53,14 +53,14 @@ def __init__( raise RuntimeError("Announce list is empty at start time...") def startProtocol(self): - _log.info("Setup Announcer") + log.info("Setup Announcer") self.D = self.reactor.callLater(0, self.sendOne) # we won't process any received traffic, so no reason to wake # up for it... self.transport.pauseProducing() def stopProtocol(self): - _log.info("Stop Announcer") + log.info("Stop Announcer") self.D.cancel() del self.D @@ -71,14 +71,14 @@ def sendOne(self): self.D = self.reactor.callLater(self.delay, self.sendOne) for A in self.udps: try: - _log.debug("announce to {s}".format(s=A)) + log.debug("announce to %s", A) self.transport.write(self.msg, A) try: self.udpErr.remove(A) - _log.warning("announce OK to {s}".format(s=A)) + log.warning("announce OK to %s", A) except KeyError: pass except MessageLengthError: if A not in self.udpErr: self.udpErr.add(A) - _log.exception("announce Error to {s}".format(s=A)) + log.exception("announce Error to %s", A) diff --git a/server/recceiver/application.py b/server/recceiver/application.py index cf9e253..1a58455 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -6,7 +6,8 @@ from twisted.application import service from twisted.internet import defer, pollreactor, task from twisted.internet.error import CannotListenError -from twisted.python import log, usage +from twisted.python import log as twisted_log +from twisted.python import usage from zope.interface import implementer from twisted import plugin @@ -16,7 +17,7 @@ from .processors import ProcessorController from .recast import CastFactory -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) class Log2Twisted(logging.StreamHandler): @@ -27,7 +28,7 @@ def __init__(self): # The Twisted log publisher adds a newline, # so strip the newline added by the Python log handler. self.terminator = "" - self.write = log.msg + self.write = twisted_log.msg def flush(self): # Required by logging.StreamHandler; this handler writes directly to Twisted logs. @@ -72,7 +73,7 @@ def __init__(self, config): self.addrlist = [("", 5049)] def privilegedStartService(self): - _log.info("Starting RecService") + log.info("Starting RecService") # Start TCP server on random port self.tcpFactory = CastFactory() @@ -94,7 +95,7 @@ def privilegedStartService(self): # Find out which port is in use addr = self.tcp.getHost() - _log.info("RecService listening on {addr}".format(addr=addr)) + log.info("RecService listening on %s", addr) self.key = random.randint(0, 0xFFFFFFFF) @@ -115,9 +116,9 @@ def privilegedStartService(self): if self.metricsPort > 0: if metrics.available: self.reactor.listenTCP(self.metricsPort, metrics.make_site(), interface=self.bind) - _log.info("Prometheus metrics available on port %d", self.metricsPort) + log.info("Prometheus metrics available on port %d", self.metricsPort) else: - _log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled") + log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled") metrics.connections_limit.set(self.tcpFactory.maxActive) @@ -128,7 +129,7 @@ def privilegedStartService(self): def _logStatus(self): metrics.connections_active.set(self.tcpFactory.NActive) metrics.connections_waiting.set(len(self.tcpFactory.Wait)) - _log.info( + log.info( "status: connections active=%d/%d queued=%d", self.tcpFactory.NActive, self.tcpFactory.maxActive, @@ -136,7 +137,7 @@ def _logStatus(self): ) def stopService(self): - _log.info("Stopping RecService") + log.info("Stopping RecService") if self._statusLoop is not None and self._statusLoop.running: self._statusLoop.stop() diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index 3cb0824..cfb9ee1 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -26,7 +26,7 @@ ) from recceiver.processors import ConfigAdapter -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) @implementer(interfaces.IProcessor) @@ -72,10 +72,10 @@ def startService(self): def _logStatus(self): metrics.known_iocs.set(len(self.iocs)) metrics.tracked_channels.set(len(self.channel_ioc_ids)) - _log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) + log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) def _start_service_with_lock(self): - _log.info("CF_START with configuration: %s", self.cf_config) + log.info("CF_START with configuration: %s", self.cf_config) if self.client is None: # For setting up mock test client self.client = PyCFClientAdapter( @@ -91,11 +91,11 @@ def _start_service_with_lock(self): cf_properties = set(self.client.get_property_names()) self._setup_cf_properties(cf_properties) except ConnectionError: - _log.exception("Cannot connect to Channelfinder service") + log.exception("Cannot connect to Channelfinder service") raise else: if self.cf_config.clean_on_start: - _log.info("CF Clean: scheduling background startup sweep") + log.info("CF Clean: scheduling background startup sweep") from twisted.internet import reactor reactor.callLater(0, self._start_background_clean) @@ -145,10 +145,10 @@ def _setup_cf_properties(self, cf_properties: Set[str]) -> None: self.record_property_names_list = record_property_names_list self.managed_properties = required_properties | record_property_names_list - _log.debug("record_property_names_list = %s", self.record_property_names_list) + log.debug("record_property_names_list = %s", self.record_property_names_list) def stopService(self): - _log.info("CF_STOP") + log.info("CF_STOP") if self._statusLoop is not None and self._statusLoop.running: self._statusLoop.stop() service.Service.stopService(self) @@ -161,14 +161,14 @@ def _stop_service_with_lock(self): The sweep runs in a background thread to avoid blocking the reactor. The lock is held throughout, preventing new commits from interleaving. """ - _log.info("CF_STOP with lock") + log.info("CF_STOP with lock") if self.cf_config.clean_on_stop: return deferToThread(self.clean_service) def _start_background_clean(self): - _log.info("CF Clean: background startup sweep beginning") + log.info("CF Clean: background startup sweep beginning") deferToThread(self.clean_service).addErrback( - lambda err: _log.error("CF Clean background sweep failed: %s", err) + lambda err: log.error("CF Clean background sweep failed: %s", err) ) # @defer.inlineCallbacks # Twisted v16 does not support cancellation! @@ -199,7 +199,7 @@ def chain_error(err): Note this is not foolproof as the thread may still be running. """ if not err.check(defer.CancelledError): - _log.error("CF_COMMIT FAILURE: %s", err) + log.error("CF_COMMIT FAILURE: %s", err) if self.cancelled: if not err.check(defer.CancelledError): raise defer.CancelledError() @@ -234,7 +234,7 @@ def transaction_to_record_infos( for record_id, (record_infos_to_add) in transaction.record_infos_to_add.items(): # find intersection of these sets if record_id not in record_infos: - _log.warning("IOC: %s: PV not found for recinfo with RID: %s", ioc_info, record_id) + log.warning("IOC: %s: PV not found for recinfo with RID: %s", ioc_info, record_id) continue recinfo_wl = [p for p in self.record_property_names_list if p in record_infos_to_add.keys()] if recinfo_wl: @@ -245,7 +245,7 @@ def transaction_to_record_infos( for record_id, record_aliases in transaction.aliases.items(): if record_id not in record_infos: - _log.warning("IOC: %s: PV not found for alias with RID: %s", ioc_info, record_id) + log.warning("IOC: %s: PV not found for alias with RID: %s", ioc_info, record_id) continue record_infos[record_id].aliases = record_aliases @@ -265,7 +265,7 @@ def _apply_env_vars( if value is not None: record_infos[record_id].info_properties.append(CFProperty(cf_prop_name, ioc_info.owner, value)) else: - _log.debug( + log.debug( "EPICS environment var %s not found in IOC: %s", epics_env_var_name, ioc_info, @@ -280,7 +280,7 @@ def record_info_by_name(record_infos: Dict[str, RecordInfo], ioc_info: IOCInfo) record_info_by_name = {} for info in record_infos.values(): if info.pv_name in record_info_by_name: - _log.warning("Commit contains multiple records with PV name: %s (%s)", info.pv_name, ioc_info) + log.warning("Commit contains multiple records with PV name: %s (%s)", info.pv_name, ioc_info) continue record_info_by_name[info.pv_name] = info return record_info_by_name @@ -330,15 +330,15 @@ def _commit_with_thread(self, transaction: interfaces.ITransaction): if not self.running: raise defer.CancelledError(f"CF Processor is not running (transaction: {host}:{port})") - _log.info("CF_COMMIT: %s", transaction) - _log.debug("CF_COMMIT: transaction: %s", repr(transaction)) + log.info("CF_COMMIT: %s", transaction) + log.debug("CF_COMMIT: transaction: %s", repr(transaction)) ioc_name = transaction.client_infos.get("IOCNAME") if not ioc_name: ioc_name = str(port) - _log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port) + log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port) if ioc_name.isdigit() and 1024 <= int(ioc_name) <= 65535: - _log.warning( + log.warning( "IOC at %s has numeric iocName '%s' (looks like an ephemeral port) — " "iocid will change on every reconnect, causing stale channels in CF; " "configure a stable iocName via reccaster", @@ -352,7 +352,7 @@ def _commit_with_thread(self, transaction: interfaces.ITransaction): or self.cf_config.username ) if owner == self.cf_config.username: - _log.debug( + log.debug( "IOC at %s:%d did not send %s or CF_USERNAME; using service account as owner", host, port, @@ -372,11 +372,11 @@ def _commit_with_thread(self, transaction: interfaces.ITransaction): record_infos = self.transaction_to_record_infos(ioc_info, transaction) records_to_delete = list(transaction.records_to_delete) - _log.debug("Delete records: %s", records_to_delete) + log.debug("Delete records: %s", records_to_delete) record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) if not transaction.connected and ioc_info.id not in self.iocs: - _log.warning( + log.warning( "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", host, port, @@ -401,7 +401,7 @@ def remove_channel(self, record_name: str, iocid: str) -> None: self.iocs[iocid].channelcount -= 1 if self.iocs[iocid].channelcount <= 0: if self.iocs[iocid].channelcount < 0: - _log.error("Channel count negative: %s", iocid) + log.error("Channel count negative: %s", iocid) self.iocs.pop(iocid) if len(self.channel_ioc_ids[record_name]) == 0: del self.channel_ioc_ids[record_name] @@ -414,21 +414,21 @@ def clean_service(self) -> None: recceiverid = self.cf_config.recceiver_id while 1: try: - _log.info("CF Clean Started") + log.info("CF Clean Started") channels = self.get_active_channels(recceiverid) while channels: self.clean_channels(owner, channels) channels = self.get_active_channels(recceiverid) - _log.info("CF Clean Completed") + log.info("CF Clean Completed") return except RequestException: - _log.exception("Clean service failed") + log.exception("Clean service failed") retry_seconds = min(60, sleep) - _log.info("Clean service retry in %s seconds", retry_seconds) + log.info("Clean service retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: - _log.info("Abandoning clean after %s seconds", retry_limit) + log.info("Abandoning clean after %s seconds", retry_limit) return def get_active_channels(self, recceiverid: str) -> List[CFChannel]: @@ -438,8 +438,8 @@ def get_active_channels(self, recceiverid: str) -> List[CFChannel]: def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: """Mark the given channels Inactive in CF.""" names = [ch.name for ch in channels or []] - _log.info("Cleaning %s channels.", len(names)) - _log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) + log.info("Cleaning %s channels.", len(names)) + log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) self.client.update_property(CFProperty(CFPropertyName.PV_STATUS.value, owner, PVStatus.INACTIVE.value), names) def _push_to_cf( @@ -448,12 +448,12 @@ def _push_to_cf( records_to_delete: List[str], ioc_info: IOCInfo, ) -> bool: - _log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) + log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) count = 0 sleep = 1.0 while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: if not self.running: - _log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) + log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) return False count += 1 t0 = time.monotonic() @@ -462,17 +462,17 @@ def _push_to_cf( elapsed = time.monotonic() - t0 metrics.cf_commit_duration_seconds.observe(elapsed) metrics.cf_commits_total.labels(result="success").inc() - _log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) + log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) return True except RequestException: elapsed = time.monotonic() - t0 - _log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) + log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) retry_seconds = min(60, sleep) - _log.info("CF push retry in %s seconds", retry_seconds) + log.info("CF push retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 metrics.cf_commits_total.labels(result="cancelled").inc() - _log.error("CF push gave up after %d attempts: %s", count, ioc_info) + log.error("CF push gave up after %d attempts: %s", count, ioc_info) return False def _assert_not_cancelled(self, context: str) -> None: @@ -485,15 +485,15 @@ def _update_channelfinder( records_to_delete: List[str], ioc_info: IOCInfo, ) -> None: - _log.info("CF Update IOC: %s", ioc_info) - _log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) + log.info("CF Update IOC: %s", ioc_info) + log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) recceiverid = self.cf_config.recceiver_id new_channels = set(record_info_by_name.keys()) iocid = ioc_info.id if iocid not in self.iocs and record_info_by_name: # Disconnect-before-upload is already logged in _commit_with_thread. - _log.warning( + log.warning( "IOC %s committed update without prior initial transaction (%d IOCs known)", ioc_info, len(self.iocs), @@ -505,7 +505,7 @@ def _update_channelfinder( self._assert_not_cancelled(f"before fetching old channels for {ioc_info}") channels: List[CFChannel] = [] - _log.debug("Find existing channels by IOCID: %s", ioc_info) + log.debug("Find existing channels by IOCID: %s", ioc_info) old_channels: List[CFChannel] = self.client.find_by_ioc_id(iocid) if old_channels: @@ -527,7 +527,7 @@ def _update_channelfinder( self._process_new_channels( new_channels, record_info_by_name, ioc_info, recceiverid, existing_channels, channels, iocid ) - _log.info("Total channels to update: %s for ioc: %s", len(channels), ioc_info) + log.info("Total channels to update: %s for ioc: %s", len(channels), ioc_info) if len(channels) != 0: self._cf_set_chunked(channels) @@ -564,7 +564,7 @@ def _process_new_channels( ) new_properties = new_properties + record_info.info_properties if channel_name in existing_channels: - _log.debug("update existing channel %s: exists but with a different iocid from %s", channel_name, iocid) + log.debug("update existing channel %s: exists but with a different iocid from %s", channel_name, iocid) self._update_existing_channel_diff_iocid( existing_channels, channel_name, new_properties, channels, record_info_by_name, ioc_info, iocid ) @@ -595,7 +595,7 @@ def _handle_channels( """ for cf_channel in old_channels: if not new_channels or cf_channel.name in records_to_delete: - _log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) + log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) if cf_channel.name in self.channel_ioc_ids: self._handle_channel_is_old(cf_channel, ioc_info, recceiverid, channels, record_info_by_name) else: @@ -623,7 +623,7 @@ def _handle_channel_is_old( self.managed_properties, ) channels.append(cf_channel) - _log.debug("Add existing channel %s to previous IOC %s", cf_channel, last_ioc_id) + log.debug("Add existing channel %s to previous IOC %s", cf_channel, last_ioc_id) if self.cf_config.alias_enabled: if cf_channel.name in record_info_by_name: for alias_name in record_info_by_name[cf_channel.name].aliases: @@ -640,7 +640,7 @@ def _handle_channel_is_old( self.managed_properties, ) channels.append(alias_channel) - _log.debug("Add existing alias %s to previous IOC: %s", alias_channel, last_alias_ioc_id) + log.debug("Add existing alias %s to previous IOC: %s", alias_channel, last_alias_ioc_id) def _orphan_channel( self, @@ -658,7 +658,7 @@ def _orphan_channel( cf_channel, ) channels.append(cf_channel) - _log.debug("Add orphaned channel %s with no IOC: %s", cf_channel, ioc_info) + log.debug("Add orphaned channel %s with no IOC: %s", cf_channel, ioc_info) if self.cf_config.alias_enabled: if cf_channel.name in record_info_by_name: for alias_name in record_info_by_name[cf_channel.name].aliases: @@ -671,7 +671,7 @@ def _orphan_channel( alias_channel, ) channels.append(alias_channel) - _log.debug("Add orphaned alias %s with no IOC: %s", alias_channel, ioc_info) + log.debug("Add orphaned alias %s with no IOC: %s", alias_channel, ioc_info) def _handle_channel_old_and_new( self, @@ -684,7 +684,7 @@ def _handle_channel_old_and_new( old_channels: List[CFChannel], ) -> None: """Channel exists in CF with the same iocid — mark active and update time.""" - _log.debug("Channel %s exists in Channelfinder with same iocid %s", cf_channel.name, iocid) + log.debug("Channel %s exists in Channelfinder with same iocid %s", cf_channel.name, iocid) cf_channel.properties = _merge_property_lists( [ CFProperty(CFPropertyName.PV_STATUS.value, ioc_info.owner, PVStatus.ACTIVE.value), @@ -694,7 +694,7 @@ def _handle_channel_old_and_new( self.managed_properties, ) channels.append(cf_channel) - _log.debug("Add existing channel with same IOC: %s", cf_channel) + log.debug("Add existing channel with same IOC: %s", cf_channel) new_channels.remove(cf_channel.name) if self.cf_config.alias_enabled: @@ -724,7 +724,7 @@ def _handle_channel_old_and_new( ) channels.append(CFChannel(alias_name, ioc_info.owner, aprops)) new_channels.remove(alias_name) - _log.debug("Add existing alias with same IOC: %s", cf_channel) + log.debug("Add existing alias with same IOC: %s", cf_channel) def _get_existing_channels(self, new_channels: Set[str]) -> Dict[str, CFChannel]: """Query CF for channels in new_channels that already exist there.""" @@ -748,7 +748,7 @@ def _update_existing_channel_diff_iocid( self.managed_properties, ) channels.append(existing_channel) - _log.debug("Add existing channel with different IOC: %s", existing_channel) + log.debug("Add existing channel with different IOC: %s", existing_channel) if self.cf_config.alias_enabled and channel_name in record_info_by_name: alias_properties = [CFProperty(CFPropertyName.ALIAS.value, ioc_info.owner, channel_name)] + new_properties for alias_name in record_info_by_name[channel_name].aliases: @@ -758,7 +758,7 @@ def _update_existing_channel_diff_iocid( channels.append(ach) else: channels.append(CFChannel(alias_name, ioc_info.owner, alias_properties)) - _log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid) + log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid) def _create_new_channel( self, @@ -769,12 +769,12 @@ def _create_new_channel( record_info_by_name: Dict[str, RecordInfo], ) -> None: channels.append(CFChannel(channel_name, ioc_info.owner, new_properties)) - _log.debug("Add new channel: %s", channel_name) + log.debug("Add new channel: %s", channel_name) if self.cf_config.alias_enabled and channel_name in record_info_by_name: alias_properties = [CFProperty(CFPropertyName.ALIAS.value, ioc_info.owner, channel_name)] + new_properties for alias in record_info_by_name[channel_name].aliases: channels.append(CFChannel(alias, ioc_info.owner, alias_properties)) - _log.debug("Add new alias: %s from %s", alias, channel_name) + log.debug("Add new alias: %s from %s", alias, channel_name) def create_ioc_properties( diff --git a/server/recceiver/dbstore.py b/server/recceiver/dbstore.py index 9ca0ee7..a8bb80e 100644 --- a/server/recceiver/dbstore.py +++ b/server/recceiver/dbstore.py @@ -10,7 +10,7 @@ from . import interfaces -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) __all__ = ["DBProcessor"] @@ -40,7 +40,7 @@ def wait_for(self, deferred): return deferred def startService(self): - _log.info("Start DBService") + log.info("Start DBService") service.Service.startService(self) # map of source id# to server table id keys @@ -65,7 +65,7 @@ def startService(self): self.wait_for(self.pool.runInteraction(self.cleanupDB)) def stopService(self): - _log.info("Stop DBService") + log.info("Stop DBService") service.Service.stopService(self) @@ -76,7 +76,7 @@ def stopService(self): return defer.DeferredList(list(self.Ds), consumeErrors=True) def cleanupDB(self, cur): - _log.info("Cleanup DBService") + log.info("Cleanup DBService") assert self.mykey != 0 cur.execute("PRAGMA foreign_keys = ON;") diff --git a/server/recceiver/processors.py b/server/recceiver/processors.py index 2c79d02..d1c32a3 100644 --- a/server/recceiver/processors.py +++ b/server/recceiver/processors.py @@ -14,7 +14,7 @@ from . import interfaces -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) __all__ = [ "ShowProcessor", @@ -88,7 +88,7 @@ def __init__(self, cfile=None): plugs = {} for plug in plugin.getPlugins(interfaces.IProcessorFactory): - _log.debug("Available plugin: {name}".format(name=plug.name)) + log.debug("Available plugin: %s", plug.name) plugs[plug.name] = plug self.procs = [] @@ -119,13 +119,13 @@ def config(self, section): def commit(self, trans): def punish(err, processor): if err.check(defer.CancelledError): - _log.debug("Cancel processing: {name}: {trans}".format(name=processor.name, trans=trans)) + log.debug("Cancel processing: %s: %s", processor.name, trans) return err try: self.procs.remove(processor) - _log.error("Remove processor: {name}: {err}".format(name=processor.name, err=err)) + log.error("Remove processor: %s: %s", processor.name, err) except ValueError: - _log.debug("Remove processor: {name}: aleady removed".format(name=processor.name)) + log.debug("Remove processor: %s: already removed", processor.name) return err defers = [defer.maybeDeferred(P.commit, trans).addErrback(punish, P) for P in self.procs] @@ -146,7 +146,7 @@ def __init__(self, name, opts): def startService(self): service.Service.startService(self) - _log.info("Show processor '{processor}' starting".format(processor=self.name)) + log.info("Show processor '%s' starting", self.name) def commit(self, transaction): def with_lock(_ignored): @@ -170,27 +170,25 @@ def release_lock(result): return self.lock.acquire().addCallback(with_lock) def _commit(self, trans): - _log.debug("# Show processor '{name}' commit".format(name=self.name)) - _log.info("# From {host}:{port}".format(host=trans.source_address.host, port=trans.source_address.port)) + log.debug("# Show processor '%s' commit", self.name) + log.info("# From %s:%s", trans.source_address.host, trans.source_address.port) if not trans.connected: - _log.info("# connection lost") + log.info("# connection lost") for item in trans.client_infos.items(): - _log.info(" epicsEnvSet('{name}','{value}')".format(name=item[0], value=item[1])) + log.info(" epicsEnvSet('%s','%s')", item[0], item[1]) for record_id, (record_name, record_type) in trans.records_to_add.items(): - _log.info( - ' record({record_type}, "{record_name}") {{'.format(record_type=record_type, record_name=record_name) - ) + log.info(' record(%s, "%s") {', record_type, record_name) for alias in trans.aliases.get(record_id, []): - _log.info(' alias("{alias}")'.format(alias=alias)) + log.info(' alias("%s")', alias) for item in trans.record_infos_to_add.get(record_id, {}).items(): - _log.info(' info({name},"{value}")'.format(name=item[0], value=[1])) - _log.info(" }") + log.info(' info(%s,"%s")', item[0], item[1]) + log.info(" }") yield - _log.info("# End") + log.info("# End") def stopService(self): service.Service.stopService(self) - _log.info("Show processor '{name}' stopping".format(name=self.name)) + log.info("Show processor '%s' stopping", self.name) @implementer(plugin.IPlugin, interfaces.IProcessorFactory) diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index a8b0256..424993e 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -12,7 +12,7 @@ from .interfaces import ITransaction from .protocol import messages -_log = logging.getLogger(__name__) +log = logging.getLogger(__name__) class CastReceiver(stateful.StatefulProtocol): @@ -68,13 +68,13 @@ def restartPingTimer(self): def writePing(self): if self.phase == 2: self.transport.loseConnection() - _log.debug("pong missed: close connection") + log.debug("pong missed: close connection") else: self.restartPingTimer() self.phase = 2 self.nonce = random.randint(0, 0xFFFFFFFF) self.transport.write(messages.Ping(self.nonce).frame()) - _log.debug(f"ping nonce: {self.nonce}") + log.debug("ping nonce: %s", self.nonce) def getInitialState(self): return (self.recvHeader, messages.Header.payload.size) @@ -84,11 +84,11 @@ def recvHeader(self, data): try: header = messages.Header.decode(data) except messages.ProtocolError as exc: - _log.error(f"Protocol error! {exc}") + log.error("Protocol error! %s", exc) self.transport.loseConnection() return if header.body_length == 0: - _log.debug(f"Ignoring empty message {header.msg_id:#06x}") + log.debug("Ignoring empty message %#06x", header.msg_id) return self.getInitialState() self.msgid = header.msg_id fn, minlen = self.rxfn[self.msgid] @@ -102,11 +102,11 @@ def recvClientGreeting(self, body): try: greeting = messages.ClientGreeting.decode(body) except messages.ProtocolError as exc: - _log.error(f"Protocol error! {exc}") + log.error("Protocol error! %s", exc) self.transport.loseConnection() return if greeting.client_type != 0: - _log.error(f"unsupported client type {greeting.client_type}") + log.error("unsupported client type %s", greeting.client_type) self.transport.loseConnection() return self.version = min(self.version, greeting.version) @@ -119,14 +119,14 @@ def recvPong(self, body): try: pong = messages.Pong.decode(body) except messages.ProtocolError as exc: - _log.error(f"Protocol error! {exc}") + log.error("Protocol error! %s", exc) self.transport.loseConnection() return if pong.nonce != self.nonce: - _log.error(f"pong nonce does not match! {pong.nonce}!={self.nonce}") + log.error("pong nonce does not match! %s!=%s", pong.nonce, self.nonce) self.transport.loseConnection() else: - _log.debug("pong nonce match") + log.debug("pong nonce match") self.phase = 1 return self.getInitialState() @@ -135,7 +135,7 @@ def recvInfo(self, body): try: info = messages.AddInfo.decode(body) except messages.ProtocolError: - _log.error("Ignoring info update") + log.error("Ignoring info update") return self.getInitialState() if info.record_id: self.sess.rec_info(info.record_id, info.key, info.value) @@ -148,7 +148,7 @@ def recvAddRec(self, body): try: record = messages.AddRecord.decode(body) except messages.ProtocolError: - _log.error("Ignoring record update") + log.error("Ignoring record update") return self.getInitialState() if record.is_alias: self.sess.add_alias(record.record_id, record.record_name) @@ -162,7 +162,7 @@ def recvDelRec(self, body): try: record = messages.DelRecord.decode(body) except messages.ProtocolError: - _log.error("Ignoring delete record update") + log.error("Ignoring delete record update") return self.getInitialState() self.sess.del_record(record.record_id) return self.getInitialState() @@ -172,7 +172,7 @@ def recvDone(self, body): try: messages.UploadDone.decode(body) except messages.ProtocolError: - _log.error("Ignoring done update") + log.error("Ignoring done update") return self.getInitialState() self.factory.isDone(self, self.active) self.sess.done() @@ -182,9 +182,13 @@ def recvDone(self, body): elapsed_s = time.time() - self.uploadStart size_kb = self.uploadSize / 1024 rate_kbs = size_kb / elapsed_s - _log.info( - f"Done message from {self.sess.ep.host}:{self.sess.ep.port}:" - f" uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)" + log.info( + "Done message from %s:%s: uploaded %skB in %ss (%skB/s)", + self.sess.ep.host, + self.sess.ep.port, + size_kb, + elapsed_s, + rate_kbs, ) return self.getInitialState() @@ -209,7 +213,7 @@ def __init__(self, ep, id): self.records_to_delete = set() def show(self): - _log.info(str(self)) + log.info(str(self)) def __str__(self): src = f"{self.source_address.host}:{self.source_address.port}" @@ -241,7 +245,7 @@ class CollectionSession: def __init__(self, proto, endpoint): from twisted.internet import reactor - _log.info(f"Open session from {endpoint}") + log.info("Open session from %s", endpoint) self.reactor = reactor self.proto, self.ep = proto, endpoint self.transaction = Transaction(self.ep, id(self)) @@ -251,7 +255,7 @@ def __init__(self, proto, endpoint): self.dirty = False def close(self): - _log.info(f"Close session from {self.ep}") + log.info("Close session from %s", self.ep) # Do not cancel self._commit_chain here. Any data commit that is still queued # behind the global lock must be allowed to complete so that channels @@ -264,7 +268,7 @@ def close(self): self.flush() def flush(self): - _log.info(f"Flush session from {self.ep}") + log.info("Flush session from %s", self.ep) self._flush_deadline = None if not self.dirty: return @@ -274,15 +278,15 @@ def flush(self): self.dirty = False def commit(_ignored): - _log.info(f"Commit: {transaction}") + log.info("Commit: %s", transaction) return defer.maybeDeferred(self.factory.commit, transaction) def abort(err): if err.check(defer.CancelledError): - _log.info(f"Commit cancelled: {transaction}") + log.info("Commit cancelled: %s", transaction) return err else: - _log.error(f"Commit failure: {err}") + log.error("Commit failure: %s", err) self.proto.transport.loseConnection() raise defer.CancelledError() @@ -293,12 +297,12 @@ def abort(err): # between transactions. Only flush after Add or Del or Done message received. def flush_safely(self): if self._flush_deadline and self._flush_deadline <= time.time(): - _log.debug("flush_safely: timeout elapsed for %s", self.ep) + log.debug("flush_safely: timeout elapsed for %s", self.ep) self.flush() elif self.trlimit and self.trlimit <= ( len(self.transaction.records_to_add) + len(self.transaction.records_to_delete) ): - _log.debug("flush_safely: trlimit %d reached for %s", self.trlimit, self.ep) + log.debug("flush_safely: trlimit %d reached for %s", self.trlimit, self.ep) self.flush() def mark_dirty(self): From a1a69a8fcf71fe6a94eda034a1b34c9d922bd34e Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 15:17:06 +0200 Subject: [PATCH 7/8] refactor(server): extract protocol error message into a constant Eliminates the duplicated "Protocol error! %s" literal that Sonar flagged as a high-priority code smell (three occurrences in recast.py). --- server/recceiver/cf/processor.py | 4 +--- server/recceiver/recast.py | 8 +++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index cfb9ee1..ec49445 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -167,9 +167,7 @@ def _stop_service_with_lock(self): def _start_background_clean(self): log.info("CF Clean: background startup sweep beginning") - deferToThread(self.clean_service).addErrback( - lambda err: log.error("CF Clean background sweep failed: %s", err) - ) + deferToThread(self.clean_service).addErrback(lambda err: log.error("CF Clean background sweep failed: %s", err)) # @defer.inlineCallbacks # Twisted v16 does not support cancellation! def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index 424993e..b1acf80 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -14,6 +14,8 @@ log = logging.getLogger(__name__) +_PROTOCOL_ERROR_MSG = "Protocol error! %s" + class CastReceiver(stateful.StatefulProtocol): timeout = 3.0 @@ -84,7 +86,7 @@ def recvHeader(self, data): try: header = messages.Header.decode(data) except messages.ProtocolError as exc: - log.error("Protocol error! %s", exc) + log.exception(_PROTOCOL_ERROR_MSG, exc) self.transport.loseConnection() return if header.body_length == 0: @@ -102,7 +104,7 @@ def recvClientGreeting(self, body): try: greeting = messages.ClientGreeting.decode(body) except messages.ProtocolError as exc: - log.error("Protocol error! %s", exc) + log.exception(_PROTOCOL_ERROR_MSG, exc) self.transport.loseConnection() return if greeting.client_type != 0: @@ -119,7 +121,7 @@ def recvPong(self, body): try: pong = messages.Pong.decode(body) except messages.ProtocolError as exc: - log.error("Protocol error! %s", exc) + log.exception(_PROTOCOL_ERROR_MSG, exc) self.transport.loseConnection() return if pong.nonce != self.nonce: From 0e8023d81b372cf21a54d5fd900314a83b2ee314 Mon Sep 17 00:00:00 2001 From: Anders Lindh Olsson Date: Wed, 13 May 2026 15:30:23 +0200 Subject: [PATCH 8/8] refactor(server): add type annotations to Transaction and enrich ITransaction docs Annotates Transaction's instance variables and __init__ signature with concrete types so static analysis tools can check callers. Also corrects the stale ITransaction.records_to_add doc (2-tuple, not 3), and narrows the source_address description to the IAddress duck type it actually is. --- server/recceiver/interfaces.py | 4 ++-- server/recceiver/recast.py | 13 ++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/server/recceiver/interfaces.py b/server/recceiver/interfaces.py index 76f6886..b9e4e99 100644 --- a/server/recceiver/interfaces.py +++ b/server/recceiver/interfaces.py @@ -5,10 +5,10 @@ class ITransaction(Interface): - source_address = Attribute("Source Address.") + source_address = Attribute("IAddress of the IOC connection (provides .host: str and .port: int)") records_to_add = Attribute("""Records being added - {recid: ('recname', 'rectype', {'key':'val'})} + {recid: ('recname', 'rectype')} """) records_to_delete = Attribute("A set() of recids which are being removed") diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index b1acf80..6cf5689 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -6,6 +6,7 @@ import time from twisted.internet import defer, protocol +from twisted.internet.interfaces import IAddress from twisted.protocols import stateful from zope.interface import implementer @@ -205,7 +206,17 @@ def dfact(cls): @implementer(ITransaction) class Transaction: - def __init__(self, ep, id): + source_address: IAddress + connected: bool + initial: bool + srcid: int + records_to_add: dict[int, tuple[str, str]] + client_infos: dict[str, str] + record_infos_to_add: dict[int, dict[str, str]] + aliases: collections.defaultdict[int, list[str]] + records_to_delete: set[int] + + def __init__(self, ep: IAddress, id: int) -> None: self.connected = True self.initial = False self.source_address = ep