@@ -24,21 +24,22 @@ def __init__(self, max_workers: int = DEFAULT_MAX_WORKERS):
2424
2525 self ._max_workers = max_workers
2626 self ._pending_workers : Queue = None
27- self ._running_workers : List [Worker ] = []
2827 self ._running_threads : Dict [Worker , StoppableThread ] = {}
2928
3029 def start (self ) -> None :
3130 """
3231 Main entry for the executor.
3332 """
33+
3434 self ._initialize ()
3535 self .run_jobs ()
3636
3737 def stop (self ) -> None :
3838 """
3939 Function that will stop all running workers.
4040 """
41- for worker in self ._running_workers :
41+
42+ for worker in self ._running_threads :
4243 self .stop_running_worker (worker )
4344
4445 self .clean_up ()
@@ -47,14 +48,15 @@ def clean_up(self):
4748 """
4849 Clean ups the resources.
4950 """
51+
5052 self ._pending_workers : Queue = None
51- self ._running_workers : List [Worker ] = []
5253 self ._running_threads : Dict [Worker , StoppableThread ] = {}
5354
5455 def _initialize (self ):
5556 """
5657 Functions that initializes the pending workers.
5758 """
59+
5860 workers = self .create_workers ()
5961
6062 if not workers or len (workers ) == 0 :
@@ -74,17 +76,17 @@ def create_workers(self) -> List[Worker]:
7476
7577 def run_jobs (self ) -> None :
7678 """
77- Will start all the workers.
79+ Function that will start all the workers.
7880 """
79- worker_iteration = self ._max_workers - len (self ._running_workers )
81+
82+ worker_iteration = self ._max_workers - len (self ._running_threads .keys ())
8083
8184 while worker_iteration > 0 and not self ._pending_workers .empty ():
8285 worker = self ._pending_workers .get ()
8386 worker_iteration -= 1
8487 thread = StoppableThread (target = worker .start )
8588 worker .add_observer (self )
8689 self ._running_threads [worker ] = thread
87- self ._running_workers .append (worker )
8890 thread .start ()
8991
9092 @synchronized
@@ -93,8 +95,8 @@ def update(self, observable, **kwargs) -> None:
9395 Observer implementation.
9496 """
9597
96- if observable in self ._running_workers :
97- self ._running_workers . remove ( observable )
98+ if observable in self ._running_threads :
99+ del self ._running_threads [ observable ]
98100
99101 if not self .processing :
100102 self .notify_observers ()
@@ -105,6 +107,7 @@ def stop_running_worker(self, worker: Worker) -> None:
105107 """
106108 Function that will stop a running worker.
107109 """
110+
108111 thread = self ._running_threads [worker ]
109112 thread .kill ()
110113
@@ -121,4 +124,4 @@ def processing(self) -> bool:
121124 """
122125
123126 return (self ._pending_workers is not None and not self ._pending_workers .empty ()) or \
124- (self ._running_workers is not None and len (self ._running_workers ) > 0 )
127+ (self ._running_threads is not None and len (self ._running_threads . keys () ) > 0 )
0 commit comments