Skip to content

Commit 623fe0a

Browse files
authored
Merge pull request #994 from dshemetov/nans_nchs
Add NAN code support to NCHS Mortality
2 parents 2897308 + 7e43d70 commit 623fe0a

File tree

5 files changed

+236
-33
lines changed

5 files changed

+236
-33
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
from git import Repo
4141
from git.refs.head import Head
4242
import pandas as pd
43+
import numpy as np
4344

4445
from .utils import read_params
4546
from .logger import get_structured_logger
47+
from .nancodes import Nans
4648

4749
Files = List[str]
4850
FileDiffMap = Dict[str, Optional[str]]
@@ -73,8 +75,10 @@ def diff_export_csv(
7375
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
7476
added_df is the pd.DataFrame of added rows from after_csv.
7577
"""
76-
export_csv_dtypes = {"geo_id": str, "val": float,
77-
"se": float, "sample_size": float}
78+
export_csv_dtypes = {
79+
"geo_id": str, "val": float, "se": float, "sample_size": float,
80+
"missing_val": int, "missing_se":int, "missing_sample_size": int
81+
}
7882

7983
before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
8084
before_df.set_index("geo_id", inplace=True)
@@ -93,8 +97,13 @@ def diff_export_csv(
9397
same_mask = before_df_cmn == after_df_cmn
9498
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
9599

100+
# Code deleted entries as nans with the deleted missing code
101+
deleted_df = before_df.loc[deleted_idx, :].copy()
102+
deleted_df[["val", "se", "sample_size"]] = np.nan
103+
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
104+
96105
return (
97-
before_df.loc[deleted_idx, :],
106+
deleted_df,
98107
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
99108
after_df.loc[added_idx, :])
100109

@@ -227,11 +236,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
227236

228237
deleted_df, changed_df, added_df = diff_export_csv(
229238
before_file, after_file)
230-
new_issues_df = pd.concat([changed_df, added_df], axis=0)
239+
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)
231240

232241
if len(deleted_df) > 0:
233242
print(
234-
f"Warning, diff has deleted indices in {after_file} that will be ignored")
243+
f"Diff has deleted indices in {after_file} that have been coded as nans.")
235244

236245
# Write the diffs to diff_file, if applicable
237246
if len(new_issues_df) > 0:

_delphi_utils_python/delphi_utils/export.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,53 @@
33
from datetime import datetime
44
from os.path import join
55
from typing import Optional
6+
import logging
67

78
import numpy as np
89
import pandas as pd
910

11+
from .nancodes import Nans
12+
13+
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
14+
"""Find values with contradictory missingness codes, filter them, and log."""
15+
val_contradictory_missing_mask = (
16+
(df["val"].isna() & df["missing_val"].eq(Nans.NOT_MISSING))
17+
|
18+
(df["val"].notna() & df["missing_val"].ne(Nans.NOT_MISSING))
19+
)
20+
se_contradictory_missing_mask = (
21+
(df["se"].isna() & df["missing_se"].eq(Nans.NOT_MISSING))
22+
|
23+
(df["se"].notna() & df["missing_se"].ne(Nans.NOT_MISSING))
24+
)
25+
sample_size_contradictory_missing_mask = (
26+
(df["sample_size"].isna() & df["missing_sample_size"].eq(Nans.NOT_MISSING))
27+
|
28+
(df["sample_size"].notna() & df["missing_sample_size"].ne(Nans.NOT_MISSING))
29+
)
30+
if df.loc[val_contradictory_missing_mask].size > 0:
31+
if not logger is None:
32+
logger.info(
33+
"Filtering contradictory missing code in " +
34+
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
35+
)
36+
df = df.loc[~val_contradictory_missing_mask]
37+
if df.loc[se_contradictory_missing_mask].size > 0:
38+
if not logger is None:
39+
logger.info(
40+
"Filtering contradictory missing code in " +
41+
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
42+
)
43+
df = df.loc[~se_contradictory_missing_mask]
44+
if df.loc[sample_size_contradictory_missing_mask].size > 0:
45+
if not logger is None:
46+
logger.info(
47+
"Filtering contradictory missing code in " +
48+
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
49+
)
50+
df = df.loc[~se_contradictory_missing_mask]
51+
return df
52+
1053
def create_export_csv(
1154
df: pd.DataFrame,
1255
export_dir: str,
@@ -15,7 +58,8 @@ def create_export_csv(
1558
metric: Optional[str] = None,
1659
start_date: Optional[datetime] = None,
1760
end_date: Optional[datetime] = None,
18-
remove_null_samples: Optional[bool] = False
61+
remove_null_samples: Optional[bool] = False,
62+
logger: Optional[logging.Logger] = None
1963
):
2064
"""Export data in the format expected by the Delphi API.
2165
@@ -39,6 +83,8 @@ def create_export_csv(
3983
Latest date to export or None if no maximum date restrictions should be applied.
4084
remove_null_samples: Optional[bool]
4185
Whether to remove entries whose sample sizes are null.
86+
logger: Optional[logging.Logger]
87+
Pass a logger object here to log information about contradictory missing codes.
4288
4389
Returns
4490
---------
@@ -64,7 +110,20 @@ def create_export_csv(
64110
else:
65111
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
66112
export_file = join(export_dir, export_filename)
67-
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
113+
expected_columns = [
114+
"geo_id",
115+
"val",
116+
"se",
117+
"sample_size",
118+
"missing_val",
119+
"missing_se",
120+
"missing_sample_size"
121+
]
122+
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
123+
if "missing_val" in export_df.columns:
124+
export_df = filter_contradicting_missing_codes(
125+
export_df, sensor, metric, date, logger=logger
126+
)
68127
if remove_null_samples:
69128
export_df = export_df[export_df["sample_size"].notnull()]
70129
export_df = export_df.round({"val": 7, "se": 7})

_delphi_utils_python/tests/test_archive.py

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,45 @@
1313
import pytest
1414

1515
from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
16-
archiver_from_params
16+
archiver_from_params, Nans
1717

18-
CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
18+
CSV_DTYPES = {
19+
"geo_id": str, "val": float, "se": float, "sample_size": float,
20+
"missing_val": int, "missing_se":int, "missing_sample_size": int
21+
}
1922

2023
CSVS_BEFORE = {
2124
# Common
2225
"csv0": pd.DataFrame({
2326
"geo_id": ["1", "2", "3"],
2427
"val": [1.000000001, 2.00000002, 3.00000003],
2528
"se": [0.1, 0.2, 0.3],
26-
"sample_size": [10.0, 20.0, 30.0]}),
29+
"sample_size": [10.0, 20.0, 30.0],
30+
"missing_val": [Nans.NOT_MISSING] * 3,
31+
"missing_se": [Nans.NOT_MISSING] * 3,
32+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
33+
}),
2734

2835
"csv1": pd.DataFrame({
2936
"geo_id": ["1", "2", "3"],
3037
"val": [1.0, 2.0, 3.0],
3138
"se": [np.nan, 0.20000002, 0.30000003],
32-
"sample_size": [10.0, 20.0, 30.0]}),
39+
"sample_size": [10.0, 20.0, 30.0],
40+
"missing_val": [Nans.NOT_MISSING] * 3,
41+
"missing_se": [Nans.NOT_MISSING] * 3,
42+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
43+
}),
3344

3445
# Deleted
3546
"csv2": pd.DataFrame({
3647
"geo_id": ["1"],
3748
"val": [1.0],
3849
"se": [0.1],
39-
"sample_size": [10.0]}),
50+
"sample_size": [10.0],
51+
"missing_val": [Nans.NOT_MISSING],
52+
"missing_se": [Nans.NOT_MISSING],
53+
"missing_sample_size": [Nans.NOT_MISSING],
54+
}),
4055
}
4156

4257
CSVS_AFTER = {
@@ -45,20 +60,32 @@
4560
"geo_id": ["1", "2", "3"],
4661
"val": [1.0, 2.0, 3.0],
4762
"se": [0.10000001, 0.20000002, 0.30000003],
48-
"sample_size": [10.0, 20.0, 30.0]}),
63+
"sample_size": [10.0, 20.0, 30.0],
64+
"missing_val": [Nans.NOT_MISSING] * 3,
65+
"missing_se": [Nans.NOT_MISSING] * 3,
66+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
67+
}),
4968

5069
"csv1": pd.DataFrame({
5170
"geo_id": ["1", "2", "4"],
5271
"val": [1.0, 2.1, 4.0],
5372
"se": [np.nan, 0.21, np.nan],
54-
"sample_size": [10.0, 21.0, 40.0]}),
73+
"sample_size": [10.0, 21.0, 40.0],
74+
"missing_val": [Nans.NOT_MISSING] * 3,
75+
"missing_se": [Nans.NOT_MISSING] * 3,
76+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
77+
}),
5578

5679
# Added
5780
"csv3": pd.DataFrame({
5881
"geo_id": ["2"],
5982
"val": [2.0000002],
6083
"se": [0.2],
61-
"sample_size": [20.0]}),
84+
"sample_size": [20.0],
85+
"missing_val": [Nans.NOT_MISSING],
86+
"missing_se": [Nans.NOT_MISSING],
87+
"missing_sample_size": [Nans.NOT_MISSING],
88+
}),
6289
}
6390

6491

@@ -80,10 +107,14 @@ def test_diff_and_filter_exports(self, tmp_path):
80107
mkdir(export_dir)
81108

82109
csv1_diff = pd.DataFrame({
83-
"geo_id": ["2", "4"],
84-
"val": [2.1, 4.0],
85-
"se": [0.21, np.nan],
86-
"sample_size": [21.0, 40.0]})
110+
"geo_id": ["3", "2", "4"],
111+
"val": [np.nan, 2.1, 4.0],
112+
"se": [np.nan, 0.21, np.nan],
113+
"sample_size": [np.nan, 21.0, 40.0],
114+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
115+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
116+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
117+
})
87118

88119
arch_diff = ArchiveDiffer(cache_dir, export_dir)
89120

@@ -261,10 +292,14 @@ def test_run(self, tmp_path, s3_client):
261292
# Check exports directory just has incremental changes
262293
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
263294
csv1_diff = pd.DataFrame({
264-
"geo_id": ["2", "4"],
265-
"val": [2.1, 4.0],
266-
"se": [0.21, np.nan],
267-
"sample_size": [21.0, 40.0]})
295+
"geo_id": ["3", "2", "4"],
296+
"val": [np.nan, 2.1, 4.0],
297+
"se": [np.nan, 0.21, np.nan],
298+
"sample_size": [np.nan, 21.0, 40.0],
299+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
300+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
301+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
302+
})
268303
assert_frame_equal(
269304
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
270305
csv1_diff)
@@ -346,7 +381,11 @@ def test_diff_exports(self, tmp_path):
346381
"geo_id": ["1", "2", "3"],
347382
"val": [1.0, 2.0, 3.0],
348383
"se": [0.1, 0.2, 0.3],
349-
"sample_size": [10.0, 20.0, 30.0]})
384+
"sample_size": [10.0, 20.0, 30.0],
385+
"missing_val": [Nans.NOT_MISSING] * 3,
386+
"missing_se": [Nans.NOT_MISSING] * 3,
387+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
388+
})
350389

351390
# Write exact same CSV into cache and export, so no diffs expected
352391
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
@@ -383,7 +422,11 @@ def test_archive_exports(self, tmp_path):
383422
"geo_id": ["1", "2", "3"],
384423
"val": [1.0, 2.0, 3.0],
385424
"se": [0.1, 0.2, 0.3],
386-
"sample_size": [10.0, 20.0, 30.0]})
425+
"sample_size": [10.0, 20.0, 30.0],
426+
"missing_val": [Nans.NOT_MISSING] * 3,
427+
"missing_se": [Nans.NOT_MISSING] * 3,
428+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
429+
})
387430

388431
# csv1.csv is now a dirty edit in the repo, and to be exported too
389432
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
@@ -462,10 +505,14 @@ def test_run(self, tmp_path):
462505
# Check exports directory just has incremental changes
463506
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
464507
csv1_diff = pd.DataFrame({
465-
"geo_id": ["2", "4"],
466-
"val": [2.1, 4.0],
467-
"se": [0.21, np.nan],
468-
"sample_size": [21.0, 40.0]})
508+
"geo_id": ["3", "2", "4"],
509+
"val": [np.nan, 2.1, 4.0],
510+
"se": [np.nan, 0.21, np.nan],
511+
"sample_size": [np.nan, 21.0, 40.0],
512+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
513+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
514+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
515+
})
469516
assert_frame_equal(
470517
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
471518
csv1_diff)

0 commit comments

Comments
 (0)