Skip to content

Commit de3abd6

Browse files
committed
Update utilities for NAN codes:
* update export utility to export, validate, and test the missing cols * handle deleted rows: replaced with nan values * handle deleted files: replace with an empty CSV file * handle comparisons between CSVs with/without missing cols
1 parent 33537f0 commit de3abd6

File tree

4 files changed

+296
-44
lines changed

4 files changed

+296
-44
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
from git import Repo
4141
from git.refs.head import Head
4242
import pandas as pd
43+
import numpy as np
4344

4445
from .utils import read_params
4546
from .logger import get_structured_logger
47+
from .nancodes import Nans
4648

4749
Files = List[str]
4850
FileDiffMap = Dict[str, Optional[str]]
@@ -73,8 +75,10 @@ def diff_export_csv(
7375
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
7476
added_df is the pd.DataFrame of added rows from after_csv.
7577
"""
76-
export_csv_dtypes = {"geo_id": str, "val": float,
77-
"se": float, "sample_size": float}
78+
export_csv_dtypes = {
79+
"geo_id": str, "val": float, "se": float, "sample_size": float,
80+
"missing_val": int, "missing_se": int, "missing_sample_size": int
81+
}
7882

7983
before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
8084
before_df.set_index("geo_id", inplace=True)
@@ -89,12 +93,22 @@ def diff_export_csv(
8993
before_df_cmn = before_df.reindex(common_idx)
9094
after_df_cmn = after_df.reindex(common_idx)
9195

92-
# Exact comparisons, treating NA == NA as True
93-
same_mask = before_df_cmn == after_df_cmn
94-
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
96+
# If CSVs have different columns (no missingness), mark all values as new
97+
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
98+
same_mask = after_df_cmn.copy()
99+
same_mask.loc[:] = False
100+
else:
101+
# Exact comparisons, treating NA == NA as True
102+
same_mask = before_df_cmn == after_df_cmn
103+
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
104+
105+
# Code deleted entries as nans with the deleted missing code
106+
deleted_df = before_df.loc[deleted_idx, :].copy()
107+
deleted_df[["val", "se", "sample_size"]] = np.nan
108+
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
95109

96110
return (
97-
before_df.loc[deleted_idx, :],
111+
deleted_df,
98112
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
99113
after_df.loc[added_idx, :])
100114

@@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
227241

228242
deleted_df, changed_df, added_df = diff_export_csv(
229243
before_file, after_file)
230-
new_issues_df = pd.concat([changed_df, added_df], axis=0)
244+
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)
231245

232246
if len(deleted_df) > 0:
233247
print(
234-
f"Warning, diff has deleted indices in {after_file} that will be ignored")
248+
f"Diff has deleted indices in {after_file} that have been coded as nans.")
235249

236250
# Write the diffs to diff_file, if applicable
237251
if len(new_issues_df) > 0:
@@ -240,6 +254,15 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
240254
new_issues_df.to_csv(diff_file, na_rep="NA")
241255
common_diffs[after_file] = diff_file
242256

257+
# Replace deleted files with empty versions, but only if the cached version is not
258+
# already empty
259+
for deleted_file in deleted_files:
260+
deleted_df = pd.read_csv(deleted_file)
261+
if not deleted_df.empty:
262+
empty_df = deleted_df[0:0]
263+
new_deleted_filename = join(self.export_dir, basename(deleted_file))
264+
empty_df.to_csv(new_deleted_filename, index=False)
265+
243266
return deleted_files, common_diffs, new_files
244267

245268
def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
@@ -266,9 +289,10 @@ def filter_exports(self, common_diffs: FileDiffMap):
266289
Filter export directory to only contain relevant files.
267290
268291
Filters down the export_dir to only contain:
269-
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only.
270-
Should be called after archive_exports() so we archive the raw exports before
271-
potentially modifying them.
292+
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows
293+
only, and 3) Deleted files replaced with empty CSVs with the same name. Should
294+
be called after archive_exports() so we archive the raw exports before potentially
295+
modifying them.
272296
273297
Parameters
274298
----------
@@ -297,9 +321,9 @@ def run(self):
297321
self.update_cache()
298322

299323
# Diff exports, and make incremental versions
300-
_, common_diffs, new_files = self.diff_exports()
324+
deleted_files, common_diffs, new_files = self.diff_exports()
301325

302-
# Archive changed and new files only
326+
# Archive changed, new, and emptied deleted files
303327
to_archive = [f for f, diff in common_diffs.items()
304328
if diff is not None]
305329
to_archive += new_files

_delphi_utils_python/delphi_utils/export.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,32 @@
33
from datetime import datetime
44
from os.path import join
55
from typing import Optional
6+
import logging
67

78
import numpy as np
89
import pandas as pd
910

11+
from .nancodes import Nans
12+
13+
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
14+
"""Find values with contradictory missingness codes, filter them, and log."""
15+
columns = ["val", "se", "sample_size"]
16+
# Get indicies where the XNOR is true (i.e. both are true or both are false).
17+
masks = [
18+
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
19+
for column in columns
20+
]
21+
for mask in masks:
22+
if not logger is None and df.loc[mask].size > 0:
23+
logger.info(
24+
"Filtering contradictory missing code in " +
25+
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
26+
)
27+
df = df.loc[~mask]
28+
elif logger is None and df.loc[mask].size > 0:
29+
df = df.loc[~mask]
30+
return df
31+
1032
def create_export_csv(
1133
df: pd.DataFrame,
1234
export_dir: str,
@@ -15,7 +37,8 @@ def create_export_csv(
1537
metric: Optional[str] = None,
1638
start_date: Optional[datetime] = None,
1739
end_date: Optional[datetime] = None,
18-
remove_null_samples: Optional[bool] = False
40+
remove_null_samples: Optional[bool] = False,
41+
logger: Optional[logging.Logger] = None
1942
):
2043
"""Export data in the format expected by the Delphi API.
2144
@@ -39,6 +62,8 @@ def create_export_csv(
3962
Latest date to export or None if no maximum date restrictions should be applied.
4063
remove_null_samples: Optional[bool]
4164
Whether to remove entries whose sample sizes are null.
65+
logger: Optional[logging.Logger]
66+
Pass a logger object here to log information about contradictory missing codes.
4267
4368
Returns
4469
---------
@@ -64,7 +89,20 @@ def create_export_csv(
6489
else:
6590
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
6691
export_file = join(export_dir, export_filename)
67-
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
92+
expected_columns = [
93+
"geo_id",
94+
"val",
95+
"se",
96+
"sample_size",
97+
"missing_val",
98+
"missing_se",
99+
"missing_sample_size"
100+
]
101+
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
102+
if "missing_val" in export_df.columns:
103+
export_df = filter_contradicting_missing_codes(
104+
export_df, sensor, metric, date, logger=logger
105+
)
68106
if remove_null_samples:
69107
export_df = export_df[export_df["sample_size"].notnull()]
70108
export_df = export_df.round({"val": 7, "se": 7})

0 commit comments

Comments
 (0)