|
14 | 14 | from taskiq.message import TaskiqMessage |
15 | 15 | from taskiq.receiver.params_parser import parse_params |
16 | 16 | from taskiq.result import TaskiqResult |
17 | | -from taskiq.semaphore import DequeSemaphore |
18 | 17 | from taskiq.state import TaskiqState |
19 | 18 | from taskiq.utils import DequeQueue, maybe_awaitable |
20 | 19 |
|
@@ -59,15 +58,15 @@ def __init__( # noqa: WPS211 |
59 | 58 | self.task_signatures[task.task_name] = inspect.signature(task.original_func) |
60 | 59 | self.task_hints[task.task_name] = get_type_hints(task.original_func) |
61 | 60 | self.dependency_graphs[task.task_name] = DependencyGraph(task.original_func) |
62 | | - self.sem: "Optional[DequeSemaphore]" = None |
| 61 | + self.sem: "Optional[asyncio.Semaphore]" = None |
63 | 62 | if max_async_tasks is not None and max_async_tasks > 0: |
64 | | - self.sem = DequeSemaphore(max_async_tasks) |
| 63 | + self.sem = asyncio.Semaphore(max_async_tasks) |
65 | 64 | else: |
66 | 65 | logger.warning( |
67 | 66 | "Setting unlimited number of async tasks " |
68 | 67 | + "can result in undefined behavior", |
69 | 68 | ) |
70 | | - self.sem_prefetch = DequeSemaphore(max_prefetch) |
| 69 | + self.sem_prefetch = asyncio.Semaphore(max_prefetch) |
71 | 70 | self.queue: DequeQueue[bytes] = DequeQueue() |
72 | 71 |
|
73 | 72 | self.sem_idle: Optional[asyncio.Semaphore] = None |
@@ -310,7 +309,7 @@ def task_cb(task: "asyncio.Task[Any]") -> None: |
310 | 309 | break |
311 | 310 | if message is QUEUE_SKIP: |
312 | 311 | # Decrease max_prefetch |
313 | | - prefetch_dec = asyncio.create_task(self.sem_prefetch.acquire_first()) |
| 312 | + prefetch_dec = asyncio.create_task(self.sem_prefetch.acquire()) |
314 | 313 | prefetch_dec.add_done_callback(tasks.discard) |
315 | 314 | tasks.add(prefetch_dec) |
316 | 315 |
|
@@ -357,5 +356,5 @@ async def task_idler(self, wait: float) -> None: |
357 | 356 | # Decrease max_prefetch in runner |
358 | 357 | task = asyncio.create_task(self.queue.put_first(QUEUE_SKIP)) |
359 | 358 | # Decrease max_tasks |
360 | | - await self.sem.acquire_first() |
| 359 | + await self.sem.acquire() |
361 | 360 | await task |
0 commit comments