From 1f81521362c3f1dd931c15ddb133481eeaaba2f7 Mon Sep 17 00:00:00 2001 From: Bruce Pannaman Date: Fri, 10 Apr 2026 18:58:33 +0100 Subject: [PATCH] fix(temporal): allowing-ACP-temporal-telemetry --- .../lib/core/temporal/workers/worker.py | 50 ++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/src/agentex/lib/core/temporal/workers/worker.py b/src/agentex/lib/core/temporal/workers/worker.py index 52ad6bb2f..1e1b9c35b 100644 --- a/src/agentex/lib/core/temporal/workers/worker.py +++ b/src/agentex/lib/core/temporal/workers/worker.py @@ -89,16 +89,17 @@ def _validate_interceptors(interceptors: list) -> None: ) -async def get_temporal_client(temporal_address: str, metrics_url: str | None = None, plugins: list = []) -> Client: +async def get_temporal_client( + temporal_address: str, metrics_url: str | None = None, plugins: list = [] +) -> Client: if plugins != []: # We don't need to validate the plugins if they are empty _validate_plugins(plugins) # Check if OpenAI plugin is present - it needs to configure its own data converter # Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents from temporalio.contrib.openai_agents import OpenAIAgentsPlugin - has_openai_plugin = any( - isinstance(p, OpenAIAgentsPlugin) for p in (plugins or []) - ) + + has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])) # Build connection kwargs connect_kwargs = { @@ -113,7 +114,9 @@ async def get_temporal_client(temporal_address: str, metrics_url: str | None = N if not metrics_url: client = await Client.connect(**connect_kwargs) else: - runtime = Runtime(telemetry=TelemetryConfig(metrics=OpenTelemetryConfig(url=metrics_url))) + runtime = Runtime( + telemetry=TelemetryConfig(metrics=OpenTelemetryConfig(url=metrics_url)) + ) connect_kwargs["runtime"] = runtime client = await Client.connect(**connect_kwargs) return client @@ -128,6 +131,7 @@ def __init__( health_check_port: int | None = None, plugins: list = [], interceptors: list = [], + metrics_url: str | None = None, ): self.task_queue = task_queue self.activity_handles = [] @@ -135,9 +139,14 @@ def __init__( self.max_concurrent_activities = max_concurrent_activities self.health_check_server_running = False self.healthy = False - self.health_check_port = health_check_port if health_check_port is not None else EnvironmentVariables.refresh().HEALTH_CHECK_PORT + self.health_check_port = ( + health_check_port + if health_check_port is not None + else EnvironmentVariables.refresh().HEALTH_CHECK_PORT + ) self.plugins = plugins self.interceptors = interceptors + self.metrics_url = metrics_url @overload async def run( @@ -172,12 +181,17 @@ async def run( temporal_client = await get_temporal_client( temporal_address=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), plugins=self.plugins, + metrics_url=self.metrics_url, ) # Enable debug mode if AgentEx debug is enabled (disables deadlock detection) - debug_enabled = os.environ.get("AGENTEX_DEBUG_ENABLED", "false").lower() == "true" + debug_enabled = ( + os.environ.get("AGENTEX_DEBUG_ENABLED", "false").lower() == "true" + ) if debug_enabled: - logger.info("🐛 [WORKER] Temporal debug mode enabled - deadlock detection disabled") + logger.info( + "🐛 [WORKER] Temporal debug mode enabled - deadlock detection disabled" + ) if workflow is None and workflows is None: raise ValueError("Either workflow or workflows must be provided") @@ -207,7 +221,9 @@ async def _health_check(self): async def start_health_check_server(self): if not self.health_check_server_running: app = web.Application() - app.router.add_get("/readyz", lambda request: self._health_check()) # noqa: ARG005 + app.router.add_get( + "/readyz", lambda request: self._health_check() + ) # noqa: ARG005 # Disable access logging runner = web.AppRunner(app, access_log=None) @@ -216,19 +232,27 @@ async def start_health_check_server(self): try: site = web.TCPSite(runner, "0.0.0.0", self.health_check_port) await site.start() - logger.info(f"Health check server running on http://0.0.0.0:{self.health_check_port}/readyz") + logger.info( + f"Health check server running on http://0.0.0.0:{self.health_check_port}/readyz" + ) self.health_check_server_running = True except OSError as e: - logger.error(f"Failed to start health check server on port {self.health_check_port}: {e}") + logger.error( + f"Failed to start health check server on port {self.health_check_port}: {e}" + ) # Try alternative port if default fails try: alt_port = self.health_check_port + 1 site = web.TCPSite(runner, "0.0.0.0", alt_port) await site.start() - logger.info(f"Health check server running on alternative port http://0.0.0.0:{alt_port}/readyz") + logger.info( + f"Health check server running on alternative port http://0.0.0.0:{alt_port}/readyz" + ) self.health_check_server_running = True except OSError as e: - logger.error(f"Failed to start health check server on alternative port {alt_port}: {e}") + logger.error( + f"Failed to start health check server on alternative port {alt_port}: {e}" + ) raise """