Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a85cd81
Add Modal GPU calibration to speed up CI
nwoodruff-co Feb 19, 2026
37de245
Fix black formatting, dataset.copy(), and add changelog entry
nwoodruff-co Feb 20, 2026
c566265
Reduce peak memory in Modal path by building matrices sequentially
nwoodruff-co Feb 20, 2026
40199d3
ci: re-trigger CI after runner preemption
nwoodruff-co Feb 20, 2026
4eba135
fix: spawn constituency Modal job before building LA arrays to reduce…
nwoodruff-co Feb 20, 2026
97422c7
fix: interleave national/local matrix builds with explicit gc.collect()
nwoodruff-co Feb 20, 2026
a40a5cf
fix: build national matrix once before app.run() to halve peak memory
nwoodruff-co Feb 20, 2026
be83839
fix: pass frs.copy() to create_national_target_matrix to avoid int ti…
nwoodruff-co Feb 20, 2026
9aaa213
fix: use serialized=True on Modal function to avoid policyengine_uk i…
nwoodruff-co Feb 20, 2026
baa2290
feat: return per-epoch weight checkpoints from Modal and write calibr…
nwoodruff-co Feb 20, 2026
448e621
feat: offload imputation pipeline to high-CPU Modal container
nwoodruff-co Feb 20, 2026
42b82e3
Reduce peak memory by serialising matrices before building next one
nwoodruff-co Feb 20, 2026
e175559
Trigger CI
nwoodruff-co Feb 20, 2026
8fe4de3
Fix Modal CPU image: install system HDF5 deps and pin policyengine-uk…
nwoodruff-co Feb 20, 2026
4f66ac0
Enable Modal output for image build debugging
nwoodruff-co Feb 20, 2026
64fe032
Fix Modal CPU image: use Python 3.13 for policyengine-uk-data compati…
nwoodruff-co Feb 20, 2026
e029ee1
Use uv for CPU image installs; pre-install CPU-only torch to avoid CU…
nwoodruff-co Feb 20, 2026
d783bcb
Fix uv_pip_install index_url args; wrap app.run() with modal.enable_o…
nwoodruff-co Feb 20, 2026
e4c74b2
Fix CPU image: use run_commands with pip to ensure policyengine-uk in…
nwoodruff-co Feb 20, 2026
de9bae8
Add serialized=True to run_imputation to avoid container importing po…
nwoodruff-co Feb 20, 2026
445d763
Install deps via uv in CPU image for faster cached builds
nwoodruff-co Feb 20, 2026
a06508d
Copy local policyengine_uk_data source into CPU image instead of inst…
nwoodruff-co Feb 20, 2026
5e475bb
Fix: use add_local_dir not copy_local_dir
nwoodruff-co Feb 20, 2026
0c7bb8f
Upgrade Modal specs: 16 CPU/32GB for imputation, A10G GPU for calibra…
nwoodruff-co Feb 20, 2026
0edba1d
Revert imputation offloading; Modal GPU for calibration only
nwoodruff-co Feb 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pull_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ jobs:
run: make data
env:
TESTING: "1"
MODAL_CALIBRATE: "1"
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
- name: Save calibration log (constituencies)
uses: actions/upload-artifact@v4
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ jobs:
HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }}
- name: Build datasets
run: make data
env:
MODAL_CALIBRATE: "1"
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
- name: Save calibration log (constituencies)
uses: actions/upload-artifact@v4
with:
Expand Down
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: minor
changes:
added:
- Modal GPU calibration support to speed up CI runs
279 changes: 227 additions & 52 deletions policyengine_uk_data/datasets/create_datasets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from policyengine_uk_data.datasets.frs import create_frs
from policyengine_uk_data.storage import STORAGE_FOLDER
import gc
import logging
import os
import io
import numpy as np
import h5py
from policyengine_uk_data.utils.uprating import uprate_dataset
from policyengine_uk_data.utils.progress import (
ProcessingProgress,
Expand All @@ -11,17 +15,159 @@

logging.basicConfig(level=logging.INFO)

USE_MODAL = os.environ.get("MODAL_CALIBRATE", "0") == "1"


def _dump(arr) -> bytes:
buf = io.BytesIO()
np.save(buf, arr)
return buf.getvalue()


def _build_weights_init(dataset, area_count, r):
areas_per_household = np.maximum(r.sum(axis=0), 1)
original_weights = np.log(
dataset.household.household_weight.values / areas_per_household
+ np.random.random(len(dataset.household.household_weight.values))
* 0.01
)
return np.ones((area_count, len(original_weights))) * original_weights


def _build_log(checkpoints, get_performance, m_c, y_c, m_n, y_n, log_csv):
import pandas as pd

performance = pd.DataFrame()
for epoch, w_bytes in checkpoints:
w = np.load(io.BytesIO(w_bytes))
perf = get_performance(w, m_c, y_c, m_n, y_n, [])
perf["epoch"] = epoch
perf["loss"] = perf.rel_abs_error**2
perf["target_name"] = [
f"{a}/{m}" for a, m in zip(perf.name, perf.metric)
]
performance = pd.concat([performance, perf], ignore_index=True)
performance.to_csv(log_csv, index=False)
final_epoch, final_bytes = checkpoints[-1]
return np.load(io.BytesIO(final_bytes))


def _run_modal_calibrations(
frs,
epochs,
create_constituency_target_matrix,
create_local_authority_target_matrix,
create_national_target_matrix,
get_constituency_performance,
get_la_performance,
):
"""
Dispatch both calibrations concurrently to Modal GPU containers.
Returns (constituency_weights, la_weights) as numpy arrays and
writes constituency_calibration_log.csv / la_calibration_log.csv.
"""
import modal
import pandas as pd
from policyengine_uk_data.utils.modal_calibrate import (
app,
run_calibration,
)

def _arr(x):
return x.values if hasattr(x, "values") else x

# Build matrices one at a time; serialise immediately and free the
# DataFrames (keeping only column/index metadata for log reconstruction).

m_nat, y_nat = create_national_target_matrix(frs.copy())
m_nat_np = _arr(m_nat)
y_nat_np = _arr(y_nat)
m_nat_cols = list(m_nat.columns)
y_nat_index = list(y_nat.index)
b_m_nat = _dump(m_nat_np)
b_y_nat = _dump(y_nat_np)
del m_nat, y_nat
gc.collect()

frs_copy = frs.copy()
matrix_c, y_c, r_c = create_constituency_target_matrix(frs_copy)
matrix_c_np = _arr(matrix_c)
y_c_np = _arr(y_c)
matrix_c_cols = list(matrix_c.columns)
y_c_cols = list(y_c.columns)
wi_c = _build_weights_init(frs_copy, 650, r_c)
b_matrix_c = _dump(matrix_c_np)
b_y_c = _dump(y_c_np)
b_wi_c = _dump(wi_c)
b_r_c = _dump(r_c)
del matrix_c, y_c, wi_c, r_c, frs_copy
gc.collect()

frs_copy = frs.copy()
matrix_la, y_la, r_la = create_local_authority_target_matrix(frs_copy)
matrix_la_np = _arr(matrix_la)
y_la_np = _arr(y_la)
matrix_la_cols = list(matrix_la.columns)
y_la_cols = list(y_la.columns)
wi_la = _build_weights_init(frs_copy, 360, r_la)
b_matrix_la = _dump(matrix_la_np)
b_y_la = _dump(y_la_np)
b_wi_la = _dump(wi_la)
b_r_la = _dump(r_la)
del matrix_la, y_la, wi_la, r_la, frs_copy
gc.collect()

with modal.enable_output(), app.run():
fut_c = run_calibration.spawn(
b_matrix_c, b_y_c, b_r_c, b_m_nat, b_y_nat, b_wi_c, epochs
)
fut_la = run_calibration.spawn(
b_matrix_la, b_y_la, b_r_la, b_m_nat, b_y_nat, b_wi_la, epochs
)
del b_r_c, b_wi_c, b_r_la, b_wi_la
gc.collect()

checkpoints_c = fut_c.get()
checkpoints_la = fut_la.get()

# Reconstruct DataFrames with correct labels for get_performance
matrix_c_df = pd.DataFrame(matrix_c_np, columns=matrix_c_cols)
y_c_df = pd.DataFrame(y_c_np, columns=y_c_cols)
m_nat_df = pd.DataFrame(m_nat_np, columns=m_nat_cols)
y_nat_df = pd.Series(y_nat_np, index=y_nat_index)
matrix_la_df = pd.DataFrame(matrix_la_np, columns=matrix_la_cols)
y_la_df = pd.DataFrame(y_la_np, columns=y_la_cols)

weights_c = _build_log(
checkpoints_c,
get_constituency_performance,
matrix_c_df,
y_c_df,
m_nat_df,
y_nat_df,
"constituency_calibration_log.csv",
)
weights_la = _build_log(
checkpoints_la,
get_la_performance,
matrix_la_df,
y_la_df,
m_nat_df,
y_nat_df,
"la_calibration_log.csv",
)

return weights_c, weights_la


def main():
"""Create enhanced FRS dataset with rich progress tracking."""
try:
# Use reduced epochs and fidelity for testing
is_testing = os.environ.get("TESTING", "0") == "1"
epochs = 32 if is_testing else 512

progress_tracker = ProcessingProgress()

# Define dataset creation steps
steps = [
"Create base FRS dataset",
"Impute consumption",
Expand All @@ -43,7 +189,6 @@ def main():
update_dataset,
nested_progress,
):
# Create base FRS dataset
update_dataset("Create base FRS dataset", "processing")
frs = create_frs(
raw_frs_folder=STORAGE_FOLDER / "frs_2023_24",
Expand All @@ -52,7 +197,6 @@ def main():
frs.save(STORAGE_FOLDER / "frs_2023_24.h5")
update_dataset("Create base FRS dataset", "completed")

# Import imputation functions
from policyengine_uk_data.datasets.imputations import (
impute_consumption,
impute_wealth,
Expand All @@ -64,9 +208,6 @@ def main():
impute_student_loan_plan,
)

# Apply imputations with progress tracking
# Wealth must be imputed before consumption because consumption
# uses num_vehicles as a predictor for fuel spending
update_dataset("Impute wealth", "processing")
frs = impute_wealth(frs)
update_dataset("Impute wealth", "completed")
Expand Down Expand Up @@ -99,19 +240,10 @@ def main():
frs = impute_student_loan_plan(frs, year=2025)
update_dataset("Impute student loan plan", "completed")

# Uprate dataset
update_dataset("Uprate to 2025", "processing")
frs = uprate_dataset(frs, 2025)
update_dataset("Uprate to 2025", "completed")

# Calibrate constituency weights with nested progress

update_dataset("Calibrate constituency weights", "processing")

# Use a separate progress tracker for calibration with nested display
from policyengine_uk_data.utils.calibrate import (
calibrate_local_areas,
)
from policyengine_uk_data.datasets.local_areas.constituencies.loss import (
create_constituency_target_matrix,
)
Expand All @@ -121,49 +253,92 @@ def main():
from policyengine_uk_data.datasets.local_areas.constituencies.calibrate import (
get_performance,
)

# Run calibration with verbose progress
frs_calibrated_constituencies = calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_constituency_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=650,
weight_file="parliamentary_constituency_weights.h5",
excluded_training_targets=[],
log_csv="constituency_calibration_log.csv",
verbose=True, # Enable nested progress display
area_name="Constituency",
get_performance=get_performance,
nested_progress=nested_progress, # Pass the nested progress manager
)

from policyengine_uk_data.datasets.local_areas.local_authorities.calibrate import (
get_performance as get_la_performance,
)
from policyengine_uk_data.datasets.local_areas.local_authorities.loss import (
create_local_authority_target_matrix,
)

# Run calibration with verbose progress
calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_local_authority_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=360,
weight_file="local_authority_weights.h5",
excluded_training_targets=[],
log_csv="la_calibration_log.csv",
verbose=True, # Enable nested progress display
area_name="Local Authority",
get_performance=get_la_performance,
nested_progress=nested_progress, # Pass the nested progress manager
)
if USE_MODAL:
update_dataset("Calibrate constituency weights", "processing")
update_dataset(
"Calibrate local authority weights", "processing"
)

weights_c, weights_la = _run_modal_calibrations(
frs,
epochs,
create_constituency_target_matrix,
create_local_authority_target_matrix,
create_national_target_matrix,
get_performance,
get_la_performance,
)

with h5py.File(
STORAGE_FOLDER / "parliamentary_constituency_weights.h5",
"w",
) as f:
f.create_dataset("2025", data=weights_c)

with h5py.File(
STORAGE_FOLDER / "local_authority_weights.h5", "w"
) as f:
f.create_dataset("2025", data=weights_la)

frs_calibrated_constituencies = frs.copy()
frs_calibrated_constituencies.household.household_weight = (
weights_c.sum(axis=0)
)

update_dataset("Calibrate constituency weights", "completed")
update_dataset(
"Calibrate local authority weights", "completed"
)
else:
from policyengine_uk_data.utils.calibrate import (
calibrate_local_areas,
)

update_dataset("Calibrate constituency weights", "processing")
frs_calibrated_constituencies = calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_constituency_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=650,
weight_file="parliamentary_constituency_weights.h5",
excluded_training_targets=[],
log_csv="constituency_calibration_log.csv",
verbose=True,
area_name="Constituency",
get_performance=get_performance,
nested_progress=nested_progress,
)
update_dataset("Calibrate constituency weights", "completed")

update_dataset("Calibrate dataset", "completed")
update_dataset(
"Calibrate local authority weights", "processing"
)
calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_local_authority_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=360,
weight_file="local_authority_weights.h5",
excluded_training_targets=[],
log_csv="la_calibration_log.csv",
verbose=True,
area_name="Local Authority",
get_performance=get_la_performance,
nested_progress=nested_progress,
)
update_dataset(
"Calibrate local authority weights", "completed"
)

# Downrate and save
update_dataset("Downrate to 2023", "processing")
frs_calibrated = uprate_dataset(
frs_calibrated_constituencies, 2023
Expand All @@ -174,14 +349,14 @@ def main():
frs_calibrated.save(STORAGE_FOLDER / "enhanced_frs_2023_24.h5")
update_dataset("Save final dataset", "completed")

# Display success message
display_success_panel(
"Dataset creation completed successfully",
details={
"base_dataset": "frs_2023_24.h5",
"enhanced_dataset": "enhanced_frs_2023_24.h5",
"imputations_applied": "consumption, wealth, VAT, services, income, capital_gains, salary_sacrifice, student_loan_plan",
"calibration": "national, LA and constituency targets",
"calibration": "national, LA and constituency targets",
"calibration_backend": "Modal GPU" if USE_MODAL else "CPU",
},
)

Expand Down
Loading