Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ repos:
hooks:
- id: mypy
args: [--ignore-missing-imports]
additional_dependencies: [numpy>=1.26.4, types-PyYAML]
additional_dependencies: [types-PyYAML]
- repo: https://github.com/google/yamlfmt
rev: v0.10.0
hooks:
Expand Down
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.12
3.11
14 changes: 8 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ readme = "README.md"
authors = [
{ name = "Gabriel Schubiner", email = "gabriel.schubiner@childmind.org" },
]
requires-python = ">=3.12"
requires-python = ">=3.11"
dependencies = [
"lark>=1.2.2",
"packaging>=24.2",
"pandas>=2.2.3",
"polars>=1.17.1",
"polars>=1.31.0",
"tyro>=0.9.5",
"xlsxwriter>=3.2.5",
]

[project.scripts]
Expand All @@ -25,6 +25,7 @@ build-backend = "hatchling.build"
[dependency-groups]
dev = [
"jupyter>=1.1.1",
"marimo[lsp]>=0.13.15",
"mypy>=1.15.0",
"pyarrow>=19.0.0",
"pytest>=8.3.4",
Expand All @@ -34,7 +35,6 @@ dev = [

[tool.mypy]
ignore_missing_imports = true
plugins = ["numpy.typing.mypy_plugin"]

[tool.ruff.lint]
extend-select = [
Expand Down Expand Up @@ -72,18 +72,20 @@ extend-select = [
extend-ignore = [
"COM812",
"COM819",
"D107",
"D203",
"D206",
"D213",
"D300",
"ISC001",
"ISC002",
"TRY003",
"ERA001",
]

[tool.ruff.lint.extend-per-file-ignores]
"tests/**/*.py" = ["S101", "SLF001", "INP001", "D103"]
"scripts/*.py" = ["T20"]
"tests/**/*.py" = ["S101", "SLF001", "INP001", "D103", "ERA001"]
"scripts/*.py" = ["T20", "D100", "N803", "B018", "ERA001"]
"scripts/*.ipynb" = [
"C901",
"D103",
Expand Down
7 changes: 1 addition & 6 deletions src/mindlogger_data_export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,16 @@
NamedOutput,
Output,
)
from .processors import (
PandasReportProcessor,
ReportProcessor,
)
from .processors import ReportProcessor

__all__ = [
"cli",
"main",
"MindloggerData",
"MindloggerItem",
"MindloggerResponseOption",
"MindloggerUser",
"NamedOutput",
"Output",
"PandasReportProcessor",
"ReportProcessor",
"UserType",
]
11 changes: 9 additions & 2 deletions src/mindlogger_data_export/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class OutputConfig:
output_dir: Annotated[Path | None, arg(aliases=["-o"])] = None
"""Path to output directory, where processed data will be written. Defaults to input_dir."""

output_format: Annotated[Literal["csv", "parquet"], arg(aliases=["-f"])] = "csv"
output_format: Annotated[
Literal["csv", "parquet", "excel"], arg(aliases=["-f"])
] = "csv"

outputs: Annotated[
UseAppendAction[list[str]],
Expand All @@ -51,6 +53,11 @@ class OutputConfig:
)
"""Logging level for the tool."""

drop_null_columns: Annotated[bool, arg(aliases=["-d"])] = False

extra: Annotated[dict[str, str], arg(aliases=["-e"])] = field(default_factory=dict)
"""Additional parameters to be used for output-specific side-inputs, etc."""

timezone: str = "America/New_York"
"""Timezone to which datetimes will be converted."""

Expand All @@ -62,4 +69,4 @@ def output_dir_or_default(self) -> Path:
@property
def output_types_or_all(self) -> list[str]:
"""Get output types."""
return self.outputs or list(Output.TYPES.keys())
return self.outputs or [t[0] for t in Output.TYPES.items() if t[1].DEFAULT]
36 changes: 26 additions & 10 deletions src/mindlogger_data_export/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from tyro.conf import OmitArgPrefixes
from tyro.extras import SubcommandApp

from .config import OutputConfig
from .config import LogLevel, OutputConfig
from .mindlogger import MindloggerData
from .outputs import Output
from .writers import OutputWriter
Expand All @@ -32,17 +32,33 @@ def output_types_info() -> None:
@app.command(name="run")
def main(config: OutputConfig) -> None:
"""Run data export transformations to produce outputs."""
logging.basicConfig(level=config.log_level.upper())
logging.debug("Starting MindLogger data export tool with config: %s.", config)
try:
logging.basicConfig(level=config.log_level.upper())
logging.debug("Starting MindLogger data export tool with config: %s.", config)

ml_data = MindloggerData.create(config.input_dir)
writer = OutputWriter.create(config.output_format)
ml_data = MindloggerData.create(config.input_dir)
writer = OutputWriter.create(config.output_format)

for output_types in config.output_types_or_all:
output_producer = Output.TYPES[output_types]()
outputs = output_producer.produce(ml_data)
for output in outputs:
writer.write(output, config.output_dir_or_default)
for output_type in config.output_types_or_all:
if output_type not in Output.TYPES:
raise ValueError(f"Unknown output type argument: {output_type}") # noqa: TRY301
logging.debug("Producing output type [%s]", output_type)
output_producer = Output.TYPES[output_type](config.extra)
outputs = output_producer.produce(ml_data)
logging.debug(
"Output type (%s) produced (%d) outputs", output_type, len(outputs)
)
for output in outputs:
writer.write(
output,
config.output_dir_or_default,
drop_null_columns=config.drop_null_columns,
)
except Exception as e:
if config.log_level == LogLevel.DEBUG:
raise
logging.info(e)
logging.info("Exiting...")


def cli() -> None:
Expand Down
100 changes: 43 additions & 57 deletions src/mindlogger_data_export/mindlogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,46 @@
from functools import cached_property
from pathlib import Path

import pandas as pd
import polars as pl

from .models import MindloggerUser, UserType
from .processors import ReportProcessor

LOG = logging.getLogger(__name__)

MINDLOGGER_REPORT_PATTERN = "report*.csv"
MINDLOGGER_REPORT_PATTERN = "*responses*.csv"
ACTIVITY_FLOW_PATTERN = "activity_flow*.csv"
FLOW_ITEM_HISTORY_PATTERN = "flow_item_history*.csv"
SCHEDULE_HISTORY_PATTERN = "schedule_history*.csv"


class MindloggerData:
"""Data model of Mindlogger export."""

def __init__(self, report: pl.DataFrame):
def __init__(self, response_data: pl.DataFrame):
"""Initialize MindloggerData object."""
self._report_frame = report
LOG.debug("Response Data Columns: %s", response_data.columns)
self._response_data = response_data

@property
@cached_property
def report(self) -> pl.DataFrame:
"""Get report DataFrame."""
return pl.DataFrame(self._report_frame)

@property
def report_pd(self) -> pd.DataFrame:
"""Get report DataFrame in Pandas format."""
return self._report_frame.to_pandas()
return pl.DataFrame(self._response_data)

@property
@cached_property
def long_options_report(self) -> pl.DataFrame:
"""Get report DataFrame with one option value per row, e.g. exploded data dictionary format."""
return MindloggerData.expand_options(self.report)

@property
@cached_property
def long_response_report(self) -> pl.DataFrame:
"""Get report DataFrame with one response value per row."""
return MindloggerData.expand_responses(self.report)

@property
@cached_property
def long_report(self) -> pl.DataFrame:
"""Get report DataFrame with one response value per row."""
return MindloggerData.expand_options(
MindloggerData.expand_responses(self.report)
)
return MindloggerData.expand_options(self.long_response_report)

@cached_property
def input_users(self) -> list[MindloggerUser]:
Expand Down Expand Up @@ -81,90 +77,80 @@ def users(self) -> dict[UserType, list[MindloggerUser]]:
UserType.ACCOUNT: self.account_users,
}

@property
@cached_property
def data_dictionary(self) -> pl.DataFrame:
"""Return unique items in report."""
return pl.DataFrame(
self._report_frame.select(
"version",
"activity_flow_id",
"activity_flow_name",
"activity_id",
"activity_name",
"item_id",
"item",
"prompt",
"options",
self.report.select(
"applet_version",
pl.col("activity_flow").struct.unnest().name.prefix("activity_flow_"),
pl.col("activity").struct.unnest().name.prefix("activity_"),
pl.col("item").struct.unnest().name.prefix("item_"),
).unique()
)

@property
def data_dictionary_pd(self) -> pd.DataFrame:
"""Return unique items in report in Pandas format."""
return self.data_dictionary.to_pandas()

@staticmethod
def expand_options(df: pl.DataFrame) -> pl.DataFrame:
"""Expand options struct to columns."""
return (
df.explode(pl.col("parsed_options"))
.with_columns(
pl.col("parsed_options").struct.unnest().name.prefix("option_")
)
.unique()
)
return df.with_columns(
item_response_options=pl.col("item").struct.field("response_options")
).explode("item_response_options")

@staticmethod
def expand_responses(df: pl.DataFrame) -> pl.DataFrame:
"""Expand responses struct to columns/rows."""
return (
df.with_columns(
pl.col("parsed_response").struct.unnest().name.prefix("response_")
# Unnest response struct
pl.col("response").struct.unnest().name.prefix("response_")
)
.with_columns(
pl.col("response_value").struct.unnest().name.prefix("response_value_")
)
# Expand value list to rows.
.with_columns(
response_value_index=pl.int_ranges(pl.col("response_value").list.len())
response_value_value_index=pl.int_ranges(
pl.col("response_value_value").list.len()
)
)
.explode("response_value", "response_value_index")
.explode("response_value_value", "response_value_value_index")
# Expand geo struct to lat/long columns.
.with_columns(
pl.col("response_geo").struct.unnest().name.prefix("response_geo_")
pl.col("response_value_geo")
.struct.unnest()
.name.prefix("response_value_geo_")
)
# Expand matrix list to rows.
.explode("response_matrix")
.explode("response_value_matrix")
# Unnest matrix struct to columns.
.with_columns(
pl.col("response_matrix")
pl.col("response_value_matrix")
.struct.unnest()
.name.prefix("response_matrix_")
.name.prefix("response_value_matrix_")
)
# Expand matrix value list to rows.
.with_columns(
response_matrix_value_index=pl.int_ranges(
pl.col("response_matrix_value").list.len()
response_value_matrix_value_index=pl.int_ranges(
pl.col("response_value_matrix_value").list.len()
)
)
.explode("response_matrix_value", "response_matrix_value_index")
.explode("response_value_matrix_value", "response_value_matrix_value_index")
# Exclude temporary struct columns.
.select(pl.exclude("response_matrix", "response_geo"))
.select(pl.exclude("response_value_matrix", "response_value_geo"))
)

def _users(self, user_type: UserType) -> list[MindloggerUser]:
"""Get users for specific type."""
return list(
map(
MindloggerUser.from_struct_factory(user_type),
self._report_frame.select(
user_info=pl.struct(*UserType.columns(user_type))
)
.get_column("user_info")
.unique(),
self.report.get_column(user_type.value).unique(),
)
)

def __str__(self):
"""Return string representation of MindloggerData object reporting column names and report head."""
return f"MindloggerData: {self._report_frame.columns}\n\n{self._report_frame.head()}"
return f"MindloggerData: {self._response_data.columns}\n\n{self._response_data.head()}"

@classmethod
def load(cls, input_dir: Path) -> pl.DataFrame:
Expand Down
Loading
Loading