22import sys
33import traceback
44from datetime import datetime
5- from functools import total_ordering
65from typing import Dict , List , Optional , Tuple , Union , Self , Any
76
87from redis import WatchError
@@ -56,7 +55,6 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
5655_job_stack = []
5756
5857
59- @total_ordering
6058class Queue :
6159 REGISTRIES = dict (
6260 finished = "finished_job_registry" ,
@@ -146,17 +144,6 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
146144 def first_queued_job_name (self ) -> Optional [str ]:
147145 return self .queued_job_registry .get_first ()
148146
149- def empty (self ):
150- """Removes all queued jobs from the queue."""
151- queued_jobs_count = self .queued_job_registry .count (connection = self .connection )
152- with self .connection .pipeline () as pipe :
153- for offset in range (0 , queued_jobs_count , 1000 ):
154- job_names = self .queued_job_registry .all (offset , 1000 )
155- for job_name in job_names :
156- self .queued_job_registry .delete (connection = pipe , job_name = job_name )
157- JobModel .delete_many (job_names , connection = pipe )
158- pipe .execute ()
159-
160147 @property
161148 def count (self ) -> int :
162149 """Returns a count of all messages in the queue."""
@@ -186,24 +173,24 @@ def get_all_jobs(self) -> List[JobModel]:
186173 return JobModel .get_many (job_names , connection = self .connection )
187174
188175 def create_and_enqueue_job (
189- self ,
190- func : FunctionReferenceType ,
191- args : Union [Tuple , List , None ] = None ,
192- kwargs : Optional [Dict ] = None ,
193- when : Optional [datetime ] = None ,
194- timeout : Optional [int ] = None ,
195- result_ttl : Optional [int ] = None ,
196- job_info_ttl : Optional [int ] = None ,
197- description : Optional [str ] = None ,
198- name : Optional [str ] = None ,
199- at_front : bool = False ,
200- meta : Optional [Dict ] = None ,
201- on_success : Optional [Callback ] = None ,
202- on_failure : Optional [Callback ] = None ,
203- on_stopped : Optional [Callback ] = None ,
204- task_type : Optional [str ] = None ,
205- scheduled_task_id : Optional [int ] = None ,
206- pipeline : Optional [ConnectionType ] = None ,
176+ self ,
177+ func : FunctionReferenceType ,
178+ args : Union [Tuple , List , None ] = None ,
179+ kwargs : Optional [Dict ] = None ,
180+ when : Optional [datetime ] = None ,
181+ timeout : Optional [int ] = None ,
182+ result_ttl : Optional [int ] = None ,
183+ job_info_ttl : Optional [int ] = None ,
184+ description : Optional [str ] = None ,
185+ name : Optional [str ] = None ,
186+ at_front : bool = False ,
187+ meta : Optional [Dict ] = None ,
188+ on_success : Optional [Callback ] = None ,
189+ on_failure : Optional [Callback ] = None ,
190+ on_stopped : Optional [Callback ] = None ,
191+ task_type : Optional [str ] = None ,
192+ scheduled_task_id : Optional [int ] = None ,
193+ pipeline : Optional [ConnectionType ] = None ,
207194 ) -> JobModel :
208195 """Creates a job to represent the delayed function call and enqueues it.
209196 :param when: When to schedule the job (None to enqueue immediately)
@@ -309,43 +296,6 @@ def run_job(self, job: JobModel) -> JobModel:
309296 pipeline .execute ()
310297 return job
311298
312- def enqueue_job (
313- self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
314- ) -> JobModel :
315- """Enqueues a job for delayed execution without checking dependencies.
316-
317- If Queue is instantiated with is_async=False, job is executed immediately.
318- :param job_model: The job redis model
319- :param connection: The Redis Pipeline
320- :param at_front: Whether to enqueue the job at the front
321-
322- :returns: The enqueued JobModel
323- """
324-
325- pipe = connection if connection is not None else self .connection .pipeline ()
326-
327- # Add Queue key set
328- job_model .status = JobStatus .QUEUED
329- job_model .enqueued_at = utcnow ()
330- job_model .save (connection = pipe )
331-
332- if self ._is_async :
333- if at_front :
334- score = current_timestamp ()
335- else :
336- score = self .queued_job_registry .get_last_timestamp () or current_timestamp ()
337- self .scheduled_job_registry .delete (connection = pipe , job_name = job_model .name )
338- self .queued_job_registry .add (connection = pipe , score = score , job_name = job_model .name )
339- pipe .execute ()
340- logger .debug (f"Pushed job { job_model .name } into { self .name } queued-jobs registry" )
341- else : # sync mode
342- pipe .execute ()
343- job_model = self .run_sync (job_model )
344- job_model .expire (ttl = job_model .job_info_ttl , connection = pipe )
345- pipe .execute ()
346-
347- return job_model
348-
349299 def run_sync (self , job : JobModel ) -> JobModel :
350300 """Run a job synchronously, meaning on the same process the method was called."""
351301 job .prepare_for_execution ("sync" , self .active_job_registry , self .connection )
@@ -361,10 +311,10 @@ def run_sync(self, job: JobModel) -> JobModel:
361311
362312 @classmethod
363313 def dequeue_any (
364- cls ,
365- queues : List [Self ],
366- timeout : Optional [int ],
367- connection : Optional [ConnectionType ] = None ,
314+ cls ,
315+ queues : List [Self ],
316+ timeout : Optional [int ],
317+ connection : Optional [ConnectionType ] = None ,
368318 ) -> Tuple [Optional [JobModel ], Optional [Self ]]:
369319 """Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
370320 is important.
@@ -398,19 +348,6 @@ def dequeue_any(
398348 return job , queue
399349 return None , None
400350
401- def __eq__ (self , other : Self ) -> bool :
402- if not isinstance (other , Queue ):
403- raise TypeError ("Cannot compare queues to other objects" )
404- return self .name == other .name
405-
406- def __lt__ (self , other : Self ) -> bool :
407- if not isinstance (other , Queue ):
408- raise TypeError ("Cannot compare queues to other objects" )
409- return self .name < other .name
410-
411- def __hash__ (self ) -> int :
412- return hash (self .name )
413-
414351 def __repr__ (self ) -> str :
415352 return f"{ self .__class__ .__name__ } ({ self .name !r} )"
416353
@@ -479,17 +416,39 @@ def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
479416 except WatchError :
480417 pass
481418
482- def requeue_jobs (self , * job_names : str , at_front : bool = False ) -> int :
483- jobs = JobModel .get_many (job_names , connection = self .connection )
484- jobs_requeued = 0
485- with self .connection .pipeline () as pipe :
486- for job in jobs :
487- if job is None :
488- continue
489- job .started_at = None
490- job .ended_at = None
491- job .save (connection = pipe )
492- self .enqueue_job (job , connection = pipe , at_front = at_front )
493- jobs_requeued += 1
419+ def enqueue_job (
420+ self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
421+ ) -> JobModel :
422+ """Enqueues a job for delayed execution without checking dependencies.
423+
424+ If Queue is instantiated with is_async=False, job is executed immediately.
425+ :param job_model: The job redis model
426+ :param connection: The Redis Pipeline
427+ :param at_front: Whether to enqueue the job at the front
428+
429+ :returns: The enqueued JobModel
430+ """
431+
432+ pipe = connection if connection is not None else self .connection .pipeline ()
433+ job_model .started_at = None
434+ job_model .ended_at = None
435+ job_model .status = JobStatus .QUEUED
436+ job_model .enqueued_at = utcnow ()
437+ job_model .save (connection = pipe )
438+
439+ if self ._is_async :
440+ if at_front :
441+ score = current_timestamp ()
442+ else :
443+ score = self .queued_job_registry .get_last_timestamp () or current_timestamp ()
444+ self .scheduled_job_registry .delete (connection = pipe , job_name = job_model .name )
445+ self .queued_job_registry .add (connection = pipe , score = score , job_name = job_model .name )
494446 pipe .execute ()
495- return jobs_requeued
447+ logger .debug (f"Pushed job { job_model .name } into { self .name } queued-jobs registry" )
448+ else : # sync mode
449+ pipe .execute ()
450+ job_model = self .run_sync (job_model )
451+ job_model .expire (ttl = job_model .job_info_ttl , connection = pipe )
452+ pipe .execute ()
453+
454+ return job_model
0 commit comments