Skip to content

Commit 4462772

Browse files
committed
Update utilities for NAN codes:
* update export utility to export, validate, and test the missing cols * add deletion coding to the archiver, make it expect missing cols, and let it handle comparisons between missing and non-missing CSVs
1 parent 33537f0 commit 4462772

File tree

4 files changed

+230
-38
lines changed

4 files changed

+230
-38
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
from git import Repo
4141
from git.refs.head import Head
4242
import pandas as pd
43+
import numpy as np
4344

4445
from .utils import read_params
4546
from .logger import get_structured_logger
47+
from .nancodes import Nans
4648

4749
Files = List[str]
4850
FileDiffMap = Dict[str, Optional[str]]
@@ -73,8 +75,10 @@ def diff_export_csv(
7375
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
7476
added_df is the pd.DataFrame of added rows from after_csv.
7577
"""
76-
export_csv_dtypes = {"geo_id": str, "val": float,
77-
"se": float, "sample_size": float}
78+
export_csv_dtypes = {
79+
"geo_id": str, "val": float, "se": float, "sample_size": float,
80+
"missing_val": int, "missing_se": int, "missing_sample_size": int
81+
}
7882

7983
before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
8084
before_df.set_index("geo_id", inplace=True)
@@ -89,12 +93,22 @@ def diff_export_csv(
8993
before_df_cmn = before_df.reindex(common_idx)
9094
after_df_cmn = after_df.reindex(common_idx)
9195

92-
# Exact comparisons, treating NA == NA as True
93-
same_mask = before_df_cmn == after_df_cmn
94-
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
96+
# If CSVs have different columns (no missingness), mark all values as new
97+
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
98+
same_mask = after_df_cmn.copy()
99+
same_mask.loc[:] = False
100+
else:
101+
# Exact comparisons, treating NA == NA as True
102+
same_mask = before_df_cmn == after_df_cmn
103+
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
104+
105+
# Code deleted entries as nans with the deleted missing code
106+
deleted_df = before_df.loc[deleted_idx, :].copy()
107+
deleted_df[["val", "se", "sample_size"]] = np.nan
108+
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
95109

96110
return (
97-
before_df.loc[deleted_idx, :],
111+
deleted_df,
98112
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
99113
after_df.loc[added_idx, :])
100114

@@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
227241

228242
deleted_df, changed_df, added_df = diff_export_csv(
229243
before_file, after_file)
230-
new_issues_df = pd.concat([changed_df, added_df], axis=0)
244+
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)
231245

232246
if len(deleted_df) > 0:
233247
print(
234-
f"Warning, diff has deleted indices in {after_file} that will be ignored")
248+
f"Diff has deleted indices in {after_file} that have been coded as nans.")
235249

236250
# Write the diffs to diff_file, if applicable
237251
if len(new_issues_df) > 0:

_delphi_utils_python/delphi_utils/export.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,32 @@
33
from datetime import datetime
44
from os.path import join
55
from typing import Optional
6+
import logging
67

78
import numpy as np
89
import pandas as pd
910

11+
from .nancodes import Nans
12+
13+
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
14+
"""Find values with contradictory missingness codes, filter them, and log."""
15+
columns = ["val", "se", "sample_size"]
16+
# Get indicies where the XNOR is true (i.e. both are true or both are false).
17+
masks = [
18+
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
19+
for column in columns
20+
]
21+
for mask in masks:
22+
if not logger is None and df.loc[mask].size > 0:
23+
logger.info(
24+
"Filtering contradictory missing code in " +
25+
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
26+
)
27+
df = df.loc[~mask]
28+
elif logger is None and df.loc[mask].size > 0:
29+
df = df.loc[~mask]
30+
return df
31+
1032
def create_export_csv(
1133
df: pd.DataFrame,
1234
export_dir: str,
@@ -15,7 +37,8 @@ def create_export_csv(
1537
metric: Optional[str] = None,
1638
start_date: Optional[datetime] = None,
1739
end_date: Optional[datetime] = None,
18-
remove_null_samples: Optional[bool] = False
40+
remove_null_samples: Optional[bool] = False,
41+
logger: Optional[logging.Logger] = None
1942
):
2043
"""Export data in the format expected by the Delphi API.
2144
@@ -39,6 +62,8 @@ def create_export_csv(
3962
Latest date to export or None if no maximum date restrictions should be applied.
4063
remove_null_samples: Optional[bool]
4164
Whether to remove entries whose sample sizes are null.
65+
logger: Optional[logging.Logger]
66+
Pass a logger object here to log information about contradictory missing codes.
4267
4368
Returns
4469
---------
@@ -64,7 +89,20 @@ def create_export_csv(
6489
else:
6590
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
6691
export_file = join(export_dir, export_filename)
67-
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
92+
expected_columns = [
93+
"geo_id",
94+
"val",
95+
"se",
96+
"sample_size",
97+
"missing_val",
98+
"missing_se",
99+
"missing_sample_size"
100+
]
101+
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
102+
if "missing_val" in export_df.columns:
103+
export_df = filter_contradicting_missing_codes(
104+
export_df, sensor, metric, date, logger=logger
105+
)
68106
if remove_null_samples:
69107
export_df = export_df[export_df["sample_size"].notnull()]
70108
export_df = export_df.round({"val": 7, "se": 7})

_delphi_utils_python/tests/test_archive.py

Lines changed: 93 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,53 @@
1414

1515
from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
1616
archiver_from_params
17+
from delphi_utils import Nans
1718

18-
CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
19+
CSV_DTYPES = {
20+
"geo_id": str, "val": float, "se": float, "sample_size": float,
21+
"missing_val": int, "missing_se":int, "missing_sample_size": int
22+
}
1923

2024
CSVS_BEFORE = {
2125
# Common
2226
"csv0": pd.DataFrame({
2327
"geo_id": ["1", "2", "3"],
2428
"val": [1.000000001, 2.00000002, 3.00000003],
2529
"se": [0.1, 0.2, 0.3],
26-
"sample_size": [10.0, 20.0, 30.0]}),
30+
"sample_size": [10.0, 20.0, 30.0],
31+
"missing_val": [Nans.NOT_MISSING] * 3,
32+
"missing_se": [Nans.NOT_MISSING] * 3,
33+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
34+
}),
2735

2836
"csv1": pd.DataFrame({
2937
"geo_id": ["1", "2", "3"],
3038
"val": [1.0, 2.0, 3.0],
3139
"se": [np.nan, 0.20000002, 0.30000003],
32-
"sample_size": [10.0, 20.0, 30.0]}),
40+
"sample_size": [10.0, 20.0, 30.0],
41+
"missing_val": [Nans.NOT_MISSING] * 3,
42+
"missing_se": [Nans.NOT_MISSING] * 3,
43+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
44+
}),
3345

3446
# Deleted
3547
"csv2": pd.DataFrame({
3648
"geo_id": ["1"],
3749
"val": [1.0],
3850
"se": [0.1],
39-
"sample_size": [10.0]}),
51+
"sample_size": [10.0],
52+
"missing_val": [Nans.NOT_MISSING],
53+
"missing_se": [Nans.NOT_MISSING],
54+
"missing_sample_size": [Nans.NOT_MISSING],
55+
}),
56+
57+
# Common, but updated with missing columns
58+
"csv4": pd.DataFrame({
59+
"geo_id": ["1"],
60+
"val": [1.0],
61+
"se": [0.1],
62+
"sample_size": [10.0]
63+
}),
4064
}
4165

4266
CSVS_AFTER = {
@@ -45,23 +69,45 @@
4569
"geo_id": ["1", "2", "3"],
4670
"val": [1.0, 2.0, 3.0],
4771
"se": [0.10000001, 0.20000002, 0.30000003],
48-
"sample_size": [10.0, 20.0, 30.0]}),
72+
"sample_size": [10.0, 20.0, 30.0],
73+
"missing_val": [Nans.NOT_MISSING] * 3,
74+
"missing_se": [Nans.NOT_MISSING] * 3,
75+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
76+
}),
4977

5078
"csv1": pd.DataFrame({
5179
"geo_id": ["1", "2", "4"],
5280
"val": [1.0, 2.1, 4.0],
5381
"se": [np.nan, 0.21, np.nan],
54-
"sample_size": [10.0, 21.0, 40.0]}),
82+
"sample_size": [10.0, 21.0, 40.0],
83+
"missing_val": [Nans.NOT_MISSING] * 3,
84+
"missing_se": [Nans.NOT_MISSING] * 3,
85+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
86+
}),
5587

5688
# Added
5789
"csv3": pd.DataFrame({
5890
"geo_id": ["2"],
5991
"val": [2.0000002],
6092
"se": [0.2],
61-
"sample_size": [20.0]}),
93+
"sample_size": [20.0],
94+
"missing_val": [Nans.NOT_MISSING],
95+
"missing_se": [Nans.NOT_MISSING],
96+
"missing_sample_size": [Nans.NOT_MISSING],
97+
}),
98+
99+
# Common, but updated with missing columns
100+
"csv4": pd.DataFrame({
101+
"geo_id": ["1"],
102+
"val": [1.0],
103+
"se": [0.1],
104+
"sample_size": [10.0],
105+
"missing_val": [Nans.NOT_MISSING],
106+
"missing_se": [Nans.NOT_MISSING],
107+
"missing_sample_size": [Nans.NOT_MISSING],
108+
}),
62109
}
63110

64-
65111
class TestArchiveDiffer:
66112

67113
def test_stubs(self):
@@ -80,10 +126,14 @@ def test_diff_and_filter_exports(self, tmp_path):
80126
mkdir(export_dir)
81127

82128
csv1_diff = pd.DataFrame({
83-
"geo_id": ["2", "4"],
84-
"val": [2.1, 4.0],
85-
"se": [0.21, np.nan],
86-
"sample_size": [21.0, 40.0]})
129+
"geo_id": ["3", "2", "4"],
130+
"val": [np.nan, 2.1, 4.0],
131+
"se": [np.nan, 0.21, np.nan],
132+
"sample_size": [np.nan, 21.0, 40.0],
133+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
134+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
135+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
136+
})
87137

88138
arch_diff = ArchiveDiffer(cache_dir, export_dir)
89139

@@ -106,15 +156,15 @@ def test_diff_and_filter_exports(self, tmp_path):
106156
# Check return values
107157
assert set(deleted_files) == {join(cache_dir, "csv2.csv")}
108158
assert set(common_diffs.keys()) == {
109-
join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]}
159+
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]}
110160
assert set(new_files) == {join(export_dir, "csv3.csv")}
111161
assert common_diffs[join(export_dir, "csv0.csv")] is None
112162
assert common_diffs[join(export_dir, "csv1.csv")] == join(
113163
export_dir, "csv1.csv.diff")
114164

115165
# Check filesystem for actual files
116166
assert set(listdir(export_dir)) == {
117-
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"}
167+
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv", "csv4.csv", "csv4.csv.diff"}
118168
assert_frame_equal(
119169
pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES),
120170
csv1_diff)
@@ -132,7 +182,7 @@ def test_diff_and_filter_exports(self, tmp_path):
132182
arch_diff.filter_exports(common_diffs)
133183

134184
# Check exports directory just has incremental changes
135-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
185+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
136186
assert_frame_equal(
137187
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
138188
csv1_diff)
@@ -259,12 +309,16 @@ def test_run(self, tmp_path, s3_client):
259309
assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df)
260310

261311
# Check exports directory just has incremental changes
262-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
312+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
263313
csv1_diff = pd.DataFrame({
264-
"geo_id": ["2", "4"],
265-
"val": [2.1, 4.0],
266-
"se": [0.21, np.nan],
267-
"sample_size": [21.0, 40.0]})
314+
"geo_id": ["3", "2", "4"],
315+
"val": [np.nan, 2.1, 4.0],
316+
"se": [np.nan, 0.21, np.nan],
317+
"sample_size": [np.nan, 21.0, 40.0],
318+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
319+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
320+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
321+
})
268322
assert_frame_equal(
269323
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
270324
csv1_diff)
@@ -346,7 +400,11 @@ def test_diff_exports(self, tmp_path):
346400
"geo_id": ["1", "2", "3"],
347401
"val": [1.0, 2.0, 3.0],
348402
"se": [0.1, 0.2, 0.3],
349-
"sample_size": [10.0, 20.0, 30.0]})
403+
"sample_size": [10.0, 20.0, 30.0],
404+
"missing_val": [Nans.NOT_MISSING] * 3,
405+
"missing_se": [Nans.NOT_MISSING] * 3,
406+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
407+
})
350408

351409
# Write exact same CSV into cache and export, so no diffs expected
352410
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
@@ -383,7 +441,11 @@ def test_archive_exports(self, tmp_path):
383441
"geo_id": ["1", "2", "3"],
384442
"val": [1.0, 2.0, 3.0],
385443
"se": [0.1, 0.2, 0.3],
386-
"sample_size": [10.0, 20.0, 30.0]})
444+
"sample_size": [10.0, 20.0, 30.0],
445+
"missing_val": [Nans.NOT_MISSING] * 3,
446+
"missing_se": [Nans.NOT_MISSING] * 3,
447+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
448+
})
387449

388450
# csv1.csv is now a dirty edit in the repo, and to be exported too
389451
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
@@ -460,12 +522,16 @@ def test_run(self, tmp_path):
460522
original_branch.checkout()
461523

462524
# Check exports directory just has incremental changes
463-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
525+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
464526
csv1_diff = pd.DataFrame({
465-
"geo_id": ["2", "4"],
466-
"val": [2.1, 4.0],
467-
"se": [0.21, np.nan],
468-
"sample_size": [21.0, 40.0]})
527+
"geo_id": ["3", "2", "4"],
528+
"val": [np.nan, 2.1, 4.0],
529+
"se": [np.nan, 0.21, np.nan],
530+
"sample_size": [np.nan, 21.0, 40.0],
531+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
532+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
533+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
534+
})
469535
assert_frame_equal(
470536
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
471537
csv1_diff)

0 commit comments

Comments
 (0)