diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..630b581ee 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -93,8 +97,13 @@ def diff_export_csv( same_mask = before_df_cmn == after_df_cmn same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED + return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +236,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 18a6a1885..f66250119 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,10 +3,53 @@ from datetime import datetime from os.path import join from typing import Optional +import logging import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + val_contradictory_missing_mask = ( + (df["val"].isna() & df["missing_val"].eq(Nans.NOT_MISSING)) + | + (df["val"].notna() & df["missing_val"].ne(Nans.NOT_MISSING)) + ) + se_contradictory_missing_mask = ( + (df["se"].isna() & df["missing_se"].eq(Nans.NOT_MISSING)) + | + (df["se"].notna() & df["missing_se"].ne(Nans.NOT_MISSING)) + ) + sample_size_contradictory_missing_mask = ( + (df["sample_size"].isna() & df["missing_sample_size"].eq(Nans.NOT_MISSING)) + | + (df["sample_size"].notna() & df["missing_sample_size"].ne(Nans.NOT_MISSING)) + ) + if df.loc[val_contradictory_missing_mask].size > 0: + if not logger is None: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~val_contradictory_missing_mask] + if df.loc[se_contradictory_missing_mask].size > 0: + if not logger is None: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~se_contradictory_missing_mask] + if df.loc[sample_size_contradictory_missing_mask].size > 0: + if not logger is None: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~se_contradictory_missing_mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -15,7 +58,8 @@ def create_export_csv( metric: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, - remove_null_samples: Optional[bool] = False + remove_null_samples: Optional[bool] = False, + logger: Optional[logging.Logger] = None ): """Export data in the format expected by the Delphi API. @@ -39,6 +83,8 @@ def create_export_csv( Latest date to export or None if no maximum date restrictions should be applied. remove_null_samples: Optional[bool] Whether to remove entries whose sample sizes are null. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -64,7 +110,20 @@ def create_export_csv( else: export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..2b18fbf81 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -13,9 +13,12 @@ import pytest from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ - archiver_from_params + archiver_from_params, Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +26,32 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } CSVS_AFTER = { @@ -45,20 +60,32 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } @@ -80,10 +107,14 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -261,10 +292,14 @@ def test_run(self, tmp_path, s3_client): # Check exports directory just has incremental changes assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -346,7 +381,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +422,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -462,10 +505,14 @@ def test_run(self, tmp_path): # Check exports directory just has incremental changes assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..aedc2933b 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,11 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans def _clean_directory(directory): """Clean files out of a directory.""" @@ -43,6 +46,34 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.UNKNOWN] + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.UNKNOWN] + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -235,3 +266,46 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False, + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + ) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 47f1d60ec..113041582 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -9,7 +9,7 @@ from typing import Dict, Any import numpy as np -from delphi_utils import S3ArchiveDiffer, get_structured_logger +from delphi_utils import S3ArchiveDiffer, get_structured_logger, Nans from .archive_diffs import arch_diffs from .constants import (METRICS, SENSOR_NAME_MAP, @@ -18,6 +18,18 @@ from .pull import pull_nchs_mortality_data +def add_nancodes(df): + """Add nancodes to the dataframe.""" + # Default missingness codes + df["missing_val"] = Nans.NOT_MISSING + df["missing_se"] = Nans.NOT_APPLICABLE + df["missing_sample_size"] = Nans.NOT_APPLICABLE + + # Mark any remaining nans with unknown + remaining_nans_mask = df["val"].isnull() + df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN + return df + def run_module(params: Dict[str, Any]): """Run module for processing NCHS mortality data. @@ -67,7 +79,8 @@ def run_module(params: Dict[str, Any]): df["val"] = df[metric] df["se"] = np.nan df["sample_size"] = np.nan - df = df[~df["val"].isnull()] + df = add_nancodes(df) + # df = df[~df["val"].isnull()] sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) export_csv( df, @@ -86,7 +99,8 @@ def run_module(params: Dict[str, Any]): df["val"] = df[metric] / df["population"] * INCIDENCE_BASE df["se"] = np.nan df["sample_size"] = np.nan - df = df[~df["val"].isnull()] + df = add_nancodes(df) + # df = df[~df["val"].isnull()] sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) export_csv( df,