1- from queue import Queue
2- from typing import List , Dict
3- from wrapt import synchronized
4- from abc import abstractmethod , ABC
1+ from typing import List
2+ from abc import ABC
3+ from concurrent .futures import ThreadPoolExecutor
54
65from investing_algorithm_framework .core .workers import Worker
7- from investing_algorithm_framework .core .exceptions import OperationalException
8- from investing_algorithm_framework .core .utils import StoppableThread
96from investing_algorithm_framework .core .events .observer import Observer
107from investing_algorithm_framework .core .events .observable import Observable
118from investing_algorithm_framework .configuration .config_constants import \
129 DEFAULT_MAX_WORKERS
1310
1411
15- class Executor (Observable , Observer , ABC ):
12+ class Executor (Observable , ABC ):
1613 """
17- Executor class: functions as an abstract class that will handle the
14+ Executor class: functions as a thread executor that will handle the
1815 executions of workers in asynchronous order.
16+
17+ It will make use of the concurrent library for execution of the workers.
18+ Also the executor functions as an observable instance.
1919 """
2020
21- def __init__ (self , max_workers : int = DEFAULT_MAX_WORKERS ):
21+ def __init__ (
22+ self ,
23+ workers : List [Worker ],
24+ max_concurrent_workers : int = DEFAULT_MAX_WORKERS
25+ ) -> None :
2226 super (Executor , self ).__init__ ()
2327
24- self ._max_workers = max_workers
25- self ._pending_workers : Queue = None
26- self ._running_threads : Dict [Worker , StoppableThread ] = {}
28+ self ._workers = workers
29+ self ._max_concurrent_workers = max_concurrent_workers
2730
2831 def start (self ) -> None :
2932 """
30- Main entry for the executor.
31- """
32-
33- self ._initialize ()
34- self .run_jobs ()
35-
36- def stop (self ) -> None :
37- """
38- Function that will stop all running workers.
39- """
40-
41- for worker in self ._running_threads :
42- self .stop_running_worker (worker )
43-
44- self .clean_up ()
45-
46- def clean_up (self ):
47- """
48- Clean ups the resources.
49- """
50-
51- self ._pending_workers : Queue = None
52- self ._running_threads : Dict [Worker , StoppableThread ] = {}
53-
54- def _initialize (self ):
55- """
56- Functions that initializes the pending workers.
57- """
58-
59- workers = self .create_workers ()
60-
61- if not workers or len (workers ) == 0 :
62- raise OperationalException ("There where no workers initialized "
63- "for the executor instance" )
64-
65- self ._pending_workers = Queue ()
66-
67- for worker in workers :
68- self ._pending_workers .put (worker )
69-
70- @abstractmethod
71- def create_workers (self ) -> List [Worker ]:
72- """
73- Abstract function that will create the workers.
74- """
75- pass
33+ Main entry for the executor. The executor creates a ThreadPoolExecutor
34+ with the given amount of max_workers.
7635
77- def run_jobs (self ) -> None :
36+ It will then pass all the workers to the ThreadPoolExecutor. When
37+ finished it will update all its observers
7838 """
79- Function that will start all the workers.
80- """
81-
82- worker_iteration = self ._max_workers - len (
83- self ._running_threads .keys ()
84- )
8539
86- while worker_iteration > 0 and not self ._pending_workers .empty ():
87- worker = self ._pending_workers .get ()
88- worker_iteration -= 1
89- thread = StoppableThread (target = worker .start )
90- worker .add_observer (self )
91- self ._running_threads [worker ] = thread
92- thread .start ()
40+ with ThreadPoolExecutor (max_workers = self ._max_concurrent_workers ) as \
41+ executor :
9342
94- @synchronized
95- def update (self , observable , ** kwargs ) -> None :
96- """
97- Observer implementation.
98- """
43+ for worker in self .workers :
44+ executor .submit (worker .start )
9945
100- if observable in self ._running_threads :
101- del self ._running_threads [observable ]
102-
103- if not self .processing :
104- self .notify_observers ()
105- else :
106- self .run_jobs ()
107-
108- def stop_running_worker (self , worker : Worker ) -> None :
109- """
110- Function that will stop a running worker.
111- """
112-
113- thread = self ._running_threads [worker ]
114- thread .kill ()
46+ self .notify_observers ()
11547
11648 def add_observer (self , observer : Observer ) -> None :
11749 super (Executor , self ).add_observer (observer )
@@ -120,12 +52,5 @@ def remove_observer(self, observer: Observer) -> None:
12052 super (Executor , self ).remove_observer (observer )
12153
12254 @property
123- def processing (self ) -> bool :
124- """
125- Property that will show if the executor is running.
126- """
127-
128- return (self ._pending_workers is not None
129- and not self ._pending_workers .empty ()) or \
130- (self ._running_threads is not None
131- and len (self ._running_threads .keys ()) > 0 )
55+ def workers (self ) -> List [Worker ]:
56+ return self ._workers
0 commit comments