Skip to content

Update archiver and export utils for nancodes and deletion-handling #1252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
from git import Repo
from git.refs.head import Head
import pandas as pd
import numpy as np

from .utils import read_params
from .logger import get_structured_logger
from .nancodes import Nans

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]
Expand Down Expand Up @@ -73,8 +75,10 @@ def diff_export_csv(
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
added_df is the pd.DataFrame of added rows from after_csv.
"""
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}
export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se": int, "missing_sample_size": int
}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -89,12 +93,22 @@ def diff_export_csv(
before_df_cmn = before_df.reindex(common_idx)
after_df_cmn = after_df.reindex(common_idx)

# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
# If CSVs have different columns (no missingness), mark all values as new
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
same_mask = after_df_cmn.copy()
same_mask.loc[:] = False
else:
# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)

# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

return (
before_df.loc[deleted_idx, :],
deleted_df,
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])

Expand Down Expand Up @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:

deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")
f"Diff has deleted indices in {after_file} that have been coded as nans.")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand Down Expand Up @@ -266,9 +280,10 @@ def filter_exports(self, common_diffs: FileDiffMap):
Filter export directory to only contain relevant files.

Filters down the export_dir to only contain:
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only.
Should be called after archive_exports() so we archive the raw exports before
potentially modifying them.
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows
only, and 3) Deleted files replaced with empty CSVs with the same name. Should
be called after archive_exports() so we archive the raw exports before potentially
modifying them.

Parameters
----------
Expand Down Expand Up @@ -299,7 +314,7 @@ def run(self):
# Diff exports, and make incremental versions
_, common_diffs, new_files = self.diff_exports()

# Archive changed and new files only
# Archive changed, new, and emptied deleted files
to_archive = [f for f, diff in common_diffs.items()
if diff is not None]
to_archive += new_files
Expand Down Expand Up @@ -414,6 +429,9 @@ def archive_exports(self, # pylint: disable=arguments-differ
archive_success.append(exported_file)
except FileNotFoundError:
archive_fail.append(exported_file)
except shutil.SameFileError:
# no need to copy if the cached file is the same
archive_success.append(exported_file)

self._exports_archived = True

Expand Down
42 changes: 40 additions & 2 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,32 @@
from datetime import datetime
from os.path import join
from typing import Optional
import logging

import numpy as np
import pandas as pd

from .nancodes import Nans

def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
# Get indicies where the XNOR is true (i.e. both are true or both are false).
masks = [
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
for column in columns
]
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
df = df.loc[~mask]
return df

def create_export_csv(
df: pd.DataFrame,
export_dir: str,
Expand All @@ -16,7 +38,8 @@ def create_export_csv(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False,
write_empty_days: Optional[bool] = False
write_empty_days: Optional[bool] = False,
logger: Optional[logging.Logger] = None
):
"""Export data in the format expected by the Delphi API.

Expand All @@ -43,6 +66,8 @@ def create_export_csv(
write_empty_days: Optional[bool]
If true, every day in between start_date and end_date will have a CSV file written
even if there is no data for the day. If false, only the days present are written.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.

Returns
---------
Expand Down Expand Up @@ -70,7 +95,20 @@ def create_export_csv(
else:
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
Expand Down
122 changes: 95 additions & 27 deletions _delphi_utils_python/tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,53 @@

from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
archiver_from_params
from delphi_utils.nancodes import Nans

CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
CSV_DTYPES = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se":int, "missing_sample_size": int
}

CSVS_BEFORE = {
# Common
"csv0": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.000000001, 2.00000002, 3.00000003],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [np.nan, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Deleted
"csv2": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]}),
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),

# Common, but updated with missing columns
"csv4": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]
}),
}

CSVS_AFTER = {
Expand All @@ -45,23 +69,45 @@
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.10000001, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "4"],
"val": [1.0, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [10.0, 21.0, 40.0]}),
"sample_size": [10.0, 21.0, 40.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Added
"csv3": pd.DataFrame({
"geo_id": ["2"],
"val": [2.0000002],
"se": [0.2],
"sample_size": [20.0]}),
"sample_size": [20.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),

# Common, but updated with missing columns
"csv4": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),
}


class TestArchiveDiffer:

def test_stubs(self):
Expand All @@ -80,10 +126,14 @@ def test_diff_and_filter_exports(self, tmp_path):
mkdir(export_dir)

csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})

arch_diff = ArchiveDiffer(cache_dir, export_dir)

Expand All @@ -106,15 +156,17 @@ def test_diff_and_filter_exports(self, tmp_path):
# Check return values
assert set(deleted_files) == {join(cache_dir, "csv2.csv")}
assert set(common_diffs.keys()) == {
join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]}
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]}
assert set(new_files) == {join(export_dir, "csv3.csv")}
assert common_diffs[join(export_dir, "csv0.csv")] is None
assert common_diffs[join(export_dir, "csv1.csv")] == join(
export_dir, "csv1.csv.diff")

# Check filesystem for actual files
assert set(listdir(export_dir)) == {
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"}
"csv0.csv", "csv1.csv", "csv1.csv.diff",
"csv3.csv", "csv4.csv", "csv4.csv.diff"
}
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES),
csv1_diff)
Expand All @@ -132,7 +184,7 @@ def test_diff_and_filter_exports(self, tmp_path):
arch_diff.filter_exports(common_diffs)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -259,12 +311,16 @@ def test_run(self, tmp_path, s3_client):
assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -346,7 +402,11 @@ def test_diff_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# Write exact same CSV into cache and export, so no diffs expected
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -383,7 +443,11 @@ def test_archive_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# csv1.csv is now a dirty edit in the repo, and to be exported too
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -460,12 +524,16 @@ def test_run(self, tmp_path):
original_branch.checkout()

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down
Loading