From fd3af42d5c712282709e3287b3f8ec7a046b53b2 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 31 Oct 2025 09:32:44 -0700 Subject: [PATCH 1/5] adding direct_sql_query feature --- .../blueprints/client/api2.py | 69 ++- .../blueprints/client/common.py | 23 +- .../blueprints/client/new_query.py | 419 ------------------ .../blueprints/client/query.py | 233 +--------- .../blueprints/client/query_manager.py | 3 + materializationengine/views.py | 2 +- 6 files changed, 92 insertions(+), 657 deletions(-) diff --git a/materializationengine/blueprints/client/api2.py b/materializationengine/blueprints/client/api2.py index ed6b8be1..d64d4dc9 100644 --- a/materializationengine/blueprints/client/api2.py +++ b/materializationengine/blueprints/client/api2.py @@ -234,7 +234,17 @@ def __schema__(self): If True, accept-encoding will determine what \ internal compression is used", ) - +query_parser.add_argument( + "direct_sql_pandas", + type=inputs.boolean, + default=False, + required=False, + location="args", + help="whether to use direct SQL queries with pandas, \ + if False it will fall back to the csv streaming method. \ + which is prone to mangling types. \ + CAVEclient>=8.0.0 should set this to True", +) query_seg_prop_parser = reqparse.RequestParser() # add an argument for a string controlling the label format @@ -352,6 +362,7 @@ def execute_materialized_query( cg_client, random_sample: int = None, split_mode: bool = False, + direct_sql_pandas: bool = False, ) -> pd.DataFrame: """_summary_ @@ -359,6 +370,11 @@ def execute_materialized_query( datastack (str): datastack to query on mat_version (int): verison to query on user_data (dict): dictionary of query payload including filters + query_map (dict): mapping of model column names to dataframe column names + cg_client: chunkedgraph client to use for root id lookups + random_sample (int, optional): number of random samples to get using TABLESAMPLE. Defaults to None. + split_mode (bool, optional): whether to use split mode for the query. Defaults to False. + direct_sql_pandas (bool, optional): whether to use pandas for the query. Defaults to False. Returns: pd.DataFrame: a dataframe with the results of the query in the materialized version @@ -452,6 +468,7 @@ def execute_materialized_query( meta_db_name=aligned_volume, split_mode=split_mode, random_sample=use_random_sample, + direct_sql_pandas=direct_sql_pandas ) qm.configure_query(user_data) qm.apply_filter({user_data["table"]: {"valid": True}}, qm.apply_equal_filter) @@ -476,6 +493,11 @@ def execute_materialized_query( f"result has {len(df)} entries, which is equal or more \ than limit of {user_data['limit']} there may be more results which are not shown" ) + if not direct_sql_pandas: + warnings.append("query was executing using streaming via csv, which can mangle types. \ + Please upgrade to caveclient>8.0.0 to avoid type mangling. \ + because you may have been corrected for mangled types this change is breaking, \ + but should be an improved experience.") return df, column_names, warnings else: return None, {}, [] @@ -489,6 +511,7 @@ def execute_production_query( chosen_timestamp: datetime.datetime, cg_client, allow_missing_lookups: bool = False, + direct_sql_pandas: bool = False, ) -> pd.DataFrame: """_summary_ @@ -499,7 +522,9 @@ def execute_production_query( timestamp_end (datetime.datetime): _description_ Returns: - pd.DataFrame: _description_ + pd.DataFrame: dataframe of query + dict: _map of table name to column name mappings + dict: list of warnings """ user_timestamp = user_data["timestamp"] if chosen_timestamp < user_timestamp: @@ -515,7 +540,11 @@ def execute_production_query( # setup a query manager on production database with split tables qm = QueryManager( - aligned_volume_name, segmentation_source, split_mode=True, split_mode_outer=True + aligned_volume_name, + segmentation_source, + split_mode=True, + split_mode_outer=True, + direct_sql_pandas=direct_sql_pandas ) user_data_modified = strip_root_id_filters(user_data) @@ -1387,7 +1416,7 @@ def get( version, target_datastack, target_version, - {}, + {"direct_sql_pandas": True}, data, convert_desired_resolution=True, ) @@ -1399,7 +1428,7 @@ def get( table_name, target_datastack, target_version, - {}, + {"direct_sql_pandas": True}, {"desired_resolution": [1, 1, 1]}, convert_desired_resolution=True, ) @@ -1471,7 +1500,7 @@ def get( user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]] return_vals = assemble_live_query_dataframe( - user_data, datastack_name=datastack_name, args={} + user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True} ) df, column_names, _, _, _ = return_vals @@ -1716,7 +1745,7 @@ def assemble_live_query_dataframe(user_data, datastack_name, args): cg_client, allow_invalid_root_ids, ) - + direct_sql_pandas = args.get("direct_sql_pandas", False) mat_df, column_names, mat_warnings = execute_materialized_query( effective_datastack_name_for_mat_query, aligned_volume_for_mat_query, @@ -1727,6 +1756,7 @@ def assemble_live_query_dataframe(user_data, datastack_name, args): cg_client, random_sample=args.get("random_sample", None), split_mode=not loc_cv_is_merged, + direct_sql_pandas=direct_sql_pandas, ) prod_df = None @@ -1748,6 +1778,7 @@ def assemble_live_query_dataframe(user_data, datastack_name, args): chosen_timestamp_utc, cg_client_for_prod, args.get("allow_missing_lookups", True), + direct_sql_pandas=direct_sql_pandas, ) if mat_df is None and prod_df is not None: column_names = column_names_prod @@ -1765,7 +1796,11 @@ def __init__(self, naive_timestamp): final_remap_warnings = remap_warnings if isinstance(remap_warnings, list) else ([remap_warnings] if remap_warnings else []) final_mat_warnings = mat_warnings if isinstance(mat_warnings, list) else ([mat_warnings] if mat_warnings else []) final_prod_warnings = prod_warnings if isinstance(prod_warnings, list) else ([prod_warnings] if prod_warnings else []) - + if not direct_sql_pandas: + final_mat_warnings.append("query was executed using streaming via csv, which was mangling types. \ + Please upgrade to caveclient>8.0.0 to avoid type mangling. \ + because you may have been corrected for mangled types this change is breaking, \ + but should be an improved experience.") # we want to drop columns that the user didn't ask for (mostly supervoxel columns) filter_column_names = copy.deepcopy(column_names) if user_data.get('select_columns', None) is not None: @@ -1970,7 +2005,7 @@ def get_precomputed_properties_and_relationships(datastack_name, table_name): user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]] return_vals = assemble_live_query_dataframe( - user_data, datastack_name=datastack_name, args={} + user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True} ) df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals @@ -2417,7 +2452,7 @@ def select_best_spatial_column(fields, table_name, metadata): return_vals = assemble_live_query_dataframe( - user_data, datastack_name=datastack_name, args={} + user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True} ) df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals @@ -2465,7 +2500,7 @@ def query_by_id( user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]] return_vals = assemble_live_query_dataframe( - user_data, datastack_name=datastack_name, args={} + user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True} ) df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals @@ -2525,7 +2560,7 @@ def live_query_by_relationship( user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]] return_vals = assemble_live_query_dataframe( - user_data, datastack_name=datastack_name, args={} + user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True} ) df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals @@ -3190,7 +3225,7 @@ def assemble_view_dataframe(datastack_name, version, view_name, data, args): md["voxel_resolution_z"], ] data["desired_resolution"] = des_res - + direct_sql_pandas = args.get("direct_sql_pandas", False) qm = QueryManager( mat_db_name, segmentation_source=pcg_table_name, @@ -3198,6 +3233,7 @@ def assemble_view_dataframe(datastack_name, version, view_name, data, args): limit=limit, offset=data.get("offset", 0), get_count=get_count, + direct_sql_pandas=direct_sql_pandas ) qm.add_view(datastack_name, view_name) qm.apply_filter(data.get("filter_in_dict", None), qm.apply_isin_filter) @@ -3223,6 +3259,11 @@ def assemble_view_dataframe(datastack_name, version, view_name, data, args): df, column_names = qm.execute_query(desired_resolution=data["desired_resolution"]) df.drop(columns=["deleted", "superceded"], inplace=True, errors="ignore") warnings = [] + if not direct_sql_pandas: + warnings.append("query was executing using streaming via csv, which can mangle types. \ + Please upgrade to caveclient>8.0.0 to avoid type mangling. \ + because you may have been corrected for mangled types this change is breaking, \ + but should be an improved experience.") current_app.logger.info("query: {}".format(data)) current_app.logger.info("args: {}".format(args)) user_id = str(g.auth_user["id"]) @@ -3318,7 +3359,7 @@ def get( mat_db_name = f"{aligned_volume_name}" df, column_names, warnings = assemble_view_dataframe( - datastack_name, version, view_name, {}, {} + datastack_name, version, view_name, {}, {"direct_sql_pandas": True} ) df, tags, bool_tags, numerical, root_id_col = preprocess_view_dataframe( diff --git a/materializationengine/blueprints/client/common.py b/materializationengine/blueprints/client/common.py index 64a20d32..99b54cbd 100644 --- a/materializationengine/blueprints/client/common.py +++ b/materializationengine/blueprints/client/common.py @@ -295,7 +295,7 @@ def generate_simple_query_dataframe( random_sample = None else: random_sample = (100.0 * random_sample) / mat_row_count - + direct_sql_pandas = args.get("direct_sql_pandas", False) qm = QueryManager( mat_db_name, segmentation_source=pcg_table_name, @@ -305,6 +305,7 @@ def generate_simple_query_dataframe( offset=data.get("offset", 0), get_count=get_count, random_sample=random_sample, + direct_sql_pandas=direct_sql_pandas ) qm.add_table(table_name, random_sample=True) qm.apply_filter(data.get("filter_in_dict", None), qm.apply_isin_filter) @@ -339,7 +340,11 @@ def generate_simple_query_dataframe( if len(df) == limit: warnings.append(f'201 - "Limited query to {limit} rows') warnings = update_notice_text_warnings(ann_md, warnings, table_name) - + if not direct_sql_pandas: + warnings.append("query was executing using streaming via csv, which can mangle types. \ + Please upgrade to caveclient>8.0.0 to avoid type mangling. \ + because you may have been corrected for mangled types this change is breaking, \ + but should be an improved experience.") return df, warnings, column_names @@ -381,7 +386,7 @@ def generate_complex_query_dataframe( target_version, args, data, - convert_desired_resolution=False, + convert_desired_resolution=False ): aligned_volume_name, pcg_table_name = get_relevant_datastack_info(datastack_name) db = dynamic_annotation_cache.get_db(aligned_volume_name) @@ -430,7 +435,16 @@ def generate_complex_query_dataframe( suffixes = {t: s for t, s in zip(uniq_tables, suffixes)} else: suffixes = data.get("suffix_map") - + direct_sql_pandas = args.get("direct_sql_pandas", False) + if not direct_sql_pandas: + warn_text = textwrap.dedent( + """\ + Using non-pandas query execution is deprecated + as it can mangle types, + please upgrade caveclient to >=8.0.0 to use pandas + for improved type handling.""" + ) + warnings.append(warn_text) random_sample = args.get("random_sample", None) if random_sample is not None: with db_manager.session_scope(db_name) as session: @@ -454,6 +468,7 @@ def generate_complex_query_dataframe( offset=data.get("offset", 0), get_count=False, random_sample=random_sample, + direct_sql_pandas=direct_sql_pandas, ) if convert_desired_resolution: if not data.get("desired_resolution", None): diff --git a/materializationengine/blueprints/client/new_query.py b/materializationengine/blueprints/client/new_query.py index 56764ce9..c4537ace 100644 --- a/materializationengine/blueprints/client/new_query.py +++ b/materializationengine/blueprints/client/new_query.py @@ -284,422 +284,3 @@ def map_filters( else: new_filters.append(None) return new_filters, id_mapping[query_map_str], warnings - - -# @cached(cache=TTLCache(maxsize=64, ttl=600)) -# def get_relevant_datastack_info(datastack_name): -# ds_info = get_datastack_info(datastack_name=datastack_name) -# seg_source = ds_info["segmentation_source"] -# pcg_table_name = seg_source.split("/")[-1] -# aligned_volume_name = ds_info["aligned_volume"]["name"] -# return aligned_volume_name, pcg_table_name - - -# aligned_volume_name, pcg_table_name = get_relevant_datastack_info( -# datastack_name -# ) -# args = query_parser.parse_args() -# Session = sqlalchemy_cache.get(aligned_volume_name) -# time_d = {} -# now = time.time() - -# timestamp = args["timestamp"] - -# engine = sqlalchemy_cache.get_engine(aligned_volume_name) -# time_d["get engine"] = time.time() - now -# now = time.time() -# max_limit = current_app.config.get("QUERY_LIMIT_SIZE", 200000) - -# data = request.parsed_obj -# time_d["get data"] = time.time() - now -# now = time.time() -# limit = data.get("limit", max_limit) - -# if limit > max_limit: -# limit = max_limit - -# get_count = args.get("count", False) - -# if get_count: -# limit = None - -# logging.info("query {}".format(data)) -# logging.info("args - {}".format(args)) -# Session = sqlalchemy_cache.get(aligned_volume_name) - -# # find the analysis version that we use to support this timestamp -# base_analysis_version = ( -# Session.query(AnalysisVersion) -# .filter(AnalysisVersion.datastack == datastack_name) -# .filter(AnalysisVersion.valid == True) -# .filter(AnalysisVersion.time_stamp < str(timestamp)) -# .order_by(AnalysisVersion.time_stamp.desc()) -# .first() -# ) -# if base_analysis_version is None: -# return "No version available to support that timestamp", 404 - -# timestamp_start = base_analysis_version.time_stamp -# # check if the table is in this timestamp -# analysis_table = ( -# Session.query(AnalysisTable) -# .filter(AnalysisTable.analysisversion_id == base_analysis_version.id) -# .filter(AnalysisTable.valid == True) -# .filter(AnalysisTable.table_name == table_name) -# .first() -# ) -# filter_in_dict = data.get("filter_in_dict", None) -# filter_out_dict = data.get("filter_notin_dict", None) -# filter_equal_dict = data.get("filter_equal_dict", None) -# filter_greater_dict = data.get("filter_greater_dict", None) -# filter_less_dict = data.get("filter_less_dict", None) -# filter_greater_equal_dict = data.get("filter_greater_equal_dict", None) -# filter_less_equal_dict = data.get("filter_less_equal_dict", None) - -# db = dynamic_annotation_cache.get_db(aligned_volume_name) -# table_metadata = db.database.get_table_metadata(table_name) -# cg_client = chunkedgraph_cache.init_pcg(pcg_table_name) -# if analysis_table is None: -# df_mat = None -# else: -# # if it is translate the query parameters to that moment in time - -# version = base_analysis_version.version -# mat_engine = sqlalchemy_cache.get_engine( -# "{}__mat{}".format(datastack_name, version) -# ) -# Model = get_flat_model(datastack_name, table_name, version, Session) -# time_d["get Model"] = time.time() - now -# now = time.time() - -# MatSession = sqlalchemy_cache.get( -# "{}__mat{}".format(datastack_name, version) -# ) - -# past_filters, future_map = map_filters( -# [filter_in_dict, filter_out_dict, filter_equal_dict, filter_greater_dict, filter_less_dict, filter_greater_equal_dict, filter_less_equal_dict], -# timestamp, -# timestamp_start, -# cg_client, -# ) -# past_filter_in_dict, past_filter_out_dict, past_equal_dict = past_filters -# if past_equal_dict is not None: -# # when doing a filter equal in the past -# # we translate it to a filter_in, as 1 ID might -# # be multiple IDs in the past. -# # so we want to update the filter_in dict -# cols = [col for col in past_equal_dict.keys()] -# for col in cols: -# if col.endswith("root_id"): -# if past_filter_in_dict is None: -# past_filter_in_dict = {} -# past_filter_in_dict[col] = past_equal_dict.pop(col) -# if len(past_equal_dict) == 0: -# past_equal_dict = None - -# time_d["get MatSession"] = time.time() - now -# now = time.time() - -# # todo fix reference annotations -# # tables, suffixes = self._resolve_merge_reference( -# # True, table_name, datastack_name, version -# # ) -# # model_dict = {} -# # for table_desc in data["tables"]: -# # table_name = table_desc[0] -# # Model = get_flat_model(datastack_name, table_name, version, Session) -# # model_dict[table_name] = Model -# if table_metadata["reference_table"] is None: -# model_dict = {table_name: Model} -# tables = [table_name] -# suffixes = None -# else: -# ref_table = table_metadata["reference_table"] -# RefModel = get_flat_model(datastack_name, ref_table, version, Session) -# model_dict = {table_name: Model, ref_table: RefModel} -# tables = [[table_name, "target_id"], [ref_table, "id"]] -# suffixes = ["", "_target"] -# print("doing mat part") -# df_mat = specific_query( -# MatSession, -# mat_engine, -# model_dict=model_dict, -# tables=tables, -# filter_in_dict=past_filter_in_dict, -# filter_notin_dict=past_filter_out_dict, -# filter_equal_dict=None, -# filter_greater_dict=None, -# filter_less_dict=None, -# filter_greater_equal_dict=None, -# filter_less_equal_dict=None, -# filter_spatial=data.get("filter_spatial_dict", None), -# select_columns=data.get("select_columns", None), -# consolidate_positions=False, -# offset=data.get("offset", None), -# limit=limit, -# get_count=get_count, -# suffixes=suffixes, -# use_pandas=True, -# ) -# df_mat = _update_rootids(df_mat, timestamp, future_map, cg_client) -# print("done mat part") -# print("doing live query part") -# AnnModel, AnnSegModel, seg_model_valid = get_split_models( -# datastack_name, -# table_name, -# with_crud_columns=True, -# ) - -# time_d["get Model"] = time.time() - now -# now = time.time() -# if table_metadata["reference_table"] is None: -# # get the production database with a timestamp interval query -# f1 = AnnModel.created.between(str(timestamp_start), str(timestamp)) -# f2 = AnnModel.deleted.between(str(timestamp_start), str(timestamp)) -# f = (or_(f1, f2),) -# time_filter_args = [f] -# seg_table = f"{table_name}__{datastack_name}" -# model_dict = {table_name: AnnModel, seg_table: AnnSegModel} -# tables = [[table_name, "id"], [seg_table, "id"]] -# suffixes = ["", "_seg"] -# else: -# ref_table = table_metadata["reference_table"] -# RefModel, RefSegModel, ref_seg_model_valid = get_split_models( -# datastack_name, -# ref_table, -# with_crud_columns=True, -# ) -# ref_seg_table = f"{ref_table}__{datastack_name}" -# seg_table = f"{table_name}__{datastack_name}" -# model_dict = {table_name: AnnModel, ref_table: RefModel} - -# if ref_seg_model_valid: -# model_dict[ref_seg_table] = RefSegModel -# tables.append([ref_seg_table, "id"]) -# if seg_model_valid: -# model_dict[seg_table] = AnnSegModel - -# ann_table.id -# ann_seg_table.id - -# ann_table.target_id -# referenced_table.id -# referenced_seg_table.id - -# suffixes = ["", "_target", "_target_seg"] -# f1 = RefModel.created.between(str(timestamp_start), str(timestamp)) -# f2 = RefModel.deleted.between(str(timestamp_start), str(timestamp)) -# f = (or_(f1, f2),) -# time_filter_args = [f] - -# time_d["setup query"] = time.time() - now -# now = time.time() - -# df_new = specific_query( -# Session, -# engine, -# model_dict=model_dict, -# tables=tables, -# filter_in_dict=_format_filter(data.get("filter_in_dict", None), table_name), -# filter_notin_dict=_format_filter( -# data.get("filter_notin_dict", None), table_name -# ), -# filter_equal_dict=_format_filter(filter_equal_dict, table_name), -# filter_greater_dict=_format_filter(filter_greater_dict, table_name), -# filter_less_dict=_format_filter(filter_less_dict, table_name), -# filter_greater_equal_dict=_format_filter(filter_greater_equal_dict, table_name), -# filter_less_equal_dict=_format_filter(filter_less_equal_dict, table_name), -# filter_spatial=data.get("filter_spatial_dict", None), -# select_columns=data.get("select_columns", None), -# consolidate_positions=not args["split_positions"], -# offset=data.get("offset", None), -# limit=limit, -# get_count=get_count, -# outer_join=False, -# use_pandas=True, -# suffixes=suffixes, -# extra_filter_args=time_filter_args, -# ) -# time_d["execute query"] = time.time() - now -# now = time.time() - -# # update the root_ids for the supervoxel_ids from production -# is_delete = df_new["deleted"] < timestamp -# is_create = (df_new["created"] > timestamp_start.replace(tzinfo=None)) & ( -# (df_new["deleted"] > timestamp) | df_new["deleted"].isna() -# ) - -# root_cols = [c for c in df_new.columns if c.endswith("pt_root_id")] - -# df_create = df_new[is_create] -# for c in root_cols: -# sv_col = c.replace("root_id", "supervoxel_id") -# svids = df_create[sv_col].values -# # not_null = ~svids.isna() -# new_roots = cg_client.get_roots(svids, timestamp=timestamp).astype(np.int64) -# df_create[c] = new_roots -# df_create = df_create.drop(["created", "deleted", "superceded_id"], axis=1) -# time_d["update new roots"] = time.time() - now -# now = time.time() - -# # merge the dataframes -# if df_mat is not None: -# print(df_mat.columns) -# df = df_mat[~df_mat.id.isin(df_new[is_delete].id)] -# if len(df_create) > 0: -# df = pd.concat([df, df_create], ignore_index=True) -# else: -# df = df_create - -# # apply the filters post -# # apply the original filters to remove rows -# # from this result which are not relevant - -# if filter_in_dict is not None: -# for col, val in filter_in_dict.items(): -# if col.endswith("root_id"): -# df = df[df[col].isin(val)] -# if filter_out_dict is not None: -# for col, val in filter_out_dict.items(): -# if col.endswith("root_id"): -# df = df[~df[col].isin(val)] -# if filter_equal_dict is not None: -# for col, val in filter_equal_dict.items(): -# if col.endswith("root_id"): -# df = df[df[col] == val] -# if filter_greater_dict is not None: -# for col, val in filter_greater_dict.items(): -# if col.endswith("root_id"): -# df = df[df[col] > val] -# if filter_less_dict is not None: -# for col, val in filter_less_dict.items(): -# if col.endswith("root_id"): -# df = df[df[col] < val] -# if filter_greater_equal_dict is not None: -# for col, val in filter_greater_equal_dict.items(): -# if col.endswith("root_id"): -# df = df[df[col] >= val] -# if filter_less_equal_dict is not None: -# for col, val in filter_less_equal_dict.items(): -# if col.endswith("root_id"): -# df = df[df[col] <= val] - -# now = time.time() -# headers = None -# warnings = [] -# if len(df) >= max_limit: -# warnings.append(f'201 - "Limited query to {max_limit} rows') -# # if args["return_pandas"] and args["return_pyarrow"]: -# # warnings.append( -# # "return_pandas=true is deprecated and may convert columns with nulls to floats, Please upgrade CAVEclient to >XXX with pip install -U caveclient" -# # ) -# if len(warnings) > 0: -# headers = {"Warning": ". ".join(warnings)} -# if args["return_pyarrow"]: -# context = pa.default_serialization_context() -# serialized = context.serialize(df).to_buffer().to_pybytes() -# time_d["serialize"] = time.time() - now -# logging.info(time_d) -# return Response( -# serialized, headers=headers, mimetype="x-application/pyarrow" -# ) -# else: -# dfjson = df.to_json(orient="records") -# time_d["serialize"] = time.time() - now -# logging.info(time_d) -# response = Response(dfjson, headers=headers, mimetype="application/json") -# return after_request(response) - -# def map_filters( -# [filter_in_dict, filter_out_dict, filter_equal_dict, filter_greater_dict, filter_less_dict, filter_greater_equal_dict, filter_less_equal_dict], -# timestamp, -# timestamp_start, -# cg_client, -# ) -# past_filter_in_dict, past_filter_out_dict, past_equal_dict = past_filters -# def make_materialized_query_manager( -# datastack_name, -# mat_version, -# timestamp=datetime.datetime.utcnow(), -# table=None, -# join_tables=None, -# select_columns=None, -# filter_in_dict=None, -# filter_equal_dict=None, -# filter_greater_dict=None, -# filter_less_dict=None, -# filter_greater_equal_dict=None, -# filter_less_equal_dict=None, -# filter_out_dict=None, -# filter_spatial_dict=None, -# offset=0, -# limit=None, -# suffixes=None, -# joins=None, -# ): -# if table & join_tables: -# raise ValueError("Cannot specify tables and join statement") - -# db_name = "{}__mat{}".format(datastack_name, mat_version.version) - -# qm = QueryManager(db_name=db_name, split_mode=False) -# if table: -# tables = [table] -# else: -# tables = [jt[0][0] for jt in join_tables] -# qm.add_tables(tables) -# if join_tables: -# for join_table in join_tables: -# qm.join_table(*join_table[0]) - -# if select_columns: -# for table_name, columns in select_columns.items(): -# qm.select_column( -# table_name, -# ) -# qm.validate_joins() - - -# def execute_query_manager(qm): -# query = qm.make_query() -# df = _execute_query( -# qm.session, -# qm.engine, -# query=query, -# fix_wkb=False, -# index_col=None, -# get_count=False, -# ) -# return df - - -# def execute_materialized_query(datastack, mat_version, timestamp, user_data): -# qm = make_materialized_query_manager(datastack, mat_version, **user_data) -# df = execute_query_manager(qm) -# df = update_root_ids(df, start_time=mat_version.timestamp, end_time=timestamp) - -# return df - - -# def execute_production_query(datastack, user_data, mat_timestamp, timestamp, matdf): - -# if timestamp > mat_timestamp: -# qm = make_production_query_manager( -# datastack, **user_data, start_time=mat_timestamp, end_time=timestamp -# ) -# else: -# qm = make_production_query_manager( -# datastack, **user_data, start_time=timestamp, end_time=mat_timestamp -# ) -# prod_df = execute_query_manager(qm) -# prod_df = lookup_root_ids(prod_df, timestamp) - - -# def combine_queries(mat_df, prod_df, user_data): -# prod_df = remove_deleted_items(prod_df, prod_df) -# matdf = remove_deleted_items(mat_df, prod_df) -# prod_df = remove_crud_columns(prod_df) - -# comb_df = pd.concat([mat_df, prod_df]) -# comb_df = apply_filters(comb_df, user_data) -# return comb_df diff --git a/materializationengine/blueprints/client/query.py b/materializationengine/blueprints/client/query.py index 31b54525..ef946937 100644 --- a/materializationengine/blueprints/client/query.py +++ b/materializationengine/blueprints/client/query.py @@ -245,218 +245,6 @@ def render_literal_value(self, value, type_): return LiteralCompiler(dialect, statement).process(statement) -def specific_query( - sqlalchemy_session, - engine, - model_dict, - tables, - filter_in_dict=None, - filter_notin_dict=None, - filter_equal_dict=None, - filter_greater_dict=None, - filter_less_dict=None, - filter_greater_equal_dict=None, - filter_less_equal_dict=None, - filter_spatial=None, - select_columns=None, - consolidate_positions=True, - return_wkb=False, - offset=None, - limit=None, - get_count=False, - suffixes=None, -): - """Allows a more narrow query without requiring knowledge about the - underlying data structures - - Parameters - ---------- - tables: list of lists - standard: list of one entry: table_name of table that one wants to - query - join: list of two lists: first entries are table names, second - entries are the columns used for the join - filter_in_dict: dict of dicts - outer layer: keys are table names - inner layer: keys are column names, values are entries to filter by - filter_notin_dict: dict of dicts - inverse to filter_in_dict - filter_equal_dict: dict of dicts - outer layer: keys are table names - inner layer: keys are column names, values are entries to be equal - filter_greater_dict: dict of dicts - outer layer: keys are table names - inner layer: keys are column names, values are entries to be exclusive upper-bound - filter_less_dict: dict of dicts - outer layer: keys are table names - inner layer: keys are column names, values are entries to be exclusive lower-bound - filter_greater_equal_dict: dict of dicts - outer layer: keys are table names - inner layer: keys are column names, values are entries to be inclusive upper-bound - filter_less_equal_dict: dict of dicts - outer layer: keys are table names - inner layer: keys are column names, values are entries to be inclusive lower-bound - filter_spatial: dict of dicts - outer layer: keys are table_namess - inner layer: keys are column names, values are [min,max] as list of lists - e.g. [[0,0,0], [1,1,1]] - select_columns: list of str - consolidate_positions: whether to make the position columns arrays of x,y,z - offset: int - limit: int or None - get_count: bool - suffixes: list of str or None - - Returns - ------- - sqlalchemy query object: - """ - tables = [[table] if not isinstance(table, list) else table for table in tables] - models = [model_dict[table[0]] for table in tables] - - column_lists = [[m.key for m in model.__table__.columns] for model in models] - - col_names, col_counts = np.unique(np.concatenate(column_lists), return_counts=True) - dup_cols = col_names[col_counts > 1] - # if there are duplicate columns we need to redname - if suffixes is None: - suffixes = [DEFAULT_SUFFIX_LIST[i] for i in range(len(models))] - else: - assert len(suffixes) == len(models) - query_args = [] - for model, suffix in zip(models, suffixes): - for column in model.__table__.columns: - if isinstance(column.type, Geometry) and ~return_wkb: - if column.key in dup_cols: - column_args = [ - column.ST_X() - .cast(Integer) - .label(column.key + "_{}_x".format(suffix)), - column.ST_Y() - .cast(Integer) - .label(column.key + "_{}_y".format(suffix)), - column.ST_Z() - .cast(Integer) - .label(column.key + "_{}_z".format(suffix)), - ] - - else: - column_args = [ - column.ST_X().cast(Integer).label(column.key + "_x"), - column.ST_Y().cast(Integer).label(column.key + "_y"), - column.ST_Z().cast(Integer).label(column.key + "_z"), - ] - query_args += column_args - if select_columns is not None and column.key in select_columns: - column_index = select_columns.index(column.key) - select_columns.pop(column_index) - select_columns += column_args - - elif column.key in dup_cols: - if len(suffix) > 0: - suffix = f"_{suffix}" - else: - suffix = "" - query_args.append(column.label(column.key + suffix)) - else: - query_args.append(column) - - if len(tables) == 2: - join_args = ( - model_dict[tables[1][0]], - model_dict[tables[1][0]].__dict__[tables[1][1]] - == model_dict[tables[0][0]].__dict__[tables[0][1]], - ) - elif len(tables) > 2: - raise Exception("Currently, only single joins are supported") - else: - join_args = None - - filter_args = [] - if filter_in_dict is not None: - for filter_table, filter_table_dict in filter_in_dict.items(): - for column_name in filter_table_dict.keys(): - filter_values = filter_table_dict[column_name] - filter_values = np.array(filter_values, dtype="O") - - filter_args.append( - (model_dict[filter_table].__dict__[column_name].in_(filter_values),) - ) - if filter_notin_dict is not None: - for filter_table, filter_table_dict in filter_notin_dict.items(): - for column_name in filter_table_dict.keys(): - filter_values = filter_table_dict[column_name] - filter_values = np.array(filter_values, dtype="O") - filter_args.append( - ( - not_( - model_dict[filter_table] - .__dict__[column_name] - .in_(filter_values) - ), - ) - ) - if filter_equal_dict is not None: - for filter_table, filter_table_dict in filter_equal_dict.items(): - for column_name in filter_table_dict.keys(): - filter_value = filter_table_dict[column_name] - filter_args.append( - (model_dict[filter_table].__dict__[column_name] == filter_value,) - ) - if filter_greater_dict is not None: - for filter_table, filter_table_dict in filter_greater_dict.items(): - for column_name in filter_table_dict.keys(): - filter_value = filter_table_dict[column_name] - filter_args.append( - (model_dict[filter_table].__dict__[column_name] > filter_value,) - ) - if filter_less_dict is not None: - for filter_table, filter_table_dict in filter_less_dict.items(): - for column_name in filter_table_dict.keys(): - filter_value = filter_table_dict[column_name] - filter_args.append( - (model_dict[filter_table].__dict__[column_name] < filter_value,) - ) - if filter_greater_equal_dict is not None: - for filter_table, filter_table_dict in filter_greater_equal_dict.items(): - for column_name in filter_table_dict.keys(): - filter_value = filter_table_dict[column_name] - filter_args.append( - (model_dict[filter_table].__dict__[column_name] >= filter_value,) - ) - if filter_less_equal_dict is not None: - for filter_table, filter_table_dict in filter_less_equal_dict.items(): - for column_name in filter_table_dict.keys(): - filter_value = filter_table_dict[column_name] - filter_args.append( - (model_dict[filter_table].__dict__[column_name] <= filter_value,) - ) - - if filter_spatial is not None: - for filter_table, filter_table_dict in filter_spatial.items(): - for column_name in filter_table_dict.keys(): - bounding_box = filter_table_dict[column_name] - filter = make_spatial_filter(model, column_name, bounding_box) - filter_args.append((filter,)) - - df = _query( - sqlalchemy_session, - engine, - query_args=query_args, - filter_args=filter_args, - join_args=join_args, - select_columns=select_columns, - fix_wkb=~return_wkb, - offset=offset, - limit=limit, - get_count=get_count, - ) - if consolidate_positions: - return concatenate_position_columns(df) - else: - return df - - def read_sql_tmpfile(query, db_engine): with tempfile.TemporaryFile() as tmpfile: copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format( @@ -521,6 +309,7 @@ def _execute_query( n_threads=None, index_col=None, get_count=False, + direct_sql_pandas=False ): """Query the database and make a dataframe out of the results @@ -539,14 +328,18 @@ def _execute_query( count = query.count() df = pd.DataFrame({"count": [count]}) else: + # print(query.statement.compile(engine, compile_kwargs={"literal_binds": True})) - df = read_sql_tmpfile( - query.statement.compile(engine, compile_kwargs={"literal_binds": True}), - engine, - ) - # df = pd.read_sql(query.statement, engine, - # coerce_float=False, index_col=index_col) - + + if direct_sql_pandas: + with engine.connect() as connection: + statement = str(query.statement.compile(engine, compile_kwargs={"literal_binds": True})) + df = pd.read_sql(statement, connection.connection, coerce_float=False, index_col=index_col, dtype_backend='pyarrow') + else: + df = read_sql_tmpfile( + query.statement.compile(engine, compile_kwargs={"literal_binds": True}), + engine, + ) df = fix_columns_with_query( df, query, fix_wkb=fix_wkb, fix_decimal=fix_decimal, n_threads=n_threads ) @@ -566,6 +359,7 @@ def _query( offset=None, limit=None, get_count=False, + direct_sql_pandas=False ): """Wraps make_query and execute_query in one function @@ -604,6 +398,7 @@ def _query( fix_wkb=fix_wkb, index_col=index_col, get_count=get_count, + direct_sql_pandas=direct_sql_pandas ) return df diff --git a/materializationengine/blueprints/client/query_manager.py b/materializationengine/blueprints/client/query_manager.py index 716dda03..5a1d731e 100644 --- a/materializationengine/blueprints/client/query_manager.py +++ b/materializationengine/blueprints/client/query_manager.py @@ -34,6 +34,7 @@ def __init__( get_count: bool = False, split_mode_outer: bool = False, random_sample: float = None, + direct_sql_pandas: bool = False ): self._db = dynamic_annotation_cache.get_db(db_name) if meta_db_name is None: @@ -56,6 +57,7 @@ def __init__( self.limit = limit self.offset = offset self.get_count = get_count + self.direct_sql_pandas = direct_sql_pandas if suffixes is None: suffixes = defaultdict(lambda: None) else: @@ -662,6 +664,7 @@ def execute_query(self, desired_resolution=None): fix_wkb=False, index_col=None, get_count=self.get_count, + direct_sql_pandas=self.direct_sql_pandas ) return df, column_names diff --git a/materializationengine/views.py b/materializationengine/views.py index 44419c51..c17bfd7a 100644 --- a/materializationengine/views.py +++ b/materializationengine/views.py @@ -38,7 +38,6 @@ from materializationengine.blueprints.client.schemas import AnalysisViewSchema from materializationengine.blueprints.reset_auth import reset_auth from materializationengine.celery_init import celery -from materializationengine.blueprints.client.query import specific_query from materializationengine.database import db_manager, dynamic_annotation_cache from materializationengine.blueprints.client.query_manager import QueryManager from materializationengine.request_db import request_db_session @@ -664,6 +663,7 @@ def generic_report(datastack_name, id): segmentation_source=pcg_table_name, meta_db_name=aligned_volume_name, split_mode=not is_merged, + direct_sql_pandas=True ) qm.add_table(table_name) qm.select_all_columns(table_name) From b4ee304f1ed7cb2e696f12c9f806838a8c3179f6 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 31 Oct 2025 16:14:44 -0700 Subject: [PATCH 2/5] Update materializationengine/blueprints/client/common.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- materializationengine/blueprints/client/common.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/materializationengine/blueprints/client/common.py b/materializationengine/blueprints/client/common.py index 99b54cbd..7be7b6cb 100644 --- a/materializationengine/blueprints/client/common.py +++ b/materializationengine/blueprints/client/common.py @@ -341,10 +341,11 @@ def generate_simple_query_dataframe( warnings.append(f'201 - "Limited query to {limit} rows') warnings = update_notice_text_warnings(ann_md, warnings, table_name) if not direct_sql_pandas: - warnings.append("query was executing using streaming via csv, which can mangle types. \ - Please upgrade to caveclient>8.0.0 to avoid type mangling. \ - because you may have been corrected for mangled types this change is breaking, \ - but should be an improved experience.") + warnings.append( + """Query was executed using streaming via CSV, which can mangle types. +Please upgrade to caveclient > 8.0.0 to avoid type mangling. +Because you may have been affected by mangled types, this change is breaking, but it should provide an improved experience.""" + ) return df, warnings, column_names From f890d1555ebc16b78f04206a180b8fd1ae34134d Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 31 Oct 2025 16:20:28 -0700 Subject: [PATCH 3/5] fixing repeated string --- .../blueprints/client/api.py | 26 ++++++----- .../blueprints/client/api2.py | 46 ++++--------------- .../blueprints/client/common.py | 35 +++++++------- .../blueprints/client/query.py | 8 +--- 4 files changed, 43 insertions(+), 72 deletions(-) diff --git a/materializationengine/blueprints/client/api.py b/materializationengine/blueprints/client/api.py index 15ec072f..65255668 100644 --- a/materializationengine/blueprints/client/api.py +++ b/materializationengine/blueprints/client/api.py @@ -2,6 +2,19 @@ from flask import abort, request from flask_accepts import accepts from flask_restx import Namespace, Resource, inputs, reqparse +from middle_auth_client import auth_requires_permission + +from materializationengine.blueprints.client.common import ( + get_analysis_version_and_table, + get_flat_model, + handle_complex_query, + handle_simple_query, + validate_table_args, +) +from materializationengine.blueprints.client.common import ( + unhandled_exception as common_unhandled_exception, +) +from materializationengine.blueprints.client.datastack import validate_datastack from materializationengine.blueprints.client.schemas import ( ComplexQuerySchema, SimpleQuerySchema, @@ -11,24 +24,13 @@ update_notice_text_warnings, ) from materializationengine.blueprints.reset_auth import reset_auth -from materializationengine.database import dynamic_annotation_cache, db_manager +from materializationengine.database import db_manager, dynamic_annotation_cache from materializationengine.info_client import ( get_aligned_volumes, get_relevant_datastack_info, ) -from materializationengine.blueprints.client.common import ( - handle_complex_query, - handle_simple_query, - validate_table_args, - get_analysis_version_and_table, - get_flat_model, - unhandled_exception as common_unhandled_exception, -) from materializationengine.models import MaterializedMetadata from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema -from middle_auth_client import auth_requires_permission -from materializationengine.blueprints.client.datastack import validate_datastack - __version__ = "5.12.1" diff --git a/materializationengine/blueprints/client/api2.py b/materializationengine/blueprints/client/api2.py index d64d4dc9..30d3e6d8 100644 --- a/materializationengine/blueprints/client/api2.py +++ b/materializationengine/blueprints/client/api2.py @@ -1,17 +1,15 @@ +import copy import datetime +import io import json -import pytz -from dynamicannotationdb.models import AnalysisTable, AnalysisVersion - -from cachetools import TTLCache, cached, LRUCache from functools import wraps from typing import List +import cloudvolume import nglui import numpy as np import pandas as pd import pytz -import copy import werkzeug from cachetools import LRUCache, TTLCache, cached from cachetools.keys import hashkey @@ -25,6 +23,7 @@ from middle_auth_client import ( auth_requires_permission, ) +from neuroglancer import viewer_state from sqlalchemy.sql.sqltypes import Boolean, DateTime, Float, Integer, Numeric, String from materializationengine.blueprints.client.common import ( @@ -34,32 +33,19 @@ get_analysis_version_and_tables, handle_complex_query, handle_simple_query, + sql_query_warning, validate_table_args, ) from materializationengine.blueprints.client.common import ( unhandled_exception as common_unhandled_exception, ) -from materializationengine.request_db import request_db_session -import pandas as pd -import numpy as np -from marshmallow import fields as mm_fields -from emannotationschemas.schemas.base import PostGISField -import datetime -from typing import List -import werkzeug -from sqlalchemy.sql.sqltypes import String, Integer, Float, DateTime, Boolean, Numeric -import io -from geoalchemy2.types import Geometry -import nglui -from neuroglancer import viewer_state -import cloudvolume - from materializationengine.blueprints.client.datastack import validate_datastack from materializationengine.blueprints.client.new_query import ( remap_query, strip_root_id_filters, update_rootids, ) +from materializationengine.blueprints.client.precomputed import AnnotationWriter from materializationengine.blueprints.client.query_manager import QueryManager from materializationengine.blueprints.client.schemas import ( AnalysisViewSchema, @@ -76,19 +62,13 @@ ) from materializationengine.blueprints.reset_auth import reset_auth from materializationengine.chunkedgraph_gateway import chunkedgraph_cache -from materializationengine.database import ( - dynamic_annotation_cache, - db_manager -) +from materializationengine.database import db_manager, dynamic_annotation_cache from materializationengine.info_client import get_aligned_volumes, get_datastack_info from materializationengine.limiter import limit_by_category from materializationengine.models import MaterializedMetadata +from materializationengine.request_db import request_db_session from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema from materializationengine.utils import check_read_permission -from materializationengine.blueprints.client.utils import update_notice_text_warnings -from materializationengine.blueprints.client.utils import after_request -from materializationengine.blueprints.client.precomputed import AnnotationWriter - __version__ = "5.12.1" @@ -494,10 +474,7 @@ def execute_materialized_query( than limit of {user_data['limit']} there may be more results which are not shown" ) if not direct_sql_pandas: - warnings.append("query was executing using streaming via csv, which can mangle types. \ - Please upgrade to caveclient>8.0.0 to avoid type mangling. \ - because you may have been corrected for mangled types this change is breaking, \ - but should be an improved experience.") + warnings.append(sql_query_warning) return df, column_names, warnings else: return None, {}, [] @@ -1797,10 +1774,7 @@ def __init__(self, naive_timestamp): final_mat_warnings = mat_warnings if isinstance(mat_warnings, list) else ([mat_warnings] if mat_warnings else []) final_prod_warnings = prod_warnings if isinstance(prod_warnings, list) else ([prod_warnings] if prod_warnings else []) if not direct_sql_pandas: - final_mat_warnings.append("query was executed using streaming via csv, which was mangling types. \ - Please upgrade to caveclient>8.0.0 to avoid type mangling. \ - because you may have been corrected for mangled types this change is breaking, \ - but should be an improved experience.") + final_mat_warnings.append(sql_query_warning) # we want to drop columns that the user didn't ask for (mostly supervoxel columns) filter_column_names = copy.deepcopy(column_names) if user_data.get('select_columns', None) is not None: diff --git a/materializationengine/blueprints/client/common.py b/materializationengine/blueprints/client/common.py index 7be7b6cb..dac9ca14 100644 --- a/materializationengine/blueprints/client/common.py +++ b/materializationengine/blueprints/client/common.py @@ -1,26 +1,29 @@ -from dynamicannotationdb.models import AnalysisVersion, AnalysisTable +import textwrap +import traceback + +import numpy as np from cachetools import LRUCache, cached from cachetools.keys import hashkey -from flask import abort, current_app +from dynamicannotationdb.models import AnalysisTable, AnalysisVersion +from flask import abort, current_app, g, request + from materializationengine.blueprints.client.query_manager import QueryManager from materializationengine.blueprints.client.utils import ( - update_notice_text_warnings, - create_query_response, collect_crud_columns, + create_query_response, + update_notice_text_warnings, ) -from materializationengine.database import dynamic_annotation_cache, db_manager -from materializationengine.models import MaterializedMetadata -from materializationengine.utils import check_read_permission +from materializationengine.database import db_manager, dynamic_annotation_cache from materializationengine.info_client import ( get_relevant_datastack_info, ) -import numpy as np -import textwrap -from flask import g, request -import traceback -from materializationengine.schemas import AnalysisVersionSchema, AnalysisTableSchema - +from materializationengine.models import MaterializedMetadata +from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema +from materializationengine.utils import check_read_permission +sql_query_warning = """Query was executed using streaming via CSV, which can mangle types. +Please upgrade to caveclient > 8.0.0 to avoid type mangling. +Because you may have been affected by mangled types, this change is breaking, but it should provide an improved experience.""" def unhandled_exception(e): status_code = 500 user_ip = str(request.remote_addr) @@ -341,11 +344,7 @@ def generate_simple_query_dataframe( warnings.append(f'201 - "Limited query to {limit} rows') warnings = update_notice_text_warnings(ann_md, warnings, table_name) if not direct_sql_pandas: - warnings.append( - """Query was executed using streaming via CSV, which can mangle types. -Please upgrade to caveclient > 8.0.0 to avoid type mangling. -Because you may have been affected by mangled types, this change is breaking, but it should provide an improved experience.""" - ) + warnings.append(sql_query_warning) return df, warnings, column_names diff --git a/materializationengine/blueprints/client/query.py b/materializationengine/blueprints/client/query.py index ef946937..23f32424 100644 --- a/materializationengine/blueprints/client/query.py +++ b/materializationengine/blueprints/client/query.py @@ -8,16 +8,15 @@ import pandas as pd import shapely from geoalchemy2.elements import WKBElement - from geoalchemy2.shape import to_shape from geoalchemy2.types import Geometry from multiwrapper import multiprocessing_utils as mu from sqlalchemy import func, not_ from sqlalchemy.orm import Query -from sqlalchemy.sql.sqltypes import Boolean, Integer, DateTime -from sqlalchemy.sql.selectable import Alias from sqlalchemy.orm.util import AliasedClass from sqlalchemy.sql.schema import Table +from sqlalchemy.sql.selectable import Alias +from sqlalchemy.sql.sqltypes import Boolean, DateTime, Integer DEFAULT_SUFFIX_LIST = ["x", "y", "z", "xx", "yy", "zz", "xxx", "yyy", "zzz"] @@ -328,9 +327,6 @@ def _execute_query( count = query.count() df = pd.DataFrame({"count": [count]}) else: - - # print(query.statement.compile(engine, compile_kwargs={"literal_binds": True})) - if direct_sql_pandas: with engine.connect() as connection: statement = str(query.statement.compile(engine, compile_kwargs={"literal_binds": True})) From cf7b63c780151ef91ed979ca1c91b8f77d09a049 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Sat, 1 Nov 2025 09:47:06 -0700 Subject: [PATCH 4/5] missed one query string --- materializationengine/blueprints/client/api2.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/materializationengine/blueprints/client/api2.py b/materializationengine/blueprints/client/api2.py index 30d3e6d8..4ca1868a 100644 --- a/materializationengine/blueprints/client/api2.py +++ b/materializationengine/blueprints/client/api2.py @@ -3234,10 +3234,7 @@ def assemble_view_dataframe(datastack_name, version, view_name, data, args): df.drop(columns=["deleted", "superceded"], inplace=True, errors="ignore") warnings = [] if not direct_sql_pandas: - warnings.append("query was executing using streaming via csv, which can mangle types. \ - Please upgrade to caveclient>8.0.0 to avoid type mangling. \ - because you may have been corrected for mangled types this change is breaking, \ - but should be an improved experience.") + warnings.append(sql_query_warning) current_app.logger.info("query: {}".format(data)) current_app.logger.info("args: {}".format(args)) user_id = str(g.auth_user["id"]) From 8c8fb7c3d5ef0b28b339afeb612a96b9242e65f2 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Sat, 1 Nov 2025 09:48:57 -0700 Subject: [PATCH 5/5] ruff formatting fixes --- materializationengine/admin.py | 2 +- materializationengine/app.py | 12 +++--- .../blueprints/client/datastack.py | 4 +- .../blueprints/client/new_query.py | 11 ++--- .../blueprints/client/precomputed.py | 7 ++-- .../blueprints/client/query_manager.py | 26 ++++++------ .../blueprints/client/schemas.py | 8 ++-- .../blueprints/client/utils.py | 9 ++-- .../blueprints/materialize/api.py | 42 ++++++++++--------- .../blueprints/materialize/schemas.py | 2 +- .../blueprints/reset_auth.py | 1 + .../blueprints/upload/api.py | 19 ++++----- .../blueprints/upload/checkpoint_manager.py | 1 - .../blueprints/upload/gcs_processor.py | 9 ++-- .../blueprints/upload/models.py | 10 +++-- .../blueprints/upload/processor.py | 2 +- .../blueprints/upload/schemas.py | 7 ++-- .../blueprints/upload/storage.py | 14 ++++--- .../blueprints/upload/tasks.py | 23 +++++----- materializationengine/celery_init.py | 1 - materializationengine/celery_slack.py | 3 +- materializationengine/chunkedgraph_gateway.py | 5 +-- materializationengine/cloudvolume_gateway.py | 3 +- materializationengine/index_manager.py | 4 +- materializationengine/info_client.py | 2 +- materializationengine/limiter.py | 7 ++-- materializationengine/models.py | 2 +- materializationengine/request_db.py | 4 +- materializationengine/schemas.py | 11 ++++- materializationengine/upsert.py | 3 +- materializationengine/utils.py | 10 +++-- materializationengine/views.py | 6 +-- .../workflows/bulk_upload.py | 11 +++-- materializationengine/workflows/chunking.py | 1 - .../workflows/complete_workflow.py | 9 ++-- .../workflows/create_frozen_database.py | 13 +++--- .../workflows/dummy_workflow.py | 1 + .../workflows/ingest_new_annotations.py | 20 ++++----- .../workflows/periodic_database_removal.py | 5 ++- .../workflows/periodic_materialization.py | 3 +- .../workflows/spatial_lookup.py | 4 +- .../workflows/update_database_workflow.py | 5 ++- .../workflows/update_root_ids.py | 7 ++-- tests/test_bulk_upload.py | 7 ++-- tests/test_create_frozen_database.py | 2 +- tests/test_database.py | 14 ++++--- tests/test_database_removal.py | 7 ++-- tests/test_ingest_new_annotations.py | 6 +-- tests/test_models.py | 1 + tests/test_shared_tasks.py | 10 +++-- tests/test_spatial_lookup.py | 1 + tests/test_update_root_ids.py | 6 +-- 52 files changed, 220 insertions(+), 183 deletions(-) diff --git a/materializationengine/admin.py b/materializationengine/admin.py index b78bccd3..b27392ca 100644 --- a/materializationengine/admin.py +++ b/materializationengine/admin.py @@ -1,6 +1,6 @@ +from dynamicannotationdb.models import AnalysisTable, AnalysisVersion from flask_admin import Admin from flask_admin.contrib.sqla import ModelView -from dynamicannotationdb.models import AnalysisVersion, AnalysisTable def setup_admin(app, db): diff --git a/materializationengine/app.py b/materializationengine/app.py index 05de9b07..0e2ccf32 100644 --- a/materializationengine/app.py +++ b/materializationengine/app.py @@ -3,7 +3,7 @@ from datetime import date, datetime, timedelta import numpy as np -from dynamicannotationdb.models import Base, AnalysisVersion +from dynamicannotationdb.models import AnalysisVersion, Base from flask import Blueprint, Flask, current_app, jsonify, redirect, url_for from flask_cors import CORS from flask_restx import Api @@ -14,17 +14,17 @@ from materializationengine.blueprints.client.api import client_bp from materializationengine.blueprints.client.api2 import client_bp as client_bp2 from materializationengine.blueprints.materialize.api import mat_bp -from materializationengine.blueprints.upload.api import upload_bp, spatial_lookup_bp -from materializationengine.blueprints.upload.storage import StorageService +from materializationengine.blueprints.upload.api import spatial_lookup_bp, upload_bp from materializationengine.blueprints.upload.models import init_staging_database +from materializationengine.blueprints.upload.storage import StorageService from materializationengine.config import config, configure_app from materializationengine.database import db_manager -from materializationengine.schemas import ma -from materializationengine.utils import get_instance_folder_path -from materializationengine.views import views_bp from materializationengine.limiter import limiter from materializationengine.migrate import migrator from materializationengine.request_db import init_request_db_cleanup +from materializationengine.schemas import ma +from materializationengine.utils import get_instance_folder_path +from materializationengine.views import views_bp db = SQLAlchemy(model_class=Base) diff --git a/materializationengine/blueprints/client/datastack.py b/materializationengine/blueprints/client/datastack.py index de1ffc1d..fadb4077 100644 --- a/materializationengine/blueprints/client/datastack.py +++ b/materializationengine/blueprints/client/datastack.py @@ -6,10 +6,10 @@ AnalysisTable, AnalysisVersion, ) -from materializationengine.database import db_manager +from flask import abort +from materializationengine.database import db_manager from materializationengine.info_client import get_relevant_datastack_info -from flask import abort def parse_args(f, *args, **kwargs): diff --git a/materializationengine/blueprints/client/new_query.py b/materializationengine/blueprints/client/new_query.py index c4537ace..611a2009 100644 --- a/materializationengine/blueprints/client/new_query.py +++ b/materializationengine/blueprints/client/new_query.py @@ -1,11 +1,12 @@ import datetime -import numpy as np -from flask import abort -from copy import deepcopy -import pandas as pd import logging -from typing import Iterable from collections import defaultdict +from copy import deepcopy +from typing import Iterable + +import numpy as np +import pandas as pd +from flask import abort def update_rootids( diff --git a/materializationengine/blueprints/client/precomputed.py b/materializationengine/blueprints/client/precomputed.py index afbf97d7..967ec1f9 100644 --- a/materializationengine/blueprints/client/precomputed.py +++ b/materializationengine/blueprints/client/precomputed.py @@ -1,9 +1,10 @@ import numbers -from typing import Literal, NamedTuple, Optional, Union, cast -from collections.abc import Sequence import struct -from neuroglancer import coordinate_space, viewer_state +from collections.abc import Sequence +from typing import Literal, NamedTuple, Optional, Union, cast + import numpy as np +from neuroglancer import coordinate_space, viewer_state class Annotation(NamedTuple): diff --git a/materializationengine/blueprints/client/query_manager.py b/materializationengine/blueprints/client/query_manager.py index 5a1d731e..9ee9e3cf 100644 --- a/materializationengine/blueprints/client/query_manager.py +++ b/materializationengine/blueprints/client/query_manager.py @@ -1,21 +1,23 @@ +import datetime from collections import defaultdict -from materializationengine.database import dynamic_annotation_cache + +import numpy as np from flask import abort, current_app +from geoalchemy2.types import Geometry +from sqlalchemy import func, or_, text +from sqlalchemy.ext.declarative.api import DeclarativeMeta +from sqlalchemy.orm import aliased +from sqlalchemy.sql.expression import tablesample +from sqlalchemy.sql.schema import Table +from sqlalchemy.sql.selectable import Alias +from sqlalchemy.sql.sqltypes import Integer, String + from materializationengine.blueprints.client.query import ( - make_spatial_filter, _execute_query, get_column, + make_spatial_filter, ) -import numpy as np -from geoalchemy2.types import Geometry -from sqlalchemy.sql.sqltypes import Integer, String -from sqlalchemy import or_, func, text -from sqlalchemy.orm import aliased -from sqlalchemy.sql.selectable import Alias -from sqlalchemy.sql.schema import Table -from sqlalchemy.sql.expression import tablesample -from sqlalchemy.ext.declarative.api import DeclarativeMeta -import datetime +from materializationengine.database import dynamic_annotation_cache DEFAULT_SUFFIX_LIST = ["x", "y", "z", "xx", "yy", "zz", "xxx", "yyy", "zzz"] DEFAULT_LIMIT = 250000 diff --git a/materializationengine/blueprints/client/schemas.py b/materializationengine/blueprints/client/schemas.py index c63a9ed0..bf1ab08d 100644 --- a/materializationengine/blueprints/client/schemas.py +++ b/materializationengine/blueprints/client/schemas.py @@ -1,8 +1,10 @@ -from marshmallow import fields, Schema -from marshmallow.validate import Length import datetime -from flask_marshmallow import Marshmallow + from dynamicannotationdb.models import AnalysisView +from flask_marshmallow import Marshmallow +from marshmallow import Schema, fields +from marshmallow.validate import Length + ma = Marshmallow() class Metadata(Schema): diff --git a/materializationengine/blueprints/client/utils.py b/materializationengine/blueprints/client/utils.py index 070195dc..6ffd7e67 100644 --- a/materializationengine/blueprints/client/utils.py +++ b/materializationengine/blueprints/client/utils.py @@ -1,11 +1,12 @@ +from io import BytesIO + import pyarrow as pa -from flask import Response, request, send_file from cloudfiles import compression -from io import BytesIO +from dynamicannotationdb.models import AnalysisVersion +from flask import Response, request, send_file -from materializationengine.info_client import get_datastack_info from materializationengine.database import db_manager -from dynamicannotationdb.models import AnalysisVersion +from materializationengine.info_client import get_datastack_info def collect_crud_columns(column_names): diff --git a/materializationengine/blueprints/materialize/api.py b/materializationengine/blueprints/materialize/api.py index ee2f35a0..259b869d 100644 --- a/materializationengine/blueprints/materialize/api.py +++ b/materializationengine/blueprints/materialize/api.py @@ -1,39 +1,41 @@ import datetime import logging +import os +import subprocess + +import cloudfiles import redis -from dynamicannotationdb.models import AnalysisTable, Base -from flask import abort, current_app, request, jsonify +from dynamicannotationdb.models import AnalysisTable, AnalysisVersion, Base +from flask import abort, current_app, jsonify, request from flask_accepts import accepts -from flask_restx import Namespace, Resource, inputs, reqparse, fields +from flask_restx import Namespace, Resource, fields, inputs, reqparse +from middle_auth_client import ( + auth_requires_admin, + auth_requires_dataset_admin, + auth_requires_permission, +) +from sqlalchemy import MetaData, Table +from sqlalchemy.engine.url import make_url +from sqlalchemy.exc import NoSuchTableError + from materializationengine.blueprints.client.utils import get_latest_version +from materializationengine.blueprints.materialize.schemas import ( + AnnotationIDListSchema, + BadRootsSchema, + VirtualVersionSchema, +) from materializationengine.blueprints.reset_auth import reset_auth from materializationengine.database import ( - dynamic_annotation_cache, db_manager, + dynamic_annotation_cache, ) from materializationengine.info_client import ( get_aligned_volumes, get_datastack_info, get_relevant_datastack_info, ) -from dynamicannotationdb.models import AnalysisVersion from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema -from materializationengine.blueprints.materialize.schemas import BadRootsSchema -from middle_auth_client import auth_requires_admin, auth_requires_permission, auth_requires_dataset_admin -from sqlalchemy import MetaData, Table -from sqlalchemy.engine.url import make_url -from sqlalchemy.exc import NoSuchTableError from materializationengine.utils import check_write_permission -import os -import subprocess -import cloudfiles - - -from materializationengine.blueprints.materialize.schemas import ( - VirtualVersionSchema, - AnnotationIDListSchema, -) - __version__ = "5.12.1" diff --git a/materializationengine/blueprints/materialize/schemas.py b/materializationengine/blueprints/materialize/schemas.py index 74c412e8..b3c03e82 100644 --- a/materializationengine/blueprints/materialize/schemas.py +++ b/materializationengine/blueprints/materialize/schemas.py @@ -1,4 +1,4 @@ -from marshmallow import fields, Schema +from marshmallow import Schema, fields class AnnotationIDListSchema(Schema): diff --git a/materializationengine/blueprints/reset_auth.py b/materializationengine/blueprints/reset_auth.py index e61eb8bf..98801082 100644 --- a/materializationengine/blueprints/reset_auth.py +++ b/materializationengine/blueprints/reset_auth.py @@ -1,4 +1,5 @@ from functools import wraps + import flask diff --git a/materializationengine/blueprints/upload/api.py b/materializationengine/blueprints/upload/api.py index 1a0c8d76..8988269a 100644 --- a/materializationengine/blueprints/upload/api.py +++ b/materializationengine/blueprints/upload/api.py @@ -1,9 +1,9 @@ import datetime import functools -import os -from typing import Any, Dict import json +import os from dataclasses import asdict +from typing import Any, Dict from dynamicannotationdb.models import AnalysisVersion from dynamicannotationdb.schema import DynamicSchemaClient @@ -19,23 +19,24 @@ url_for, ) from flask_restx import Namespace, Resource, inputs, reqparse -from google.cloud import storage from google.api_core import exceptions as google_exceptions +from google.cloud import storage from middle_auth_client import ( + auth_required, auth_requires_admin, auth_requires_permission, - auth_required, ) from redis import StrictRedis +from materializationengine import __version__ from materializationengine.blueprints.reset_auth import reset_auth from materializationengine.blueprints.upload.checkpoint_manager import ( - RedisCheckpointManager, - CHUNK_STATUS_PENDING, - CHUNK_STATUS_PROCESSING, CHUNK_STATUS_COMPLETED, - CHUNK_STATUS_FAILED_RETRYABLE, CHUNK_STATUS_FAILED_PERMANENT, + CHUNK_STATUS_FAILED_RETRYABLE, + CHUNK_STATUS_PENDING, + CHUNK_STATUS_PROCESSING, + RedisCheckpointManager, ) from materializationengine.blueprints.upload.schema_helper import get_schema_types from materializationengine.blueprints.upload.schemas import UploadRequestSchema @@ -51,8 +52,6 @@ from materializationengine.database import db_manager from materializationengine.info_client import get_datastack_info, get_datastacks from materializationengine.utils import get_config_param -from materializationengine import __version__ - authorizations = { "apikey": {"type": "apiKey", "in": "query", "name": "middle_auth_token"} diff --git a/materializationengine/blueprints/upload/checkpoint_manager.py b/materializationengine/blueprints/upload/checkpoint_manager.py index c8326cc1..9a684d58 100644 --- a/materializationengine/blueprints/upload/checkpoint_manager.py +++ b/materializationengine/blueprints/upload/checkpoint_manager.py @@ -12,7 +12,6 @@ from materializationengine.utils import get_config_param - celery_logger = get_task_logger(__name__) diff --git a/materializationengine/blueprints/upload/gcs_processor.py b/materializationengine/blueprints/upload/gcs_processor.py index ec2937c6..78721169 100644 --- a/materializationengine/blueprints/upload/gcs_processor.py +++ b/materializationengine/blueprints/upload/gcs_processor.py @@ -1,9 +1,10 @@ -from google.cloud import storage -import pandas as pd import io -from typing import Iterator, Optional, Callable, Dict, Any +import time from dataclasses import dataclass -import time +from typing import Any, Callable, Dict, Iterator, Optional + +import pandas as pd +from google.cloud import storage # @dataclass # class UploadConfig: diff --git a/materializationengine/blueprints/upload/models.py b/materializationengine/blueprints/upload/models.py index e6409d03..2043f6a6 100644 --- a/materializationengine/blueprints/upload/models.py +++ b/materializationengine/blueprints/upload/models.py @@ -1,10 +1,12 @@ -from sqlalchemy import Column, Integer, String, DateTime, JSON, Text, Enum, ARRAY -from sqlalchemy.ext.declarative import declarative_base +import enum from datetime import datetime, timezone + +from flask import Flask +from sqlalchemy import ARRAY, JSON, Column, DateTime, Enum, Integer, String, Text from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.declarative import declarative_base + from materializationengine.database import dynamic_annotation_cache -import enum -from flask import Flask Base = declarative_base() diff --git a/materializationengine/blueprints/upload/processor.py b/materializationengine/blueprints/upload/processor.py index b1fd86b4..54f5acdc 100644 --- a/materializationengine/blueprints/upload/processor.py +++ b/materializationengine/blueprints/upload/processor.py @@ -3,8 +3,8 @@ from typing import Dict, List, Optional, Set, Tuple import marshmallow as mm -import pandas as pd import numpy as np +import pandas as pd from emannotationschemas import get_schema from emannotationschemas.models import make_model_from_schema from emannotationschemas.schemas.base import ReferenceAnnotation, SpatialPoint diff --git a/materializationengine/blueprints/upload/schemas.py b/materializationengine/blueprints/upload/schemas.py index be30cf85..3984d8ae 100644 --- a/materializationengine/blueprints/upload/schemas.py +++ b/materializationengine/blueprints/upload/schemas.py @@ -1,12 +1,13 @@ +from typing import Any, Dict + from marshmallow import ( + EXCLUDE, Schema, + ValidationError, fields, validates, - ValidationError, - EXCLUDE, validates_schema, ) -from typing import Dict, Any class MetadataSchema(Schema): diff --git a/materializationengine/blueprints/upload/storage.py b/materializationengine/blueprints/upload/storage.py index ed9f7ea4..6a55696f 100644 --- a/materializationengine/blueprints/upload/storage.py +++ b/materializationengine/blueprints/upload/storage.py @@ -1,12 +1,14 @@ -from typing import Dict, Any, Optional, Tuple, Generator, BinaryIO +import json +import time from dataclasses import dataclass -from google.cloud import storage +from datetime import datetime +from typing import Any, BinaryIO, Dict, Generator, Optional, Tuple + +import pandas as pd from google.auth.transport.requests import AuthorizedSession +from google.cloud import storage from google.resumable_media.requests import ResumableUpload -import pandas as pd -import time -import json -from datetime import datetime + @dataclass class StorageConfig: diff --git a/materializationengine/blueprints/upload/tasks.py b/materializationengine/blueprints/upload/tasks.py index 34f5c85c..fc82f9d4 100644 --- a/materializationengine/blueprints/upload/tasks.py +++ b/materializationengine/blueprints/upload/tasks.py @@ -1,19 +1,28 @@ import json import os +import shlex import subprocess from datetime import datetime, timezone from typing import Any, Dict, List -import shlex import pandas as pd from celery import chain from celery.result import AsyncResult from celery.utils.log import get_task_logger +from dynamicannotationdb.key_utils import build_segmentation_table_name from flask import current_app from redis import Redis from sqlalchemy import text -from dynamicannotationdb.key_utils import build_segmentation_table_name +from materializationengine.blueprints.upload.checkpoint_manager import ( + CHUNK_STATUS_COMPLETED, + CHUNK_STATUS_ERROR, + CHUNK_STATUS_FAILED_PERMANENT, + CHUNK_STATUS_FAILED_RETRYABLE, + CHUNK_STATUS_PROCESSING, + CHUNK_STATUS_PROCESSING_SUBTASKS, + RedisCheckpointManager, +) from materializationengine.blueprints.upload.gcs_processor import GCSCsvProcessor from materializationengine.blueprints.upload.processor import SchemaProcessor from materializationengine.celery_init import celery @@ -26,15 +35,7 @@ create_segmentation_model, ) from materializationengine.workflows.spatial_lookup import run_spatial_lookup_workflow -from materializationengine.blueprints.upload.checkpoint_manager import ( - CHUNK_STATUS_COMPLETED, - CHUNK_STATUS_FAILED_PERMANENT, - CHUNK_STATUS_FAILED_RETRYABLE, - CHUNK_STATUS_PROCESSING, - CHUNK_STATUS_PROCESSING_SUBTASKS, - CHUNK_STATUS_ERROR, - RedisCheckpointManager, -) + celery_logger = get_task_logger(__name__) # Redis client for storing job status diff --git a/materializationengine/celery_init.py b/materializationengine/celery_init.py index e33a4b12..a3798404 100644 --- a/materializationengine/celery_init.py +++ b/materializationengine/celery_init.py @@ -1,6 +1,5 @@ from celery import Celery - celery = Celery( include=[ "materializationengine.workflows.ingest_new_annotations", diff --git a/materializationengine/celery_slack.py b/materializationengine/celery_slack.py index 2e23f0ca..ca893cc3 100644 --- a/materializationengine/celery_slack.py +++ b/materializationengine/celery_slack.py @@ -1,6 +1,7 @@ +import os + import requests from celery.utils.log import get_task_logger -import os celery_logger = get_task_logger(__name__) diff --git a/materializationengine/chunkedgraph_gateway.py b/materializationengine/chunkedgraph_gateway.py index dc677afc..352d4d85 100644 --- a/materializationengine/chunkedgraph_gateway.py +++ b/materializationengine/chunkedgraph_gateway.py @@ -1,8 +1,7 @@ -from caveclient.chunkedgraph import ChunkedGraphClient -from caveclient.auth import AuthClient -from caveclient.auth import default_global_server_address import os +from caveclient.auth import AuthClient, default_global_server_address +from caveclient.chunkedgraph import ChunkedGraphClient default_server_address = os.environ.get( "GLOBAL_SERVER_URL", default_global_server_address diff --git a/materializationengine/cloudvolume_gateway.py b/materializationengine/cloudvolume_gateway.py index 56455674..73dc8a48 100644 --- a/materializationengine/cloudvolume_gateway.py +++ b/materializationengine/cloudvolume_gateway.py @@ -1,6 +1,7 @@ -import cloudvolume import os +import cloudvolume + class CloudVolumeGateway: """A class to manage cloudvolume clients and cache them for reuse.""" diff --git a/materializationengine/index_manager.py b/materializationengine/index_manager.py index b986320d..f3b3a6e4 100644 --- a/materializationengine/index_manager.py +++ b/materializationengine/index_manager.py @@ -1,6 +1,6 @@ from geoalchemy2.types import Geometry -from sqlalchemy import engine, MetaData -from sqlalchemy import inspect +from sqlalchemy import MetaData, engine, inspect + class IndexCache: def get_table_indices(self, table_name: str, engine: engine): diff --git a/materializationengine/info_client.py b/materializationengine/info_client.py index 889be3aa..00f64d29 100644 --- a/materializationengine/info_client.py +++ b/materializationengine/info_client.py @@ -5,7 +5,7 @@ from cachetools import LRUCache, TTLCache, cached from caveclient.auth import AuthClient from caveclient.infoservice import InfoServiceClient -from flask import current_app, abort +from flask import abort, current_app from materializationengine.errors import ( AlignedVolumeNotFoundException, diff --git a/materializationengine/limiter.py b/materializationengine/limiter.py index 304c89e1..011ef8ec 100644 --- a/materializationengine/limiter.py +++ b/materializationengine/limiter.py @@ -1,8 +1,9 @@ +import json +import os + +from flask import g from flask_limiter import Limiter from flask_limiter.util import get_remote_address -from flask import g -import os -import json def limit_by_category(category): diff --git a/materializationengine/models.py b/materializationengine/models.py index f2807df7..4b4e7d9d 100644 --- a/materializationengine/models.py +++ b/materializationengine/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, Integer, DateTime +from sqlalchemy import Column, DateTime, Integer, String from sqlalchemy.ext.declarative import declarative_base MatBase = declarative_base() diff --git a/materializationengine/request_db.py b/materializationengine/request_db.py index 8024e624..8f2d7878 100644 --- a/materializationengine/request_db.py +++ b/materializationengine/request_db.py @@ -8,7 +8,9 @@ import logging from contextlib import contextmanager -from flask import g, current_app + +from flask import current_app, g + from materializationengine.database import dynamic_annotation_cache logger = logging.getLogger(__name__) diff --git a/materializationengine/schemas.py b/materializationengine/schemas.py index df51c64f..7a843ee6 100644 --- a/materializationengine/schemas.py +++ b/materializationengine/schemas.py @@ -1,11 +1,18 @@ from dynamicannotationdb.models import ( AnalysisTable, AnalysisVersion, - VersionErrorTable, AnalysisView, + VersionErrorTable, ) from flask_marshmallow import Marshmallow -from marshmallow import Schema, fields, validate, ValidationError, validates_schema, post_load +from marshmallow import ( + Schema, + ValidationError, + fields, + post_load, + validate, + validates_schema, +) from marshmallow_sqlalchemy import SQLAlchemyAutoSchema ma = Marshmallow() diff --git a/materializationengine/upsert.py b/materializationengine/upsert.py index 205b277e..9369d455 100644 --- a/materializationengine/upsert.py +++ b/materializationengine/upsert.py @@ -1,6 +1,7 @@ +from functools import partial from itertools import groupby, islice, repeat, takewhile from operator import itemgetter -from functools import partial + from sqlalchemy.exc import IntegrityError, SQLAlchemyError diff --git a/materializationengine/utils.py b/materializationengine/utils.py index c2e448a6..1186d432 100644 --- a/materializationengine/utils.py +++ b/materializationengine/utils.py @@ -1,11 +1,13 @@ import os +from typing import Any + +from cachetools import LRUCache, TTLCache, cached +from celery.utils.log import get_task_logger from dynamicannotationdb.schema import DynamicSchemaClient +from flask import abort, current_app, g from geoalchemy2.shape import to_shape -from flask import current_app, abort, g from middle_auth_client.decorators import users_share_common_group -from celery.utils.log import get_task_logger -from typing import Any -from cachetools import TTLCache, cached, LRUCache + celery_logger = get_task_logger(__name__) diff --git a/materializationengine/views.py b/materializationengine/views.py index c17bfd7a..243cae79 100644 --- a/materializationengine/views.py +++ b/materializationengine/views.py @@ -39,15 +39,12 @@ from materializationengine.blueprints.reset_auth import reset_auth from materializationengine.celery_init import celery from materializationengine.database import db_manager, dynamic_annotation_cache -from materializationengine.blueprints.client.query_manager import QueryManager -from materializationengine.request_db import request_db_session -from materializationengine.blueprints.client.datastack import validate_datastack - from materializationengine.info_client import ( get_datastack_info, get_datastacks, get_relevant_datastack_info, ) +from materializationengine.request_db import request_db_session from materializationengine.schemas import ( AnalysisTableSchema, AnalysisVersionSchema, @@ -56,7 +53,6 @@ ) from materializationengine.utils import check_read_permission, get_config_param - __version__ = "5.12.1" views_bp = Blueprint("views", __name__, url_prefix="/materialize/views") diff --git a/materializationengine/workflows/bulk_upload.py b/materializationengine/workflows/bulk_upload.py index 6a987bcd..a7713336 100644 --- a/materializationengine/workflows/bulk_upload.py +++ b/materializationengine/workflows/bulk_upload.py @@ -1,26 +1,25 @@ import datetime -from typing import List -import time import json +import time +from typing import List import gcsfs import numpy as np import pandas as pd -from celery import chain, group, chord +from celery import chain, chord, group from celery.utils.log import get_task_logger from dynamicannotationdb.models import AnnoMetadata, SegmentationMetadata - from dynamicannotationdb.schema import DynamicSchemaClient + from materializationengine.celery_init import celery from materializationengine.database import db_manager from materializationengine.index_manager import index_cache -from materializationengine.shared_tasks import fin, add_index +from materializationengine.shared_tasks import add_index, fin from materializationengine.utils import ( create_annotation_model, create_segmentation_model, ) - celery_logger = get_task_logger(__name__) diff --git a/materializationengine/workflows/chunking.py b/materializationengine/workflows/chunking.py index e5aef510..fef08a7c 100644 --- a/materializationengine/workflows/chunking.py +++ b/materializationengine/workflows/chunking.py @@ -7,7 +7,6 @@ create_annotation_model, ) - celery_logger = get_task_logger(__name__) diff --git a/materializationengine/workflows/complete_workflow.py b/materializationengine/workflows/complete_workflow.py index 055f6ac8..7ecb8ff2 100644 --- a/materializationengine/workflows/complete_workflow.py +++ b/materializationengine/workflows/complete_workflow.py @@ -2,28 +2,29 @@ from celery import chain, chord from celery.utils.log import get_task_logger + from materializationengine.celery_init import celery from materializationengine.shared_tasks import ( fin, get_materialization_info, - workflow_complete, monitor_workflow_state, + workflow_complete, workflow_failed, ) +from materializationengine.task import LockedTask from materializationengine.workflows.create_frozen_database import ( check_tables, + clean_split_table_workflow, create_materialized_database_workflow, create_new_version, format_materialization_database_workflow, rebuild_reference_tables, set_version_status, - clean_split_table_workflow, ) from materializationengine.workflows.ingest_new_annotations import ( + find_missing_root_ids_workflow, ingest_new_annotations_workflow, - find_missing_root_ids_workflow ) -from materializationengine.task import LockedTask from materializationengine.workflows.update_root_ids import ( update_root_ids_workflow, ) diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index 837b7075..37869e3f 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -21,11 +21,17 @@ make_flat_model, make_reference_annotation_model, ) +from psycopg2 import sql +from sqlalchemy import MetaData, create_engine, func +from sqlalchemy.engine import reflection +from sqlalchemy.engine.url import make_url +from sqlalchemy.exc import OperationalError + from materializationengine.blueprints.materialize.api import get_datastack_info from materializationengine.celery_init import celery from materializationengine.database import ( - dynamic_annotation_cache, db_manager, + dynamic_annotation_cache, ) from materializationengine.errors import IndexMatchError from materializationengine.index_manager import index_cache @@ -40,11 +46,6 @@ create_segmentation_model, get_config_param, ) -from psycopg2 import sql -from sqlalchemy import MetaData, create_engine, func -from sqlalchemy.engine import reflection -from sqlalchemy.engine.url import make_url -from sqlalchemy.exc import OperationalError celery_logger = get_task_logger(__name__) diff --git a/materializationengine/workflows/dummy_workflow.py b/materializationengine/workflows/dummy_workflow.py index f7459bfa..9fbf0683 100644 --- a/materializationengine/workflows/dummy_workflow.py +++ b/materializationengine/workflows/dummy_workflow.py @@ -2,6 +2,7 @@ from celery import chain, chord from celery.utils.log import get_task_logger + from materializationengine.celery_init import celery from materializationengine.shared_tasks import fin diff --git a/materializationengine/workflows/ingest_new_annotations.py b/materializationengine/workflows/ingest_new_annotations.py index e45ad4ad..fcfdc07b 100644 --- a/materializationengine/workflows/ingest_new_annotations.py +++ b/materializationengine/workflows/ingest_new_annotations.py @@ -8,30 +8,30 @@ from celery import chain, chord, group from celery.utils.log import get_task_logger from dynamicannotationdb.models import SegmentationMetadata +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.sql import func, or_, text + from materializationengine.celery_init import celery from materializationengine.chunkedgraph_gateway import chunkedgraph_cache -from materializationengine.database import dynamic_annotation_cache, db_manager -from materializationengine.throttle import throttle_celery +from materializationengine.database import db_manager, dynamic_annotation_cache from materializationengine.shared_tasks import ( - generate_chunked_model_ids, - fin, - query_id_range, create_chunks, - update_metadata, + fin, + generate_chunked_model_ids, get_materialization_info, - monitor_workflow_state, monitor_task_states, + monitor_workflow_state, + query_id_range, + update_metadata, workflow_complete, ) +from materializationengine.throttle import throttle_celery from materializationengine.utils import ( create_annotation_model, create_segmentation_model, get_geom_from_wkb, get_query_columns_by_suffix, ) -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.sql import or_ -from sqlalchemy.sql import func, text celery_logger = get_task_logger(__name__) diff --git a/materializationengine/workflows/periodic_database_removal.py b/materializationengine/workflows/periodic_database_removal.py index 8e5af9f4..e0b9df16 100644 --- a/materializationengine/workflows/periodic_database_removal.py +++ b/materializationengine/workflows/periodic_database_removal.py @@ -7,12 +7,13 @@ from celery.utils.log import get_task_logger from dynamicannotationdb.models import AnalysisVersion +from sqlalchemy import create_engine +from sqlalchemy.engine.url import make_url + from materializationengine.celery_init import celery from materializationengine.database import db_manager from materializationengine.info_client import get_aligned_volumes, get_datastack_info from materializationengine.utils import get_config_param -from sqlalchemy import create_engine -from sqlalchemy.engine.url import make_url celery_logger = get_task_logger(__name__) diff --git a/materializationengine/workflows/periodic_materialization.py b/materializationengine/workflows/periodic_materialization.py index ffc14c92..0a919025 100644 --- a/materializationengine/workflows/periodic_materialization.py +++ b/materializationengine/workflows/periodic_materialization.py @@ -6,10 +6,11 @@ from typing import List from celery.utils.log import get_task_logger +from dynamicannotationdb.models import AnalysisVersion + from materializationengine.blueprints.materialize.api import get_datastack_info from materializationengine.celery_init import celery from materializationengine.database import db_manager -from dynamicannotationdb.models import AnalysisVersion from materializationengine.shared_tasks import check_if_task_is_running from materializationengine.utils import get_config_param from materializationengine.workflows.complete_workflow import run_complete_workflow diff --git a/materializationengine/workflows/spatial_lookup.py b/materializationengine/workflows/spatial_lookup.py index 0b3015d4..e6ef7143 100644 --- a/materializationengine/workflows/spatial_lookup.py +++ b/materializationengine/workflows/spatial_lookup.py @@ -1,6 +1,6 @@ import datetime import time -from typing import Dict, List, Any +from typing import Any, Dict, List import numpy as np import pandas as pd @@ -22,11 +22,11 @@ from materializationengine.blueprints.upload.checkpoint_manager import ( CHUNK_STATUS_COMPLETED, + CHUNK_STATUS_ERROR, CHUNK_STATUS_FAILED_PERMANENT, CHUNK_STATUS_FAILED_RETRYABLE, CHUNK_STATUS_PROCESSING, CHUNK_STATUS_PROCESSING_SUBTASKS, - CHUNK_STATUS_ERROR, RedisCheckpointManager, ) from materializationengine.celery_init import celery diff --git a/materializationengine/workflows/update_database_workflow.py b/materializationengine/workflows/update_database_workflow.py index 19eba84a..759057bc 100644 --- a/materializationengine/workflows/update_database_workflow.py +++ b/materializationengine/workflows/update_database_workflow.py @@ -4,19 +4,20 @@ from celery import chain from celery.utils.log import get_task_logger + from materializationengine.blueprints.materialize.api import get_datastack_info from materializationengine.celery_init import celery from materializationengine.shared_tasks import ( + fin, get_materialization_info, monitor_workflow_state, workflow_complete, - fin, ) from materializationengine.task import LockedTask from materializationengine.utils import get_config_param from materializationengine.workflows.ingest_new_annotations import ( - ingest_new_annotations_workflow, find_missing_root_ids_workflow, + ingest_new_annotations_workflow, ) from materializationengine.workflows.update_root_ids import ( update_root_ids_workflow, diff --git a/materializationengine/workflows/update_root_ids.py b/materializationengine/workflows/update_root_ids.py index aa497278..b610bd9e 100644 --- a/materializationengine/workflows/update_root_ids.py +++ b/materializationengine/workflows/update_root_ids.py @@ -5,22 +5,23 @@ import pandas as pd from celery import chain, chord, group from celery.utils.log import get_task_logger +from requests import HTTPError +from sqlalchemy.sql import or_, text + from materializationengine.celery_init import celery from materializationengine.chunkedgraph_gateway import chunkedgraph_cache from materializationengine.database import db_manager from materializationengine.shared_tasks import ( fin, + generate_chunked_model_ids, get_materialization_info, monitor_task_states, monitor_workflow_state, update_metadata, workflow_complete, - generate_chunked_model_ids, ) from materializationengine.throttle import throttle_celery from materializationengine.utils import create_segmentation_model -from requests import HTTPError -from sqlalchemy.sql import or_, text celery_logger = get_task_logger(__name__) diff --git a/tests/test_bulk_upload.py b/tests/test_bulk_upload.py index 29d9eb33..b5c6ff44 100644 --- a/tests/test_bulk_upload.py +++ b/tests/test_bulk_upload.py @@ -1,8 +1,9 @@ -from materializationengine.workflows.bulk_upload import get_gcs_file_info -from unittest import mock -import logging import datetime +import logging import time +from unittest import mock + +from materializationengine.workflows.bulk_upload import get_gcs_file_info class TestBulkUpload: diff --git a/tests/test_create_frozen_database.py b/tests/test_create_frozen_database.py index 49736d2c..0753a5fe 100644 --- a/tests/test_create_frozen_database.py +++ b/tests/test_create_frozen_database.py @@ -1,6 +1,7 @@ import datetime import logging +from materializationengine.shared_tasks import get_materialization_info from materializationengine.workflows.create_frozen_database import ( add_indices, check_tables, @@ -11,7 +12,6 @@ merge_tables, update_table_metadata, ) -from materializationengine.shared_tasks import get_materialization_info datastack_info = { "datastack": "test_aligned_volume", diff --git a/tests/test_database.py b/tests/test_database.py index 60cf0074..d3f332e1 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -1,15 +1,17 @@ +import logging + +import pytest +from dynamicannotationdb import DynamicAnnotationInterface +from sqlalchemy.engine.base import Engine +from sqlalchemy.orm import Session + from materializationengine.database import ( + db_manager, dynamic_annotation_cache, get_sql_url_params, ping_connection, reflect_tables, - db_manager, ) -from sqlalchemy.orm import Session -from sqlalchemy.engine.base import Engine -from dynamicannotationdb import DynamicAnnotationInterface -import logging -import pytest class TestDatabaseUtils: diff --git a/tests/test_database_removal.py b/tests/test_database_removal.py index 8ca86042..84a3f314 100644 --- a/tests/test_database_removal.py +++ b/tests/test_database_removal.py @@ -1,12 +1,13 @@ import datetime -import pytest from unittest import mock +import pytest +from dynamicannotationdb.models import AnalysisVersion + +from materializationengine.database import db_manager from materializationengine.workflows.periodic_database_removal import ( remove_expired_databases, ) -from materializationengine.database import db_manager -from dynamicannotationdb.models import AnalysisVersion class TestPeriodicDatabaseRemoval: diff --git a/tests/test_ingest_new_annotations.py b/tests/test_ingest_new_annotations.py index 7a1478aa..bb27c77d 100644 --- a/tests/test_ingest_new_annotations.py +++ b/tests/test_ingest_new_annotations.py @@ -1,13 +1,15 @@ # Mock pychunkgraph imports and cloudvolume before importing # the tasks -import sys import logging +import sys from unittest import mock sys.modules["materializationengine.chunkedgraph_gateway"] = mock.MagicMock() sys.modules["cloudvolume"] = mock.MagicMock() import numpy as np +from numpy import nan + from materializationengine.workflows.ingest_new_annotations import ( create_missing_segmentation_table, get_annotations_with_missing_supervoxel_ids, @@ -16,8 +18,6 @@ get_sql_supervoxel_ids_chunks, insert_segmentation_data, ) -from numpy import nan - missing_segmentation_data = { "post_pt_supervoxel_id": [nan], diff --git a/tests/test_models.py b/tests/test_models.py index 1eae3d68..7583c515 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,5 +1,6 @@ from dynamicannotationdb.models import AnalysisTable, AnalysisVersion + class TestModels: def test_analysis_version(self, mat_metadata): analysisversion = AnalysisVersion( diff --git a/tests/test_shared_tasks.py b/tests/test_shared_tasks.py index df2aef46..7a42183a 100644 --- a/tests/test_shared_tasks.py +++ b/tests/test_shared_tasks.py @@ -1,18 +1,20 @@ import datetime import logging + from dynamicannotationdb.models import AnalysisVersion +from emannotationschemas.models import make_annotation_model + +from materializationengine.index_manager import IndexCache from materializationengine.shared_tasks import ( - generate_chunked_model_ids, + add_index, chunk_ids, collect_data, fin, + generate_chunked_model_ids, get_materialization_info, query_id_range, - add_index, update_metadata, ) -from materializationengine.index_manager import IndexCache -from emannotationschemas.models import make_annotation_model index_client = IndexCache() diff --git a/tests/test_spatial_lookup.py b/tests/test_spatial_lookup.py index 8da3b493..606fdf4b 100644 --- a/tests/test_spatial_lookup.py +++ b/tests/test_spatial_lookup.py @@ -1,6 +1,7 @@ import numpy as np import pandas as pd + class TestDataFrameMerging: """Test class for dataframe merging operations""" diff --git a/tests/test_update_root_ids.py b/tests/test_update_root_ids.py index 371d7788..0845641e 100644 --- a/tests/test_update_root_ids.py +++ b/tests/test_update_root_ids.py @@ -1,14 +1,14 @@ -from unittest.mock import MagicMock import sys +from unittest.mock import MagicMock sys.modules["materializationengine.chunkedgraph_gateway"] = MagicMock() +import logging + from materializationengine.workflows.update_root_ids import ( get_expired_root_ids_from_pcg, get_new_root_ids, get_supervoxel_id_queries, ) -import logging - mocked_expired_root_id_data = [ [20000000, 20000001],