diff --git a/actionpack/action.py b/actionpack/action.py index 83b757e..2d5601e 100644 --- a/actionpack/action.py +++ b/actionpack/action.py @@ -1,4 +1,5 @@ from __future__ import annotations +import asyncio from functools import partialmethod from oslash import Left from oslash import Right @@ -110,6 +111,13 @@ def perform( ) -> Result[Outcome]: return self._perform(should_raise, timestamp_provider) + async def aperform( + self, + should_raise: bool = False, + timestamp_provider: Callable[[], int] = microsecond_timestamp + ) -> Result[Outcome]: + return self._perform(should_raise, timestamp_provider) + def validate(self): return self diff --git a/actionpack/procedure.py b/actionpack/procedure.py index 6294801..531a86f 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -1,3 +1,6 @@ +import asyncio +import functools +import logging from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from functools import reduce @@ -13,6 +16,8 @@ from actionpack.action import Result from actionpack import Action +logger = logging.getLogger(__name__) + class Procedure(Generic[Name, Outcome]): @@ -31,6 +36,25 @@ def validate(self): raise Procedure.NotAnAction(msg) return self + async def aio_gen( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + actions = [] + # We only create coroutines here -- we're not running them until the `gather`. + for action in self.actions: + actions.append(action.aperform(should_raise=should_raise)) + + return await asyncio.gather(*actions, return_exceptions=not(should_raise)) + + async def aio_execute( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + val = await self.aio_gen(should_raise) + logger.debug(f"aio_execute {val}") + return val + def execute( self, max_workers: int = 5, @@ -42,6 +66,10 @@ def execute( if synchronously: for action in actions: yield action.perform(should_raise=should_raise) if should_raise else action.perform() + elif max_workers <= 0: + logger.debug("running asyncio for Procedure") + for t in asyncio.run(self.aio_execute(should_raise)): + yield t else: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(action._perform, should_raise=should_raise): str(action) for action in actions} @@ -99,6 +127,22 @@ def validate(self): raise KeyedProcedure.UnnamedAction(msg) return self + async def aio_gen( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + for action in self.actions: + ret = await action.aperform(should_raise=should_raise) + yield (action.name, ret) + + async def aio_execute( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + val = [a async for a in self.aio_gen(should_raise)] + logger.debug(f"aio_execute {val}") + return val + def execute( self, max_workers: int = 5, @@ -109,6 +153,10 @@ def execute( for action in self: yield (action.name, action.perform(should_raise=should_raise)) \ if should_raise else (action.name, action.perform()) + elif max_workers <= 0: + logger.debug("running asyncio for KeyedProcedure") + for t in asyncio.run(self.aio_execute(should_raise)): + yield t else: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(action._perform, should_raise=should_raise): action for action in self} diff --git a/noxfile.py b/noxfile.py index 801868c..ca5240a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -24,6 +24,7 @@ '3.8', '3.9', '3.10', + '3.11', ] nox.options.default_venv_backend = 'none' if not USEVENV else USEVENV diff --git a/requirements.txt b/requirements.txt index b675cee..5bbae2a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ coverage==5.5 marshmallow==3.10.0 requests==2.25.1 +asyncio==3.4.3 + diff --git a/tests/actionpack/test_procedure.py b/tests/actionpack/test_procedure.py index 37afb82..65804f0 100644 --- a/tests/actionpack/test_procedure.py +++ b/tests/actionpack/test_procedure.py @@ -113,6 +113,25 @@ def test_can_execute_Procedure_asynchronously(self): # NOTE: the wellwish precedes since the question took longer self.assertEqual(file.read(), wellwish + question) + def test_can_execute_Procedure_asyncio(self): + file = FakeFile() + + question = b' How are you?' + wellwish = b' I hope you\'re well.' + + action1 = FakeWrite[str, int](file, question, delay=0.2) + action2 = FakeWrite[str, int](file, wellwish, delay=0.1) + + procedure = Procedure[str, int]((action1, action2)) + results = procedure.execute(max_workers=0, should_raise=True, synchronously=False) + + assertIsIterable(results) + self.assertIsInstance(next(results), Result) + self.assertIsInstance(next(results), Result) + + # NOTE: when running with asyncio the question preceeds the wellwish despite the question taking longer + self.assertEqual(file.read(), question + wellwish) + class KeyedProcedureTest(TestCase): @@ -149,6 +168,14 @@ def test_can_execute_asynchronously(self): self.assertIn('success', results.keys()) self.assertIn('failure', results.keys()) + def test_can_execute_asyncio(self): + results = KeyedProcedure((success, failure)).execute(max_workers=0, synchronously=False) + + assertIsIterable(results) + results = dict(results) + self.assertIn('success', results.keys()) + self.assertIn('failure', results.keys()) + def test_can_create_KeyedProcedure_from_Actions_named_using_any_scriptable_type(self): action1 = FakeAction[int, str]() action2 = FakeAction[bool, str](instruction_provider=raise_failure)