Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 21 additions & 28 deletions modal_app/data_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,7 @@
image = (
modal.Image.debian_slim(python_version="3.13")
.apt_install("git")
.pip_install(
"policyengine-us>=1.353.0",
"policyengine-core>=3.19.0",
"pandas>=2.3.1",
"requests>=2.25.0",
"tqdm>=4.60.0",
"microdf_python>=1.0.0",
"microimpute>=1.1.4",
"google-cloud-storage>=2.0.0",
"google-auth>=2.0.0",
"scipy>=1.15.3",
"statsmodels>=0.14.5",
"openpyxl>=3.1.5",
"tables>=3.10.2",
"torch>=2.7.1",
"us>=2.0.0",
"sqlalchemy>=2.0.41",
"sqlmodel>=0.0.24",
"xlrd>=2.0.2",
"huggingface_hub",
"pytest",
)
.pip_install("uv")
)

REPO_URL = "https://github.com/PolicyEngine/policyengine-us-data.git"
Expand Down Expand Up @@ -66,7 +45,8 @@ def build_datasets(
os.chdir("/root")
subprocess.run(["git", "clone", "-b", branch, REPO_URL], check=True)
os.chdir("policyengine-us-data")
subprocess.run(["pip", "install", "-e", ".[dev]"], check=True)
# Use uv sync to install exact versions from uv.lock
subprocess.run(["uv", "sync", "--locked"], check=True)

env = os.environ.copy()
if test_lite:
Expand All @@ -75,6 +55,8 @@ def build_datasets(
# Download prerequisites
subprocess.run(
[
"uv",
"run",
"python",
"policyengine_us_data/storage/download_private_prerequisites.py",
],
Expand All @@ -95,7 +77,7 @@ def build_datasets(
]
for script in scripts:
print(f"Running {script}...")
subprocess.run(["python", script], check=True, env=env)
subprocess.run(["uv", "run", "python", script], check=True, env=env)

os.rename(
"policyengine_us_data/storage/enhanced_cps_2024.h5",
Expand All @@ -116,22 +98,29 @@ def build_datasets(
local_area_env["LOCAL_AREA_CALIBRATION"] = "true"

subprocess.run(
["python", "policyengine_us_data/datasets/cps/cps.py"],
["uv", "run", "python", "policyengine_us_data/datasets/cps/cps.py"],
check=True,
env=local_area_env,
)
subprocess.run(
["python", "policyengine_us_data/datasets/puf/puf.py"],
["uv", "run", "python", "policyengine_us_data/datasets/puf/puf.py"],
check=True,
env=local_area_env,
)
subprocess.run(
["python", "policyengine_us_data/datasets/cps/extended_cps.py"],
[
"uv",
"run",
"python",
"policyengine_us_data/datasets/cps/extended_cps.py",
],
check=True,
env=local_area_env,
)
subprocess.run(
[
"uv",
"run",
"python",
"policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py",
"10500",
Expand All @@ -144,6 +133,8 @@ def build_datasets(
print("Running local area calibration tests...")
subprocess.run(
[
"uv",
"run",
"pytest",
"policyengine_us_data/tests/test_local_area_calibration/",
"-v",
Expand All @@ -154,12 +145,14 @@ def build_datasets(

# Run main test suite
print("Running main test suite...")
subprocess.run(["pytest"], check=True, env=env)
subprocess.run(["uv", "run", "pytest"], check=True, env=env)

# Upload if requested
if upload:
subprocess.run(
[
"uv",
"run",
"python",
"policyengine_us_data/storage/upload_completed_datasets.py",
],
Expand Down
27 changes: 5 additions & 22 deletions modal_app/local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,7 @@
image = (
modal.Image.debian_slim(python_version="3.13")
.apt_install("git")
.pip_install(
"policyengine-us>=1.353.0",
"policyengine-core>=3.19.0",
"pandas>=2.3.1",
"requests>=2.25.0",
"tqdm>=4.60.0",
"microdf_python>=1.0.0",
"microimpute>=1.1.4",
"google-cloud-storage>=2.0.0",
"google-auth>=2.0.0",
"scipy>=1.15.3",
"statsmodels>=0.14.5",
"openpyxl>=3.1.5",
"tables>=3.10.2",
"torch>=2.7.1",
"us>=2.0.0",
"sqlalchemy>=2.0.41",
"sqlmodel>=0.0.24",
"xlrd>=2.0.2",
"huggingface_hub",
)
.pip_install("uv")
)

REPO_URL = "https://github.com/PolicyEngine/policyengine-us-data.git"
Expand Down Expand Up @@ -61,10 +41,13 @@ def publish_all_local_areas(branch: str = "main"):
os.chdir("/root")
subprocess.run(["git", "clone", "-b", branch, REPO_URL], check=True)
os.chdir("policyengine-us-data")
subprocess.run(["pip", "install", "-e", "."], check=True)
# Use uv sync to install exact versions from uv.lock
subprocess.run(["uv", "sync", "--locked"], check=True)

subprocess.run(
[
"uv",
"run",
"python",
"policyengine_us_data/datasets/cps/local_area_calibration/publish_local_area.py",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,66 @@ def assign_counties_for_cd(
weights = list(dist.values())
selected = random.choices(counties, weights=weights, k=n_households)
return np.array([get_county_index(c) for c in selected], dtype=np.int32)


def get_county_filter_probability(
cd_geoid: str,
county_filter: set,
) -> float:
"""
Calculate P(county in filter | CD).

Returns the probability that a household in this CD would be in the
target area (e.g., NYC). Used for weight scaling when building
city-level datasets.

Args:
cd_geoid: Congressional district geoid (e.g., "3610")
county_filter: Set of county names that define the target area

Returns:
Probability between 0 and 1
"""
cd_key = str(int(cd_geoid))

if cd_key in _CD_COUNTY_DISTRIBUTIONS:
dist = _CD_COUNTY_DISTRIBUTIONS[cd_key]
else:
dist = _generate_uniform_distribution(cd_key)

return sum(
prob for county, prob in dist.items() if county in county_filter
)


def get_filtered_county_distribution(
cd_geoid: str,
county_filter: set,
) -> Dict[str, float]:
"""
Get normalized distribution over target counties only.

Used when building city-level datasets to assign only valid counties
while maintaining relative proportions within the target area.

Args:
cd_geoid: Congressional district geoid (e.g., "3610")
county_filter: Set of county names that define the target area

Returns:
Dictionary mapping county names to normalized probabilities.
Empty dict if CD has no overlap with target area.
"""
cd_key = str(int(cd_geoid))

if cd_key in _CD_COUNTY_DISTRIBUTIONS:
dist = _CD_COUNTY_DISTRIBUTIONS[cd_key]
else:
dist = _generate_uniform_distribution(cd_key)

filtered = {c: p for c, p in dist.items() if c in county_filter}
total = sum(filtered.values())

if total > 0:
return {c: p / total for c, p in filtered.items()}
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
)
from policyengine_us_data.datasets.cps.local_area_calibration.county_assignment import (
assign_counties_for_cd,
get_county_filter_probability,
get_filtered_county_distribution,
)

NYC_COUNTIES = {
Expand Down Expand Up @@ -65,6 +67,7 @@ def create_sparse_cd_stacked_dataset(
output_path=None,
dataset_path=None,
county_filter=None,
seed: int = 42,
):
"""
Create a SPARSE congressional district-stacked dataset using DataFrame approach.
Expand All @@ -80,6 +83,8 @@ def create_sparse_cd_stacked_dataset(
dataset_path: Path to the base .h5 dataset used during calibration.
county_filter: Optional set of county names to filter to. Only households
assigned to these counties will be included. Used for city-level datasets.
seed: Base random seed for county assignment. Each CD gets seed + int(cd_geoid)
for deterministic, order-independent results. Default 42.

Returns:
output_path: Path to the saved .h5 file.
Expand Down Expand Up @@ -208,7 +213,16 @@ def create_sparse_cd_stacked_dataset(
# Get this CD's calibrated weights from the weight matrix
calibrated_weights_for_cd = W[
cd_idx, :
] # Get this CD's row from weight matrix
].copy() # Get this CD's row from weight matrix

# For city datasets: scale weights by P(target|CD)
# This preserves the representative sample while adjusting for target population
if county_filter is not None:
p_target = get_county_filter_probability(cd_geoid, county_filter)
if p_target == 0:
# CD has no overlap with target area, skip entirely
continue
calibrated_weights_for_cd = calibrated_weights_for_cd * p_target

# Map the calibrated weights to household IDs
hh_weight_values = []
Expand Down Expand Up @@ -325,23 +339,31 @@ def create_sparse_cd_stacked_dataset(
)

# Set county for this CD
county_indices = assign_counties_for_cd(
cd_geoid=cd_geoid, n_households=n_households_orig, seed=42 + idx
)
cd_sim.set_input("county", time_period, county_indices)

# Filter to only households assigned to specified counties (e.g., NYC)
# For city datasets: use only target counties (normalized distribution)
if county_filter is not None:
filtered_household_ids = set()
for hh_idx in active_household_indices:
county_name = get_county_name(county_indices[hh_idx])
if county_name in county_filter:
filtered_household_ids.add(household_ids[hh_idx])

active_household_ids = filtered_household_ids

if len(active_household_ids) == 0:
filtered_dist = get_filtered_county_distribution(
cd_geoid, county_filter
)
if not filtered_dist:
# Should not happen if we already checked p_target > 0
continue
county_indices = assign_counties_for_cd(
cd_geoid=cd_geoid,
n_households=n_households_orig,
seed=seed + int(cd_geoid),
distributions={cd_geoid: filtered_dist},
)
else:
county_indices = assign_counties_for_cd(
cd_geoid=cd_geoid,
n_households=n_households_orig,
seed=seed + int(cd_geoid),
)
cd_sim.set_input("county", time_period, county_indices)

# Note: We no longer use binary filtering for county_filter.
# Instead, weights are scaled by P(target|CD) and all households
# are included to avoid sample selection bias.

geoadj = cd_geoadj_values[cd_geoid]
new_spm_thresholds = calculate_spm_thresholds_for_cd(
Expand Down
Loading