44
55from datetime import datetime , timedelta , timezone
66from logging import getLogger
7- from typing import TYPE_CHECKING , cast
7+ from operator import attrgetter
8+ from typing import TYPE_CHECKING , TypeVar , cast
89
910import psutil
10-
11- from crawlee ._autoscaling .types import ClientSnapshot , CpuSnapshot , EventLoopSnapshot , MemorySnapshot , Snapshot
11+ from sortedcontainers import SortedList
12+
13+ from crawlee ._autoscaling .types import (
14+ ClientSnapshot ,
15+ CpuSnapshot ,
16+ EventLoopSnapshot ,
17+ MemorySnapshot ,
18+ Snapshot ,
19+ )
1220from crawlee ._utils .byte_size import ByteSize
1321from crawlee ._utils .docs import docs_group
1422from crawlee ._utils .recurring_task import RecurringTask
2129
2230logger = getLogger (__name__ )
2331
32+ T = TypeVar ('T' )
33+
2434
2535@docs_group ('Classes' )
2636class Snapshotter :
@@ -94,16 +104,20 @@ def __init__(
94104 cast (float , available_memory_ratio )
95105 )
96106
97- self ._cpu_snapshots : list [CpuSnapshot ] = []
98- self ._event_loop_snapshots : list [EventLoopSnapshot ] = []
99- self ._memory_snapshots : list [MemorySnapshot ] = []
100- self ._client_snapshots : list [ClientSnapshot ] = []
107+ self ._cpu_snapshots = self . _get_sorted_list_by_created_at ( list [CpuSnapshot ]())
108+ self ._event_loop_snapshots = self . _get_sorted_list_by_created_at ( list [EventLoopSnapshot ]())
109+ self ._memory_snapshots = self . _get_sorted_list_by_created_at ( list [MemorySnapshot ]())
110+ self ._client_snapshots = self . _get_sorted_list_by_created_at ( list [ClientSnapshot ]())
101111
102112 self ._snapshot_event_loop_task = RecurringTask (self ._snapshot_event_loop , self ._event_loop_snapshot_interval )
103113 self ._snapshot_client_task = RecurringTask (self ._snapshot_client , self ._client_snapshot_interval )
104114
105115 self ._timestamp_of_last_memory_warning : datetime = datetime .now (timezone .utc ) - timedelta (hours = 1 )
106116
117+ @staticmethod
118+ def _get_sorted_list_by_created_at (input_list : list [T ]) -> SortedList [T ]:
119+ return SortedList (input_list , key = attrgetter ('created_at' ))
120+
107121 @staticmethod
108122 def _get_default_max_memory_size (available_memory_ratio : float ) -> ByteSize :
109123 """Default `memory_max_size` is 1/4 of the total system memory."""
@@ -194,7 +208,7 @@ def _get_sample(snapshots: list[Snapshot], duration: timedelta | None = None) ->
194208 return []
195209
196210 latest_time = snapshots [- 1 ].created_at
197- return [snapshot for snapshot in reversed ( snapshots ) if latest_time - snapshot .created_at <= duration ]
211+ return [snapshot for snapshot in snapshots if latest_time - snapshot .created_at <= duration ]
198212
199213 def _snapshot_cpu (self , event_data : EventSystemInfoData ) -> None :
200214 """Captures a snapshot of the current CPU usage.
@@ -213,7 +227,7 @@ def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
213227
214228 snapshots = cast (list [Snapshot ], self ._cpu_snapshots )
215229 self ._prune_snapshots (snapshots , event_data .cpu_info .created_at )
216- self ._cpu_snapshots .append (snapshot )
230+ self ._cpu_snapshots .add (snapshot )
217231
218232 def _snapshot_memory (self , event_data : EventSystemInfoData ) -> None :
219233 """Captures a snapshot of the current memory usage.
@@ -233,8 +247,7 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
233247
234248 snapshots = cast (list [Snapshot ], self ._memory_snapshots )
235249 self ._prune_snapshots (snapshots , snapshot .created_at )
236- self ._memory_snapshots .append (snapshot )
237-
250+ self ._memory_snapshots .add (snapshot )
238251 self ._evaluate_memory_load (event_data .memory_info .current_size , event_data .memory_info .created_at )
239252
240253 def _snapshot_event_loop (self ) -> None :
@@ -254,7 +267,7 @@ def _snapshot_event_loop(self) -> None:
254267
255268 snapshots = cast (list [Snapshot ], self ._event_loop_snapshots )
256269 self ._prune_snapshots (snapshots , snapshot .created_at )
257- self ._event_loop_snapshots .append (snapshot )
270+ self ._event_loop_snapshots .add (snapshot )
258271
259272 def _snapshot_client (self ) -> None :
260273 """Captures a snapshot of the current API state by checking for rate limit errors (HTTP 429).
@@ -271,7 +284,7 @@ def _snapshot_client(self) -> None:
271284
272285 snapshots = cast (list [Snapshot ], self ._client_snapshots )
273286 self ._prune_snapshots (snapshots , snapshot .created_at )
274- self ._client_snapshots .append (snapshot )
287+ self ._client_snapshots .add (snapshot )
275288
276289 def _prune_snapshots (self , snapshots : list [Snapshot ], now : datetime ) -> None :
277290 """Removes snapshots that are older than the `self._snapshot_history`.
0 commit comments