1+ from threading import active_count
2+ from typing import Dict , Any , List
3+ from unittest import TestCase
4+ from time import sleep
5+ from wrapt import synchronized
6+
7+ from investing_bot_framework .core .workers import Worker
8+ from investing_bot_framework .core .executors import Executor
9+ from investing_bot_framework .core .events .observer import Observer
10+
11+
12+ class TestObserver (Observer ):
13+
14+ def __init__ (self ) -> None :
15+ self .update_count = 0
16+
17+ @synchronized
18+ def update (self , observable , ** kwargs ) -> None :
19+ self .update_count += 1
20+
21+
22+ class TestWorkerOne (Worker ):
23+ id = 'TestWorkerOne'
24+
25+ def work (self , ** kwargs : Dict [str , Any ]) -> None :
26+ # Simulate some work
27+ sleep (1 )
28+
29+
30+ class TestWorkerTwo (Worker ):
31+ id = 'TestWorkerTwo'
32+
33+ def work (self , ** kwargs : Dict [str , Any ]) -> None :
34+ # Simulate some work
35+ sleep (1 )
36+
37+
38+ class TestWorkerThree (Worker ):
39+ id = 'TestWorkerThree'
40+
41+ def work (self , ** kwargs : Dict [str , Any ]) -> None :
42+ # Simulate some work
43+ sleep (1 )
44+
45+
46+ class TestExecutor (Executor ):
47+
48+ def __init__ (self , workers : List [Worker ] = None ):
49+ super (TestExecutor , self ).__init__ (max_workers = 2 )
50+
51+ self ._registered_workers = workers
52+
53+ def create_workers (self ) -> List [Worker ]:
54+ return self .registered_workers
55+
56+ @property
57+ def registered_workers (self ) -> List [Worker ]:
58+ return self ._registered_workers
59+
60+
61+ class TestStandardExecutor (TestCase ):
62+
63+ def test (self ) -> None :
64+ executor = TestExecutor (workers = [TestWorkerOne (), TestWorkerTwo ()])
65+ observer = TestObserver ()
66+ executor .add_observer (observer )
67+
68+ # Make sure the initialization is correct
69+ self .assertEqual (len (executor .registered_workers ), 2 )
70+ self .assertEqual (active_count (), 1 )
71+
72+ # Start the executor
73+ executor .start ()
74+
75+ # 3 Threads must be running
76+ self .assertTrue (executor .processing )
77+ self .assertEqual (active_count (), 3 )
78+
79+ sleep (2 )
80+
81+ # After finishing only 1 thread must be active
82+ self .assertEqual (active_count (), 1 )
83+ self .assertFalse (executor .processing )
84+
85+ # Observer must have been updated by the executor
86+ self .assertEqual (observer .update_count , 1 )
87+
88+ # Start the executor
89+ executor .start ()
90+
91+ # 3 Threads must be running
92+ self .assertTrue (executor .processing )
93+ self .assertEqual (active_count (), 3 )
94+
95+ sleep (2 )
96+
97+ # After finishing only 1 thread must be active
98+ self .assertEqual (active_count (), 1 )
99+ self .assertFalse (executor .processing )
100+
101+ # Observer must have been updated by the executor
102+ self .assertEqual (observer .update_count , 2 )
103+
104+ executor = TestExecutor (workers = [TestWorkerOne (), TestWorkerTwo (), TestWorkerThree ()])
105+ executor .add_observer (observer )
106+
107+ # Start the executor
108+ executor .start ()
109+
110+ # 3 Threads must be running
111+ self .assertTrue (executor .processing )
112+ self .assertEqual (active_count (), 3 )
113+
114+ sleep (2 )
115+
116+ # After finishing only two threads must be active (main + last worker, because max workers is 2)
117+ self .assertEqual (active_count (), 2 )
118+ self .assertTrue (executor .processing )
119+
120+ sleep (1 )
121+
122+ # After finishing only 1 thread must be active
123+ self .assertEqual (active_count (), 1 )
124+ self .assertFalse (executor .processing )
125+
126+ # Observer must have been updated by the executor
127+ self .assertEqual (observer .update_count , 3 )
128+
129+
130+
131+
132+
133+ # def test_execution_executor():
134+ # logger.info("TEST: test DataProviderExecutor execution")
135+ #
136+ # observer = DummyObserver()
137+ #
138+ # data_provider_one = DummyDataProviderWorker()
139+ # data_provider_three = DummyDataProviderWorker()
140+ #
141+ # executor = DataProviderExecutor(
142+ # [
143+ # data_provider_one,
144+ # data_provider_three
145+ # ]
146+ # )
147+ #
148+ # executor.add_observer(observer)
149+ #
150+ # assert active_count() == 1
151+ #
152+ #
153+ # assert active_count() == 3
154+ #
155+ # sleep(2)
156+ #
157+ # # Check if the observer is updated by the executor
158+ # assert observer.update_count == 1
159+ #
160+ # data_provider_one = DummyDataProviderWorker()
161+ #
162+ # executor = DataProviderExecutor(
163+ # [
164+ # data_provider_one,
165+ # ]
166+ # )
167+ #
168+ # executor.add_observer(observer)
169+ #
170+ # assert active_count() == 1
171+ #
172+ # executor.start()
173+ #
174+ # assert active_count() == 2
175+ #
176+ # sleep(2)
177+ #
178+ # # Check if the observer is updated by the executor
179+ # assert observer.update_count == 2
180+ #
181+ # executor.start()
182+ #
183+ # sleep(2)
184+ #
185+ # # Check if the observer is updated by the executor
186+ # assert observer.update_count == 3
187+ #
188+ # logger.info("TEST FINISHED")
0 commit comments