Skip to content

Commit 3cb59e1

Browse files
author
Anton
committed
fix: semaphore for different python versions
1 parent cffc702 commit 3cb59e1

File tree

3 files changed

+67
-34
lines changed

3 files changed

+67
-34
lines changed

taskiq/receiver/receiver.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
from taskiq.message import TaskiqMessage
1515
from taskiq.receiver.params_parser import parse_params
1616
from taskiq.result import TaskiqResult
17+
from taskiq.semaphore import DequeSemaphore
1718
from taskiq.state import TaskiqState
18-
from taskiq.utils import DequeQueue, DequeSemaphore, maybe_awaitable
19+
from taskiq.utils import DequeQueue, maybe_awaitable
1920

2021
logger = getLogger(__name__)
2122
QUEUE_DONE = b"-1"

taskiq/semaphore.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import asyncio
2+
import sys
3+
from typing import TYPE_CHECKING, Any, Deque, Literal
4+
5+
6+
class DequeSemaphore(asyncio.Semaphore):
7+
"""Deque based Semaphore."""
8+
9+
if TYPE_CHECKING: # noqa: WPS604
10+
_loop: asyncio.BaseEventLoop
11+
_waiters: Deque[Any]
12+
_wakeup_scheduled: bool
13+
_get_loop: Any
14+
15+
if sys.version_info[1] < 10: # noqa: C901, WPS604
16+
17+
async def acquire_first(self) -> Literal[True]:
18+
"""
19+
Acquire as soon as possible. LIFO style.
20+
21+
:param self: DequeSemaphore
22+
:raises BaseException: exception
23+
:return: true
24+
"""
25+
while self._value < 0:
26+
fut = self._loop.create_future()
27+
self._waiters.appendleft(fut)
28+
29+
try:
30+
await fut
31+
except BaseException: # noqa: WPS424
32+
self._value += 1
33+
34+
fut.cancel()
35+
if not self.locked() and not fut.cancelled():
36+
self._wake_up_next()
37+
raise
38+
39+
return True
40+
41+
else:
42+
43+
async def acquire_first(self) -> Literal[True]:
44+
"""
45+
Acquire as soon as possible. LIFO style.
46+
47+
:param self: DequeSemaphore
48+
:raises asyncio.exceptions.CancelledError: exception
49+
:return: true
50+
"""
51+
# _wakeup_scheduled is set if *another* task is scheduled to wakeup
52+
# but its acquire() is not resumed yet
53+
while self._wakeup_scheduled or self._value <= 0:
54+
fut = self._get_loop().create_future()
55+
self._waiters.appendleft(fut)
56+
try:
57+
await fut
58+
# reset _wakeup_scheduled *after* waiting for a future
59+
self._wakeup_scheduled = False
60+
except asyncio.exceptions.CancelledError:
61+
self._wake_up_next()
62+
raise
63+
64+
self._value -= 1
65+
return True

taskiq/utils.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
import inspect
33
from typing import TYPE_CHECKING, Any, Coroutine, Deque, Generic, TypeVar, Union
44

5-
from typing_extensions import Literal
6-
75
_T = TypeVar("_T") # noqa: WPS111
86

97

@@ -40,37 +38,6 @@ def remove_suffix(text: str, suffix: str) -> str:
4038
return text
4139

4240

43-
class DequeSemaphore(asyncio.Semaphore):
44-
"""Deque based Semaphore."""
45-
46-
if TYPE_CHECKING: # noqa: WPS604
47-
_loop: asyncio.BaseEventLoop
48-
49-
async def acquire_first(self) -> Literal[True]:
50-
"""
51-
Acquire as soon as possible. LIFO style.
52-
53-
:raises BaseException: exception
54-
:return: true
55-
"""
56-
self._value -= 1
57-
58-
while self._value < 0:
59-
fut: asyncio.Future[Any] = self._loop.create_future()
60-
self._waiters.appendleft(fut)
61-
try:
62-
await fut
63-
except BaseException: # noqa: WPS424
64-
self._value += 1
65-
66-
fut.cancel()
67-
if not self.locked() and not fut.cancelled():
68-
self._wake_up_next()
69-
raise
70-
71-
return True
72-
73-
7441
class DequeQueue(
7542
asyncio.Queue, # type: ignore
7643
Generic[_T],

0 commit comments

Comments
 (0)