|
13 | 13 | from redis.asyncio.sentinel import Sentinel |
14 | 14 | from redis.exceptions import RedisError, WatchError |
15 | 15 |
|
16 | | -from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix |
| 16 | +from .constants import ( |
| 17 | + default_queue_name, |
| 18 | + expires_extra_ms, |
| 19 | + job_key_prefix, |
| 20 | + job_message_id_prefix, |
| 21 | + result_key_prefix, |
| 22 | + stream_key_suffix, |
| 23 | +) |
17 | 24 | from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job |
| 25 | +from .lua import publish_job_lua |
18 | 26 | from .utils import timestamp_ms, to_ms, to_unix_ms |
19 | 27 |
|
20 | 28 | logger = logging.getLogger('arq.connections') |
@@ -165,20 +173,63 @@ async def enqueue_job( |
165 | 173 | elif defer_by_ms: |
166 | 174 | score = enqueue_time_ms + defer_by_ms |
167 | 175 | else: |
168 | | - score = enqueue_time_ms |
| 176 | + score = None |
169 | 177 |
|
170 | | - expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms |
| 178 | + expires_ms = expires_ms or (score or enqueue_time_ms) - enqueue_time_ms + self.expires_extra_ms |
171 | 179 |
|
172 | | - job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) |
| 180 | + job = serialize_job( |
| 181 | + function, |
| 182 | + args, |
| 183 | + kwargs, |
| 184 | + _job_try, |
| 185 | + enqueue_time_ms, |
| 186 | + serializer=self.job_serializer, |
| 187 | + ) |
173 | 188 | pipe.multi() |
174 | 189 | pipe.psetex(job_key, expires_ms, job) |
175 | | - pipe.zadd(_queue_name, {job_id: score}) |
| 190 | + |
| 191 | + if score is not None: |
| 192 | + pipe.zadd(_queue_name, {job_id: score}) |
| 193 | + else: |
| 194 | + stream_key = _queue_name + stream_key_suffix |
| 195 | + job_message_id_key = job_message_id_prefix + job_id |
| 196 | + pipe.eval( |
| 197 | + publish_job_lua, |
| 198 | + 2, |
| 199 | + # keys |
| 200 | + stream_key, |
| 201 | + job_message_id_key, |
| 202 | + # args |
| 203 | + job_id, |
| 204 | + str(enqueue_time_ms), |
| 205 | + str(expires_ms), |
| 206 | + ) |
| 207 | + |
176 | 208 | try: |
177 | 209 | await pipe.execute() |
178 | 210 | except WatchError: |
179 | 211 | # job got enqueued since we checked 'job_exists' |
180 | 212 | return None |
181 | | - return Job(job_id, redis=self, _queue_name=_queue_name, _deserializer=self.job_deserializer) |
| 213 | + return Job( |
| 214 | + job_id, |
| 215 | + redis=self, |
| 216 | + _queue_name=_queue_name, |
| 217 | + _deserializer=self.job_deserializer, |
| 218 | + ) |
| 219 | + |
| 220 | + async def get_queue_size(self, queue_name: str | None = None, include_delayed_tasks: bool = True) -> int: |
| 221 | + if queue_name is None: |
| 222 | + queue_name = self.default_queue_name |
| 223 | + |
| 224 | + async with self.pipeline(transaction=True) as pipe: |
| 225 | + pipe.xlen(queue_name + stream_key_suffix) |
| 226 | + pipe.zcount(queue_name, '-inf', '+inf') |
| 227 | + stream_size, delayed_queue_size = await pipe.execute() |
| 228 | + |
| 229 | + if not include_delayed_tasks: |
| 230 | + return stream_size |
| 231 | + |
| 232 | + return stream_size + delayed_queue_size |
182 | 233 |
|
183 | 234 | async def _get_job_result(self, key: bytes) -> JobResult: |
184 | 235 | job_id = key[len(result_key_prefix) :].decode() |
@@ -213,7 +264,16 @@ async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef] |
213 | 264 | """ |
214 | 265 | if queue_name is None: |
215 | 266 | queue_name = self.default_queue_name |
216 | | - jobs = await self.zrange(queue_name, withscores=True, start=0, end=-1) |
| 267 | + |
| 268 | + async with self.pipeline(transaction=True) as pipe: |
| 269 | + pipe.zrange(queue_name, withscores=True, start=0, end=-1) |
| 270 | + pipe.xrange(queue_name + stream_key_suffix, '-', '+') |
| 271 | + delayed_jobs, stream_jobs = await pipe.execute() |
| 272 | + |
| 273 | + jobs = [ |
| 274 | + *delayed_jobs, |
| 275 | + *[(j[b'job_id'], int(j[b'score'])) for _, j in stream_jobs], |
| 276 | + ] |
217 | 277 | return await asyncio.gather(*[self._get_job_def(job_id, int(score)) for job_id, score in jobs]) |
218 | 278 |
|
219 | 279 |
|
|
0 commit comments