22from typing import List
33from wrapt import synchronized
44
5+ from investing_bot_framework .core .exceptions import ImproperlyConfigured , OperationalException
56from investing_bot_framework .core .events import Observer
67from investing_bot_framework .core .context .bot_context import BotContext
78from investing_bot_framework .core .context .states import BotState
9+ from investing_bot_framework .core .executors import ExecutionScheduler
810from investing_bot_framework .core .data .data_providers import DataProvider , DataProviderExecutor
9- from investing_bot_framework .core .resolvers import ClassCollector
10- from investing_bot_framework .core .exceptions import OperationalException
11+ from investing_bot_framework .core .utils import Singleton , TimeUnit
1112from investing_bot_framework .core .configuration .config_constants import DEFAULT_MAX_WORKERS , SETTINGS_MAX_WORKERS , \
1213 SETTINGS_DATA_PROVIDER_REGISTERED_APPS
1314
1415
16+ def import_class (cl ):
17+ d = cl .rfind ("." )
18+ classname = cl [d + 1 :len (cl )]
19+ m = __import__ (cl [0 :d ], globals (), locals (), [classname ])
20+ return getattr (m , classname )
21+
22+
23+ class DataProviderScheduler (ExecutionScheduler , metaclass = Singleton ):
24+ """
25+ Data Provider scheduler that will function as a scheduler to make sure it keeps it state across multiple states,
26+ it is defined as a Singleton.
27+ """
28+
29+ def __init__ (self ):
30+ self ._configured = False
31+
32+ super (DataProviderScheduler , self ).__init__ ()
33+
34+ def configure (self , data_providers : List [DataProvider ]) -> None :
35+ self ._planning = {}
36+
37+ for data_provider in data_providers :
38+ self .add_execution_task (execution_id = data_provider .get_id (), time_unit = TimeUnit .ALWAYS )
39+
40+ self ._configured = True
41+
42+ @property
43+ def configured (self ) -> bool :
44+ return self ._configured
45+
46+
1547class DataState (BotState , Observer ):
1648 """
1749 Represent the data state of a bot. This state will load all the defined data providers and will
@@ -24,7 +56,6 @@ class DataState(BotState, Observer):
2456 transition_state_class = StrategyState
2557
2658 data_providers : List [DataProvider ] = []
27-
2859 _data_provider_executor : DataProviderExecutor
2960
3061 def __init__ (self , context : BotContext ) -> None :
@@ -40,17 +71,17 @@ def _initialize(self) -> None:
4071 """
4172 self ._clean_up ()
4273
43- for data_provider_app_conf in self .context .settings [SETTINGS_DATA_PROVIDER_REGISTERED_APPS ]:
44- class_collector = ClassCollector (package_path = data_provider_app_conf , class_type = DataProvider )
45- self .data_providers += class_collector .instances
74+ for data_provider_app_class in self .context .settings [SETTINGS_DATA_PROVIDER_REGISTERED_APPS ]:
4675
47- self ._data_provider_executor = DataProviderExecutor (
48- data_providers = self .data_providers ,
49- max_workers = self .context .settings .get (SETTINGS_MAX_WORKERS , DEFAULT_MAX_WORKERS )
50- )
76+ instance = import_class (data_provider_app_class )()
77+
78+ if not isinstance (instance , DataProvider ):
79+ raise ImproperlyConfigured (
80+ "Specified data provider {} is not a instance of DataProvider" .format (data_provider_app_class )
81+ )
82+
83+ self .data_providers .append (instance )
5184
52- # Adding self as an observer
53- self ._data_provider_executor .add_observer (self )
5485 self ._configured = True
5586
5687 def _clean_up (self ) -> None :
@@ -61,25 +92,52 @@ def _clean_up(self) -> None:
6192 self ._data_provider_executor = None
6293 self .data_providers = []
6394
95+ def _schedule_data_providers (self ) -> List [DataProvider ]:
96+ data_provider_scheduler = DataProviderScheduler ()
97+
98+ if not data_provider_scheduler .configured :
99+ data_provider_scheduler .configure (self .data_providers )
100+
101+ planning = data_provider_scheduler .schedule_executions ()
102+ planned_data_providers = []
103+
104+ for data_provider in self .data_providers :
105+
106+ if data_provider .get_id () in planning :
107+ planned_data_providers .append (data_provider )
108+
109+ return planned_data_providers
110+
111+ def _start_data_providers (self , data_providers : List [DataProvider ]) -> None :
112+
113+ self ._data_provider_executor = DataProviderExecutor (
114+ data_providers = data_providers ,
115+ max_workers = self .context .settings .get (SETTINGS_MAX_WORKERS , DEFAULT_MAX_WORKERS )
116+ )
117+
118+ self ._data_provider_executor .add_observer (self )
119+ self ._data_provider_executor .start ()
120+
64121 def run (self ) -> None :
65122
66123 if self ._configured :
67- # Start the data providers
68- self ._data_provider_executor .start ()
124+ # Schedule the data providers
125+ planned_data_providers = self ._schedule_data_providers ()
126+
127+ # Execute all the data providers
128+ self ._start_data_providers (planned_data_providers )
69129
70130 # Sleep till updated
71131 while not self ._updated :
72132 time .sleep (1 )
73133
74134 # Collect all data from the data providers
75135 for data_provider in self ._data_provider_executor .registered_data_providers :
76- print (data_provider .get_id ())
136+ print ("Data provider: {} finished running" . format ( data_provider .get_id () ))
77137
78138 else :
79139 raise OperationalException ("Data state started without any configuration" )
80140
81- print ('stopped running' )
82-
83141 def stop (self ) -> None :
84142 """
85143 Stop all data providers
0 commit comments