@@ -138,6 +138,70 @@ def get(self, query, responseformat="geojson", verbosity="body", build=True, dat
138138 # construct geojson
139139 return self ._as_geojson (response ["elements" ])
140140
141+ @staticmethod
142+ def _api_status () -> dict :
143+ """
144+ :returns: dict describing the client's status with the API
145+ """
146+ endpoint = "https://overpass-api.de/api/status"
147+
148+ r = requests .get (endpoint )
149+ lines = tuple (r .text .splitlines ())
150+
151+ available_re = re .compile (r'\d(?= slots? available)' )
152+ available_slots = int (
153+ available_re .search (lines [3 ]).group ()
154+ if available_re .search (lines [3 ])
155+ else 0
156+ )
157+
158+ waiting_re = re .compile (r'(?<=Slot available after: )[\d\-TZ:]{20}' )
159+ waiting_slots = tuple (
160+ datetime .strptime (
161+ waiting_re .search (line ).group (), "%Y-%m-%dT%H:%M:%S%z"
162+ )
163+ for line in lines if waiting_re .search (line )
164+ )
165+
166+ current_idx = next (
167+ i for i , word in enumerate (lines )
168+ if word .startswith ('Currently running queries' )
169+ )
170+ running_slots = tuple (tuple (line .split ()) for line in lines [current_idx + 1 :])
171+ running_slots_datetimes = tuple (
172+ datetime .strptime (
173+ slot [3 ], "%Y-%m-%dT%H:%M:%S%z"
174+ )
175+ for slot in running_slots
176+ )
177+
178+ return {
179+ "available_slots" : available_slots ,
180+ "waiting_slots" : waiting_slots ,
181+ "running_slots" : running_slots_datetimes ,
182+ }
183+
184+ @property
185+ def slots_available (self ) -> int :
186+ """
187+ :returns: count of open slots the client has on the server
188+ """
189+ return self ._api_status ()["available_slots" ]
190+
191+ @property
192+ def slots_waiting (self ) -> tuple :
193+ """
194+ :returns: tuple of datetimes representing waiting slots and when they will be available
195+ """
196+ return self ._api_status ()["waiting_slots" ]
197+
198+ @property
199+ def slots_running (self ) -> tuple :
200+ """
201+ :returns: tuple of datetimes representing running slots and when they will be freed
202+ """
203+ return self ._api_status ()["running_slots" ]
204+
141205 def search (self , feature_type , regex = False ):
142206 """Search for something."""
143207 raise NotImplementedError ()
0 commit comments