diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index a23f92516..c805434bf 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -49,6 +49,10 @@ Files = List[str] FileDiffMap = Dict[str, Optional[str]] +EXPORT_CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": "Int64", "missing_se": "Int64", "missing_sample_size": "Int64" +} def diff_export_csv( before_csv: str, @@ -75,15 +79,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = { - "geo_id": str, "val": float, "se": float, "sample_size": float, - "missing_val": int, "missing_se": int, "missing_sample_size": int - } - - before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) + before_df = pd.read_csv(before_csv, dtype=EXPORT_CSV_DTYPES) before_df.set_index("geo_id", inplace=True) before_df = before_df.round({"val": 7, "se": 7}) - after_df = pd.read_csv(after_csv, dtype=export_csv_dtypes) + after_df = pd.read_csv(after_csv, dtype=EXPORT_CSV_DTYPES) after_df.set_index("geo_id", inplace=True) after_df = after_df.round({"val": 7, "se": 7}) deleted_idx = before_df.index.difference(after_df.index) @@ -93,8 +92,8 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # If CSVs have different columns (no missingness), mark all values as new - if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + # If new CSV has missingness columns, but old doesn't, mark all values as new + if ("missing_val" not in before_df_cmn.columns) & ("missing_val" in after_df_cmn.columns): same_mask = after_df_cmn.copy() same_mask.loc[:] = False else: @@ -102,11 +101,12 @@ def diff_export_csv( same_mask = before_df_cmn == after_df_cmn same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) - # Code deleted entries as nans with the deleted missing code + # Any deleted entries become rows with nans and the deleted missing code deleted_df = before_df.loc[deleted_idx, :].copy() deleted_df[["val", "se", "sample_size"]] = np.nan - if "missing_val" in after_df_cmn.columns: - deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED + # If the new file doesn't have missing columsn, then when the deleted, changed, and added + # rows are concatenated (in diff_exports), they will default to NA + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( deleted_df, diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 3e30a3264..68f05454d 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -2,6 +2,7 @@ from io import StringIO, BytesIO from os import listdir, mkdir from os.path import join +from typing import Any, Dict, List from boto3 import Session from git import Repo, exc @@ -18,8 +19,8 @@ CSV_DTYPES = { "geo_id": str, "val": float, "se": float, "sample_size": float, - "missing_val": int, "missing_se": int, "missing_sample_size": int - } + "missing_val": "Int64", "missing_se": "Int64", "missing_sample_size": "Int64" +} CSVS_BEFORE = { # All rows unchanged @@ -31,7 +32,7 @@ "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # One row deleted and one row added "csv1": pd.DataFrame({ @@ -40,9 +41,9 @@ "se": [np.nan, 0.20000002, 0.30000003], "sample_size": [10.0, 20.0, 30.0], "missing_val": [Nans.NOT_MISSING] * 3, - "missing_se": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.CENSORED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # File deleted "csv2": pd.DataFrame({ @@ -53,7 +54,7 @@ "missing_val": [Nans.NOT_MISSING], "missing_se": [Nans.NOT_MISSING], "missing_sample_size": [Nans.NOT_MISSING], - }), + }), # All rows common, but missing columns added "csv4": pd.DataFrame({ @@ -61,42 +62,15 @@ "val": [1.0], "se": [0.1], "sample_size": [10.0] - }), + }), - # All rows common, but missing columns removed + # Same as 1, but no missing columns ("old-style" file) "csv5": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0], - "missing_val": [Nans.NOT_MISSING], - "missing_se": [Nans.NOT_MISSING], - "missing_sample_size": [Nans.NOT_MISSING], - }), - - # All rows common, but no missing columns - "csv6": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), - - # Row deleted and row added, but no missing columns (will not be uploaded) - "csv7": pd.DataFrame({ - "geo_id": ["1", "2"], - "val": [1.0, 2.0], - "se": [0.1, 0.2], - "sample_size": [10.0, 20.0] - }), - - # Row deleted and row added, but no missing columns - "csv8": pd.DataFrame({ - "geo_id": ["1", "2"], - "val": [1.0, 2.0], - "se": [0.1, 0.2], - "sample_size": [10.0, 20.0] - }), + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [np.nan, 0.20000002, 0.30000003], + "sample_size": [10.0, 20.0, 30.0] + }) } CSVS_AFTER = { @@ -109,7 +83,7 @@ "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # One row deleted and one row added "csv1": pd.DataFrame({ @@ -118,9 +92,9 @@ "se": [np.nan, 0.21, np.nan], "sample_size": [10.0, 21.0, 40.0], "missing_val": [Nans.NOT_MISSING] * 3, - "missing_se": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.CENSORED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # File added "csv3": pd.DataFrame({ @@ -131,7 +105,7 @@ "missing_val": [Nans.NOT_MISSING], "missing_se": [Nans.NOT_MISSING], "missing_sample_size": [Nans.NOT_MISSING], - }), + }), # All rows common, but missing columns added "csv4": pd.DataFrame({ @@ -142,40 +116,29 @@ "missing_val": [Nans.NOT_MISSING], "missing_se": [Nans.NOT_MISSING], "missing_sample_size": [Nans.NOT_MISSING], - }), + }), - # All rows common, but missing columns removed + # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously + # and present again as nan, row 5 deleted previously and absent from file + # (no missing columns) "csv5": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), + "geo_id": ["1", "2", "4"], + "val": [1.0, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [10.0, 21.0, 40.0] + }) +} - # All rows common, but no missing columns - "csv6": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), +def _assert_frames_equal_ignore_row_order(df1, df2, index_cols: List[str] = None): + return assert_frame_equal(df1.set_index(index_cols).sort_index(), df2.set_index(index_cols).sort_index()) + +def _set_df_datatypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: + df = df.copy() + for k, v in dtypes.items(): + if k in df.columns: + df[k] = df[k].astype(v) + return df - # Row deleted and row added, but no missing columns (will not be uploaded) - "csv7": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), - - # Row deleted and row added, but no missing columns - "csv8": pd.DataFrame({ - "geo_id": ["1", "3"], - "val": [1.0, 3.0], - "se": [0.1, 0.3], - "sample_size": [10.0, 30.0] - }), -} class TestArchiveDiffer: @@ -194,15 +157,25 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(cache_dir) mkdir(export_dir) - csv1_diff = pd.DataFrame({ - "geo_id": ["3", "2", "4"], - "val": [np.nan, 2.1, 4.0], - "se": [np.nan, 0.21, np.nan], - "sample_size": [np.nan, 21.0, 40.0], - "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - }) + expected_csv1_diff = _set_df_datatypes(pd.DataFrame({ + "geo_id": ["2", "3", "4"], + "val": [2.1, np.nan, 4.0], + "se": [0.21, np.nan, np.nan], + "sample_size": [21.0, np.nan, 40.0], + "missing_val": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], + }), dtypes=CSV_DTYPES) + expected_csv4_diff = _set_df_datatypes(CSVS_AFTER["csv4"], dtypes=CSV_DTYPES) + expected_csv5_diff = _set_df_datatypes(pd.DataFrame({ + "geo_id": ["2", "3", "4"], + "val": [2.1, np.nan, 4.0], + "se": [0.21, np.nan, np.nan], + "sample_size": [21.0, np.nan, 40.0], + "missing_val": [np.nan, Nans.DELETED, np.nan], + "missing_se": [np.nan, Nans.DELETED, np.nan], + "missing_sample_size": [np.nan, Nans.DELETED, np.nan], + }), dtypes=CSV_DTYPES) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -225,7 +198,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv", "csv8.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -238,13 +211,24 @@ def test_diff_and_filter_exports(self, tmp_path): "csv3.csv", "csv4.csv", "csv4.csv.diff", "csv5.csv", "csv5.csv.diff", - "csv6.csv", - "csv7.csv", "csv7.csv.diff", - "csv8.csv", "csv8.csv.diff" } - assert_frame_equal( + # Check that the files look as expected + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), - csv1_diff) + expected_csv1_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv4.csv.diff"), dtype=CSV_DTYPES), + expected_csv4_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv5.csv.diff"), dtype=CSV_DTYPES), + expected_csv5_diff, + index_cols=["geo_id"] + ) + # Test filter_exports # =================== @@ -259,10 +243,22 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"} - assert_frame_equal( + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), - csv1_diff) + expected_csv1_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv4.csv"), dtype=CSV_DTYPES), + expected_csv4_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv5.csv"), dtype=CSV_DTYPES), + expected_csv5_diff, + index_cols=["geo_id"] + ) AWS_CREDENTIALS = { @@ -326,7 +322,7 @@ def test_archive_exports(self, tmp_path, s3_client): mkdir(cache_dir) mkdir(export_dir) - csv1 = CSVS_BEFORE["csv1"] + csv1 = _set_df_datatypes(CSVS_BEFORE["csv1"], dtypes=CSV_DTYPES) csv1.to_csv(join(export_dir, "csv1.csv"), index=False) s3_client.create_bucket(Bucket=self.bucket_name) @@ -381,11 +377,11 @@ def test_run(self, tmp_path, s3_client): # Check that the buckets now contain the exported files. for csv_name, df in CSVS_AFTER.items(): body = s3_client.get_object(Bucket=self.bucket_name, Key=f"{self.indicator_prefix}/{csv_name}.csv")["Body"] - assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) + assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), _set_df_datatypes(df, dtypes=CSV_DTYPES)) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"} - csv1_diff = pd.DataFrame({ + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} + csv1_diff = _set_df_datatypes(pd.DataFrame({ "geo_id": ["3", "2", "4"], "val": [np.nan, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], @@ -393,10 +389,12 @@ def test_run(self, tmp_path, s3_client): "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - }) - assert_frame_equal( + }), dtypes=CSV_DTYPES) + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), - csv1_diff) + csv1_diff, + index_cols=["geo_id"] + ) class TestGitArchiveDiffer: @@ -479,7 +477,7 @@ def test_diff_exports(self, tmp_path): "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }) + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -520,7 +518,7 @@ def test_archive_exports(self, tmp_path): "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }) + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -591,12 +589,11 @@ def test_run(self, tmp_path): # Check that the archive branch contains `CSVS_AFTER`. arch_diff.get_branch(branch_name).checkout() for csv_name, df in CSVS_AFTER.items(): - assert_frame_equal( - pd.read_csv(join(cache_dir, f"{csv_name}.csv"), dtype=CSV_DTYPES), df) + assert_frame_equal(pd.read_csv(join(cache_dir, f"{csv_name}.csv"), dtype=CSV_DTYPES), _set_df_datatypes(df, dtypes=CSV_DTYPES)) original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} csv1_diff = pd.DataFrame({ "geo_id": ["3", "2", "4"], "val": [np.nan, 2.1, 4.0], @@ -605,10 +602,12 @@ def test_run(self, tmp_path): "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - }) - assert_frame_equal( + }) + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), - csv1_diff) + _set_df_datatypes(csv1_diff, dtypes=CSV_DTYPES), + index_cols=["geo_id"] + ) class TestFromParams: