-
Notifications
You must be signed in to change notification settings - Fork 262
Fix: Handle dead consumer threads in Google Cloud Functions environment #1473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
7893a44
24638f7
9a3506c
c2180e0
2670c3f
d4dbf43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,5 +1,6 @@ | ||||||
| import logging | ||||||
| import threading | ||||||
| import time | ||||||
|
|
||||||
| from .media_manager import MediaManager | ||||||
|
|
||||||
|
|
@@ -25,6 +26,8 @@ def __init__( | |||||
| # run() *after* we set it to False in pause... and keep running | ||||||
| # forever. | ||||||
| self.running = True | ||||||
| # Track when thread last processed something | ||||||
| self.last_activity = time.time() | ||||||
| self._identifier = identifier | ||||||
| self._media_manager = media_manager | ||||||
|
|
||||||
|
|
@@ -34,11 +37,33 @@ def run(self) -> None: | |||||
| f"Thread: Media upload consumer thread #{self._identifier} started and actively processing queue items" | ||||||
| ) | ||||||
| while self.running: | ||||||
| self._media_manager.process_next_media_upload() | ||||||
| try: | ||||||
| # Update activity timestamp before processing | ||||||
| self.last_activity = time.time() | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updating
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an upload takes long, I think we'd rather have a signature of when the task starts so that it serves the purpose of knowing the thread |
||||||
| self._media_manager.process_next_media_upload() | ||||||
| # Update after successful processing | ||||||
| self.last_activity = time.time() | ||||||
| except Exception as e: | ||||||
| self._log.error( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use
Suggested change
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done 9a3506c |
||||||
| f"Thread #{self._identifier}: Unexpected error in consumer loop: {e}" | ||||||
| ) | ||||||
| # Continue running despite errors | ||||||
| time.sleep(0.1) | ||||||
|
|
||||||
| def pause(self) -> None: | ||||||
| """Pause the media upload consumer.""" | ||||||
| self._log.debug( | ||||||
| f"Thread: Pausing media upload consumer thread #{self._identifier}" | ||||||
| ) | ||||||
| self.running = False | ||||||
|
|
||||||
| def is_healthy(self, timeout_seconds: float = 5.0) -> bool: | ||||||
| """ | ||||||
| Check if thread is healthy and recently active. | ||||||
| Returns False if thread hasn't processed anything in timeout_seconds. | ||||||
| """ | ||||||
| if not self.is_alive(): | ||||||
| return False | ||||||
|
|
||||||
| time_since_activity = time.time() - self.last_activity | ||||||
| return time_since_activity < timeout_seconds | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid magic numbers: consider extracting the 5.0s health check and 30s timeout into configurable constants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done 9a3506c