Skip to content
Merged
12 changes: 10 additions & 2 deletions server/demo.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
# Time interval for sending recceiver advertisments
#announceInterval = 15.0

# Interval in seconds between periodic status log lines (0 to disable)
Comment thread
anderslindho marked this conversation as resolved.
#statusInterval = 60.0

# TCP port to expose Prometheus metrics on (0 or absent to disable).
# Requires prometheus_client: pip install recceiver[metrics]
#metricsPort = 0
Comment thread
anderslindho marked this conversation as resolved.

# Idle Timeout for TCP connections.
#tcptimeout = 15.0

Expand Down Expand Up @@ -126,5 +133,6 @@
# Number of times to retry polling before giving up. Default is 10.
#pushMaxRetries = 10

# Whether to retry polling indefinitely until success. Default is True.
#pushAlwaysRetry = True
# Whether to retry polling indefinitely until success. Default is False.
# Enabling this holds the global CF commit lock until CF recovers; use with caution.
#pushAlwaysRetry = False
1 change: 1 addition & 0 deletions server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"twisted>=22.10,<23; python_version<'3.8'",
"twisted>=24.11,<24.12; python_version>='3.8'",
]
optional-dependencies.metrics = [ "prometheus-client" ]
Comment thread
jacomago marked this conversation as resolved.
optional-dependencies.test = [ "pytest>=8.3,<8.4", "pytest-cov>=6,<7", "testcontainers>=4.8.2,<4.9" ]
urls.Repository = "https://github.com/ChannelFinder/recsync"

Expand Down
32 changes: 31 additions & 1 deletion server/recceiver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import random

from twisted.application import service
from twisted.internet import defer, pollreactor
from twisted.internet import defer, pollreactor, task
from twisted.internet.error import CannotListenError
from twisted.python import log, usage
from zope.interface import implementer

from twisted import plugin

from . import metrics
from .announce import Announcer
from .processors import ProcessorController
from .recast import CastFactory
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(self, config):
self.reactor = reactor

service.MultiService.__init__(self)
self._statusLoop = None
self.annperiod = float(config.get("announceInterval", "15.0"))
self.tcptimeout = float(config.get("tcptimeout", "15.0"))
self.commitperiod = float(config.get("commitInterval", "5.0"))
Expand All @@ -52,6 +54,8 @@ def __init__(self, config):
self.addrlist = []

self.port = int(portn or "0")
self.statusInterval = float(config.get("statusInterval", "60.0"))
self.metricsPort = int(config.get("metricsPort", "0"))

for addr in config.get("addrlist", "").split(","):
if not addr:
Expand Down Expand Up @@ -111,9 +115,35 @@ def privilegedStartService(self):
# This will start up plugin Processors
service.MultiService.privilegedStartService(self)

if self.metricsPort > 0:
if metrics.available:
self.reactor.listenTCP(self.metricsPort, metrics.make_site(), interface=self.bind)
_log.info("Prometheus metrics available on port %d", self.metricsPort)
else:
_log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled")

metrics.connections_limit.set(self.tcpFactory.maxActive)

if self.statusInterval > 0:
self._statusLoop = task.LoopingCall(self._logStatus)
self._statusLoop.start(self.statusInterval, now=False)

def _logStatus(self):
metrics.connections_active.set(self.tcpFactory.NActive)
metrics.connections_waiting.set(len(self.tcpFactory.Wait))
_log.info(
"status: connections active=%d/%d queued=%d",
self.tcpFactory.NActive,
self.tcpFactory.maxActive,
len(self.tcpFactory.Wait),
)

def stopService(self):
_log.info("Stopping RecService")

if self._statusLoop is not None and self._statusLoop.running:
self._statusLoop.stop()

# This will stop plugin Processors
D2 = defer.maybeDeferred(service.MultiService.stopService, self)

Expand Down
6 changes: 4 additions & 2 deletions server/recceiver/cf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class CFConfig:
cf_password: Optional[str] = None
verify_ssl: Optional[bool] = None
push_max_retries: int = 10
push_always_retry: bool = True
push_always_retry: bool = False
status_interval: float = 60.0

@classmethod
def loads(cls, conf: ConfigAdapter) -> "CFConfig":
Expand All @@ -53,7 +54,8 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig":
cf_password=conf.get("cfPassword"),
verify_ssl=conf.getboolean("verifySSL"),
push_max_retries=conf.getint("pushMaxRetries", 10),
push_always_retry=conf.getboolean("pushAlwaysRetry", True),
push_always_retry=conf.getboolean("pushAlwaysRetry", False),
status_interval=float(conf.get("statusInterval", "60.0")),
)

def __repr__(self) -> str:
Expand Down
83 changes: 68 additions & 15 deletions server/recceiver/cf/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from channelfinder import ChannelFinderClient
from requests import ConnectionError, RequestException
from twisted.application import service
from twisted.internet import defer
from twisted.internet import defer, task
from twisted.internet.defer import DeferredLock
from twisted.internet.threads import deferToThread
from zope.interface import implementer

from recceiver import interfaces
from recceiver import interfaces, metrics
from recceiver.cf.adapter import ChannelFinderAdapter, PyCFClientAdapter
from recceiver.cf.config import CFConfig
from recceiver.cf.model import (
Expand Down Expand Up @@ -46,6 +46,7 @@ def __init__(self, name: Optional[str], conf: ConfigAdapter):
self.client: Optional[ChannelFinderAdapter] = None
self.current_time: Callable[[Optional[str]], str] = get_current_time
self.lock: DeferredLock = DeferredLock()
self._statusLoop = None

def startService(self):
service.Service.startService(self)
Expand All @@ -65,6 +66,15 @@ def startService(self):
finally:
self.lock.release()

if self.cf_config.status_interval > 0:
self._statusLoop = task.LoopingCall(self._logStatus)
self._statusLoop.start(self.cf_config.status_interval, now=False)

def _logStatus(self):
metrics.known_iocs.set(len(self.iocs))
metrics.tracked_channels.set(len(self.channel_ioc_ids))
_log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids))

def _start_service_with_lock(self):
_log.info("CF_START with configuration: %s", self.cf_config)

Expand All @@ -86,7 +96,10 @@ def _start_service_with_lock(self):
raise
else:
if self.cf_config.clean_on_start:
self.clean_service()
_log.info("CF Clean: scheduling background startup sweep")
from twisted.internet import reactor

reactor.callLater(0, self._start_background_clean)

def _setup_cf_properties(self, cf_properties: Set[str]) -> None:
"""Compute required CF properties, register any missing ones, and cache state.
Expand Down Expand Up @@ -137,13 +150,27 @@ def _setup_cf_properties(self, cf_properties: Set[str]) -> None:

def stopService(self):
_log.info("CF_STOP")
if self._statusLoop is not None and self._statusLoop.running:
self._statusLoop.stop()
service.Service.stopService(self)
return self.lock.run(self._stop_service_with_lock)

def _stop_service_with_lock(self):
if self.cf_config.clean_on_stop:
self.clean_service()
"""Stop the CFProcessor service with lock held.

If clean_on_stop is enabled, mark all channels as inactive.
The sweep runs in a background thread to avoid blocking the reactor.
The lock is held throughout, preventing new commits from interleaving.
"""
_log.info("CF_STOP with lock")
if self.cf_config.clean_on_stop:
return deferToThread(self.clean_service)

def _start_background_clean(self):
_log.info("CF Clean: background startup sweep beginning")
deferToThread(self.clean_service).addErrback(
lambda err: _log.error("CF Clean background sweep failed: %s", err)
)

# @defer.inlineCallbacks # Twisted v16 does not support cancellation!
def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred:
Expand Down Expand Up @@ -308,7 +335,15 @@ def _commit_with_thread(self, transaction: CommitTransaction):
ioc_name = transaction.client_infos.get("IOCNAME")
if not ioc_name:
ioc_name = str(port)
_log.debug("IOC at %s:%d did not send IOCNAME; using port as iocName", host, port)
_log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port)
if ioc_name.isdigit() and 1024 <= int(ioc_name) <= 65535:
_log.warning(
"IOC at %s has numeric iocName '%s' (looks like an ephemeral port) — "
"iocid will change on every reconnect, causing stale channels in CF; "
"configure a stable iocName via reccaster",
host,
ioc_name,
)

owner = (
transaction.client_infos.get(self.cf_config.env_owner_variable)
Expand Down Expand Up @@ -339,6 +374,13 @@ def _commit_with_thread(self, transaction: CommitTransaction):
_log.debug("Delete records: %s", records_to_delete)

record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info)
if not transaction.connected and ioc_info.id not in self.iocs:
_log.warning(
"IOC at %s:%d disconnected before completing initial upload (0 channels registered)",
host,
port,
)
return
self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name)
poll_success = self._push_to_cf(record_info_by_name, records_to_delete, ioc_info)
if not poll_success:
Expand Down Expand Up @@ -378,8 +420,8 @@ def clean_service(self) -> None:
channels = self.get_active_channels(recceiverid)
_log.info("CF Clean Completed")
return
except RequestException as e:
_log.exception("Clean service failed: %s", e)
except RequestException:
_log.exception("Clean service failed")
retry_seconds = min(60, sleep)
_log.info("Clean service retry in %s seconds", retry_seconds)
time.sleep(retry_seconds)
Expand All @@ -405,21 +447,31 @@ def _push_to_cf(
records_to_delete: List[str],
ioc_info: IOCInfo,
) -> bool:
_log.info("Pushing updates for %s begins...", ioc_info)
_log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name))
count = 0
sleep = 1.0
while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries:
if not self.running:
_log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count)
return False
count += 1
t0 = time.monotonic()
try:
self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info)
elapsed = time.monotonic() - t0
metrics.cf_commit_duration_seconds.observe(elapsed)
metrics.cf_commits_total.labels(result="success").inc()
_log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name))
return True
except RequestException as e:
_log.exception("ChannelFinder update failed: %s", e)
except RequestException:
elapsed = time.monotonic() - t0
_log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info)
retry_seconds = min(60, sleep)
_log.info("ChannelFinder update retry in %s seconds", retry_seconds)
_log.info("CF push retry in %s seconds", retry_seconds)
time.sleep(retry_seconds)
sleep *= 1.5
_log.error("Pushing updates for %s complete, failed after %d attempts", ioc_info, count)
metrics.cf_commits_total.labels(result="cancelled").inc()
_log.error("CF push gave up after %d attempts: %s", count, ioc_info)
return False

def _update_channelfinder(
Expand All @@ -434,9 +486,10 @@ def _update_channelfinder(
new_channels = set(record_info_by_name.keys())
iocid = ioc_info.id

if iocid not in self.iocs:
if iocid not in self.iocs and record_info_by_name:
# Disconnect-before-upload is already logged in _commit_with_thread.
_log.warning(
"IOC %s did not send an initial transaction to join IOC list (%d IOCs known)",
"IOC %s committed update without prior initial transaction (%d IOCs known)",
ioc_info,
len(self.iocs),
)
Expand Down
89 changes: 89 additions & 0 deletions server/recceiver/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-

import logging

_log = logging.getLogger(__name__)

try:
from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Counter, Gauge, Histogram, generate_latest
from twisted.web.resource import Resource
from twisted.web.server import Site

_registry = CollectorRegistry(auto_describe=True)

connections_active = Gauge(
"recceiver_connections_active",
"Active uploading IOC connections",
registry=_registry,
)
connections_waiting = Gauge(
"recceiver_connections_waiting",
"IOC connections waiting for an upload slot",
registry=_registry,
)
connections_limit = Gauge(
"recceiver_connections_limit",
"Maximum concurrent active connections (maxActive)",
registry=_registry,
)
known_iocs = Gauge(
"recceiver_known_iocs",
"IOCs with channels currently registered in CF",
registry=_registry,
)
tracked_channels = Gauge(
"recceiver_tracked_channels",
"Unique channel names tracked by the CF processor",
registry=_registry,
)
cf_commits_total = Counter(
"recceiver_cf_commits_total",
"CF push attempts by result",
["result"],
registry=_registry,
)
cf_commit_duration_seconds = Histogram(
"recceiver_cf_commit_duration_seconds",
"CF push duration in seconds",
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
registry=_registry,
)

Comment thread
jacomago marked this conversation as resolved.
class _MetricsResource(Resource):
isLeaf = True

def render_GET(self, request):
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode())
return generate_latest(_registry)

def make_site():
return Site(_MetricsResource())

available = True

except ImportError:
available = False

class _Noop:
def set(self, v=None):
pass # no-op: prometheus_client not installed

def inc(self, v=1):
pass # no-op: prometheus_client not installed

def labels(self, **_kw):
return self

def observe(self, v):
pass # no-op: prometheus_client not installed

connections_active = _Noop()
connections_waiting = _Noop()
connections_limit = _Noop()
known_iocs = _Noop()
tracked_channels = _Noop()
cf_commits_total = _Noop()
cf_commit_duration_seconds = _Noop()

def make_site():
raise RuntimeError("prometheus_client is not installed")
1 change: 1 addition & 0 deletions server/recceiver/recast.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ def flush(self):
return

transaction, self.transaction = self.transaction, Transaction(self.ep, id(self))
self.transaction.client_infos = dict(transaction.client_infos)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really work with an IOC?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix is only applicable for large IOCs, with more than 5k (trlimit) records. The copy to subsequent transaction applies also to smaller IOCs but does not matter for those as they will flush all (incl client_infos) on first transaction. But for >5k, the subsequent transaction starts with empty client infos.

Keep in mind that reccaster proto is ordered: (all) AddInfo goes before AddRecord.

self.dirty = False

def commit(_ignored):
Expand Down
Loading
Loading