Skip to content

Commit 974a67c

Browse files
committed
Change executor to thread pool executor
1 parent 5af19fc commit 974a67c

File tree

23 files changed

+838
-514
lines changed

23 files changed

+838
-514
lines changed
Lines changed: 26 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,117 +1,49 @@
1-
from queue import Queue
2-
from typing import List, Dict
3-
from wrapt import synchronized
4-
from abc import abstractmethod, ABC
1+
from typing import List
2+
from abc import ABC
3+
from concurrent.futures import ThreadPoolExecutor
54

65
from investing_algorithm_framework.core.workers import Worker
7-
from investing_algorithm_framework.core.exceptions import OperationalException
8-
from investing_algorithm_framework.core.utils import StoppableThread
96
from investing_algorithm_framework.core.events.observer import Observer
107
from investing_algorithm_framework.core.events.observable import Observable
118
from investing_algorithm_framework.configuration.config_constants import \
129
DEFAULT_MAX_WORKERS
1310

1411

15-
class Executor(Observable, Observer, ABC):
12+
class Executor(Observable, ABC):
1613
"""
17-
Executor class: functions as an abstract class that will handle the
14+
Executor class: functions as a thread executor that will handle the
1815
executions of workers in asynchronous order.
16+
17+
It will make use of the concurrent library for execution of the workers.
18+
Also the executor functions as an observable instance.
1919
"""
2020

21-
def __init__(self, max_workers: int = DEFAULT_MAX_WORKERS):
21+
def __init__(
22+
self,
23+
workers: List[Worker],
24+
max_concurrent_workers: int = DEFAULT_MAX_WORKERS
25+
) -> None:
2226
super(Executor, self).__init__()
2327

24-
self._max_workers = max_workers
25-
self._pending_workers: Queue = None
26-
self._running_threads: Dict[Worker, StoppableThread] = {}
28+
self._workers = workers
29+
self._max_concurrent_workers = max_concurrent_workers
2730

2831
def start(self) -> None:
2932
"""
30-
Main entry for the executor.
31-
"""
32-
33-
self._initialize()
34-
self.run_jobs()
35-
36-
def stop(self) -> None:
37-
"""
38-
Function that will stop all running workers.
39-
"""
40-
41-
for worker in self._running_threads:
42-
self.stop_running_worker(worker)
43-
44-
self.clean_up()
45-
46-
def clean_up(self):
47-
"""
48-
Clean ups the resources.
49-
"""
50-
51-
self._pending_workers: Queue = None
52-
self._running_threads: Dict[Worker, StoppableThread] = {}
53-
54-
def _initialize(self):
55-
"""
56-
Functions that initializes the pending workers.
57-
"""
58-
59-
workers = self.create_workers()
60-
61-
if not workers or len(workers) == 0:
62-
raise OperationalException("There where no workers initialized "
63-
"for the executor instance")
64-
65-
self._pending_workers = Queue()
66-
67-
for worker in workers:
68-
self._pending_workers.put(worker)
69-
70-
@abstractmethod
71-
def create_workers(self) -> List[Worker]:
72-
"""
73-
Abstract function that will create the workers.
74-
"""
75-
pass
33+
Main entry for the executor. The executor creates a ThreadPoolExecutor
34+
with the given amount of max_workers.
7635
77-
def run_jobs(self) -> None:
36+
It will then pass all the workers to the ThreadPoolExecutor. When
37+
finished it will update all its observers
7838
"""
79-
Function that will start all the workers.
80-
"""
81-
82-
worker_iteration = self._max_workers - len(
83-
self._running_threads.keys()
84-
)
8539

86-
while worker_iteration > 0 and not self._pending_workers.empty():
87-
worker = self._pending_workers.get()
88-
worker_iteration -= 1
89-
thread = StoppableThread(target=worker.start)
90-
worker.add_observer(self)
91-
self._running_threads[worker] = thread
92-
thread.start()
40+
with ThreadPoolExecutor(max_workers=self._max_concurrent_workers) as \
41+
executor:
9342

94-
@synchronized
95-
def update(self, observable, **kwargs) -> None:
96-
"""
97-
Observer implementation.
98-
"""
43+
for worker in self.workers:
44+
executor.submit(worker.start)
9945

100-
if observable in self._running_threads:
101-
del self._running_threads[observable]
102-
103-
if not self.processing:
104-
self.notify_observers()
105-
else:
106-
self.run_jobs()
107-
108-
def stop_running_worker(self, worker: Worker) -> None:
109-
"""
110-
Function that will stop a running worker.
111-
"""
112-
113-
thread = self._running_threads[worker]
114-
thread.kill()
46+
self.notify_observers()
11547

11648
def add_observer(self, observer: Observer) -> None:
11749
super(Executor, self).add_observer(observer)
@@ -120,12 +52,5 @@ def remove_observer(self, observer: Observer) -> None:
12052
super(Executor, self).remove_observer(observer)
12153

12254
@property
123-
def processing(self) -> bool:
124-
"""
125-
Property that will show if the executor is running.
126-
"""
127-
128-
return (self._pending_workers is not None
129-
and not self._pending_workers.empty()) or \
130-
(self._running_threads is not None
131-
and len(self._running_threads.keys()) > 0)
55+
def workers(self) -> List[Worker]:
56+
return self._workers

investing_algorithm_framework/core/state/state.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66

77
class State(ABC):
88
"""
9-
Represents a state of the Bot, these state are use by the BotContext.
10-
Each implemented state represents a work mode for the
11-
investing_algorithm_framework.
9+
Represents a state of the context, these state are use by the Context.
10+
Each implemented state represents a work mode for a application created
11+
with investing_algorithm_framework.
1212
"""
1313

1414
# Transition state for the next BotState
@@ -24,7 +24,7 @@ def __init__(self, context) -> None:
2424
def start(self):
2525

2626
# Will stop the state if pre-conditions are not met
27-
if not self.validate_state():
27+
if not self.validate_state(pre_state=True):
2828
return
2929

3030
while True:
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from investing_algorithm_framework.core.workers.worker import Worker
22
from investing_algorithm_framework.core.workers.scheduled_worker \
33
import ScheduledWorker
4+
from investing_algorithm_framework.core.workers.relational_worker import \
5+
RelationalWorker
46

5-
__all__ = ['Worker', 'ScheduledWorker']
7+
__all__ = ['Worker', 'ScheduledWorker', 'RelationalWorker']
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from abc import ABC
2+
from typing import Dict, Any
3+
4+
from investing_algorithm_framework.core.workers.worker import Worker
5+
from investing_algorithm_framework.core.exceptions import OperationalException
6+
7+
8+
class RelationalWorker(Worker, ABC):
9+
"""
10+
RelationalWorker will start after it's relational worker has run.
11+
12+
It will check if the related worked had run, and if this is
13+
true it will start itself. Use this worker if you want to create
14+
chains of workers that are depended on each other.
15+
"""
16+
run_after: Worker
17+
18+
def start(self, **kwargs: Dict[str, Any]) -> None:
19+
20+
# Only run if the last time this worker stared is before
21+
# the last time the 'run_after' worker had finished.
22+
if not self.run_after:
23+
raise OperationalException(
24+
'The run_after worker is not set, make sure you set this '
25+
'attribute to let the RelationalWorker run properly.'
26+
)
27+
28+
if self.last_run is not None:
29+
30+
if self.run_after.last_run is None:
31+
raise OperationalException(
32+
"Relational Worker has run before 'run_after' worker."
33+
)
34+
35+
if self.run_after.last_run > self.last_run:
36+
super(RelationalWorker, self).start()
37+
38+
elif self.run_after.last_run is not None:
39+
super(RelationalWorker, self).start()
Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,58 @@
11
from abc import ABC
2+
from datetime import datetime
3+
from typing import Dict, Any
24

35
from investing_algorithm_framework.core.utils import TimeUnit
46
from investing_algorithm_framework.core.workers.worker import Worker
57

68

79
class ScheduledWorker(Worker, ABC):
10+
time_unit: TimeUnit = None
11+
time_interval: int = None
12+
13+
def start(self, **kwargs: Dict[str, Any]) -> None:
14+
15+
# If the worker has never run, run it
16+
if self.last_run is None:
17+
super(ScheduledWorker, self).start()
18+
19+
else:
20+
# Get the current time
21+
elapsed_time = datetime.now() - self.last_run
22+
23+
# Second evaluation
24+
if self.get_time_unit() is TimeUnit.SECOND:
25+
seconds = elapsed_time.total_seconds()
26+
27+
if seconds > self.get_time_interval():
28+
super(ScheduledWorker, self).start()
29+
30+
# Minute evaluation
31+
elif self.get_time_unit() is TimeUnit.MINUTE:
32+
minutes = divmod(elapsed_time.total_seconds(), 60)
33+
34+
if minutes > self.get_time_interval():
35+
super(ScheduledWorker, self).start()
36+
37+
# Hour evaluation
38+
elif self.get_time_unit() is TimeUnit.HOUR:
39+
hours = divmod(elapsed_time.total_seconds(), 3600)
40+
41+
if hours > self.get_time_interval():
42+
super(ScheduledWorker, self).start()
843

944
def get_time_unit(self) -> TimeUnit:
1045
assert getattr(self, 'time_unit', None) is not None, (
1146
"{} should either include a time_unit attribute, or override the "
12-
"`get_time_unit()`, method.".format(self.__class__.__name__)
47+
"`get_time_unit()` method.".format(self.__class__.__name__)
1348
)
1449

1550
return getattr(self, 'time_unit')
1651

1752
def get_time_interval(self) -> int:
1853
assert getattr(self, 'time_interval', None) is not None, (
19-
"{} should either include a time_interval attribute, or "
20-
"override the `get_time_interval()`, "
21-
"method.".format(self.__class__.__name__)
54+
"{} should either include a time_interval attribute, or override "
55+
"the `get_time_interval()` method.".format(self.__class__.__name__)
2256
)
2357

2458
return getattr(self, 'time_interval')
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from investing_algorithm_framework.templates.data_providers.data_provider \
2-
import DataProvider
2+
import DataProvider, RelationalDataProvider, ScheduledDataProvider
33

4-
__all__ = ['DataProvider']
4+
__all__ = ['DataProvider', 'RelationalDataProvider', 'ScheduledDataProvider']
Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,60 @@
1-
import logging
21
from typing import Dict, Any
3-
from abc import abstractmethod
2+
from abc import abstractmethod, ABC
43

5-
from investing_algorithm_framework.core.workers import ScheduledWorker
4+
from investing_algorithm_framework.core.workers import ScheduledWorker, \
5+
Worker, RelationalWorker
66

7-
logger = logging.getLogger(__name__)
7+
8+
class DataProviderInterface:
9+
"""
10+
Class DataProviderInterface: interface for data provider implementation,
11+
this interface can be used to implement a data provider. A client then
12+
knows which method to call when presented with a 'data provider'
13+
"""
14+
@abstractmethod
15+
def provide_data(self) -> None:
16+
pass
817

918

10-
class DataProviderException(Exception):
19+
class DataProvider(DataProviderInterface, Worker, ABC):
1120
"""
12-
Should be raised when an data_provider related error occurs, for
13-
example if an authorization for an API fails,
14-
i.e.: raise DataProviderException('Provided api token is false')
21+
Class DataProvider: makes use of the abstract Worker class and inherits the
22+
interface of of the DataProviderInterface.
23+
24+
This is a Worker instance.
1525
"""
1626

17-
def __init__(self, message: str) -> None:
18-
super().__init__(self)
19-
self.message = message
27+
def work(self, **kwargs: Dict[str, Any]) -> None:
2028

21-
def __str__(self) -> str:
22-
return self.message
29+
# Call DataProviderInterface
30+
self.provide_data()
2331

2432

25-
class DataProvider(ScheduledWorker):
33+
class ScheduledDataProvider(DataProviderInterface, ScheduledWorker, ABC):
2634
"""
27-
Class DataProvider: An entity which responsibility is to provide data
28-
from an external source. Where a data_providers source is defined as
29-
any third party service that provides data e.g cloud storage,
30-
REST API, or website.
35+
Class ScheduledDataProvider: makes use of the abstract ScheduledWorker
36+
class and inherits the interface of of the DataProviderInterface.
3137
32-
A data_provider must always be run with the start function from it´s
33-
super class. Otherwise depended
34-
observers will not be updated.
38+
This is a ScheduledWorker instance, and therefore you must set the
39+
'time_unit' class attribute and the 'time_interval' class attribute.
3540
"""
3641

37-
@abstractmethod
38-
def provide_data(self) -> None:
39-
pass
42+
def work(self, **kwargs: Dict[str, Any]) -> None:
43+
44+
# Call DataProviderInterface
45+
self.provide_data()
46+
47+
48+
class RelationalDataProvider(RelationalWorker, DataProviderInterface, ABC):
49+
"""
50+
Class RelationalDataProvider: makes use of the abstract RelationalWorker
51+
class and inherits the interface of of the DataProviderInterface.
52+
53+
This is a RelationalWorker instance, and therefore you must link it to
54+
another worker instance, by setting the 'run_after' class attribute.
55+
"""
4056

4157
def work(self, **kwargs: Dict[str, Any]) -> None:
42-
logger.info("Starting data provider {}".format(self.get_id()))
58+
59+
# Call DataProviderInterface
4360
self.provide_data()

0 commit comments

Comments
 (0)