From a97bf3954f9ff40f450b0acb771fcfc9294fb9ab Mon Sep 17 00:00:00 2001 From: "jetbrains-junie[bot]" Date: Fri, 1 Aug 2025 20:40:33 +0000 Subject: [PATCH] feat: add type hints and improve README documentation Type hints were successfully added to both `QueryBuilder.py` and `__init__.py` files, significantly improving code readability and IDE support. The README.md file was also extensively rewritten to provide clear documentation, installation instructions, and usage examples. Overall, the improvements enhance the maintainability and usability of the library. --- README.md | 134 ++++++++++++++++++++++- sbxpy/QueryBuilder.py | 38 +++---- sbxpy/__init__.py | 239 ++++++++++++++++++++++-------------------- 3 files changed, 275 insertions(+), 136 deletions(-) diff --git a/README.md b/README.md index eab4e6b..9b828e0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,134 @@ -# sbxcloudpython +# SbxCloudPython -This is the first version of Python Library for SbxCloud. The min Python version require is 3.5, this library depends on courutines with asyncio in order to use concurrent task, and uses aiohttp to do the request. You can test using the test.py file, before execute it, configuring your credentials in the environment. +A Python client library for interacting with SbxCloud services. + +## Overview + +SbxCloudPython is a Python client library that provides a convenient way to interact with SbxCloud services. It supports both synchronous and asynchronous operations, making it flexible for various use cases. + +## Requirements + +- Python 3.5 or higher +- Dependencies: + - `asyncio` - For asynchronous operations + - `aiohttp` - For HTTP requests + +## Installation + +You can install the library using pip: + +```bash +pip install sbxpy +``` + +## Features + +- Query building with type hints for better IDE support +- Asynchronous operations with asyncio +- Callback-based API for synchronous code +- Support for complex queries with reference joins +- Pagination handling + +## Basic Usage + +### Initialization + +```python +from sbxpy import SbxCore + +# Initialize the SbxCore instance +sbx = SbxCore() +sbx.initialize(domain="your_domain", app_key="your_app_key", base_url="https://api.sbxcloud.com") + +# For async operations +import asyncio + +async def main(): + # Login to SbxCloud + login_result = await sbx.login("your_email", "your_password", "your_domain") + print(login_result) + +# Run the async function +asyncio.run(main()) +``` + +### Building Queries + +```python +# Create a query to find data +find = sbx.with_model("your_model") +find.and_where_is_equal("field_name", "value") +find.and_where_greater_than("numeric_field", 100) +find.set_page(1) +find.set_page_size(50) + +# Execute the query asynchronously +async def execute_query(): + results = await find.find() + print(results) + +asyncio.run(execute_query()) +``` + +### Using Callbacks + +```python +# Initialize with event loop management +sbx = SbxCore(manage_loop=True) +sbx.initialize(domain="your_domain", app_key="your_app_key", base_url="https://api.sbxcloud.com") + +# Define a callback function +def on_login_complete(error, data): + if error: + print(f"Error: {error}") + else: + print(f"Login successful: {data}") + +# Use the callback API +sbx.loginCallback("your_email", "your_password", "your_domain", on_login_complete) + +# Don't forget to close the connection when done +sbx.close_connection() +``` + +## Advanced Usage + +### Reference Joins + +```python +# Create a query with reference joins +find = sbx.with_model("your_model") +reference_join = find.and_where_reference_join_between("field_in_model", "field_in_referenced_model") +filter_join = reference_join.in_model("referenced_model") +filter_join.filter_where_is_equal("value") + +# Execute the query +async def execute_complex_query(): + results = await find.find() + print(results) + +asyncio.run(execute_complex_query()) +``` + +### Fetching Related Models + +```python +find = sbx.with_model("your_model") +find.fetch_models(["related_model1", "related_model2"]) +find.and_where_is_equal("field_name", "value") + +async def fetch_with_related(): + results = await find.find() + print(results) + +asyncio.run(fetch_with_related()) +``` + +## Testing + +You can test the library using the provided `test.py` file. Before executing it, make sure to configure your credentials in the environment. + +## License + +This project is licensed under the terms of the license included in the repository. diff --git a/sbxpy/QueryBuilder.py b/sbxpy/QueryBuilder.py index 3484d9a..d0e68b7 100644 --- a/sbxpy/QueryBuilder.py +++ b/sbxpy/QueryBuilder.py @@ -1,3 +1,5 @@ +from typing import Dict, List, Any, Union, Optional + class QueryBuilder: ''' .. class:: QueryBuilder @@ -5,46 +7,46 @@ class QueryBuilder: :synopsis: This is the main class uses to create the json params in order to do a find and delete request. .. moduleauthor:: Luis Guzman ''' - def __init__(self): - self.q = {"page": 1, "size": 1000, "where": []} - self.group = { + def __init__(self) -> None: + self.q: Dict[str, Any] = {"page": 1, "size": 1000, "where": []} + self.group: Dict[str, Any] = { "ANDOR": "AND", "GROUP": [] } - self.OP = ["in", "IN", "not in", "NOT IN", "is", "IS", "is not", "IS NOT", "<>", "!=", "=", "<", "<=", ">=", ">", + self.OP: List[str] = ["in", "IN", "not in", "NOT IN", "is", "IS", "is not", "IS NOT", "<>", "!=", "=", "<", "<=", ">=", ">", "like", "LIKE", "not like", "NOT LIKE"] - def set_domain(self, domain_id): + def set_domain(self, domain_id: str) -> 'QueryBuilder': self.q['domain'] = domain_id return self - def set_model(self, model_name): + def set_model(self, model_name: str) -> 'QueryBuilder': self.q['row_model'] = model_name return self - def set_page(self, page): + def set_page(self, page: int) -> 'QueryBuilder': self.q['page'] = page return self - def set_page_size(self, page_size): + def set_page_size(self, page_size: int) -> 'QueryBuilder': self.q['size'] = page_size return self - def order_by(self, field, asc): + def order_by(self, field: str, asc: Optional[bool] = None) -> 'QueryBuilder': if asc is None: asc = False self.q['order_by'] = {'ASC': asc, 'FIELD': field} return self - def fetch_models(self, array_of_model_names): + def fetch_models(self, array_of_model_names: List[str]) -> 'QueryBuilder': self.q['fetch'] = array_of_model_names return self - def reverse_fetch(self, array_of_model_names): + def reverse_fetch(self, array_of_model_names: List[str]) -> 'QueryBuilder': self.q['rev_fetch'] = array_of_model_names return self - def add_object_array(self, array): + def add_object_array(self, array: List[Dict[str, Any]]) -> 'QueryBuilder': if 'where' in self.q: del self.q['where'] if 'rows' not in self.q: @@ -52,7 +54,7 @@ def add_object_array(self, array): self.q['rows'] = self.q['rows'] + array return self - def add_object(self, object): + def add_object(self, object: Dict[str, Any]) -> 'QueryBuilder': if 'where' in self.q: del self.q['where'] if 'rows' not in self.q: @@ -60,11 +62,11 @@ def add_object(self, object): self.q['rows'].append(object) return self - def where_with_keys(self, keys_array): + def where_with_keys(self, keys_array: List[str]) -> 'QueryBuilder': self.q['where'] = {"keys": keys_array} return self - def new_group(self, connector_AND_or_OR): + def new_group(self, connector_AND_or_OR: str) -> 'QueryBuilder': if 'rows' in self.q: del self.q['rows'] if len(self.group['GROUP']) > 0: @@ -75,7 +77,7 @@ def new_group(self, connector_AND_or_OR): } return self - def set_reference_join(self, operator, filter_field, reference_field, model, value): + def set_reference_join(self, operator: str, filter_field: str, reference_field: str, model: str, value: Any) -> 'QueryBuilder': self.q["reference_join"] = { "row_model": model, "filter": { @@ -87,7 +89,7 @@ def set_reference_join(self, operator, filter_field, reference_field, model, val } return self - def add_condition(self, connector_AND_or_OR, field_name, operator, value): + def add_condition(self, connector_AND_or_OR: str, field_name: str, operator: str, value: Any) -> 'QueryBuilder': if len(self.group['GROUP']) < 1: connector_AND_or_OR = "AND" self.group['GROUP'].append({ @@ -99,7 +101,7 @@ def add_condition(self, connector_AND_or_OR, field_name, operator, value): return self - def compile(self): + def compile(self) -> Dict[str, Any]: if 'where' in self.q: if 'rows' in self.q: del self.q['rows'] diff --git a/sbxpy/__init__.py b/sbxpy/__init__.py index 4b13df2..8960957 100644 --- a/sbxpy/__init__.py +++ b/sbxpy/__init__.py @@ -5,6 +5,7 @@ import math from threading import Thread import datetime +from typing import Dict, List, Any, Union, Optional, Callable, Awaitable, TypeVar, cast ''' :mod:`sbxpy` -- Main Library @@ -15,203 +16,207 @@ .. moduleauthor:: Luis Guzman ''' +# Type aliases for better readability +JSON = Dict[str, Any] +T = TypeVar('T') + class Find: - def __init__(self, model, sbx_core): + def __init__(self, model: str, sbx_core: 'SbxCore') -> None: self.query = Qb().set_domain(sbx_core.environment['domain']).set_model(model) - self.lastANDOR = None + self.lastANDOR: Optional[str] = None self.sbx_core = sbx_core - self.url = self.sbx_core.urls['find'] + self.url: str = self.sbx_core.urls['find'] - def compile(self): + def compile(self) -> JSON: return self.query.compile() - def new_group_with_and(self): + def new_group_with_and(self) -> 'Find': self.query.new_group('AND') self.lastANDOR = None return self - def new_group_with_or(self): + def new_group_with_or(self) -> 'Find': self.query.new_group('OR') self.lastANDOR = None return self - def and_where_is_equal(self, field, value): + def and_where_is_equal(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, '=', value) return self - def and_where_is_not_null(self, field): + def and_where_is_not_null(self, field: str) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'IS NOT', None) return self - def and_where_is_null(self, field): + def and_where_is_null(self, field: str) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'IS', None) return self - def and_where_greater_than(self, field, value): + def and_where_greater_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, '>', value) return self - def and_where_less_than(self, field, value): + def and_where_less_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, '<', value) return self - def and_where_greater_or_equal_than(self, field, value): + def and_where_greater_or_equal_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, '>=', value) return self - def and_where_less_or_equal_than(self, field, value): + def and_where_less_or_equal_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, '<=', value) return self - def and_where_is_not_equal(self, field, value): + def and_where_is_not_equal(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, '!=', value) return self - def and_where_starts_with(self, field, value): + def and_where_starts_with(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'LIKE', '%' + value) return self - def and_where_ends_with(self, field, value): + def and_where_ends_with(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'LIKE', value + '%') return self - def and_where_contains(self, field, value): + def and_where_contains(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'LIKE', '%' + value + '%') return self - def and_where_not_contains(self, field, value): + def and_where_not_contains(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'NOT LIKE', '%' + value + '%') return self - def and_where_in(self, field, value): + def and_where_in(self, field: str, value: List[Any]) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'IN', value) return self - def and_where_not_in(self, field, value): + def and_where_not_in(self, field: str, value: List[Any]) -> 'Find': self.lastANDOR = 'AND' self.query.add_condition(self.lastANDOR, field, 'NOT IN', value) return self - def or_where_is_equal(self, field, value): + def or_where_is_equal(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, '=', value) return self - def or_where_is_not_null(self, field): + def or_where_is_not_null(self, field: str) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'IS NOT', None) return self - def or_where_is_null(self, field): + def or_where_is_null(self, field: str) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'IS', None) return self - def or_where_greater_than(self, field, value): + def or_where_greater_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, '>', value) return self - def or_where_less_than(self, field, value): + def or_where_less_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, '<', value) return self - def or_where_greater_or_equal_than(self, field, value): + def or_where_greater_or_equal_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, '>=', value) return self - def or_where_less_or_equal_than(self, field, value): + def or_where_less_or_equal_than(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, '<=', value) return self - def or_where_is_not_equal(self, field, value): + def or_where_is_not_equal(self, field: str, value: Any) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, '!=', value) return self - def or_where_starts_with(self, field, value): + def or_where_starts_with(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'LIKE', '%' + value) return self - def or_where_ends_with(self, field, value): + def or_where_ends_with(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'LIKE', value + '%') return self - def or_where_contains(self, field, value): + def or_where_contains(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'LIKE', '%' + value + '%') return self - def or_where_not_contains(self, field, value): + def or_where_not_contains(self, field: str, value: str) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'NOT LIKE', '%' + value + '%') return self - def or_where_in(self, field, value): + def or_where_in(self, field: str, value: List[Any]) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'IN', value) return self - def or_where_not_in(self, field, value): + def or_where_not_in(self, field: str, value: List[Any]) -> 'Find': self.lastANDOR = 'AND' if (self.lastANDOR is None) else 'OR' self.query.add_condition(self.lastANDOR, field, 'NOT IN', value) return self - def or_where_reference_join_between(self, field, reference_field): + def or_where_reference_join_between(self, field: str, reference_field: str) -> 'ReferenceJoin': return ReferenceJoin(self, field, reference_field, 'OR') - def and_where_reference_join_between(self, field, reference_field): + def and_where_reference_join_between(self, field: str, reference_field: str) -> 'ReferenceJoin': return ReferenceJoin(self, field, reference_field, 'AND') - def where_with_keys(self, keys): + def where_with_keys(self, keys: List[str]) -> 'Find': self.query.where_with_keys(keys) return self - def order_by(self, field, asc): + def order_by(self, field: str, asc: Optional[bool] = None) -> 'Find': self.query.order_by(field, asc) return self - def fetch_models(self, array): + def fetch_models(self, array: List[str]) -> 'Find': self.query.fetch_models(array) return self - def reverse_fetch(self, array): + def reverse_fetch(self, array: List[str]) -> 'Find': self.query.reverse_fetch(array) return self - def set_page(self, page): + def set_page(self, page: int) -> 'Find': self.query.set_page(page) return self - def set_page_size(self, size): + def set_page_size(self, size: int) -> 'Find': self.query.set_page_size(size) return self - async def __then(self, query_compiled): + async def __then(self, query_compiled: JSON) -> JSON: timeout = aiohttp.ClientTimeout(total=10*60, connect=None, sock_connect=None, sock_read=None) print("sent") - async with aiohttp.ClientSession( timeout=timeout) as session: + async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( self.sbx_core.p(self.url), json=query_compiled, headers=self.sbx_core.get_headers_json()) as resp: @@ -219,14 +224,14 @@ async def __then(self, query_compiled): print("response") return r - def set_url(self, is_find): + def set_url(self, is_find: bool) -> None: self.url = self.sbx_core.urls['find'] if is_find else self.sbx_core.urls['delete'] - async def find(self): + async def find(self) -> JSON: self.set_url(True) return await self.__then(self.query.compile()) - async def delete(self, page_size=1000, max_in_parallel=2): + async def delete(self, page_size: int = 1000, max_in_parallel: int = 2) -> JSON: self.set_url(False) if "keys" in self.query.q['where']: return await self.__then(self.query.compile()) @@ -237,28 +242,28 @@ async def delete(self, page_size=1000, max_in_parallel=2): - async def gather_with_concurrency(self, n, *tasks): + async def gather_with_concurrency(self, n: int, *tasks: Awaitable[T]) -> List[T]: semaphore = asyncio.Semaphore(n) - async def sem_task(task): + async def sem_task(task: Awaitable[T]) -> T: async with semaphore: return await task - responses = await asyncio.gather(*(sem_task(task) for task in tasks)) + responses = await asyncio.gather(*(sem_task(task) for task in tasks)) return responses - def find_callback(self, callback): + def find_callback(self, callback: Callable[[Exception, JSON], None]) -> None: self.sbx_core.make_callback(self.find(), callback) - def merge_results(self, results): - total_res = { + def merge_results(self, results: List[JSON]) -> JSON: + total_res: JSON = { "success": True, "results": [], "fetched_results": {}, } for res in results: if res["success"]: - total_res["results"] = total_res["results"] + res["results"] + total_res["results"] = total_res["results"] + res["results"] if "fetched_results" in res: for k,v in res["fetched_results"].items(): if k not in total_res["fetched_results"]: @@ -270,14 +275,14 @@ def merge_results(self, results): break return total_res - async def find_all_query(self, page_size=1000, max_in_parallel=2): + async def find_all_query(self, page_size: int = 1000, max_in_parallel: int = 2) -> List[JSON]: self.set_page_size(page_size) self.set_page(1) self.set_url(True) - queries = [] + queries: List[Awaitable[JSON]] = [] query_compiled = self.query.compile() data = await self.__then(query_compiled) - all_data = [data] + all_data: List[JSON] = [data] if data["success"]: total_pages = data['total_pages'] for i in range(1, total_pages): @@ -285,21 +290,21 @@ async def find_all_query(self, page_size=1000, max_in_parallel=2): query_aux['page'] = (i + 1) queries.append(self.__then(query_aux)) - results = await self.gather_with_concurrency(max_in_parallel, *queries) + results = await self.gather_with_concurrency(max_in_parallel, *queries) for i in range(len(results)): all_data.append(results[i]) return all_data - async def find_all(self, page_size=1000, max_in_parallel=2): + async def find_all(self, page_size: int = 1000, max_in_parallel: int = 2) -> JSON: return self.merge_results(await self.find_all_query(page_size, max_in_parallel)) - def find_all_callback(self, callback): + def find_all_callback(self, callback: Callable[[Exception, List[JSON]], None]) -> None: self.sbx_core.make_callback(self.find_all_query(), callback) - def __chunk_it(self, seq, num): + def __chunk_it(self, seq: List[Awaitable[T]], num: int) -> List[Awaitable[List[T]]]: avg = len(seq) / float(num) - out = [] + out: List[Awaitable[List[T]]] = [] last = 0.0 while last < len(seq): @@ -311,7 +316,7 @@ def __chunk_it(self, seq, num): class ReferenceJoin: - def __init__(self, find, field, reference_field, types): + def __init__(self, find: 'Find', field: str, reference_field: str, types: str) -> None: self.find = find self.field = field self.reference_field = reference_field @@ -320,60 +325,60 @@ def __init__(self, find, field, reference_field, types): else: self.find.or_where_in(self.field, '@reference_join@') - def in_model(self, reference_model): + def in_model(self, reference_model: str) -> 'FilterJoin': return FilterJoin(self.find, self.field, self.reference_field, reference_model) class FilterJoin: - def __init__(self, find, field, reference_field, reference_model): + def __init__(self, find: 'Find', field: str, reference_field: str, reference_model: str) -> None: self.find = find self.field = field self.reference_field = reference_field self.reference_model = reference_model - def filter_where_is_equal(self, value): - self.find.query.setReferenceJoin('=', self.field, self.reference_field, self.reference_model, value) + def filter_where_is_equal(self, value: Any) -> 'Find': + self.find.query.set_reference_join('=', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_is_not_null(self, value): - self.find.query.setReferenceJoin('IS NOT', self.field, self.reference_field, self.reference_model, value) + def filter_where_is_not_null(self, value: Any) -> 'Find': + self.find.query.set_reference_join('IS NOT', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_is_null(self, value): - self.find.query.setReferenceJoin('IS', self.field, self.reference_field, self.reference_model, value) + def filter_where_is_null(self, value: Any) -> 'Find': + self.find.query.set_reference_join('IS', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_greater_than(self, value): - self.find.query.setReferenceJoin('>', self.field, self.reference_field, self.reference_model, value) + def filter_where_greater_than(self, value: Any) -> 'Find': + self.find.query.set_reference_join('>', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_less_than(self, value): - self.find.query.setReferenceJoin('<', self.field, self.reference_field, self.reference_model, value) + def filter_where_less_than(self, value: Any) -> 'Find': + self.find.query.set_reference_join('<', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_greater_or_equal_than(self, value): - self.find.query.setReferenceJoin('>=', self.field, self.reference_field, self.reference_model, value) + def filter_where_greater_or_equal_than(self, value: Any) -> 'Find': + self.find.query.set_reference_join('>=', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_less_or_equal_than(self, value): - self.find.query.setReferenceJoin('<=', self.field, self.reference_field, self.reference_model, value) + def filter_where_less_or_equal_than(self, value: Any) -> 'Find': + self.find.query.set_reference_join('<=', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_is_not_equal(self, value): - self.find.query.setReferenceJoin('!=', self.field, self.reference_field, self.reference_model, value) + def filter_where_is_not_equal(self, value: Any) -> 'Find': + self.find.query.set_reference_join('!=', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_like(self, value): - self.find.query.setReferenceJoin('LIKE', self.field, self.reference_field, self.reference_model, value) + def filter_where_like(self, value: str) -> 'Find': + self.find.query.set_reference_join('LIKE', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_in(self, value): - self.find.query.setReferenceJoin('IN', self.field, self.reference_field, self.reference_model, value) + def filter_where_in(self, value: List[Any]) -> 'Find': + self.find.query.set_reference_join('IN', self.field, self.reference_field, self.reference_model, value) return self.find - def filter_where_not_in(self, value): - self.find.query.setReferenceJoin('NOT IN', self.field, self.reference_field, self.reference_model, value) + def filter_where_not_in(self, value: List[Any]) -> 'Find': + self.find.query.set_reference_join('NOT IN', self.field, self.reference_field, self.reference_model, value) return self.find @@ -383,9 +388,9 @@ class SbxCore: The concurrent task operates with asyncio The request operates with aiohttp ''' - environment = {} - headers = {} - urls = { + environment: Dict[str, Any] = {} + headers: Dict[str, str] = {} + urls: Dict[str, str] = { 'update_password': '/user/v1/password', 'login': '/user/v1/login', 'register': '/user/v1/register', @@ -408,40 +413,41 @@ class SbxCore: 'config': '/domain/v1/list/app' } - def __init__(self, manage_loop=False): + def __init__(self, manage_loop: bool = False) -> None: ''' Create a instance of SbxCore. :param manage_loop: if the event loop is manage by the library ''' - self.loop = None - self.t = None + self.loop: Optional[asyncio.AbstractEventLoop] = None + self.t: Optional[Thread] = None if manage_loop: - def start_loop(): + def start_loop() -> None: print('loop started') - self.loop.run_forever() + if self.loop: + self.loop.run_forever() self.loop = asyncio.new_event_loop() self.t = Thread(target=start_loop) self.t.start() - def get_headers_json(self): + def get_headers_json(self) -> Dict[str, str]: self.headers['Content-Type'] = 'application/json' return self.headers - def p(self, path): + def p(self, path: str) -> str: return self.environment['base_url'] + path - def initialize(self, domain, app_key, base_url): + def initialize(self, domain: str, app_key: str, base_url: str) -> 'SbxCore': self.environment['domain'] = domain self.environment['base_url'] = base_url self.environment['app_key'] = app_key self.headers['App-Key'] = app_key return self - def with_model(self, model): + def with_model(self, model: str) -> Find: return Find(model, self) - def query_builder_to_insert(self, data, let_null): + def query_builder_to_insert(self, data: Union[Dict[str, Any], List[Dict[str, Any]]], let_null: bool) -> Qb: query = Qb().set_domain(self.environment['domain']) if isinstance(data, list): for item in data: @@ -450,7 +456,7 @@ def query_builder_to_insert(self, data, let_null): query.add_object(self.validate_data(data, let_null)) return query - def is_update(self, data): + def is_update(self, data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> bool: sw = False if isinstance(data, list): for item in data: @@ -461,9 +467,9 @@ def is_update(self, data): sw = True return sw - def validate_data(self, data, let_null): + def validate_data(self, data: Dict[str, Any], let_null: bool) -> Dict[str, Any]: listkeys = [key for key in data if let_null or data[key] is not None] - temp = {} + temp: Dict[str, Any] = {} for key in listkeys: if data[key] is not None and isinstance(data[key], dict) and '_KEY' in data[key]: data[key] = data[key]['_KEY'] @@ -476,7 +482,7 @@ def validate_data(self, data, let_null): ====================================== ''' - async def login(self, login, password, domain): + async def login(self, login: str, password: str, domain: str) -> JSON: async with aiohttp.ClientSession() as session: params = {'login': login, 'password': password, 'domain': domain} async with session.get(self.p(self.urls['login']), params=params, headers=self.get_headers_json()) as resp: @@ -485,29 +491,29 @@ async def login(self, login, password, domain): self.headers['Authorization'] = 'Bearer ' + data['token'] return data - async def list_domain(self): + async def list_domain(self) -> JSON: async with aiohttp.ClientSession() as session: params = {'domain': self.environment['domain']} async with session.get(self.p(self.urls['domain']), params=params, headers=self.get_headers_json()) as resp: return await resp.json() - async def get_config(self): + async def get_config(self) -> JSON: async with aiohttp.ClientSession() as session: async with session.get(self.p(self.urls['config']), params={}, headers=self.get_headers_json()) as resp: return await resp.json() - async def run(self, key, params, url=None): + async def run(self, key: str, params: Dict[str, Any], url: Optional[str] = None) -> JSON: async with aiohttp.ClientSession() as session: - params = {'key': key, 'params': params} - async with session.post(self.p(self.urls['cloudscript_run'] if url is None else url), json=params, + request_params = {'key': key, 'params': params} + async with session.post(self.p(self.urls['cloudscript_run'] if url is None else url), json=request_params, headers=self.get_headers_json()) as resp: return await resp.json() - async def upsert(self, model, data, let_null=False): + async def upsert(self, model: str, data: Union[Dict[str, Any], List[Dict[str, Any]]], let_null: bool = False) -> JSON: query = self.query_builder_to_insert(data, let_null).set_model(model).compile() return await self.__then(query, self.is_update(data)) - async def __then(self, query_compiled, update): + async def __then(self, query_compiled: JSON, update: bool) -> JSON: async with aiohttp.ClientSession() as session: async with session.post( self.p(self.urls['row'] if not update else self.urls['update']), json=query_compiled, @@ -520,20 +526,21 @@ async def __then(self, query_compiled, update): ====================================== ''' - def loginCallback(self, login, password, domain, call): + def loginCallback(self, login: str, password: str, domain: str, call: Callable[[Exception, JSON], None]) -> None: self.make_callback(self.login(login, password, domain), call) - def upsertCallback(self, model, data, call, let_null=False): + def upsertCallback(self, model: str, data: Union[Dict[str, Any], List[Dict[str, Any]]], + call: Callable[[Exception, JSON], None], let_null: bool = False) -> None: self.make_callback(self.upsert(model, data, let_null), call) - def make_callback(self, coroutine, call): + def make_callback(self, coroutine: Awaitable[T], call: Callable[[Exception, T], None]) -> None: try: if self.loop is None: raise Exception('SbxCore must initialize with manage_loop True') else: # future = asyncio.ensure_future( # coroutine, loop=self.loop) - def callback(fut): + def callback(fut: asyncio.Future) -> None: call(None, fut.result()) # future.add_done_callback(callback) @@ -543,7 +550,7 @@ def callback(fut): except Exception as inst: call(inst, None) - def close_connection(self): + def close_connection(self) -> None: if self.loop is not None: if self.loop.is_running(): asyncio.gather(*asyncio.all_tasks()).cancel()