Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion materializationengine/admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dynamicannotationdb.models import AnalysisTable, AnalysisVersion
from flask_admin import Admin
from flask_admin.contrib.sqla import ModelView
from dynamicannotationdb.models import AnalysisVersion, AnalysisTable


def setup_admin(app, db):
Expand Down
12 changes: 6 additions & 6 deletions materializationengine/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import date, datetime, timedelta

import numpy as np
from dynamicannotationdb.models import Base, AnalysisVersion
from dynamicannotationdb.models import AnalysisVersion, Base
from flask import Blueprint, Flask, current_app, jsonify, redirect, url_for
from flask_cors import CORS
from flask_restx import Api
Expand All @@ -14,17 +14,17 @@
from materializationengine.blueprints.client.api import client_bp
from materializationengine.blueprints.client.api2 import client_bp as client_bp2
from materializationengine.blueprints.materialize.api import mat_bp
from materializationengine.blueprints.upload.api import upload_bp, spatial_lookup_bp
from materializationengine.blueprints.upload.storage import StorageService
from materializationengine.blueprints.upload.api import spatial_lookup_bp, upload_bp
from materializationengine.blueprints.upload.models import init_staging_database
from materializationengine.blueprints.upload.storage import StorageService
from materializationengine.config import config, configure_app
from materializationengine.database import db_manager
from materializationengine.schemas import ma
from materializationengine.utils import get_instance_folder_path
from materializationengine.views import views_bp
from materializationengine.limiter import limiter
from materializationengine.migrate import migrator
from materializationengine.request_db import init_request_db_cleanup
from materializationengine.schemas import ma
from materializationengine.utils import get_instance_folder_path
from materializationengine.views import views_bp

db = SQLAlchemy(model_class=Base)

Expand Down
26 changes: 14 additions & 12 deletions materializationengine/blueprints/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@
from flask import abort, request
from flask_accepts import accepts
from flask_restx import Namespace, Resource, inputs, reqparse
from middle_auth_client import auth_requires_permission

from materializationengine.blueprints.client.common import (
get_analysis_version_and_table,
get_flat_model,
handle_complex_query,
handle_simple_query,
validate_table_args,
)
from materializationengine.blueprints.client.common import (
unhandled_exception as common_unhandled_exception,
)
from materializationengine.blueprints.client.datastack import validate_datastack
from materializationengine.blueprints.client.schemas import (
ComplexQuerySchema,
SimpleQuerySchema,
Expand All @@ -11,24 +24,13 @@
update_notice_text_warnings,
)
from materializationengine.blueprints.reset_auth import reset_auth
from materializationengine.database import dynamic_annotation_cache, db_manager
from materializationengine.database import db_manager, dynamic_annotation_cache
from materializationengine.info_client import (
get_aligned_volumes,
get_relevant_datastack_info,
)
from materializationengine.blueprints.client.common import (
handle_complex_query,
handle_simple_query,
validate_table_args,
get_analysis_version_and_table,
get_flat_model,
unhandled_exception as common_unhandled_exception,
)
from materializationengine.models import MaterializedMetadata
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
from middle_auth_client import auth_requires_permission
from materializationengine.blueprints.client.datastack import validate_datastack


__version__ = "5.12.1"

Expand Down
96 changes: 54 additions & 42 deletions materializationengine/blueprints/client/api2.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import copy
import datetime
import io
import json
import pytz
from dynamicannotationdb.models import AnalysisTable, AnalysisVersion

from cachetools import TTLCache, cached, LRUCache
from functools import wraps
from typing import List

import cloudvolume
import nglui
import numpy as np
import pandas as pd
import pytz
import copy
import werkzeug
from cachetools import LRUCache, TTLCache, cached
from cachetools.keys import hashkey
Expand All @@ -25,6 +23,7 @@
from middle_auth_client import (
auth_requires_permission,
)
from neuroglancer import viewer_state
from sqlalchemy.sql.sqltypes import Boolean, DateTime, Float, Integer, Numeric, String

from materializationengine.blueprints.client.common import (
Expand All @@ -34,32 +33,19 @@
get_analysis_version_and_tables,
handle_complex_query,
handle_simple_query,
sql_query_warning,
validate_table_args,
)
from materializationengine.blueprints.client.common import (
unhandled_exception as common_unhandled_exception,
)
from materializationengine.request_db import request_db_session
import pandas as pd
import numpy as np
from marshmallow import fields as mm_fields
from emannotationschemas.schemas.base import PostGISField
import datetime
from typing import List
import werkzeug
from sqlalchemy.sql.sqltypes import String, Integer, Float, DateTime, Boolean, Numeric
import io
from geoalchemy2.types import Geometry
import nglui
from neuroglancer import viewer_state
import cloudvolume

from materializationengine.blueprints.client.datastack import validate_datastack
from materializationengine.blueprints.client.new_query import (
remap_query,
strip_root_id_filters,
update_rootids,
)
from materializationengine.blueprints.client.precomputed import AnnotationWriter
from materializationengine.blueprints.client.query_manager import QueryManager
from materializationengine.blueprints.client.schemas import (
AnalysisViewSchema,
Expand All @@ -76,19 +62,13 @@
)
from materializationengine.blueprints.reset_auth import reset_auth
from materializationengine.chunkedgraph_gateway import chunkedgraph_cache
from materializationengine.database import (
dynamic_annotation_cache,
db_manager
)
from materializationengine.database import db_manager, dynamic_annotation_cache
from materializationengine.info_client import get_aligned_volumes, get_datastack_info
from materializationengine.limiter import limit_by_category
from materializationengine.models import MaterializedMetadata
from materializationengine.request_db import request_db_session
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
from materializationengine.utils import check_read_permission
from materializationengine.blueprints.client.utils import update_notice_text_warnings
from materializationengine.blueprints.client.utils import after_request
from materializationengine.blueprints.client.precomputed import AnnotationWriter


__version__ = "5.12.1"

Expand Down Expand Up @@ -234,7 +214,17 @@ def __schema__(self):
If True, accept-encoding will determine what \
internal compression is used",
)

query_parser.add_argument(
"direct_sql_pandas",
type=inputs.boolean,
default=False,
required=False,
location="args",
help="whether to use direct SQL queries with pandas, \
if False it will fall back to the csv streaming method. \
which is prone to mangling types. \
CAVEclient>=8.0.0 should set this to True",
)

query_seg_prop_parser = reqparse.RequestParser()
# add an argument for a string controlling the label format
Expand Down Expand Up @@ -352,13 +342,19 @@ def execute_materialized_query(
cg_client,
random_sample: int = None,
split_mode: bool = False,
direct_sql_pandas: bool = False,
) -> pd.DataFrame:
"""_summary_

Args:
datastack (str): datastack to query on
mat_version (int): verison to query on
user_data (dict): dictionary of query payload including filters
query_map (dict): mapping of model column names to dataframe column names
cg_client: chunkedgraph client to use for root id lookups
random_sample (int, optional): number of random samples to get using TABLESAMPLE. Defaults to None.
split_mode (bool, optional): whether to use split mode for the query. Defaults to False.
direct_sql_pandas (bool, optional): whether to use pandas for the query. Defaults to False.

Returns:
pd.DataFrame: a dataframe with the results of the query in the materialized version
Expand Down Expand Up @@ -452,6 +448,7 @@ def execute_materialized_query(
meta_db_name=aligned_volume,
split_mode=split_mode,
random_sample=use_random_sample,
direct_sql_pandas=direct_sql_pandas
)
qm.configure_query(user_data)
qm.apply_filter({user_data["table"]: {"valid": True}}, qm.apply_equal_filter)
Expand All @@ -476,6 +473,8 @@ def execute_materialized_query(
f"result has {len(df)} entries, which is equal or more \
than limit of {user_data['limit']} there may be more results which are not shown"
)
if not direct_sql_pandas:
warnings.append(sql_query_warning)
return df, column_names, warnings
else:
return None, {}, []
Expand All @@ -489,6 +488,7 @@ def execute_production_query(
chosen_timestamp: datetime.datetime,
cg_client,
allow_missing_lookups: bool = False,
direct_sql_pandas: bool = False,
) -> pd.DataFrame:
"""_summary_

Expand All @@ -499,7 +499,9 @@ def execute_production_query(
timestamp_end (datetime.datetime): _description_

Returns:
pd.DataFrame: _description_
pd.DataFrame: dataframe of query
dict: _map of table name to column name mappings
dict: list of warnings
"""
user_timestamp = user_data["timestamp"]
if chosen_timestamp < user_timestamp:
Expand All @@ -515,7 +517,11 @@ def execute_production_query(

# setup a query manager on production database with split tables
qm = QueryManager(
aligned_volume_name, segmentation_source, split_mode=True, split_mode_outer=True
aligned_volume_name,
segmentation_source,
split_mode=True,
split_mode_outer=True,
direct_sql_pandas=direct_sql_pandas
)
user_data_modified = strip_root_id_filters(user_data)

Expand Down Expand Up @@ -1387,7 +1393,7 @@ def get(
version,
target_datastack,
target_version,
{},
{"direct_sql_pandas": True},
data,
convert_desired_resolution=True,
)
Expand All @@ -1399,7 +1405,7 @@ def get(
table_name,
target_datastack,
target_version,
{},
{"direct_sql_pandas": True},
{"desired_resolution": [1, 1, 1]},
convert_desired_resolution=True,
)
Expand Down Expand Up @@ -1471,7 +1477,7 @@ def get(
user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]]

return_vals = assemble_live_query_dataframe(
user_data, datastack_name=datastack_name, args={}
user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True}
)
df, column_names, _, _, _ = return_vals

Expand Down Expand Up @@ -1716,7 +1722,7 @@ def assemble_live_query_dataframe(user_data, datastack_name, args):
cg_client,
allow_invalid_root_ids,
)

direct_sql_pandas = args.get("direct_sql_pandas", False)
mat_df, column_names, mat_warnings = execute_materialized_query(
effective_datastack_name_for_mat_query,
aligned_volume_for_mat_query,
Expand All @@ -1727,6 +1733,7 @@ def assemble_live_query_dataframe(user_data, datastack_name, args):
cg_client,
random_sample=args.get("random_sample", None),
split_mode=not loc_cv_is_merged,
direct_sql_pandas=direct_sql_pandas,
)

prod_df = None
Expand All @@ -1748,6 +1755,7 @@ def assemble_live_query_dataframe(user_data, datastack_name, args):
chosen_timestamp_utc,
cg_client_for_prod,
args.get("allow_missing_lookups", True),
direct_sql_pandas=direct_sql_pandas,
)
if mat_df is None and prod_df is not None:
column_names = column_names_prod
Expand All @@ -1765,7 +1773,8 @@ def __init__(self, naive_timestamp):
final_remap_warnings = remap_warnings if isinstance(remap_warnings, list) else ([remap_warnings] if remap_warnings else [])
final_mat_warnings = mat_warnings if isinstance(mat_warnings, list) else ([mat_warnings] if mat_warnings else [])
final_prod_warnings = prod_warnings if isinstance(prod_warnings, list) else ([prod_warnings] if prod_warnings else [])

if not direct_sql_pandas:
final_mat_warnings.append(sql_query_warning)
# we want to drop columns that the user didn't ask for (mostly supervoxel columns)
filter_column_names = copy.deepcopy(column_names)
if user_data.get('select_columns', None) is not None:
Expand Down Expand Up @@ -1970,7 +1979,7 @@ def get_precomputed_properties_and_relationships(datastack_name, table_name):
user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]]

return_vals = assemble_live_query_dataframe(
user_data, datastack_name=datastack_name, args={}
user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True}
)
df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals

Expand Down Expand Up @@ -2417,7 +2426,7 @@ def select_best_spatial_column(fields, table_name, metadata):


return_vals = assemble_live_query_dataframe(
user_data, datastack_name=datastack_name, args={}
user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True}
)
df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals

Expand Down Expand Up @@ -2465,7 +2474,7 @@ def query_by_id(
user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]]

return_vals = assemble_live_query_dataframe(
user_data, datastack_name=datastack_name, args={}
user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True}
)
df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals

Expand Down Expand Up @@ -2525,7 +2534,7 @@ def live_query_by_relationship(
user_data["join_tables"] = [[table_name, "target_id", ref_table, "id"]]

return_vals = assemble_live_query_dataframe(
user_data, datastack_name=datastack_name, args={}
user_data, datastack_name=datastack_name, args={"direct_sql_pandas": True}
)
df, column_names, mat_warnings, prod_warnings, remap_warnings = return_vals

Expand Down Expand Up @@ -3190,14 +3199,15 @@ def assemble_view_dataframe(datastack_name, version, view_name, data, args):
md["voxel_resolution_z"],
]
data["desired_resolution"] = des_res

direct_sql_pandas = args.get("direct_sql_pandas", False)
qm = QueryManager(
mat_db_name,
segmentation_source=pcg_table_name,
split_mode=False,
limit=limit,
offset=data.get("offset", 0),
get_count=get_count,
direct_sql_pandas=direct_sql_pandas
)
qm.add_view(datastack_name, view_name)
qm.apply_filter(data.get("filter_in_dict", None), qm.apply_isin_filter)
Expand All @@ -3223,6 +3233,8 @@ def assemble_view_dataframe(datastack_name, version, view_name, data, args):
df, column_names = qm.execute_query(desired_resolution=data["desired_resolution"])
df.drop(columns=["deleted", "superceded"], inplace=True, errors="ignore")
warnings = []
if not direct_sql_pandas:
warnings.append(sql_query_warning)
current_app.logger.info("query: {}".format(data))
current_app.logger.info("args: {}".format(args))
user_id = str(g.auth_user["id"])
Expand Down Expand Up @@ -3318,7 +3330,7 @@ def get(
mat_db_name = f"{aligned_volume_name}"

df, column_names, warnings = assemble_view_dataframe(
datastack_name, version, view_name, {}, {}
datastack_name, version, view_name, {}, {"direct_sql_pandas": True}
)

df, tags, bool_tags, numerical, root_id_col = preprocess_view_dataframe(
Expand Down
Loading