From 2d28aae4dd2949d18c77493b65238fde34d797d4 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Thu, 9 Apr 2026 17:02:35 +0000 Subject: [PATCH 1/4] fix potential deadlocks in `SitemapRequestLoader` and `RequestManagerTandem` --- .../_request_manager_tandem.py | 3 +++ .../_sitemap_request_loader.py | 22 ++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/crawlee/request_loaders/_request_manager_tandem.py b/src/crawlee/request_loaders/_request_manager_tandem.py index 4f33d41cf3..3798e2e49a 100644 --- a/src/crawlee/request_loaders/_request_manager_tandem.py +++ b/src/crawlee/request_loaders/_request_manager_tandem.py @@ -89,6 +89,9 @@ async def fetch_next_request(self) -> Request | None: 'Adding request from the RequestLoader to the RequestManager failed, the request has been dropped', extra={'url': request.url, 'unique_key': request.unique_key}, ) + # Mark it as processed so that the `request` doesn't get stuck in the `in_progress` status + # in `RequestLoader` + await self._read_only_loader.mark_request_as_handled(request) return None await self._read_only_loader.mark_request_as_handled(request) diff --git a/src/crawlee/request_loaders/_sitemap_request_loader.py b/src/crawlee/request_loaders/_sitemap_request_loader.py index 06f2c29111..4b6f0a3c8e 100644 --- a/src/crawlee/request_loaders/_sitemap_request_loader.py +++ b/src/crawlee/request_loaders/_sitemap_request_loader.py @@ -160,7 +160,11 @@ def __init__( async def _get_state(self) -> SitemapRequestLoaderState: """Initialize and return the current state.""" + if self._state.is_initialized: + return self._state.current_value + async with self._queue_lock: + # Re-check if state got initialized while waiting for the lock if self._state.is_initialized: return self._state.current_value @@ -260,7 +264,6 @@ async def _load_sitemaps(self) -> None: # Check if we have capacity in the queue await self._queue_has_capacity.wait() - state = await self._get_state() async with self._queue_lock: state.url_queue.append(url) state.current_sitemap_processed_urls.add(url) @@ -305,12 +308,18 @@ async def is_empty(self) -> bool: @override async def is_finished(self) -> bool: """Check if all URLs have been processed.""" + if self._loading_task is None: + raise RuntimeError('SitemapRequestLoader has not been started.') + state = await self._get_state() return not state.url_queue and len(state.in_progress) == 0 and self._loading_task.done() @override async def fetch_next_request(self) -> Request | None: """Fetch the next request to process.""" + if self._loading_task is None: + raise RuntimeError('SitemapRequestLoader has not been started.') + while not (await self.is_finished()): state = await self._get_state() if not state.url_queue: @@ -318,8 +327,16 @@ async def fetch_next_request(self) -> Request | None: continue async with self._queue_lock: + # Double-check if the queue is still empty after acquiring the lock + if not state.url_queue: + continue + url = state.url_queue.popleft() request_option = RequestOptions(url=url) + + if len(state.url_queue) < self._max_buffer_size: + self._queue_has_capacity.set() + if self._transform_request_function: transform_request_option = self._transform_request_function(request_option) if transform_request_option == 'skip': @@ -327,10 +344,9 @@ async def fetch_next_request(self) -> Request | None: continue if transform_request_option != 'unchanged': request_option = transform_request_option + request = Request.from_url(**request_option) state.in_progress.add(request.url) - if len(state.url_queue) < self._max_buffer_size: - self._queue_has_capacity.set() return request From 7442320ef8920bc914465db4a8845631e754e171 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Thu, 9 Apr 2026 17:09:42 +0000 Subject: [PATCH 2/4] fix comment --- src/crawlee/request_loaders/_sitemap_request_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/request_loaders/_sitemap_request_loader.py b/src/crawlee/request_loaders/_sitemap_request_loader.py index 4b6f0a3c8e..334fd815b5 100644 --- a/src/crawlee/request_loaders/_sitemap_request_loader.py +++ b/src/crawlee/request_loaders/_sitemap_request_loader.py @@ -327,7 +327,7 @@ async def fetch_next_request(self) -> Request | None: continue async with self._queue_lock: - # Double-check if the queue is still empty after acquiring the lock + # Double-check if the queue is still not empty after acquiring the lock if not state.url_queue: continue From 5eb3a76adf4c571230f6d4bd464f7d67621dd63c Mon Sep 17 00:00:00 2001 From: Max Bohomolov <34358312+Mantisus@users.noreply.github.com> Date: Thu, 9 Apr 2026 20:23:03 +0300 Subject: [PATCH 3/4] Update src/crawlee/request_loaders/_sitemap_request_loader.py Okay, I agree, those are unnecessary checks. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/crawlee/request_loaders/_sitemap_request_loader.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/crawlee/request_loaders/_sitemap_request_loader.py b/src/crawlee/request_loaders/_sitemap_request_loader.py index 334fd815b5..e1c4c4d2e1 100644 --- a/src/crawlee/request_loaders/_sitemap_request_loader.py +++ b/src/crawlee/request_loaders/_sitemap_request_loader.py @@ -308,18 +308,12 @@ async def is_empty(self) -> bool: @override async def is_finished(self) -> bool: """Check if all URLs have been processed.""" - if self._loading_task is None: - raise RuntimeError('SitemapRequestLoader has not been started.') - state = await self._get_state() return not state.url_queue and len(state.in_progress) == 0 and self._loading_task.done() @override async def fetch_next_request(self) -> Request | None: """Fetch the next request to process.""" - if self._loading_task is None: - raise RuntimeError('SitemapRequestLoader has not been started.') - while not (await self.is_finished()): state = await self._get_state() if not state.url_queue: From c4b12159469817ca11ecef7a9ffae7629c65cea2 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Thu, 9 Apr 2026 17:50:06 +0000 Subject: [PATCH 4/4] add tests --- .../test_sitemap_request_loader.py | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/tests/unit/request_loaders/test_sitemap_request_loader.py b/tests/unit/request_loaders/test_sitemap_request_loader.py index a2e765ac36..8033211157 100644 --- a/tests/unit/request_loaders/test_sitemap_request_loader.py +++ b/tests/unit/request_loaders/test_sitemap_request_loader.py @@ -2,6 +2,7 @@ import base64 import gzip from typing import TYPE_CHECKING +from unittest.mock import patch from yarl import URL @@ -216,3 +217,79 @@ def transform_request(request_options: RequestOptions) -> RequestOptions | Reque 'http://not-exists.com/catalog?item=74&desc=vacation_newfoundland', 'http://not-exists.com/catalog?item=83&desc=vacation_usa', } + + +async def test_transform_request_function_with_skip(server_url: URL, http_client: HttpClient) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + + def transform_request(_request_options: RequestOptions) -> RequestOptions | RequestTransformAction: + return 'skip' + + sitemap_loader = SitemapRequestLoader( + [str(sitemap_url)], + http_client=http_client, + transform_request_function=transform_request, + ) + + while not await sitemap_loader.is_finished(): + request = await sitemap_loader.fetch_next_request() + + if request: + await sitemap_loader.mark_request_as_handled(request) + + # Even though the sitemap had URLs, all were skipped, so the loader should be empty and finished with + # 0 handled requests. + assert await sitemap_loader.is_empty() + assert await sitemap_loader.is_finished() + assert await sitemap_loader.get_total_count() == 0 + assert await sitemap_loader.get_handled_count() == 0 + + +async def test_sitemap_loader_to_tandem( + server_url: URL, + http_client: HttpClient, +) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + + sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client) + request_manager = await sitemap_loader.to_tandem() + + while not await sitemap_loader.is_finished(): + request = await request_manager.fetch_next_request() + + if request: + await request_manager.mark_request_as_handled(request) + + assert await sitemap_loader.is_empty() + assert await sitemap_loader.is_finished() + + assert await request_manager.is_empty() + assert await request_manager.is_finished() + + +async def test_sitemap_loader_to_tandem_with_request_dropped( + server_url: URL, + http_client: HttpClient, +) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + + sitemap_loader = SitemapRequestLoader( + [str(sitemap_url)], + http_client=http_client, + ) + request_manager = await sitemap_loader.to_tandem() + + with patch.object( + request_manager._read_write_manager, 'add_request', side_effect=Exception('Failed to add request') + ): + while not await sitemap_loader.is_finished(): + request = await request_manager.fetch_next_request() + + if request: + await request_manager.mark_request_as_handled(request) + + assert await sitemap_loader.is_empty() + assert await sitemap_loader.is_finished() + + assert await request_manager.is_empty() + assert await request_manager.is_finished()