@@ -43,6 +43,24 @@ class AutoscaledPool:
4343 any of the tasks, it is propagated and the pool is stopped.
4444 """
4545
46+ _AUTOSCALE_INTERVAL = timedelta (seconds = 10 )
47+ """Interval at which the autoscaled pool adjusts the desired concurrency based on the latest system status."""
48+
49+ _LOGGING_INTERVAL = timedelta (minutes = 1 )
50+ """Interval at which the autoscaled pool logs its current state."""
51+
52+ _DESIRED_CONCURRENCY_RATIO = 0.9
53+ """Minimum ratio of desired concurrency that must be reached before allowing further scale-up."""
54+
55+ _SCALE_UP_STEP_RATIO = 0.05
56+ """Fraction of desired concurrency to add during each scale-up operation."""
57+
58+ _SCALE_DOWN_STEP_RATIO = 0.05
59+ """Fraction of desired concurrency to remove during each scale-down operation."""
60+
61+ _TASK_TIMEOUT : timedelta | None = None
62+ """Timeout within which the `run_task_function` must complete."""
63+
4664 def __init__ (
4765 self ,
4866 * ,
@@ -51,17 +69,12 @@ def __init__(
5169 run_task_function : Callable [[], Awaitable ],
5270 is_task_ready_function : Callable [[], Awaitable [bool ]],
5371 is_finished_function : Callable [[], Awaitable [bool ]],
54- task_timeout : timedelta | None = None ,
55- autoscale_interval : timedelta = timedelta (seconds = 10 ),
56- logging_interval : timedelta = timedelta (minutes = 1 ),
57- desired_concurrency_ratio : float = 0.9 ,
58- scale_up_step_ratio : float = 0.05 ,
59- scale_down_step_ratio : float = 0.05 ,
6072 ) -> None :
6173 """A default constructor.
6274
6375 Args:
6476 system_status: Provides data about system utilization (load).
77+ concurrency_settings: Settings of concurrency levels.
6578 run_task_function: A function that performs an asynchronous resource-intensive task.
6679 is_task_ready_function: A function that indicates whether `run_task_function` should be called. This
6780 function is called every time there is free capacity for a new task and it should indicate whether
@@ -71,44 +84,21 @@ def __init__(
7184 resolves to `True` then the pool's run finishes. Being called only when there are no tasks being
7285 processed means that as long as `is_task_ready_function` keeps resolving to `True`,
7386 `is_finished_function` will never be called. To abort a run, use the `abort` method.
74- task_timeout: Timeout in which the `run_task_function` needs to finish.
75- autoscale_interval: Defines how often the pool should attempt to adjust the desired concurrency based on
76- the latest system status. Setting it lower than 1 might have a severe impact on performance. We suggest
77- using a value from 5 to 20.
78- logging_interval: Specifies a period in which the instance logs its state, in seconds.
79- desired_concurrency_ratio: Minimum level of desired concurrency to reach before more scaling up is allowed.
80- scale_up_step_ratio: Defines the fractional amount of desired concurrency to be added with each scaling up.
81- scale_down_step_ratio: Defines the amount of desired concurrency to be subtracted with each scaling down.
82- concurrency_settings: Settings of concurrency levels.
8387 """
84- self . _system_status = system_status
88+ concurrency_settings = concurrency_settings or ConcurrencySettings ()
8589
90+ self ._system_status = system_status
8691 self ._run_task_function = run_task_function
8792 self ._is_task_ready_function = is_task_ready_function
8893 self ._is_finished_function = is_finished_function
89-
90- self ._task_timeout = task_timeout
91-
92- self ._logging_interval = logging_interval
93- self ._log_system_status_task = RecurringTask (self ._log_system_status , logging_interval )
94-
95- self ._autoscale_task = RecurringTask (self ._autoscale , autoscale_interval )
96-
97- if desired_concurrency_ratio < 0 or desired_concurrency_ratio > 1 :
98- raise ValueError ('desired_concurrency_ratio must be between 0 and 1 (non-inclusive)' )
99-
100- self ._desired_concurrency_ratio = desired_concurrency_ratio
101-
102- concurrency_settings = concurrency_settings or ConcurrencySettings ()
103-
10494 self ._desired_concurrency = concurrency_settings .desired_concurrency
10595 self ._max_concurrency = concurrency_settings .max_concurrency
10696 self ._min_concurrency = concurrency_settings .min_concurrency
97+ self ._max_tasks_per_minute = concurrency_settings .max_tasks_per_minute
10798
108- self ._scale_up_step_ratio = scale_up_step_ratio
109- self ._scale_down_step_ratio = scale_down_step_ratio
99+ self ._log_system_status_task = RecurringTask ( self . _log_system_status , self . _LOGGING_INTERVAL )
100+ self ._autoscale_task = RecurringTask ( self . _autoscale , self . _AUTOSCALE_INTERVAL )
110101
111- self ._max_tasks_per_minute = concurrency_settings .max_tasks_per_minute
112102 self ._is_paused = False
113103 self ._current_run : _AutoscaledPoolRun | None = None
114104
@@ -195,7 +185,7 @@ def _autoscale(self) -> None:
195185 """Inspect system load status and adjust desired concurrency if necessary. Do not call directly."""
196186 status = self ._system_status .get_historical_system_info ()
197187
198- min_current_concurrency = math .floor (self ._desired_concurrency_ratio * self .desired_concurrency )
188+ min_current_concurrency = math .floor (self ._DESIRED_CONCURRENCY_RATIO * self .desired_concurrency )
199189 should_scale_up = (
200190 status .is_system_idle
201191 and self ._desired_concurrency < self ._max_concurrency
@@ -205,10 +195,10 @@ def _autoscale(self) -> None:
205195 should_scale_down = not status .is_system_idle and self ._desired_concurrency > self ._min_concurrency
206196
207197 if should_scale_up :
208- step = math .ceil (self ._scale_up_step_ratio * self ._desired_concurrency )
198+ step = math .ceil (self ._SCALE_UP_STEP_RATIO * self ._desired_concurrency )
209199 self ._desired_concurrency = min (self ._max_concurrency , self ._desired_concurrency + step )
210200 elif should_scale_down :
211- step = math .ceil (self ._scale_down_step_ratio * self ._desired_concurrency )
201+ step = math .ceil (self ._SCALE_DOWN_STEP_RATIO * self ._desired_concurrency )
212202 self ._desired_concurrency = max (self ._min_concurrency , self ._desired_concurrency - step )
213203
214204 def _log_system_status (self ) -> None :
@@ -286,10 +276,10 @@ async def _worker_task(self) -> None:
286276 try :
287277 await asyncio .wait_for (
288278 self ._run_task_function (),
289- timeout = self ._task_timeout .total_seconds () if self ._task_timeout is not None else None ,
279+ timeout = self ._TASK_TIMEOUT .total_seconds () if self ._TASK_TIMEOUT is not None else None ,
290280 )
291281 except asyncio .TimeoutError :
292- timeout_str = self ._task_timeout .total_seconds () if self ._task_timeout is not None else '*not set*'
282+ timeout_str = self ._TASK_TIMEOUT .total_seconds () if self ._TASK_TIMEOUT is not None else '*not set*'
293283 logger .warning (f'Task timed out after { timeout_str } seconds' )
294284 finally :
295285 logger .debug ('Worker task finished' )
0 commit comments