Skip to content

Commit c2e3760

Browse files
manan164claude
andauthored
Fix backward compat: fall back to v1 update endpoint (#387)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent e11e88c commit c2e3760

3 files changed

Lines changed: 249 additions & 13 deletions

File tree

src/conductor/client/automator/async_task_runner.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from conductor.client.http.models.task_result import TaskResult
2525
from conductor.client.http.models.task_result_status import TaskResultStatus
2626
from conductor.client.http.models.schema_def import SchemaDef, SchemaType
27-
from conductor.client.http.rest import AuthorizationException
27+
from conductor.client.http.rest import AuthorizationException, ApiException
2828
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
2929
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
3030
from conductor.client.telemetry.metrics_collector import MetricsCollector
@@ -111,6 +111,7 @@ def __init__(
111111
# Semaphore will be created in run() within the event loop
112112
self._semaphore = None
113113
self._shutdown = False # Flag to indicate graceful shutdown
114+
self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint
114115

115116
async def run(self) -> None:
116117
"""Main async loop - runs continuously in single event loop."""
@@ -583,6 +584,11 @@ async def __async_execute_and_update_task(self, task: Task) -> None:
583584
return
584585
# Update task and get next task from v2 response
585586
task = await self.__async_update_task(task_result)
587+
# v2 returns the next task; if v1 was used (returns None), immediately
588+
# poll for the next task to preserve tight-loop behaviour on older servers
589+
if task is None and not self._use_update_v2 and not self._shutdown:
590+
tasks = await self.__async_batch_poll(1)
591+
task = tasks[0] if tasks else None
586592
except Exception as e:
587593
logger.error(
588594
"Error executing/updating task %s: %s",
@@ -815,15 +821,55 @@ async def __async_update_task(self, task_result: TaskResult):
815821
# Exponential backoff: [10s, 20s, 30s] before retry
816822
await asyncio.sleep(attempt * 10)
817823
try:
818-
next_task = await self.async_task_client.update_task_v2(body=task_result)
819-
logger.debug(
820-
"Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
824+
if self._use_update_v2:
825+
next_task = await self.async_task_client.update_task_v2(body=task_result)
826+
logger.debug(
827+
"Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
828+
task_result.task_id,
829+
task_result.workflow_instance_id,
830+
task_definition_name,
831+
next_task.task_id if next_task else None
832+
)
833+
return next_task
834+
else:
835+
await self.async_task_client.update_task(body=task_result)
836+
logger.debug(
837+
"Updated async task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s",
838+
task_result.task_id,
839+
task_result.workflow_instance_id,
840+
task_definition_name,
841+
)
842+
return None
843+
except ApiException as e:
844+
if e.status == 404 and self._use_update_v2:
845+
logger.warning(
846+
"Server does not support update-task-v2 endpoint (HTTP 404). "
847+
"Falling back to v1 update endpoint. "
848+
"Upgrade your Conductor instance to v5+ to enable the v2 endpoint."
849+
)
850+
self._use_update_v2 = False
851+
# Retry immediately with v1
852+
try:
853+
await self.async_task_client.update_task(body=task_result)
854+
return None
855+
except Exception as fallback_e:
856+
last_exception = fallback_e
857+
continue
858+
last_exception = e
859+
if self.metrics_collector is not None:
860+
self.metrics_collector.increment_task_update_error(
861+
task_definition_name, type(e)
862+
)
863+
logger.error(
864+
"Failed to update async task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
865+
attempt + 1,
866+
retry_count,
821867
task_result.task_id,
822868
task_result.workflow_instance_id,
823869
task_definition_name,
824-
next_task.task_id if next_task else None
870+
traceback.format_exc()
825871
)
826-
return next_task
872+
continue
827873
except Exception as e:
828874
last_exception = e
829875
if self.metrics_collector is not None:

src/conductor/client/automator/task_runner.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from conductor.client.http.models.task_result import TaskResult
2626
from conductor.client.http.models.task_result_status import TaskResultStatus
2727
from conductor.client.http.models.schema_def import SchemaDef, SchemaType
28-
from conductor.client.http.rest import AuthorizationException
28+
from conductor.client.http.rest import AuthorizationException, ApiException
2929
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
3030
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
3131
from conductor.client.telemetry.metrics_collector import MetricsCollector
@@ -92,6 +92,7 @@ def __init__(
9292
self._last_poll_time = 0 # Track last poll to avoid excessive polling when queue is empty
9393
self._consecutive_empty_polls = 0 # Track empty polls to implement backoff
9494
self._shutdown = False # Flag to indicate graceful shutdown
95+
self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint
9596

9697
def run(self) -> None:
9798
if self.configuration is not None:
@@ -523,6 +524,11 @@ def __execute_and_update_task(self, task: Task) -> None:
523524
return
524525
# Update task and get next task from v2 response
525526
task = self.__update_task(task_result)
527+
# v2 returns the next task; if v1 was used (returns None), immediately
528+
# poll for the next task to preserve tight-loop behaviour on older servers
529+
if task is None and not self._use_update_v2 and not self._shutdown:
530+
tasks = self.__batch_poll_tasks(1)
531+
task = tasks[0] if tasks else None
526532
except Exception as e:
527533
logger.error(
528534
"Error executing/updating task %s: %s",
@@ -845,15 +851,55 @@ def __update_task(self, task_result: TaskResult):
845851
# Exponential backoff: [10s, 20s, 30s] before retry
846852
time.sleep(attempt * 10)
847853
try:
848-
next_task = self.task_client.update_task_v2(body=task_result)
849-
logger.debug(
850-
"Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
854+
if self._use_update_v2:
855+
next_task = self.task_client.update_task_v2(body=task_result)
856+
logger.debug(
857+
"Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
858+
task_result.task_id,
859+
task_result.workflow_instance_id,
860+
task_definition_name,
861+
next_task.task_id if next_task else None
862+
)
863+
return next_task
864+
else:
865+
self.task_client.update_task(body=task_result)
866+
logger.debug(
867+
"Updated task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s",
868+
task_result.task_id,
869+
task_result.workflow_instance_id,
870+
task_definition_name,
871+
)
872+
return None
873+
except ApiException as e:
874+
if e.status == 404 and self._use_update_v2:
875+
logger.warning(
876+
"Server does not support update-task-v2 endpoint (HTTP 404). "
877+
"Falling back to v1 update endpoint. "
878+
"Upgrade your Orkes instance to v5+ to enable the v2 endpoint."
879+
)
880+
self._use_update_v2 = False
881+
# Retry immediately with v1
882+
try:
883+
self.task_client.update_task(body=task_result)
884+
return None
885+
except Exception as fallback_e:
886+
last_exception = fallback_e
887+
continue
888+
last_exception = e
889+
if self.metrics_collector is not None:
890+
self.metrics_collector.increment_task_update_error(
891+
task_definition_name, type(e)
892+
)
893+
logger.error(
894+
"Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
895+
attempt + 1,
896+
retry_count,
851897
task_result.task_id,
852898
task_result.workflow_instance_id,
853899
task_definition_name,
854-
next_task.task_id if next_task else None
900+
traceback.format_exc()
855901
)
856-
return next_task
902+
continue
857903
except Exception as e:
858904
last_exception = e
859905
if self.metrics_collector is not None:

tests/unit/automator/test_task_runner_coverage.py

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from conductor.client.http.models.task import Task
2424
from conductor.client.http.models.task_result import TaskResult
2525
from conductor.client.http.models.task_result_status import TaskResultStatus
26-
from conductor.client.http.rest import AuthorizationException
26+
from conductor.client.http.rest import AuthorizationException, ApiException
2727
from conductor.client.worker.worker_interface import WorkerInterface
2828

2929

@@ -803,6 +803,150 @@ def test_update_task_with_metrics_on_error(self):
803803
4
804804
)
805805

806+
# ========================================
807+
# v1 Fallback Tests (backward compat with Orkes Conductor < v5)
808+
# ========================================
809+
810+
@patch('time.sleep', Mock(return_value=None))
811+
def test_update_task_v2_404_falls_back_to_v1(self):
812+
"""When server returns 404 for v2 endpoint, should fall back to v1 and return None."""
813+
worker = MockWorker('test_task')
814+
task_runner = TaskRunner(worker=worker)
815+
816+
task_result = TaskResult(
817+
task_id='test_id',
818+
workflow_instance_id='wf_id',
819+
worker_id=worker.get_identity(),
820+
status=TaskResultStatus.COMPLETED
821+
)
822+
823+
with patch.object(TaskResourceApi, 'update_task_v2',
824+
side_effect=ApiException(status=404)) as mock_v2, \
825+
patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1:
826+
result = task_runner._TaskRunner__update_task(task_result)
827+
828+
mock_v2.assert_called_once()
829+
mock_v1.assert_called_once()
830+
self.assertIsNone(result)
831+
832+
@patch('time.sleep', Mock(return_value=None))
833+
def test_update_task_v2_404_sets_v1_flag(self):
834+
"""After a 404 on v2, _use_update_v2 flag must be False."""
835+
worker = MockWorker('test_task')
836+
task_runner = TaskRunner(worker=worker)
837+
self.assertTrue(task_runner._use_update_v2)
838+
839+
task_result = TaskResult(
840+
task_id='test_id',
841+
workflow_instance_id='wf_id',
842+
worker_id=worker.get_identity(),
843+
status=TaskResultStatus.COMPLETED
844+
)
845+
846+
with patch.object(TaskResourceApi, 'update_task_v2',
847+
side_effect=ApiException(status=404)), \
848+
patch.object(TaskResourceApi, 'update_task', return_value='ok'):
849+
task_runner._TaskRunner__update_task(task_result)
850+
851+
self.assertFalse(task_runner._use_update_v2)
852+
853+
@patch('time.sleep', Mock(return_value=None))
854+
def test_update_task_uses_v1_only_after_flag_set(self):
855+
"""Once _use_update_v2 is False, v2 is never called again."""
856+
worker = MockWorker('test_task')
857+
task_runner = TaskRunner(worker=worker)
858+
task_runner._use_update_v2 = False # pre-set as if fallback already happened
859+
860+
task_result = TaskResult(
861+
task_id='test_id',
862+
workflow_instance_id='wf_id',
863+
worker_id=worker.get_identity(),
864+
status=TaskResultStatus.COMPLETED
865+
)
866+
867+
with patch.object(TaskResourceApi, 'update_task_v2') as mock_v2, \
868+
patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1:
869+
result = task_runner._TaskRunner__update_task(task_result)
870+
871+
mock_v2.assert_not_called()
872+
mock_v1.assert_called_once()
873+
self.assertIsNone(result)
874+
875+
@patch('time.sleep', Mock(return_value=None))
876+
def test_update_task_non_404_api_exception_does_not_fallback(self):
877+
"""A non-404 ApiException (e.g. 500) should not trigger v1 fallback."""
878+
worker = MockWorker('test_task')
879+
task_runner = TaskRunner(worker=worker)
880+
881+
task_result = TaskResult(
882+
task_id='test_id',
883+
workflow_instance_id='wf_id',
884+
worker_id=worker.get_identity(),
885+
status=TaskResultStatus.COMPLETED
886+
)
887+
888+
with patch.object(TaskResourceApi, 'update_task_v2',
889+
side_effect=ApiException(status=500)) as mock_v2, \
890+
patch.object(TaskResourceApi, 'update_task') as mock_v1:
891+
result = task_runner._TaskRunner__update_task(task_result)
892+
893+
# v2 called 4 times (all retries), v1 never called, flag unchanged
894+
self.assertEqual(mock_v2.call_count, 4)
895+
mock_v1.assert_not_called()
896+
self.assertTrue(task_runner._use_update_v2)
897+
self.assertIsNone(result)
898+
899+
@patch('time.sleep', Mock(return_value=None))
900+
def test_execute_and_update_task_tight_loop_with_v1_polls_for_next(self):
901+
"""When v1 is used, the tight loop should poll immediately for the next task."""
902+
worker = MockWorker('test_task')
903+
task_runner = TaskRunner(worker=worker)
904+
task_runner._use_update_v2 = False # simulate post-fallback state
905+
906+
first_task = Task(task_id='task_1', workflow_instance_id='wf_1')
907+
second_task = Task(task_id='task_2', workflow_instance_id='wf_1')
908+
909+
# Execute returns a result, update v1 returns None, poll returns second task then empty
910+
with patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1, \
911+
patch.object(TaskResourceApi, 'batch_poll',
912+
side_effect=[[second_task], []]) as mock_poll:
913+
task_runner._TaskRunner__execute_and_update_task(first_task)
914+
915+
# update_task called twice (once per task), poll called twice (second_task then empty)
916+
self.assertEqual(mock_v1.call_count, 2)
917+
self.assertEqual(mock_poll.call_count, 2)
918+
919+
@patch('time.sleep', Mock(return_value=None))
920+
def test_execute_and_update_task_tight_loop_stops_when_queue_empty_on_v1(self):
921+
"""With v1, if poll returns nothing the tight loop exits cleanly."""
922+
worker = MockWorker('test_task')
923+
task_runner = TaskRunner(worker=worker)
924+
task_runner._use_update_v2 = False
925+
926+
task = Task(task_id='task_1', workflow_instance_id='wf_1')
927+
928+
with patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1, \
929+
patch.object(TaskResourceApi, 'batch_poll', return_value=[]) as mock_poll:
930+
task_runner._TaskRunner__execute_and_update_task(task)
931+
932+
mock_v1.assert_called_once()
933+
mock_poll.assert_called_once()
934+
935+
@patch('time.sleep', Mock(return_value=None))
936+
def test_execute_and_update_task_tight_loop_not_pollled_when_v2(self):
937+
"""With v2, poll is NOT called inside the tight loop (v2 returns next task directly)."""
938+
worker = MockWorker('test_task')
939+
task_runner = TaskRunner(worker=worker)
940+
941+
first_task = Task(task_id='task_1', workflow_instance_id='wf_1')
942+
943+
with patch.object(TaskResourceApi, 'update_task_v2', return_value=None) as mock_v2, \
944+
patch.object(TaskResourceApi, 'batch_poll') as mock_poll:
945+
task_runner._TaskRunner__execute_and_update_task(first_task)
946+
947+
mock_v2.assert_called_once()
948+
mock_poll.assert_not_called()
949+
806950
# ========================================
807951
# Property and Environment Tests
808952
# ========================================

0 commit comments

Comments
 (0)