diff --git a/scripts/build_us_candidate.py b/scripts/build_us_candidate.py index 40aca39..8eb72dd 100644 --- a/scripts/build_us_candidate.py +++ b/scripts/build_us_candidate.py @@ -47,6 +47,13 @@ "~/PolicyEngine/policyengine-us-data/policyengine_us_data/storage" ).expanduser() OLD_WORKTREE = Path("~/CosilicoAI/microplex-us").expanduser() +PINNED_PRODUCTION_ECPS_BLOB = Path( + "~/.cache/huggingface/hub/models--policyengine--policyengine-us-data/blobs/" + "7af7026224f84cb6a91743fd8fa7ac506bad8c78e011fa58b6901894db4b4290" +).expanduser() +LOCAL_HF_MAIN_BASELINE = ( + OLD_WORKTREE / "artifacts/baselines/enhanced_cps_2024_hf_main.h5" +) BLOCK_CROSSWALK = Path( "~/CosilicoAI/microplex/data/block_probabilities.parquet" ).expanduser() @@ -195,6 +202,12 @@ def _derive_person_columns(person: pd.DataFrame) -> pd.DataFrame: p["social_security_disability"] = (ss * disabled).astype(float) p["social_security_survivors"] = (ss * survivor).astype(float) p["social_security_dependents"] = (ss * dependent).astype(float) + p["meets_ssi_disability_criteria"] = ( + p["is_disabled"] + | (num("ssi") > 0) + | (p["disability_benefits"] > 0) + | (p["social_security_disability"] > 0) + ).astype(bool) return p @@ -265,6 +278,18 @@ def _derive_person_columns(person: pd.DataFrame) -> pd.DataFrame: SHARED_PREDICTORS = ("age", "is_joint", "n_people", "n_children") +def _validate_score_baseline(path: Path) -> Path: + """Reject local non-certified eCPS baselines for promotion scoring.""" + baseline = path.expanduser() + if baseline.name == LOCAL_HF_MAIN_BASELINE.name: + raise ValueError( + "Refusing to score against local enhanced_cps_2024_hf_main.h5; " + "use the certified pinned production eCPS blob at " + f"{PINNED_PRODUCTION_ECPS_BLOB}." + ) + return baseline + + def _build_imputation_steps(*, weighted: bool = True) -> list[dict]: """ASEC+PUF imputation graph over donor-available variables. @@ -456,6 +481,55 @@ def _allocate_to_persons( return person +def _attach_filing_status_inputs( + person: pd.DataFrame, + spine: pd.DataFrame, +) -> pd.DataFrame: + """Carry microunit filing-status outcomes into PE filing-status inputs.""" + required_person = {"new_tax_unit_id", "tax_unit_role_input"} + missing_person = sorted(required_person - set(person.columns)) + if missing_person: + raise ValueError( + "cannot attach filing-status inputs; person frame is missing " + f"{missing_person}" + ) + required_spine = {"tax_unit_id", "filing_status_input"} + missing_spine = sorted(required_spine - set(spine.columns)) + if missing_spine: + raise ValueError( + "cannot attach filing-status inputs; spine frame is missing " + f"{missing_spine}" + ) + + out = person.copy() + status_by_unit = ( + spine[["tax_unit_id", "filing_status_input"]] + .drop_duplicates("tax_unit_id") + .set_index("tax_unit_id")["filing_status_input"] + ) + status = out["new_tax_unit_id"].map(status_by_unit) + status = ( + status.astype("string") + .str.strip() + .str.upper() + .str.replace(" ", "_", regex=False) + .fillna("") + ) + is_head = ( + out["tax_unit_role_input"].astype("string").str.upper().fillna("").eq("HEAD") + ) + out["is_surviving_spouse"] = (is_head & status.eq("SURVIVING_SPOUSE")).astype(bool) + existing_separated = ( + out["is_separated"].astype(bool) + if "is_separated" in out.columns + else pd.Series(False, index=out.index) + ) + out["is_separated"] = ( + existing_separated | (is_head & status.eq("SEPARATE")) + ).astype(bool) + return out + + def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--mode", choices=["smoke", "full"], default="smoke") @@ -470,7 +544,11 @@ def main() -> int: ap.add_argument( "--baseline-h5", type=Path, - default=OLD_WORKTREE / "artifacts/baselines/enhanced_cps_2024_hf_main.h5", + default=PINNED_PRODUCTION_ECPS_BLOB, + help=( + "Baseline H5 for scoring. Defaults to the certified pinned " + "production eCPS blob; local hf_main baselines are rejected." + ), ) ap.add_argument( "--usdata-repo", @@ -478,6 +556,8 @@ def main() -> int: default=Path("~/.claude-worktrees/usdata-f7458313").expanduser(), ) args = ap.parse_args() + if args.score: + args.baseline_h5 = _validate_score_baseline(args.baseline_h5) smoke = args.mode == "smoke" max_units = args.max_tax_units or (4000 if smoke else None) @@ -637,6 +717,7 @@ def main() -> int: - person["qualified_dividend_income"] ).clip(lower=0.0) person = _derive_person_columns(person) + person = _attach_filing_status_inputs(person, spine) # Pre-response copies and aliases the contract requires alongside the # base variables. person["employment_income_before_lsr"] = person["employment_income"] diff --git a/tests/test_build_us_candidate.py b/tests/test_build_us_candidate.py new file mode 100644 index 0000000..3fb905a --- /dev/null +++ b/tests/test_build_us_candidate.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import importlib.util +from pathlib import Path + +import pandas as pd + +ROOT = Path(__file__).parents[1] + + +def _load_build_driver(): + spec = importlib.util.spec_from_file_location( + "build_us_candidate", + ROOT / "scripts" / "build_us_candidate.py", + ) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_derive_person_columns_sets_ssi_disability_criteria() -> None: + driver = _load_build_driver() + person = pd.DataFrame( + { + "CSP_VAL": [0, 0, 0, 0, 0], + "CHSP_VAL": [0, 0, 0, 0, 0], + "DIS_VAL1": [0, 0, 500, 0, 0], + "DIS_VAL2": [0, 0, 0, 0, 0], + "DIS_SC1": [1, 1, 2, 1, 1], + "DIS_SC2": [1, 1, 1, 1, 1], + "NOW_GRP": [0, 0, 0, 0, 0], + "NOW_MRK": [0, 0, 0, 0, 0], + "WICYN": [0, 0, 0, 0, 0], + "PHIP_VAL": [0, 0, 0, 0, 0], + "POTC_VAL": [0, 0, 0, 0, 0], + "PMED_VAL": [0, 0, 0, 0, 0], + "PEDISDRS": [1, 2, 2, 2, 2], + "PEDISEAR": [2, 2, 2, 2, 2], + "PEDISEYE": [2, 2, 2, 2, 2], + "PEDISOUT": [2, 2, 2, 2, 2], + "PEDISPHY": [2, 2, 2, 2, 2], + "PEDISREM": [2, 2, 2, 2, 2], + "race": [1, 1, 1, 1, 1], + "sex": [1, 1, 1, 1, 1], + "hispanic": [0, 0, 0, 0, 0], + "A_EXPRRP": [1, 1, 1, 1, 1], + "A_MARITL": [7, 7, 7, 7, 7], + "household_id": [1, 1, 1, 1, 1], + "A_LINENO": [1, 2, 3, 4, 5], + "PEPAR1": [0, 0, 0, 0, 0], + "PEPAR2": [0, 0, 0, 0, 0], + "A_AGE": [40, 40, 40, 40, 40], + "social_security": [0, 0, 0, 700, 0], + "ssi": [0, 600, 0, 0, 0], + "RESNSS1": [0, 0, 0, 2, 0], + "RESNSS2": [0, 0, 0, 0, 0], + } + ) + + result = driver._derive_person_columns(person) + + assert result["meets_ssi_disability_criteria"].tolist() == [ + True, + True, + True, + True, + False, + ] + + +def test_attach_filing_status_inputs_sets_policyengine_status_controls() -> None: + driver = _load_build_driver() + person = pd.DataFrame( + { + "new_tax_unit_id": [10, 10, 20, 30], + "tax_unit_role_input": ["HEAD", "DEPENDENT", "HEAD", "HEAD"], + "is_separated": [False, False, False, True], + } + ) + spine = pd.DataFrame( + { + "tax_unit_id": [10, 20, 30], + "filing_status_input": ["SURVIVING_SPOUSE", "SEPARATE", "JOINT"], + } + ) + + result = driver._attach_filing_status_inputs(person, spine) + + assert result["is_surviving_spouse"].tolist() == [True, False, False, False] + assert result["is_separated"].tolist() == [False, False, True, True] + + +def test_attach_filing_status_inputs_requires_status_surface() -> None: + driver = _load_build_driver() + person = pd.DataFrame( + { + "new_tax_unit_id": [10], + "tax_unit_role_input": ["HEAD"], + } + ) + + try: + driver._attach_filing_status_inputs( + person, + pd.DataFrame({"tax_unit_id": [10]}), + ) + except ValueError as exc: + assert "filing_status_input" in str(exc) + else: # pragma: no cover + raise AssertionError("missing filing_status_input should fail") + + +def test_score_baseline_rejects_local_hf_main_baseline() -> None: + driver = _load_build_driver() + + try: + driver._validate_score_baseline(driver.LOCAL_HF_MAIN_BASELINE) + except ValueError as exc: + message = str(exc) + assert "enhanced_cps_2024_hf_main.h5" in message + assert str(driver.PINNED_PRODUCTION_ECPS_BLOB) in message + else: # pragma: no cover + raise AssertionError("local hf_main baseline should be rejected") + + +def test_score_baseline_defaults_to_pinned_production_blob() -> None: + driver = _load_build_driver() + + assert driver.PINNED_PRODUCTION_ECPS_BLOB.name == ( + "7af7026224f84cb6a91743fd8fa7ac506bad8c78e011fa58b6901894db4b4290" + ) + assert ( + driver._validate_score_baseline(driver.PINNED_PRODUCTION_ECPS_BLOB) + == driver.PINNED_PRODUCTION_ECPS_BLOB + )