Skip to content

Commit 02320d6

Browse files
authored
PYTHON-5767 - Finalize client backpressure implementation for phase 1… (#2742)
1 parent 3f64de3 commit 02320d6

22 files changed

Lines changed: 422 additions & 1894 deletions

doc/changelog.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ PyMongo 4.17 brings a number of changes including:
99
- Added the :meth:`~pymongo.asynchronous.client_session.AsyncClientSession.bind` and :meth:`~pymongo.client_session.ClientSession.bind` methods
1010
that allow users to bind a session to all database operations within the scope of a context manager instead of having to explicitly pass the session to each individual operation.
1111
See <PLACEHOLDER> for examples and more information.
12+
- Added support for MongoDB 's Intelligent Workload Management (IWM) and ingress connection rate limiting features.
13+
The driver now gracefully handles write-blocking scenarios and optimizes connection establishment during high-load conditions to maintain application availability.
14+
See <DOCSP-55426> and <DOCSP-57078> for more information.
1215

1316
Changes in Version 4.16.0 (2026/01/07)
1417
--------------------------------------

pymongo/asynchronous/helpers.py

Lines changed: 5 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
)
3131

3232
from pymongo import _csot
33+
from pymongo.common import MAX_ADAPTIVE_RETRIES
3334
from pymongo.errors import (
3435
OperationFailure,
3536
)
3637
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
37-
from pymongo.lock import _async_create_lock
3838

3939
_IS_SYNC = False
4040

@@ -76,11 +76,8 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7676
return cast(F, inner)
7777

7878

79-
_MAX_RETRIES = 5
8079
_BACKOFF_INITIAL = 0.1
8180
_BACKOFF_MAX = 10
82-
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
83-
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8481

8582

8683
def _backoff(
@@ -90,79 +87,32 @@ def _backoff(
9087
return jitter * min(initial_delay * (2**attempt), max_delay)
9188

9289

93-
class _TokenBucket:
94-
"""A token bucket implementation for rate limiting."""
95-
96-
def __init__(
97-
self,
98-
capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY,
99-
return_rate: float = DEFAULT_RETRY_TOKEN_RETURN,
100-
):
101-
self.lock = _async_create_lock()
102-
self.capacity = capacity
103-
self.tokens = capacity
104-
self.return_rate = return_rate
105-
106-
async def consume(self) -> bool:
107-
"""Consume a token from the bucket if available."""
108-
async with self.lock:
109-
if self.tokens >= 1:
110-
self.tokens -= 1
111-
return True
112-
return False
113-
114-
async def deposit(self, retry: bool = False) -> None:
115-
"""Deposit a token back into the bucket."""
116-
retry_token = 1 if retry else 0
117-
async with self.lock:
118-
self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate)
119-
120-
12190
class _RetryPolicy:
122-
"""A retry limiter that performs exponential backoff with jitter.
123-
124-
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
125-
a prolonged outage or high load.
126-
"""
91+
"""A retry limiter that performs exponential backoff with jitter."""
12792

12893
def __init__(
12994
self,
130-
token_bucket: _TokenBucket,
131-
attempts: int = _MAX_RETRIES,
95+
attempts: int = MAX_ADAPTIVE_RETRIES,
13296
backoff_initial: float = _BACKOFF_INITIAL,
13397
backoff_max: float = _BACKOFF_MAX,
134-
adaptive_retry: bool = False,
13598
):
136-
self.token_bucket = token_bucket
13799
self.attempts = attempts
138100
self.backoff_initial = backoff_initial
139101
self.backoff_max = backoff_max
140-
self.adaptive_retry = adaptive_retry
141-
142-
async def record_success(self, retry: bool) -> None:
143-
"""Record a successful operation."""
144-
if self.adaptive_retry:
145-
await self.token_bucket.deposit(retry)
146102

147103
def backoff(self, attempt: int) -> float:
148-
"""Return the backoff duration for the given ."""
104+
"""Return the backoff duration for the given attempt."""
149105
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)
150106

151107
async def should_retry(self, attempt: int, delay: float) -> bool:
152-
"""Return if we have budget to retry and how long to backoff."""
108+
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
153109
if attempt > self.attempts:
154110
return False
155111

156-
# If the delay would exceed the deadline, bail early before consuming a token.
157112
if _csot.get_timeout():
158113
if time.monotonic() + delay > _csot.get_deadline():
159114
return False
160115

161-
# Check token bucket last since we only want to consume a token if we actually retry.
162-
if self.adaptive_retry and not await self.token_bucket.consume():
163-
# DRIVERS-3246 Improve diagnostics when this case happens.
164-
# We could add info to the exception and log.
165-
return False
166116
return True
167117

168118

pymongo/asynchronous/mongo_client.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
7171
from pymongo.asynchronous.helpers import (
7272
_RetryPolicy,
73-
_TokenBucket,
7473
)
7574
from pymongo.asynchronous.settings import TopologySettings
7675
from pymongo.asynchronous.topology import Topology, _ErrorContext
@@ -615,17 +614,17 @@ def __init__(
615614
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
616615
details.
617616
618-
| **Adaptive retry options:**
619-
| (If not enabled explicitly, adaptive retries will not be enabled.)
617+
| **Overload retry options:**
620618
621-
- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
622-
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
619+
- `max_adaptive_retries`: (int) How many retries to allow for overload errors. Defaults to ``2``.
620+
- `enable_overload_retargeting`: (boolean) Whether overload retargeting is enabled for this client.
621+
If enabled, server overload errors will cause retry attempts to select a server that has not yet returned an overload error, if possible.
623622
Defaults to ``False``.
624623
625624
.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.
626625
627626
.. versionchanged:: 4.17
628-
Added the ``adaptive_retries`` URI and keyword argument.
627+
Added the ``max_adaptive_retries`` and ``enable_overload_retargeting`` URI and keyword arguments.
629628
630629
.. versionchanged:: 4.5
631630
Added the ``serverMonitoringMode`` keyword argument.
@@ -894,9 +893,7 @@ def __init__(
894893
self._options.read_concern,
895894
)
896895

897-
self._retry_policy = _RetryPolicy(
898-
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
899-
)
896+
self._retry_policy = _RetryPolicy(attempts=self._options.max_adaptive_retries)
900897

901898
self._init_based_on_options(self._seeds, srv_max_hosts, srv_service_name)
902899

@@ -2822,7 +2819,6 @@ async def run(self) -> T:
28222819
self._check_last_error(check_csot=True)
28232820
try:
28242821
res = await self._read() if self._is_read else await self._write()
2825-
await self._retry_policy.record_success(self._attempt_number > 0)
28262822
# Track whether the transaction has completed a command.
28272823
# If we need to apply backpressure to the first command,
28282824
# we will need to revert back to starting state.
@@ -2930,10 +2926,9 @@ async def run(self) -> T:
29302926
transaction.set_starting()
29312927
transaction.attempt = 0
29322928

2933-
if (
2934-
self._server is not None
2935-
and self._client.topology_description.topology_type_name == "Sharded"
2936-
or exc.has_error_label("SystemOverloadedError")
2929+
if self._server is not None and (
2930+
self._client.topology_description.topology_type_name == "Sharded"
2931+
or (overloaded and self._client.options.enable_overload_retargeting)
29372932
):
29382933
self._deprioritized_servers.append(self._server)
29392934

pymongo/client_options.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,15 @@ def __init__(
235235
self.__server_monitoring_mode = options.get(
236236
"servermonitoringmode", common.SERVER_MONITORING_MODE
237237
)
238-
self.__adaptive_retries = (
239-
options.get("adaptive_retries", common.ADAPTIVE_RETRIES)
240-
if "adaptive_retries" in options
241-
else options.get("adaptiveretries", common.ADAPTIVE_RETRIES)
238+
self.__max_adaptive_retries = (
239+
options.get("max_adaptive_retries", common.MAX_ADAPTIVE_RETRIES)
240+
if "max_adaptive_retries" in options
241+
else options.get("maxadaptiveretries", common.MAX_ADAPTIVE_RETRIES)
242+
)
243+
self.__enable_overload_retargeting = (
244+
options.get("enable_overload_retargeting", common.ENABLE_OVERLOAD_RETARGETING)
245+
if "enable_overload_retargeting" in options
246+
else options.get("enableoverloadretargeting", common.ENABLE_OVERLOAD_RETARGETING)
242247
)
243248

244249
@property
@@ -353,9 +358,17 @@ def server_monitoring_mode(self) -> str:
353358
return self.__server_monitoring_mode
354359

355360
@property
356-
def adaptive_retries(self) -> bool:
357-
"""The configured adaptiveRetries option.
361+
def max_adaptive_retries(self) -> int:
362+
"""The configured maxAdaptiveRetries option.
363+
364+
.. versionadded:: 4.17
365+
"""
366+
return self.__max_adaptive_retries
367+
368+
@property
369+
def enable_overload_retargeting(self) -> bool:
370+
"""The configured enableOverloadRetargeting option.
358371
359-
.. versionadded:: 4.XX
372+
.. versionadded:: 4.17
360373
"""
361-
return self.__adaptive_retries
374+
return self.__enable_overload_retargeting

pymongo/common.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,11 @@
140140
# Default value for serverMonitoringMode
141141
SERVER_MONITORING_MODE = "auto" # poll/stream/auto
142142

143-
# Default value for adaptiveRetries
144-
ADAPTIVE_RETRIES = False
143+
# Default value for max adaptive retries
144+
MAX_ADAPTIVE_RETRIES = 2
145+
146+
# Default value for enableOverloadRetargeting
147+
ENABLE_OVERLOAD_RETARGETING = False
145148

146149
# Auth mechanism properties that must raise an error instead of warning if they invalidate.
147150
_MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"]
@@ -741,7 +744,8 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
741744
"srvmaxhosts": validate_non_negative_integer,
742745
"timeoutms": validate_timeoutms,
743746
"servermonitoringmode": validate_server_monitoring_mode,
744-
"adaptiveretries": validate_boolean_or_string,
747+
"maxadaptiveretries": validate_non_negative_integer,
748+
"enableoverloadretargeting": validate_boolean_or_string,
745749
}
746750

747751
# Dictionary where keys are the names of URI options specific to pymongo,
@@ -775,7 +779,8 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
775779
"server_selector": validate_is_callable_or_none,
776780
"auto_encryption_opts": validate_auto_encryption_opts_or_none,
777781
"authoidcallowedhosts": validate_list,
778-
"adaptive_retries": validate_boolean_or_string,
782+
"max_adaptive_retries": validate_non_negative_integer,
783+
"enable_overload_retargeting": validate_boolean_or_string,
779784
}
780785

781786
# Dictionary where keys are any URI option name, and values are the

pymongo/synchronous/helpers.py

Lines changed: 5 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
)
3131

3232
from pymongo import _csot
33+
from pymongo.common import MAX_ADAPTIVE_RETRIES
3334
from pymongo.errors import (
3435
OperationFailure,
3536
)
3637
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
37-
from pymongo.lock import _create_lock
3838

3939
_IS_SYNC = True
4040

@@ -76,11 +76,8 @@ def inner(*args: Any, **kwargs: Any) -> Any:
7676
return cast(F, inner)
7777

7878

79-
_MAX_RETRIES = 5
8079
_BACKOFF_INITIAL = 0.1
8180
_BACKOFF_MAX = 10
82-
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
83-
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8481

8582

8683
def _backoff(
@@ -90,79 +87,32 @@ def _backoff(
9087
return jitter * min(initial_delay * (2**attempt), max_delay)
9188

9289

93-
class _TokenBucket:
94-
"""A token bucket implementation for rate limiting."""
95-
96-
def __init__(
97-
self,
98-
capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY,
99-
return_rate: float = DEFAULT_RETRY_TOKEN_RETURN,
100-
):
101-
self.lock = _create_lock()
102-
self.capacity = capacity
103-
self.tokens = capacity
104-
self.return_rate = return_rate
105-
106-
def consume(self) -> bool:
107-
"""Consume a token from the bucket if available."""
108-
with self.lock:
109-
if self.tokens >= 1:
110-
self.tokens -= 1
111-
return True
112-
return False
113-
114-
def deposit(self, retry: bool = False) -> None:
115-
"""Deposit a token back into the bucket."""
116-
retry_token = 1 if retry else 0
117-
with self.lock:
118-
self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate)
119-
120-
12190
class _RetryPolicy:
122-
"""A retry limiter that performs exponential backoff with jitter.
123-
124-
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
125-
a prolonged outage or high load.
126-
"""
91+
"""A retry limiter that performs exponential backoff with jitter."""
12792

12893
def __init__(
12994
self,
130-
token_bucket: _TokenBucket,
131-
attempts: int = _MAX_RETRIES,
95+
attempts: int = MAX_ADAPTIVE_RETRIES,
13296
backoff_initial: float = _BACKOFF_INITIAL,
13397
backoff_max: float = _BACKOFF_MAX,
134-
adaptive_retry: bool = False,
13598
):
136-
self.token_bucket = token_bucket
13799
self.attempts = attempts
138100
self.backoff_initial = backoff_initial
139101
self.backoff_max = backoff_max
140-
self.adaptive_retry = adaptive_retry
141-
142-
def record_success(self, retry: bool) -> None:
143-
"""Record a successful operation."""
144-
if self.adaptive_retry:
145-
self.token_bucket.deposit(retry)
146102

147103
def backoff(self, attempt: int) -> float:
148-
"""Return the backoff duration for the given ."""
104+
"""Return the backoff duration for the given attempt."""
149105
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)
150106

151107
def should_retry(self, attempt: int, delay: float) -> bool:
152-
"""Return if we have budget to retry and how long to backoff."""
108+
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
153109
if attempt > self.attempts:
154110
return False
155111

156-
# If the delay would exceed the deadline, bail early before consuming a token.
157112
if _csot.get_timeout():
158113
if time.monotonic() + delay > _csot.get_deadline():
159114
return False
160115

161-
# Check token bucket last since we only want to consume a token if we actually retry.
162-
if self.adaptive_retry and not self.token_bucket.consume():
163-
# DRIVERS-3246 Improve diagnostics when this case happens.
164-
# We could add info to the exception and log.
165-
return False
166116
return True
167117

168118

0 commit comments

Comments
 (0)