diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..994dfd7df 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -89,12 +93,22 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Exact comparisons, treating NA == NA as True - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # If CSVs have different columns (no missingness), mark all values as new + if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + same_mask = after_df_cmn.copy() + same_mask.loc[:] = False + else: + # Exact comparisons, treating NA == NA as True + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: @@ -240,7 +254,26 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: new_issues_df.to_csv(diff_file, na_rep="NA") common_diffs[after_file] = diff_file - return deleted_files, common_diffs, new_files + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } + + # Replace deleted files with empty versions, but only if the cached version is not + # already empty + deleted_files_nanfilled = [] + for deleted_file in deleted_files: + deleted_df = pd.read_csv(deleted_file, dtype=export_csv_dtypes) + print( + f"Diff has deleted {deleted_file}; generating a CSV with corresponding deleted rows." + ) + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED + filename = join(self.export_dir, basename(deleted_file)) + deleted_df.to_csv(filename, index=False) + deleted_files_nanfilled.append(filename) + + return deleted_files_nanfilled, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: """ @@ -266,9 +299,10 @@ def filter_exports(self, common_diffs: FileDiffMap): Filter export directory to only contain relevant files. Filters down the export_dir to only contain: - 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only. - Should be called after archive_exports() so we archive the raw exports before - potentially modifying them. + 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows + only, and 3) Deleted files replaced with empty CSVs with the same name. Should + be called after archive_exports() so we archive the raw exports before potentially + modifying them. Parameters ---------- @@ -297,12 +331,13 @@ def run(self): self.update_cache() # Diff exports, and make incremental versions - _, common_diffs, new_files = self.diff_exports() + deleted_files, common_diffs, new_files = self.diff_exports() - # Archive changed and new files only + # Archive changed, new, and emptied deleted files to_archive = [f for f, diff in common_diffs.items() if diff is not None] to_archive += new_files + to_archive += deleted_files _, fails = self.archive_exports(to_archive) # Filter existing exports to exclude those that failed to archive @@ -414,6 +449,9 @@ def archive_exports(self, # pylint: disable=arguments-differ archive_success.append(exported_file) except FileNotFoundError: archive_fail.append(exported_file) + except shutil.SameFileError: + # no need to copy if the cached file is the same + archive_success.append(exported_file) self._exports_archived = True diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 5a3b804b2..afc1a4c8a 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,10 +3,32 @@ from datetime import datetime from os.path import join from typing import Optional +import logging import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + columns = ["val", "se", "sample_size"] + # Get indicies where the XNOR is true (i.e. both are true or both are false). + masks = [ + ~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING)) + for column in columns + ] + for mask in masks: + if not logger is None and df.loc[mask].size > 0: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~mask] + elif logger is None and df.loc[mask].size > 0: + df = df.loc[~mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -16,7 +38,8 @@ def create_export_csv( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, remove_null_samples: Optional[bool] = False, - write_empty_days: Optional[bool] = False + write_empty_days: Optional[bool] = False, + logger: Optional[logging.Logger] = None ): """Export data in the format expected by the Delphi API. @@ -43,6 +66,8 @@ def create_export_csv( write_empty_days: Optional[bool] If true, every day in between start_date and end_date will have a CSV file written even if there is no data for the day. If false, only the days present are written. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -70,7 +95,20 @@ def create_export_csv( else: export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..111acf92f 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -14,8 +14,12 @@ from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ archiver_from_params +from delphi_utils.nancodes import Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +27,40 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), } CSVS_AFTER = { @@ -45,23 +69,45 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } - class TestArchiveDiffer: def test_stubs(self): @@ -80,10 +126,24 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) + + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -104,9 +164,9 @@ def test_diff_and_filter_exports(self, tmp_path): deleted_files, common_diffs, new_files = arch_diff.diff_exports() # Check return values - assert set(deleted_files) == {join(cache_dir, "csv2.csv")} + assert set(deleted_files) == {join(export_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -114,7 +174,10 @@ def test_diff_and_filter_exports(self, tmp_path): # Check filesystem for actual files assert set(listdir(export_dir)) == { - "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + "csv0.csv", "csv1.csv", "csv1.csv.diff", + "csv3.csv", "csv4.csv", "csv4.csv.diff", + "csv2.csv" + } assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), csv1_diff) @@ -131,8 +194,11 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) - # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + # Check exports directory just has incremental and deleted changes + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -259,15 +325,31 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) class TestGitArchiveDiffer: @@ -346,7 +428,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +469,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -460,15 +550,32 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) + class TestFromParams: diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..b22a710cd 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,11 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans def _clean_directory(directory): """Clean files out of a directory.""" @@ -43,6 +46,34 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER] + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER] + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -235,3 +266,46 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False, + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + ) diff --git a/quidel/delphi_quidel/data_tools.py b/quidel/delphi_quidel/data_tools.py index 5d67dd812..6c52fe939 100644 --- a/quidel/delphi_quidel/data_tools.py +++ b/quidel/delphi_quidel/data_tools.py @@ -3,6 +3,8 @@ import numpy as np import pandas as pd +from delphi_utils import Nans + def _prop_var(p, n): """ Calculate variance of proportion. @@ -117,7 +119,7 @@ def _geographical_pooling(tpooled_tests, tpooled_ptests, min_obs, max_borrow_obs return borrow_prop -def raw_positive_prop(positives, tests, min_obs): +def raw_positive_prop(positives, tests, min_obs, missing_val, missing_se, missing_sample_size): """ Calculate proportion of positive tests for a single location with no temporal smoothing. @@ -166,10 +168,15 @@ def raw_positive_prop(positives, tests, min_obs): positive_prop = positives / tests se = np.sqrt(_prop_var(positive_prop, tests)) sample_size = tests - return positive_prop, se, sample_size + missing_val[np.isnan(tests) | (tests < min_obs) | np.isnan(positive_prop)] = Nans.CENSORED + missing_se[np.isnan(se)] = Nans.CENSORED + missing_sample_size[np.isnan(tests) | (tests < min_obs)] = Nans.CENSORED + + return positive_prop, se, sample_size, missing_val, missing_se, missing_sample_size def smoothed_positive_prop(positives, tests, min_obs, max_borrow_obs, pool_days, + missing_val, missing_se, missing_sample_size, parent_positives=None, parent_tests=None): """ Calculate the proportion of negative tests for a single location with temporal smoothing. @@ -259,10 +266,13 @@ def smoothed_positive_prop(positives, tests, min_obs, max_borrow_obs, pool_days, pooled_positives = tpooled_positives pooled_tests = tpooled_tests ## STEP 2: CALCULATE AS THOUGH THEY'RE RAW - return raw_positive_prop(pooled_positives, pooled_tests, min_obs) + return raw_positive_prop( + pooled_positives, pooled_tests, min_obs, + missing_val, missing_se, missing_sample_size + ) -def raw_tests_per_device(devices, tests, min_obs): +def raw_tests_per_device(devices, tests, min_obs, missing_val, missing_se, missing_sample_size): """ Calculate the tests per device for a single geographic location, without any temporal smoothing. @@ -297,14 +307,20 @@ def raw_tests_per_device(devices, tests, min_obs): 'with no np.nan') if min_obs <= 0: raise ValueError('min_obs should be positive') + tests[tests < min_obs] = np.nan tests_per_device = tests / devices se = np.repeat(np.nan, len(devices)) sample_size = tests - return tests_per_device, se, sample_size + missing_val[np.isnan(tests) | (tests < min_obs)] = Nans.CENSORED + missing_se = np.repeat(Nans.NOT_APPLICABLE, len(devices)) + missing_sample_size[np.isnan(tests) | (tests < min_obs)] = Nans.CENSORED + + return tests_per_device, se, sample_size, missing_val, missing_se, missing_sample_size def smoothed_tests_per_device(devices, tests, min_obs, max_borrow_obs, pool_days, + missing_val, missing_se, missing_sample_size, parent_devices=None, parent_tests=None): """ Calculate the ratio of tests per device for a single location with temporal smoothing. @@ -383,4 +399,7 @@ def smoothed_tests_per_device(devices, tests, min_obs, max_borrow_obs, pool_days pooled_devices = tpooled_devices pooled_tests = tpooled_tests ## STEP 2: CALCULATE AS THOUGH THEY'RE RAW - return raw_tests_per_device(pooled_devices, pooled_tests, min_obs) + return raw_tests_per_device( + pooled_devices, pooled_tests, min_obs, + missing_val, missing_se, missing_sample_size + ) diff --git a/quidel/delphi_quidel/generate_sensor.py b/quidel/delphi_quidel/generate_sensor.py index 43778c9b4..d5fe9fa74 100644 --- a/quidel/delphi_quidel/generate_sensor.py +++ b/quidel/delphi_quidel/generate_sensor.py @@ -31,31 +31,51 @@ def generate_sensor_for_states(state_groups, smooth, device, first_date, last_da # smoothed test per device if device & smooth: - stat, se, sample_size = smoothed_tests_per_device( - devices=state_group["numUniqueDevices"].values, - tests=state_group['totalTest'].values, - min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, - pool_days=POOL_DAYS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + smoothed_tests_per_device( + devices=state_group["numUniqueDevices"].values, + tests=state_group['totalTest'].values, + missing_val=state_group['missing_val'].values, + missing_se=state_group['missing_se'].values, + missing_sample_size=state_group['missing_sample_size'].values, + min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, + pool_days=POOL_DAYS) + ) # raw test per device elif device & (not smooth): - stat, se, sample_size = raw_tests_per_device( - devices=state_group["numUniqueDevices"].values, - tests=state_group['totalTest'].values, - min_obs=MIN_OBS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + raw_tests_per_device( + devices=state_group["numUniqueDevices"].values, + tests=state_group['totalTest'].values, + missing_val=state_group['missing_val'].values, + missing_se=state_group['missing_se'].values, + missing_sample_size=state_group['missing_sample_size'].values, + min_obs=MIN_OBS) + ) # smoothed pct positive elif (not device) & smooth: - stat, se, sample_size = smoothed_positive_prop( - tests=state_group['totalTest'].values, - positives=state_group['positiveTest'].values, - min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, - pool_days=POOL_DAYS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + smoothed_positive_prop( + tests=state_group['totalTest'].values, + positives=state_group['positiveTest'].values, + missing_val=state_group['missing_val'].values, + missing_se=state_group['missing_se'].values, + missing_sample_size=state_group['missing_sample_size'].values, + min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, + pool_days=POOL_DAYS) + ) stat = stat * 100 # raw pct positive else: - stat, se, sample_size = raw_positive_prop( - tests=state_group['totalTest'].values, - positives=state_group['positiveTest'].values, - min_obs=MIN_OBS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + raw_positive_prop( + tests=state_group['totalTest'].values, + positives=state_group['positiveTest'].values, + missing_val=state_group['missing_val'].values, + missing_se=state_group['missing_se'].values, + missing_sample_size=state_group['missing_sample_size'].values, + min_obs=MIN_OBS) + ) stat = stat * 100 se = se * 100 @@ -63,7 +83,10 @@ def generate_sensor_for_states(state_groups, smooth, device, first_date, last_da "timestamp": state_group.index, "val": stat, "se": se, - "sample_size": sample_size})) + "sample_size": sample_size, + "missing_val": missing_val, + "missing_se": missing_se, + "missing_sample_size": missing_sample_size})) return state_df def generate_sensor_for_other_geores(state_groups, data, res_key, smooth, @@ -102,47 +125,77 @@ def generate_sensor_for_other_geores(state_groups, data, res_key, smooth, if smooth: if has_parent: if device: - stat, se, sample_size = smoothed_tests_per_device( - devices=res_group["numUniqueDevices"].values, - tests=res_group['totalTest'].values, - min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, - pool_days=POOL_DAYS, - parent_devices=res_group["numUniqueDevices_parent"].values, - parent_tests=res_group["totalTest_parent"].values) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + smoothed_tests_per_device( + devices=res_group["numUniqueDevices"].values, + tests=res_group['totalTest'].values, + missing_val=res_group['missing_val'].values, + missing_se=res_group['missing_se'].values, + missing_sample_size=res_group['missing_sample_size'].values, + min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, + pool_days=POOL_DAYS, + parent_devices=res_group["numUniqueDevices_parent"].values, + parent_tests=res_group["totalTest_parent"].values) + ) else: - stat, se, sample_size = smoothed_positive_prop( - tests=res_group['totalTest'].values, - positives=res_group['positiveTest'].values, - min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, - pool_days=POOL_DAYS, - parent_tests=res_group["totalTest_parent"].values, - parent_positives=res_group['positiveTest_parent'].values) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + smoothed_positive_prop( + tests=res_group['totalTest'].values, + positives=res_group['positiveTest'].values, + missing_val=res_group['missing_val'].values, + missing_se=res_group['missing_se'].values, + missing_sample_size=res_group['missing_sample_size'].values, + min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, + pool_days=POOL_DAYS, + parent_tests=res_group["totalTest_parent"].values, + parent_positives=res_group['positiveTest_parent'].values) + ) stat = stat * 100 else: if device: - stat, se, sample_size = smoothed_tests_per_device( - devices=res_group["numUniqueDevices"].values, - tests=res_group['totalTest'].values, - min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, - pool_days=POOL_DAYS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + smoothed_tests_per_device( + devices=res_group["numUniqueDevices"].values, + tests=res_group['totalTest'].values, + missing_val=res_group['missing_val'].values, + missing_se=res_group['missing_se'].values, + missing_sample_size=res_group['missing_sample_size'].values, + min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, + pool_days=POOL_DAYS) + ) else: - stat, se, sample_size = smoothed_positive_prop( - tests=res_group['totalTest'].values, - positives=res_group['positiveTest'].values, - min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, - pool_days=POOL_DAYS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + smoothed_positive_prop( + tests=res_group['totalTest'].values, + positives=res_group['positiveTest'].values, + missing_val=res_group['missing_val'].values, + missing_se=res_group['missing_se'].values, + missing_sample_size=res_group['missing_sample_size'].values, + min_obs=MIN_OBS, max_borrow_obs=MAX_BORROW_OBS, + pool_days=POOL_DAYS) + ) stat = stat * 100 else: if device: - stat, se, sample_size = raw_tests_per_device( - devices=res_group["numUniqueDevices"].values, - tests=res_group['totalTest'].values, - min_obs=MIN_OBS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + raw_tests_per_device( + devices=res_group["numUniqueDevices"].values, + tests=res_group['totalTest'].values, + missing_val=res_group['missing_val'].values, + missing_se=res_group['missing_se'].values, + missing_sample_size=res_group['missing_sample_size'].values, + min_obs=MIN_OBS) + ) else: - stat, se, sample_size = raw_positive_prop( - tests=res_group['totalTest'].values, - positives=res_group['positiveTest'].values, - min_obs=MIN_OBS) + stat, se, sample_size, missing_val, missing_se, missing_sample_size = ( + raw_positive_prop( + tests=res_group['totalTest'].values, + positives=res_group['positiveTest'].values, + missing_val=res_group['missing_val'].values, + missing_se=res_group['missing_se'].values, + missing_sample_size=res_group['missing_sample_size'].values, + min_obs=MIN_OBS) + ) stat = stat * 100 se = se * 100 @@ -150,5 +203,8 @@ def generate_sensor_for_other_geores(state_groups, data, res_key, smooth, "timestamp": res_group.index, "val": stat, "se": se, - "sample_size": sample_size})) + "sample_size": sample_size, + "missing_val": missing_val, + "missing_se": missing_se, + "missing_sample_size": missing_sample_size})) return res_df diff --git a/quidel/delphi_quidel/run.py b/quidel/delphi_quidel/run.py index 49f6ec66b..d58c87a3f 100644 --- a/quidel/delphi_quidel/run.py +++ b/quidel/delphi_quidel/run.py @@ -12,7 +12,8 @@ from delphi_utils import ( add_prefix, create_export_csv, - get_structured_logger + get_structured_logger, + Nans ) from .constants import (END_FROM_TODAY_MINUS, EXPORT_DAY_RANGE, @@ -83,6 +84,12 @@ def run_module(params: Dict[str, Any]): test_type = "covid_ag" if "covid_ag" in sensor else "flu_ag" print("state", sensor) data = dfs[test_type].copy() + + # Default missingness values + data["missing_val"] = Nans.NOT_MISSING + data["missing_se"] = Nans.NOT_MISSING + data["missing_sample_size"] = Nans.NOT_MISSING + state_groups = geo_map("state", data, map_df).groupby("state_id") first_date, last_date = data["timestamp"].min(), data["timestamp"].max() diff --git a/quidel/tests/test_data_tools.py b/quidel/tests/test_data_tools.py index a2223a87b..fb8d2aca4 100644 --- a/quidel/tests/test_data_tools.py +++ b/quidel/tests/test_data_tools.py @@ -5,6 +5,7 @@ import pytest from delphi_quidel import data_tools +from delphi_utils import Nans class TestDataTools: @@ -93,36 +94,49 @@ def test__geographical_pooling(self, min_obs, max_borrow_obs, expected): with pytest.raises(ValueError): data_tools._geographical_pooling(np.array([np.nan]), np.array([1]), 1, 1) - @pytest.mark.parametrize("min_obs, expected_pos_prop, expected_se, expected_sample_sz", [ + @pytest.mark.parametrize("min_obs, expected_pos_prop, expected_se, expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size", [ (3, # one case of tests < min_obs np.array([np.nan, 1/2, 1/2, 4/10]), np.array([np.nan, np.sqrt(0.25/4), np.sqrt(0.25/6), np.sqrt(0.24/10)]), - np.array([np.nan, 4, 6, 10])), + np.array([np.nan, 4, 6, 10]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]) + ), (1, # no cases of tests < min_obs np.array([1/2, 2/4, 3/6, 4/10]), np.array([np.sqrt(0.25/2), np.sqrt(0.25/4), np.sqrt(0.25/6), np.sqrt(0.24/10)]), - np.array([2, 4, 6, 10])), + np.array([2, 4, 6, 10]), + np.repeat(Nans.NOT_MISSING, 4), + np.repeat(Nans.NOT_MISSING, 4), + np.repeat(Nans.NOT_MISSING, 4)) ]) - def test_raw_positive_prop(self, min_obs, expected_pos_prop, expected_se, expected_sample_sz): + def test_raw_positive_prop(self, min_obs, expected_pos_prop, expected_se, expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size): positives = np.array([1, 2, 3, 4]) tests = np.array([2, 4, 6, 10]) - output = data_tools.raw_positive_prop(positives, tests, min_obs) + missing_val = np.array([0, 0, 0, 0]) + missing_se = np.array([0, 0, 0, 0]) + missing_sample_size = np.array([0, 0, 0, 0]) + output = data_tools.raw_positive_prop(positives, tests, min_obs, missing_val, missing_se, missing_sample_size) # np.array_equal() doesn't compare nan's assert np.allclose(output[0], expected_pos_prop, equal_nan=True) assert np.allclose(output[1], expected_se, equal_nan=True) assert np.allclose(output[2], expected_sample_sz, equal_nan=True) + assert np.allclose(output[3], expected_missing_val, equal_nan=True) + assert np.allclose(output[4], expected_missing_se, equal_nan=True) + assert np.allclose(output[5], expected_missing_sample_size, equal_nan=True) # nan case with pytest.raises(ValueError): - data_tools.raw_positive_prop(np.array([np.nan]), np.array([1]), 3) + data_tools.raw_positive_prop(np.array([np.nan]), np.array([1]), 3, missing_val, missing_se, missing_sample_size) # positives > tests case with pytest.raises(ValueError): - data_tools.raw_positive_prop(np.array([3]), np.array([1]), 3) + data_tools.raw_positive_prop(np.array([3]), np.array([1]), 3, missing_val, missing_se, missing_sample_size) # min obs <= 0 case with pytest.raises(ValueError): - data_tools.raw_positive_prop(np.array([1]), np.array([1]), 0) + data_tools.raw_positive_prop(np.array([1]), np.array([1]), 0, missing_val, missing_se, missing_sample_size) @pytest.mark.parametrize("min_obs, max_borrow_obs, pool_days, parent_positives, parent_tests," - "expected_prop, expected_se, expected_sample_sz", [ + "expected_prop, expected_se, expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size", [ (3, # no parents case 3, 2, @@ -131,6 +145,9 @@ def test_raw_positive_prop(self, min_obs, expected_pos_prop, expected_se, expect np.array([np.nan, 1/2, 1/2, 7/16]), np.array([np.nan, np.sqrt(0.25/6), np.sqrt(0.25/10), np.sqrt(63/256/16)]), np.array([np.nan, 6, 10, 16]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]) ), (3, # parents case 3, @@ -140,69 +157,93 @@ def test_raw_positive_prop(self, min_obs, expected_pos_prop, expected_se, expect np.array([1.6/3, 1/2, 1/2, 7/16]), np.array([np.sqrt(56/225/3), np.sqrt(0.25/6), np.sqrt(0.25/10), np.sqrt(63/256/16)]), np.array([3, 6, 10, 16]), + np.repeat(Nans.NOT_MISSING, 4), + np.repeat(Nans.NOT_MISSING, 4), + np.repeat(Nans.NOT_MISSING, 4) ), ]) def test_smoothed_positive_prop(self, min_obs, max_borrow_obs, pool_days, parent_positives, - parent_tests, expected_prop, expected_se, expected_sample_sz): + parent_tests, expected_prop, expected_se, expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size): positives = np.array([1, 2, 3, 4]) tests = np.array([2, 4, 6, 10]) + missing_val = np.array([0, 0, 0, 0]) + missing_se = np.array([0, 0, 0, 0]) + missing_sample_size = np.array([0, 0, 0, 0]) output = data_tools.smoothed_positive_prop(positives, tests, min_obs, max_borrow_obs, - pool_days, parent_positives, parent_tests) + pool_days, missing_val, missing_se, missing_sample_size, parent_positives, parent_tests) assert np.allclose(output[0], expected_prop, equal_nan=True) assert np.allclose(output[1], expected_se, equal_nan=True) assert np.allclose(output[2], expected_sample_sz, equal_nan=True) + assert np.allclose(output[3], expected_missing_val, equal_nan=True) + assert np.allclose(output[4], expected_missing_se, equal_nan=True) + assert np.allclose(output[5], expected_missing_sample_size, equal_nan=True) # nan case with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([np.nan]), np.array([1]), 1, 1, 1) + data_tools.smoothed_positive_prop(np.array([np.nan]), np.array([1]), 1, 1, 1, np.array([0]), np.array([0]), np.array([0])) # positives > tests case with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([2]), np.array([1]), 1, 1, 1) + data_tools.smoothed_positive_prop(np.array([2]), np.array([1]), 1, 1, 1, np.array([0]), np.array([0]), np.array([0])) # nan case with parent with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 1, + data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 1, np.array([0]), np.array([0]), np.array([0]), np.array([np.nan]), np.array([np.nan])) # positives > tests case with parent with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 1, + data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 1, np.array([0]), np.array([0]), np.array([0]), np.array([3]), np.array([1])) # min obs <= 0 case with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 0, 1, 1) + data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 0, 1, 1, np.array([0]), np.array([0]), np.array([0])) # pool_days <= 0 case with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 0) + data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 0, np.array([0]), np.array([0]), np.array([0])) # pool_days non int case with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 1.5) + data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 1, 1.5, np.array([0]), np.array([0]), np.array([0])) # max_borrow_obs > min_obs case with pytest.raises(ValueError): - data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 2, 1.5) + data_tools.smoothed_positive_prop(np.array([1]), np.array([1]), 1, 2, 1.5, np.array([0]), np.array([0]), np.array([0])) - @pytest.mark.parametrize("min_obs, expected_tests_per_device, expected_sample_sz", [ + @pytest.mark.parametrize("min_obs, expected_tests_per_device, expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size", [ (3, # one case of tests < min_obs np.array([np.nan, 2, 1/2, 10/4]), - np.array([np.nan, 4, 3, 10])), + np.array([np.nan, 4, 3, 10]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]), + np.repeat(Nans.NOT_APPLICABLE, 4), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]) + ), (1, # no cases of tests < min_obs np.array([2, 2, 1/2, 10/4]), - np.array([2, 4, 3, 10])), + np.array([2, 4, 3, 10]), + np.repeat(Nans.NOT_MISSING, 4), + np.repeat(Nans.NOT_APPLICABLE, 4), + np.repeat(Nans.NOT_MISSING, 4) + ), ]) - def test_raw_tests_per_device(self, min_obs, expected_tests_per_device, expected_sample_sz): + def test_raw_tests_per_device(self, min_obs, expected_tests_per_device, expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size): devices = np.array([1, 2, 6, 4]) tests = np.array([2, 4, 3, 10]) - output = data_tools.raw_tests_per_device(devices, tests, min_obs) + missing_val = np.array([0, 0, 0, 0]) + missing_se = np.array([0, 0, 0, 0]) + missing_sample_size = np.array([0, 0, 0, 0]) + output = data_tools.raw_tests_per_device(devices, tests, min_obs, missing_val, missing_se, missing_sample_size) assert np.allclose(output[0], expected_tests_per_device, equal_nan=True) assert np.allclose(output[1], np.repeat(np.nan, len(devices)), equal_nan=True) assert np.allclose(output[2], expected_sample_sz, equal_nan=True) + assert np.allclose(output[3], expected_missing_val, equal_nan=True) + assert np.allclose(output[4], expected_missing_se, equal_nan=True) + assert np.allclose(output[5], expected_missing_sample_size, equal_nan=True) + # nan case with pytest.raises(ValueError): - data_tools.raw_tests_per_device(np.array([np.nan]), np.array([1]), 3) + data_tools.raw_tests_per_device(np.array([np.nan]), np.array([1]), 3, np.array([0]), np.array([0]), np.array([0])) # min obs <= 0 case with pytest.raises(ValueError): - data_tools.raw_tests_per_device(np.array([1]), np.array([1]), 0) + data_tools.raw_tests_per_device(np.array([1]), np.array([1]), 0, np.array([0]), np.array([0]), np.array([0])) @pytest.mark.parametrize("min_obs, max_borrow_obs, pool_days, parent_devices, parent_tests," - "expected_prop, expected_se, expected_sample_sz", [ + "expected_prop, expected_se, expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size", [ (3, # no parents case 3, 2, @@ -211,6 +252,9 @@ def test_raw_tests_per_device(self, min_obs, expected_tests_per_device, expected np.array([np.nan, 2, 5/6, 8/7]), np.repeat(np.nan, 4), np.array([np.nan, 6, 10, 16]), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]), + np.repeat(Nans.NOT_APPLICABLE, 4), + np.array([Nans.CENSORED, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING]) ), (3, # no parents case 3, @@ -220,35 +264,44 @@ def test_raw_tests_per_device(self, min_obs, expected_tests_per_device, expected np.array([3/1.6, 2, 5/6, 8/7]), np.repeat(np.nan, 4), np.array([3, 6, 10, 16]), + np.repeat(Nans.NOT_MISSING, 4), + np.repeat(Nans.NOT_APPLICABLE, 4), + np.repeat(Nans.NOT_MISSING, 4) ), ]) def test_smoothed_tests_per_device(self, min_obs, max_borrow_obs, pool_days, parent_devices, parent_tests, expected_prop, expected_se, - expected_sample_sz): + expected_sample_sz, expected_missing_val, expected_missing_se, expected_missing_sample_size): devices = np.array([1, 2, 10, 4]) tests = np.array([2, 4, 6, 10]) + missing_val = np.array([0, 0, 0, 0]) + missing_se = np.array([0, 0, 0, 0]) + missing_sample_size = np.array([0, 0, 0, 0]) output = data_tools.smoothed_tests_per_device(devices, tests, min_obs, max_borrow_obs, - pool_days, parent_devices, parent_tests) + pool_days, missing_val, missing_se, missing_sample_size, parent_devices, parent_tests) assert np.allclose(output[0], expected_prop, equal_nan=True) assert np.allclose(output[1], expected_se, equal_nan=True) assert np.allclose(output[2], expected_sample_sz, equal_nan=True) + assert np.allclose(output[3], expected_missing_val, equal_nan=True) + assert np.allclose(output[4], expected_missing_se, equal_nan=True) + assert np.allclose(output[5], expected_missing_sample_size, equal_nan=True) # nan case with pytest.raises(ValueError): - data_tools.smoothed_tests_per_device(np.array([np.nan]), np.array([1]), 1, 1, 1) + data_tools.smoothed_tests_per_device(np.array([np.nan]), np.array([1]), 1, 1, 1, np.array([0]), np.array([0]), np.array([0])) # nan case with parent with pytest.raises(ValueError): - data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 1, 1, + data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 1, 1, np.array([0]), np.array([0]), np.array([0]), np.array([np.nan]), np.array([np.nan])) # min obs <= 0 case with pytest.raises(ValueError): - data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 0, 0, 1) + data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 0, 0, 1, np.array([0]), np.array([0]), np.array([0])) # pool_days <= 0 case with pytest.raises(ValueError): - data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 1, 0) + data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 1, 0, np.array([0]), np.array([0]), np.array([0])) # pool_days non int case with pytest.raises(ValueError): - data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 1, 1.5) + data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 1, 1.5, np.array([0]), np.array([0]), np.array([0])) # max_borrow_obs > min_obs case with pytest.raises(ValueError): - data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 3, 1.5) + data_tools.smoothed_tests_per_device(np.array([1]), np.array([1]), 1, 3, 1.5, np.array([0]), np.array([0]), np.array([0])) diff --git a/quidel/tests/test_generate_sensor.py b/quidel/tests/test_generate_sensor.py index c9177a6cd..6e773e893 100644 --- a/quidel/tests/test_generate_sensor.py +++ b/quidel/tests/test_generate_sensor.py @@ -4,6 +4,7 @@ from delphi_quidel.generate_sensor import (MIN_OBS, MAX_BORROW_OBS, POOL_DAYS, generate_sensor_for_states, generate_sensor_for_other_geores) +from delphi_utils import Nans class TestGenerateSensor: def test_generate_sensor(self): @@ -17,8 +18,12 @@ def test_generate_sensor(self): assert isinstance(POOL_DAYS, int) # State Level - state_groups = pd.read_csv("./test_data/state_data.csv", sep = ",", - parse_dates=['timestamp']).groupby("state_id") + data = pd.read_csv("./test_data/state_data.csv", sep = ",", + parse_dates=['timestamp']) + data["missing_val"] = Nans.NOT_MISSING + data["missing_se"] = Nans.NOT_MISSING + data["missing_sample_size"] = Nans.NOT_MISSING + state_groups = data.groupby("state_id") # raw pct_positive state_pct_positive = generate_sensor_for_states( @@ -27,7 +32,7 @@ def test_generate_sensor(self): assert (state_pct_positive.dropna()["val"] < 100).all() assert set(state_pct_positive.columns) ==\ - set(["geo_id", "val", "se", "sample_size", "timestamp"]) + set(["geo_id", "val", "se", "sample_size", "timestamp", "missing_val", "missing_se", "missing_sample_size"]) assert len(state_pct_positive.groupby("geo_id").count()["timestamp"].unique()) == 1 # raw test_per_device @@ -37,7 +42,7 @@ def test_generate_sensor(self): assert state_test_per_device["se"].isnull().all() assert set(state_test_per_device.columns) ==\ - set(["geo_id", "val", "se", "sample_size", "timestamp"]) + set(["geo_id", "val", "se", "sample_size", "timestamp", "missing_val", "missing_se", "missing_sample_size"]) assert len(state_test_per_device.groupby("geo_id").count()["timestamp"].unique()) == 1 @@ -51,7 +56,7 @@ def test_generate_sensor(self): assert (msa_pct_positive.dropna()["val"] < 100).all() assert set(msa_pct_positive.columns) ==\ - set(["geo_id", "val", "se", "sample_size", "timestamp"]) + set(["geo_id", "val", "se", "sample_size", "timestamp", "missing_val", "missing_se", "missing_sample_size"]) assert len(msa_pct_positive.groupby("geo_id").count()["timestamp"].unique()) == 1 # smoothed test_per_device @@ -61,5 +66,5 @@ def test_generate_sensor(self): assert msa_test_per_device["se"].isnull().all() assert set(msa_test_per_device.columns) ==\ - set(["geo_id", "val", "se", "sample_size", "timestamp"]) + set(["geo_id", "val", "se", "sample_size", "timestamp", "missing_val", "missing_se", "missing_sample_size"]) assert len(msa_test_per_device.groupby("geo_id").count()["timestamp"].unique()) == 1 diff --git a/quidel/tests/test_run.py b/quidel/tests/test_run.py index 91a4acdd4..3c7371098 100644 --- a/quidel/tests/test_run.py +++ b/quidel/tests/test_run.py @@ -81,7 +81,7 @@ def test_output_files(self, clean_receiving_dir): df = pd.read_csv( join("./receiving", "20200709_state_covid_ag_raw_pct_positive.csv") ) - assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all() + assert (df.columns.values == ["geo_id", "val", "se", "sample_size", "missing_val", "missing_se", "missing_sample_size"]).all() # test_intermediate_file flag = None