Skip to content

Commit 81175ff

Browse files
authored
feat(pubsub): allow explicit ack/nack on messages (talkiq#931)
Adds new ack() and nack() methods on SubscriberMessage which allow users to forcibly mark a messsage as getting acked or nacked, regardless of the success of the callback. In plain English: When `callback` terminates, if `.ack()` has been called it acks. Else, if `.nack()` has been called it nacks. Else, it acks if no exception was raised and nacks otherwise. Fixes talkiq#837
1 parent 6480a1d commit 81175ff

3 files changed

Lines changed: 46 additions & 10 deletions

File tree

pubsub/gcloud/aio/pubsub/subscriber.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import time
1111
from collections.abc import Awaitable
1212
from collections.abc import Callable
13-
from typing import Optional
1413
from typing import TYPE_CHECKING
14+
from typing import Optional
1515
from typing import TypeVar
1616

1717
from . import metrics
@@ -286,11 +286,26 @@ async def maybe_nack(ack_id: str) -> None:
286286

287287
ack_ids = []
288288

289+
async def ack_or_nack(
290+
message: SubscriberMessage,
291+
ack_queue: 'asyncio.Queue[str]',
292+
nack_queue: Optional['asyncio.Queue[str]'],
293+
ack: bool = False,
294+
) -> None:
295+
if message.force_ack_nack is None:
296+
# if we've not forced the ack status, set it here
297+
message.force_ack_nack = ack
298+
299+
if message.force_ack_nack:
300+
await ack_queue.put(message.ack_id)
301+
elif nack_queue:
302+
await nack_queue.put(message.ack_id)
303+
289304
async def _execute_callback(
290305
message: SubscriberMessage,
291306
callback: ApplicationHandler,
292307
ack_queue: 'asyncio.Queue[str]',
293-
nack_queue: 'Optional[asyncio.Queue[str]]',
308+
nack_queue: Optional['asyncio.Queue[str]'],
294309
insertion_time: float,
295310
) -> None:
296311
try:
@@ -300,18 +315,15 @@ async def _execute_callback(
300315
)
301316
with metrics.CONSUME_LATENCY.labels(phase='runtime').time():
302317
await callback(message)
303-
await ack_queue.put(message.ack_id)
318+
await ack_or_nack(message, ack_queue, nack_queue, ack=True)
304319
metrics.CONSUME.labels(outcome='succeeded').inc()
305-
306320
except asyncio.CancelledError:
307-
if nack_queue:
308-
await nack_queue.put(message.ack_id)
321+
await ack_or_nack(message, ack_queue, nack_queue, ack=False)
309322

310323
log.warning('application callback was cancelled')
311324
metrics.CONSUME.labels(outcome='cancelled').inc()
312325
except Exception as e:
313-
if nack_queue:
314-
await nack_queue.put(message.ack_id)
326+
await ack_or_nack(message, ack_queue, nack_queue, ack=False)
315327

316328
log.warning(
317329
'application callback raised an exception',
@@ -326,7 +338,7 @@ async def consumer( # pylint: disable=too-many-locals
326338
ack_queue: 'asyncio.Queue[str]',
327339
ack_deadline_cache: AckDeadlineCache,
328340
max_tasks: int,
329-
nack_queue: 'Optional[asyncio.Queue[str]]',
341+
nack_queue: Optional['asyncio.Queue[str]'],
330342
) -> None:
331343
try:
332344
semaphore = asyncio.Semaphore(max_tasks)
@@ -450,7 +462,7 @@ async def subscribe(
450462
ack_queue: 'asyncio.Queue[str]' = asyncio.Queue(
451463
maxsize=(max_messages_per_producer * num_producers),
452464
)
453-
nack_queue: 'Optional[asyncio.Queue[str]]' = None
465+
nack_queue: Optional['asyncio.Queue[str]'] = None
454466
ack_deadline_cache = AckDeadlineCache(
455467
subscriber_client,
456468
subscription,

pubsub/gcloud/aio/pubsub/subscriber_message.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ def __init__(
2929
self.attributes = attributes
3030
self.delivery_attempt = delivery_attempt
3131

32+
self.force_ack_nack: bool | None = None
33+
3234
@staticmethod
3335
def from_repr(
3436
received_message: dict[str, Any],
@@ -66,3 +68,24 @@ def to_repr(self) -> dict[str, Any]:
6668
if self.delivery_attempt is not None:
6769
r['deliveryAttempt'] = self.delivery_attempt
6870
return r
71+
72+
def ack(self) -> None:
73+
"""
74+
Forcibly mark a message as acked.
75+
76+
By default, we only ack a message if the callback returns without
77+
raising an exception. If this method has been called on the Message, we
78+
will instead ack it regardless of exception status.
79+
"""
80+
self.force_ack_nack = True
81+
82+
def nack(self) -> None:
83+
"""
84+
Forcibly mark a message as nacked.
85+
86+
By default, we only nack a message if the callback raises an exception.
87+
If this method has been called on the Message, we will instead nack it
88+
regardless of exception status, ie. including if it completes
89+
successfully.
90+
"""
91+
self.force_ack_nack = False

pubsub/tests/unit/subscriber_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def make_message_mock():
2929
mock = MagicMock()
3030
mock.ack_id = 'ack_id'
3131
mock.publish_time.timestamp = MagicMock(return_value=time.time())
32+
mock.force_ack_nack = None
3233
return mock
3334

3435
@pytest.fixture(scope='function')

0 commit comments

Comments
 (0)