From 715c0bea5b14836ce2ed29993d05f376e79609e1 Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Wed, 23 Nov 2022 15:47:37 -0800 Subject: [PATCH 01/11] feat: optional keep 'created' time column --- materializationengine/config.py | 2 +- materializationengine/shared_tasks.py | 1 + materializationengine/workflows/create_frozen_database.py | 7 ++++++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/materializationengine/config.py b/materializationengine/config.py index e18bbcb2..df741a2e 100644 --- a/materializationengine/config.py +++ b/materializationengine/config.py @@ -48,7 +48,7 @@ class BaseConfig: AUTH_TOKEN = json.load(f)["token"] else: AUTH_TOKEN = "" - + KEEP_CREATED_COLUMN = False BEAT_SCHEDULES = [ { "name": "Materialized Database Daily (2 Days)", diff --git a/materializationengine/shared_tasks.py b/materializationengine/shared_tasks.py index 3a1529fc..6b8ff323 100644 --- a/materializationengine/shared_tasks.py +++ b/materializationengine/shared_tasks.py @@ -196,6 +196,7 @@ def get_materialization_info( "reference_table": reference_table, "materialization_time_stamp": str(materialization_time_stamp), "table_count": len(annotation_tables), + "keep_created_column": get_config_param("KEEP_CREATED_COLUMN") } has_segmentation_table = db.schema.is_segmentation_table_required(schema) if has_segmentation_table: diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index a5718b82..29125613 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -651,7 +651,12 @@ def merge_tables(self, mat_metadata: dict): mat_metadata, with_crud_columns=True ) SegmentationModel = create_segmentation_model(mat_metadata) - crud_columns = ["created", "deleted", "superceded_id"] + + keep_created = mat_metadata["keep_created_column"] + crud_columns = ["deleted", "superceded_id"] + if keep_created: + crud_columns.append("created") + query_columns = { col.name: col for col in AnnotationModel.__table__.columns From aee0e57a5200d6efafb3d58f4e9af3371129fd8d Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Wed, 23 Nov 2022 15:52:09 -0800 Subject: [PATCH 02/11] fix: pop 'created' if False --- materializationengine/workflows/create_frozen_database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index 29125613..637b8bc5 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -653,9 +653,9 @@ def merge_tables(self, mat_metadata: dict): SegmentationModel = create_segmentation_model(mat_metadata) keep_created = mat_metadata["keep_created_column"] - crud_columns = ["deleted", "superceded_id"] + crud_columns = ["created", "deleted", "superceded_id"] # crud cols to drop if keep_created: - crud_columns.append("created") + crud_columns.pop(0) # we want to keep the 'created' col query_columns = { col.name: col From 65f1d40f5e67bb2cac36abdfcc78d1263d9516ab Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Wed, 23 Nov 2022 15:56:47 -0800 Subject: [PATCH 03/11] test: add 'keep_created_column' to assert --- tests/test_shared_tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_shared_tasks.py b/tests/test_shared_tasks.py index 8464364c..c33dba6f 100644 --- a/tests/test_shared_tasks.py +++ b/tests/test_shared_tasks.py @@ -63,6 +63,7 @@ def test_get_materialization_info(self): "chunk_size": 2, "table_count": 1, "find_all_expired_roots": False, + 'keep_created_column': False, "analysis_version": 1, "analysis_database": "test_datastack__mat1", "queue_length_limit": 10000, From c6d7c95a95a12c7f3de6a6b2991f5a0bfb835c9c Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Wed, 23 Nov 2022 16:00:14 -0800 Subject: [PATCH 04/11] fix: return None if not set --- materializationengine/workflows/create_frozen_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index 637b8bc5..478026cd 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -652,7 +652,7 @@ def merge_tables(self, mat_metadata: dict): ) SegmentationModel = create_segmentation_model(mat_metadata) - keep_created = mat_metadata["keep_created_column"] + keep_created = mat_metadata.get("keep_created_column") crud_columns = ["created", "deleted", "superceded_id"] # crud cols to drop if keep_created: crud_columns.pop(0) # we want to keep the 'created' col From 09c599be38a03e7f595ba43529bdc699af64caef Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Tue, 3 Jan 2023 11:30:25 -0800 Subject: [PATCH 05/11] feat: add seg source and is merged cols --- materializationengine/models.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/materializationengine/models.py b/materializationengine/models.py index f2807df7..3c1cb66c 100644 --- a/materializationengine/models.py +++ b/materializationengine/models.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, Integer, DateTime +from sqlalchemy import Column, String, Integer, DateTime, Boolean from sqlalchemy.ext.declarative import declarative_base MatBase = declarative_base() @@ -11,3 +11,6 @@ class MaterializedMetadata(MatBase): table_name = Column(String(100), nullable=False) row_count = Column(Integer, nullable=False) materialized_timestamp = Column(DateTime, nullable=False) + segmentation_source = Column(String(255), nullable=True) + is_merged = Column(Boolean, nullable=True) + From a7e2eb469fa67d2a0146cac79a978188d73ccf56 Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Tue, 3 Jan 2023 11:30:47 -0800 Subject: [PATCH 06/11] feat: add has_created_ts col --- materializationengine/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/materializationengine/models.py b/materializationengine/models.py index 3c1cb66c..8aad9669 100644 --- a/materializationengine/models.py +++ b/materializationengine/models.py @@ -13,4 +13,4 @@ class MaterializedMetadata(MatBase): materialized_timestamp = Column(DateTime, nullable=False) segmentation_source = Column(String(255), nullable=True) is_merged = Column(Boolean, nullable=True) - + has_created_ts = Column(Boolean, nullable=True) From 0184b603a204309866f24bf0812a2fe7b96dbd6d Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Tue, 3 Jan 2023 11:31:05 -0800 Subject: [PATCH 07/11] fix: use model from mat engine --- materializationengine/views.py | 2 +- materializationengine/workflows/create_frozen_database.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/materializationengine/views.py b/materializationengine/views.py index 5146e167..6e8a7dfe 100644 --- a/materializationengine/views.py +++ b/materializationengine/views.py @@ -11,7 +11,6 @@ AnalysisVersion, VersionErrorTable, AnnoMetadata, - MaterializedMetadata, ) from dynamicannotationdb.schema import DynamicSchemaClient from flask import ( @@ -32,6 +31,7 @@ from materializationengine.celery_init import celery from materializationengine.blueprints.client.query import specific_query from materializationengine.database import sqlalchemy_cache, dynamic_annotation_cache +from materializationengine.models import MaterializedMetadata from materializationengine.info_client import ( get_datastack_info, get_datastacks, diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index 478026cd..634782e5 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -12,7 +12,6 @@ AnalysisVersion, AnnoMetadata, Base, - MaterializedMetadata, ) from emannotationschemas import get_schema from emannotationschemas.flatten import create_flattened_schema @@ -28,6 +27,7 @@ dynamic_annotation_cache, sqlalchemy_cache, ) +from materializationengine.models import MaterializedMetadata from materializationengine.errors import IndexMatchError from materializationengine.index_manager import index_cache from materializationengine.shared_tasks import ( From da3d67ee9d34e92e764fcccc9274fc402871fa57 Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Tue, 3 Jan 2023 11:43:59 -0800 Subject: [PATCH 08/11] feat: add migrations --- migration/__init__.py | 0 migration/alembic.ini | 102 ++++++++++++++++++ migration/env.py | 75 +++++++++++++ migration/script.py.mako | 24 +++++ .../4006a7c94f25_add_has_timestamp_col.py | 27 +++++ .../8ff84a0bb8f8_base_mat_metadata.py | 46 ++++++++ 6 files changed, 274 insertions(+) create mode 100644 migration/__init__.py create mode 100644 migration/alembic.ini create mode 100644 migration/env.py create mode 100644 migration/script.py.mako create mode 100644 migration/versions/4006a7c94f25_add_has_timestamp_col.py create mode 100644 migration/versions/8ff84a0bb8f8_base_mat_metadata.py diff --git a/migration/__init__.py b/migration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/migration/alembic.ini b/migration/alembic.ini new file mode 100644 index 00000000..d4e9917a --- /dev/null +++ b/migration/alembic.ini @@ -0,0 +1,102 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = . + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to migration/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:migration/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# sqlalchemy.url = + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/migration/env.py b/migration/env.py new file mode 100644 index 00000000..1126cab2 --- /dev/null +++ b/migration/env.py @@ -0,0 +1,75 @@ +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) + +# add your model's MetaData object here +from materializationengine.models import MatBase +target_metadata = MatBase.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migration/script.py.mako b/migration/script.py.mako new file mode 100644 index 00000000..2c015630 --- /dev/null +++ b/migration/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/migration/versions/4006a7c94f25_add_has_timestamp_col.py b/migration/versions/4006a7c94f25_add_has_timestamp_col.py new file mode 100644 index 00000000..d6634021 --- /dev/null +++ b/migration/versions/4006a7c94f25_add_has_timestamp_col.py @@ -0,0 +1,27 @@ +"""Add has timestamp col + +Revision ID: 4006a7c94f25 +Revises: 8ff84a0bb8f8 +Create Date: 2023-01-03 10:53:44.569973 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "4006a7c94f25" +down_revision = "8ff84a0bb8f8" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "materializedmetadata", sa.Column("has_created_ts", sa.Boolean(), nullable=True) + ) + op.execute("UPDATE materializedmetadata SET has_created_ts = False") + + +def downgrade(): + op.drop_column("materializedmetadata", "has_created_ts") diff --git a/migration/versions/8ff84a0bb8f8_base_mat_metadata.py b/migration/versions/8ff84a0bb8f8_base_mat_metadata.py new file mode 100644 index 00000000..58a7aa07 --- /dev/null +++ b/migration/versions/8ff84a0bb8f8_base_mat_metadata.py @@ -0,0 +1,46 @@ +"""Base mat metadata + +Revision ID: 8ff84a0bb8f8 +Revises: +Create Date: 2023-01-03 10:50:35.866055 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.engine import reflection +from sqlalchemy import engine_from_config + +# revision identifiers, used by Alembic. +revision = "8ff84a0bb8f8" +down_revision = None +branch_labels = None +depends_on = None + + +def get_tables(): + config = op.get_context().config + engine = engine_from_config( + config.get_section(config.config_ini_section), prefix="sqlalchemy." + ) + inspector = reflection.Inspector.from_engine(engine) + return inspector.get_table_names() + + +def upgrade(): + tables = get_tables() + if "materializedmetadata" not in tables: + op.create_table( + "materializedmetadata", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("schema", sa.String(length=100), nullable=False), + sa.Column("table_name", sa.String(length=100), nullable=False), + sa.Column("row_count", sa.Integer(), nullable=False), + sa.Column("materialized_timestamp", sa.DateTime(), nullable=False), + sa.Column("segmentation_source", sa.String(length=255), nullable=True), + sa.Column("is_merged", sa.Boolean(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + + +def downgrade(): + op.drop_table("materializedmetadata") From f0dd2ea62604660c8f473faf37ea09df1fded420 Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Tue, 3 Jan 2023 11:46:30 -0800 Subject: [PATCH 09/11] feat: add has_created_ts to metadata table --- materializationengine/workflows/create_frozen_database.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index 634782e5..d779120b 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -403,7 +403,8 @@ def create_materialized_metadata( valid_row_count = mat_metadata["row_count"] segmentation_source = mat_metadata.get("segmentation_source") merge_table = mat_metadata.get("merge_table") - + has_created_ts = mat_metadata.get("keep_created_column", False) + celery_logger.info(f"Row count {valid_row_count}") if valid_row_count == 0: continue @@ -415,6 +416,7 @@ def create_materialized_metadata( materialized_timestamp=materialization_time_stamp, segmentation_source=segmentation_source, is_merged=merge_table, + has_created_ts=has_created_ts ) analysis_session.add(mat_metadata) analysis_session.commit() From 7e584bd2eb73a5a274cf0bcdc0942f56352b79b3 Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Tue, 17 Jan 2023 15:06:35 -0800 Subject: [PATCH 10/11] feat: use annotation_metadata to determine 'keep_created_ts_col' --- materializationengine/shared_tasks.py | 2 +- materializationengine/workflows/create_frozen_database.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/materializationengine/shared_tasks.py b/materializationengine/shared_tasks.py index 6b8ff323..eb7d108f 100644 --- a/materializationengine/shared_tasks.py +++ b/materializationengine/shared_tasks.py @@ -196,7 +196,7 @@ def get_materialization_info( "reference_table": reference_table, "materialization_time_stamp": str(materialization_time_stamp), "table_count": len(annotation_tables), - "keep_created_column": get_config_param("KEEP_CREATED_COLUMN") + "keep_created_ts_col": md.get("keep_created_ts_col", False) } has_segmentation_table = db.schema.is_segmentation_table_required(schema) if has_segmentation_table: diff --git a/materializationengine/workflows/create_frozen_database.py b/materializationengine/workflows/create_frozen_database.py index d779120b..f810ec09 100644 --- a/materializationengine/workflows/create_frozen_database.py +++ b/materializationengine/workflows/create_frozen_database.py @@ -403,7 +403,7 @@ def create_materialized_metadata( valid_row_count = mat_metadata["row_count"] segmentation_source = mat_metadata.get("segmentation_source") merge_table = mat_metadata.get("merge_table") - has_created_ts = mat_metadata.get("keep_created_column", False) + has_created_ts = mat_metadata.get("keep_created_ts_col") celery_logger.info(f"Row count {valid_row_count}") if valid_row_count == 0: @@ -654,7 +654,7 @@ def merge_tables(self, mat_metadata: dict): ) SegmentationModel = create_segmentation_model(mat_metadata) - keep_created = mat_metadata.get("keep_created_column") + keep_created = mat_metadata.get("keep_created_ts_col") crud_columns = ["created", "deleted", "superceded_id"] # crud cols to drop if keep_created: crud_columns.pop(0) # we want to keep the 'created' col From 2d6b7912ba58207636d313933011065a8425e079 Mon Sep 17 00:00:00 2001 From: Derrick Brittain Date: Tue, 17 Jan 2023 15:10:21 -0800 Subject: [PATCH 11/11] test: fix assert --- tests/test_shared_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_shared_tasks.py b/tests/test_shared_tasks.py index c33dba6f..34589dac 100644 --- a/tests/test_shared_tasks.py +++ b/tests/test_shared_tasks.py @@ -63,7 +63,7 @@ def test_get_materialization_info(self): "chunk_size": 2, "table_count": 1, "find_all_expired_roots": False, - 'keep_created_column': False, + 'keep_created_ts_col': False, "analysis_version": 1, "analysis_database": "test_datastack__mat1", "queue_length_limit": 10000,