Skip to content

Add NAN code support to NCHS Mortality [staging test] #994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
from git import Repo
from git.refs.head import Head
import pandas as pd
import numpy as np

from .utils import read_params
from .logger import get_structured_logger
from .nancodes import Nans

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]
Expand Down Expand Up @@ -73,8 +75,10 @@ def diff_export_csv(
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
added_df is the pd.DataFrame of added rows from after_csv.
"""
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}
export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se":int, "missing_sample_size": int
}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -93,8 +97,13 @@ def diff_export_csv(
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)

# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

return (
before_df.loc[deleted_idx, :],
deleted_df,
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])

Expand Down Expand Up @@ -227,11 +236,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:

deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")
f"Diff has deleted indices in {after_file} that have been coded as nans.")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand Down
63 changes: 61 additions & 2 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,53 @@
from datetime import datetime
from os.path import join
from typing import Optional
import logging

import numpy as np
import pandas as pd

from .nancodes import Nans

def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
val_contradictory_missing_mask = (
(df["val"].isna() & df["missing_val"].eq(Nans.NOT_MISSING))
|
(df["val"].notna() & df["missing_val"].ne(Nans.NOT_MISSING))
)
se_contradictory_missing_mask = (
(df["se"].isna() & df["missing_se"].eq(Nans.NOT_MISSING))
|
(df["se"].notna() & df["missing_se"].ne(Nans.NOT_MISSING))
)
sample_size_contradictory_missing_mask = (
(df["sample_size"].isna() & df["missing_sample_size"].eq(Nans.NOT_MISSING))
|
(df["sample_size"].notna() & df["missing_sample_size"].ne(Nans.NOT_MISSING))
)
if df.loc[val_contradictory_missing_mask].size > 0:
if not logger is None:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~val_contradictory_missing_mask]
if df.loc[se_contradictory_missing_mask].size > 0:
if not logger is None:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~se_contradictory_missing_mask]
if df.loc[sample_size_contradictory_missing_mask].size > 0:
if not logger is None:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~se_contradictory_missing_mask]
return df

def create_export_csv(
df: pd.DataFrame,
export_dir: str,
Expand All @@ -15,7 +58,8 @@ def create_export_csv(
metric: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False
remove_null_samples: Optional[bool] = False,
logger: Optional[logging.Logger] = None
):
"""Export data in the format expected by the Delphi API.
Expand All @@ -39,6 +83,8 @@ def create_export_csv(
Latest date to export or None if no maximum date restrictions should be applied.
remove_null_samples: Optional[bool]
Whether to remove entries whose sample sizes are null.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.
Returns
---------
Expand All @@ -64,7 +110,20 @@ def create_export_csv(
else:
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
Expand Down
91 changes: 69 additions & 22 deletions _delphi_utils_python/tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,45 @@
import pytest

from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
archiver_from_params
archiver_from_params, Nans

CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
CSV_DTYPES = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se":int, "missing_sample_size": int
}

CSVS_BEFORE = {
# Common
"csv0": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.000000001, 2.00000002, 3.00000003],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [np.nan, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Deleted
"csv2": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]}),
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),
}

CSVS_AFTER = {
Expand All @@ -45,20 +60,32 @@
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.10000001, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "4"],
"val": [1.0, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [10.0, 21.0, 40.0]}),
"sample_size": [10.0, 21.0, 40.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Added
"csv3": pd.DataFrame({
"geo_id": ["2"],
"val": [2.0000002],
"se": [0.2],
"sample_size": [20.0]}),
"sample_size": [20.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),
}


Expand All @@ -80,10 +107,14 @@ def test_diff_and_filter_exports(self, tmp_path):
mkdir(export_dir)

csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})

arch_diff = ArchiveDiffer(cache_dir, export_dir)

Expand Down Expand Up @@ -261,10 +292,14 @@ def test_run(self, tmp_path, s3_client):
# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -346,7 +381,11 @@ def test_diff_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# Write exact same CSV into cache and export, so no diffs expected
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -383,7 +422,11 @@ def test_archive_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# csv1.csv is now a dirty edit in the repo, and to be exported too
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -462,10 +505,14 @@ def test_run(self, tmp_path):
# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down
Loading