Skip to content

Commit 2ea630b

Browse files
committed
Update utilities for NAN codes:
* update export utility to export, validate, and test the missing cols * add deletion coding to the archiver, make it expect missing cols, and let it handle comparisons between missing and non-missing CSVs
1 parent badf367 commit 2ea630b

File tree

4 files changed

+45
-49
lines changed

4 files changed

+45
-49
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def diff_export_csv(
7777
"""
7878
export_csv_dtypes = {
7979
"geo_id": str, "val": float, "se": float, "sample_size": float,
80-
"missing_val": int, "missing_se":int, "missing_sample_size": int
80+
"missing_val": int, "missing_se": int, "missing_sample_size": int
8181
}
8282

8383
before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
@@ -93,9 +93,14 @@ def diff_export_csv(
9393
before_df_cmn = before_df.reindex(common_idx)
9494
after_df_cmn = after_df.reindex(common_idx)
9595

96-
# Exact comparisons, treating NA == NA as True
97-
same_mask = before_df_cmn == after_df_cmn
98-
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
96+
# If CSVs have different columns (no missingness), mark all values as new
97+
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
98+
same_mask = after_df_cmn.copy()
99+
same_mask.loc[:] = False
100+
else:
101+
# Exact comparisons, treating NA == NA as True
102+
same_mask = before_df_cmn == after_df_cmn
103+
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
99104

100105
# Code deleted entries as nans with the deleted missing code
101106
deleted_df = before_df.loc[deleted_idx, :].copy()

_delphi_utils_python/delphi_utils/export.py

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,42 +12,21 @@
1212

1313
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
1414
"""Find values with contradictory missingness codes, filter them, and log."""
15-
val_contradictory_missing_mask = (
16-
(df["val"].isna() & df["missing_val"].eq(Nans.NOT_MISSING))
17-
|
18-
(df["val"].notna() & df["missing_val"].ne(Nans.NOT_MISSING))
19-
)
20-
se_contradictory_missing_mask = (
21-
(df["se"].isna() & df["missing_se"].eq(Nans.NOT_MISSING))
22-
|
23-
(df["se"].notna() & df["missing_se"].ne(Nans.NOT_MISSING))
24-
)
25-
sample_size_contradictory_missing_mask = (
26-
(df["sample_size"].isna() & df["missing_sample_size"].eq(Nans.NOT_MISSING))
27-
|
28-
(df["sample_size"].notna() & df["missing_sample_size"].ne(Nans.NOT_MISSING))
29-
)
30-
if df.loc[val_contradictory_missing_mask].size > 0:
31-
if not logger is None:
15+
columns = ["val", "se", "sample_size"]
16+
# Get indicies where the XNOR is true (i.e. both are true or both are false).
17+
masks = [
18+
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
19+
for column in columns
20+
]
21+
for mask in masks:
22+
if not logger is None and df.loc[mask].size > 0:
3223
logger.info(
3324
"Filtering contradictory missing code in " +
3425
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
3526
)
36-
df = df.loc[~val_contradictory_missing_mask]
37-
if df.loc[se_contradictory_missing_mask].size > 0:
38-
if not logger is None:
39-
logger.info(
40-
"Filtering contradictory missing code in " +
41-
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
42-
)
43-
df = df.loc[~se_contradictory_missing_mask]
44-
if df.loc[sample_size_contradictory_missing_mask].size > 0:
45-
if not logger is None:
46-
logger.info(
47-
"Filtering contradictory missing code in " +
48-
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
49-
)
50-
df = df.loc[~se_contradictory_missing_mask]
27+
df = df.loc[~mask]
28+
elif logger is None and df.loc[mask].size > 0:
29+
df = df.loc[~mask]
5130
return df
5231

5332
def create_export_csv(

_delphi_utils_python/delphi_utils/nancodes.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
"""Provides unified not-a-number codes for the indicators.
2-
3-
Currently requires a manual sync between the covidcast-indicators
4-
and the delphi-epidata repo.
5-
* in covidcast-indicators: _delphi_utils_python/delphi_utils
6-
* in delphi-epidata: src/acquisition/covidcast
7-
"""
1+
"""Unified not-a-number codes for CMU Delphi codebase."""
82

93
from enum import IntEnum
104

_delphi_utils_python/tests/test_archive.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@
5252
"missing_se": [Nans.NOT_MISSING],
5353
"missing_sample_size": [Nans.NOT_MISSING],
5454
}),
55+
56+
# Common, but updated with missing columns
57+
"csv4": pd.DataFrame({
58+
"geo_id": ["1"],
59+
"val": [1.0],
60+
"se": [0.1],
61+
"sample_size": [10.0]
62+
}),
5563
}
5664

5765
CSVS_AFTER = {
@@ -86,8 +94,18 @@
8694
"missing_se": [Nans.NOT_MISSING],
8795
"missing_sample_size": [Nans.NOT_MISSING],
8896
}),
89-
}
9097

98+
# Common, but updated with missing columns
99+
"csv4": pd.DataFrame({
100+
"geo_id": ["1"],
101+
"val": [1.0],
102+
"se": [0.1],
103+
"sample_size": [10.0],
104+
"missing_val": [Nans.NOT_MISSING],
105+
"missing_se": [Nans.NOT_MISSING],
106+
"missing_sample_size": [Nans.NOT_MISSING],
107+
}),
108+
}
91109

92110
class TestArchiveDiffer:
93111

@@ -137,15 +155,15 @@ def test_diff_and_filter_exports(self, tmp_path):
137155
# Check return values
138156
assert set(deleted_files) == {join(cache_dir, "csv2.csv")}
139157
assert set(common_diffs.keys()) == {
140-
join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]}
158+
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]}
141159
assert set(new_files) == {join(export_dir, "csv3.csv")}
142160
assert common_diffs[join(export_dir, "csv0.csv")] is None
143161
assert common_diffs[join(export_dir, "csv1.csv")] == join(
144162
export_dir, "csv1.csv.diff")
145163

146164
# Check filesystem for actual files
147165
assert set(listdir(export_dir)) == {
148-
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"}
166+
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv", "csv4.csv", "csv4.csv.diff"}
149167
assert_frame_equal(
150168
pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES),
151169
csv1_diff)
@@ -163,7 +181,7 @@ def test_diff_and_filter_exports(self, tmp_path):
163181
arch_diff.filter_exports(common_diffs)
164182

165183
# Check exports directory just has incremental changes
166-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
184+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
167185
assert_frame_equal(
168186
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
169187
csv1_diff)
@@ -290,7 +308,7 @@ def test_run(self, tmp_path, s3_client):
290308
assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df)
291309

292310
# Check exports directory just has incremental changes
293-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
311+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
294312
csv1_diff = pd.DataFrame({
295313
"geo_id": ["3", "2", "4"],
296314
"val": [np.nan, 2.1, 4.0],
@@ -503,7 +521,7 @@ def test_run(self, tmp_path):
503521
original_branch.checkout()
504522

505523
# Check exports directory just has incremental changes
506-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
524+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
507525
csv1_diff = pd.DataFrame({
508526
"geo_id": ["3", "2", "4"],
509527
"val": [np.nan, 2.1, 4.0],

0 commit comments

Comments
 (0)