diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 97256893a..8c461bf8b 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.1.16 +current_version = 0.1.17 commit = True message = chore: bump covidcast-indicators to {new_version} tag = False diff --git a/.github/workflows/build-container-images.yml b/.github/workflows/build-container-images.yml index e32b58093..c01460c87 100644 --- a/.github/workflows/build-container-images.yml +++ b/.github/workflows/build-container-images.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - packages: [ "" ] + packages: [ facebook ] steps: - name: Checkout code uses: actions/checkout@v2 diff --git a/.github/workflows/r-ci.yml b/.github/workflows/r-ci.yml index 116537194..9f81ca991 100644 --- a/.github/workflows/r-ci.yml +++ b/.github/workflows/r-ci.yml @@ -48,12 +48,15 @@ jobs: ${{ runner.os }}-r-facebook-survey- - name: Install R dependencies run: | - if ( packageVersion("readr") != "1.4.0" ) { + if ( !require("readr") || packageVersion("readr") != "1.4.0" ) { install.packages("devtools") devtools::install_version("readr", version = "1.4.0") } - install.packages("remotes") - remotes::update_packages(c("rcmdcheck", "mockr"), upgrade="always") + + if ( !require("remotes") ) { + install.packages("remotes") + } + remotes::update_packages(c("rcmdcheck", "mockr", "remotes"), upgrade="always") dependency_list <- remotes::dev_package_deps(dependencies=TRUE) remotes::update_packages(dependency_list$package[dependency_list$package != "readr"], upgrade="always") shell: Rscript {0} diff --git a/.gitignore b/.gitignore index 94dfe3178..e664af2da 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,7 @@ params.json # Do not commit output files -receiving/*.csv +**/receiving/*.csv # Do not commit hidden macOS files .DS_Store diff --git a/_delphi_utils_python/.bumpversion.cfg b/_delphi_utils_python/.bumpversion.cfg index 2d7554882..67f34ca69 100644 --- a/_delphi_utils_python/.bumpversion.cfg +++ b/_delphi_utils_python/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.1.12 +current_version = 0.1.13 commit = True message = chore: bump delphi_utils to {new_version} tag = False diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 6d0b523fd..99f35fc0a 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -14,4 +14,4 @@ from .signal import add_prefix from .nancodes import Nans -__version__ = "0.1.12" +__version__ = "0.1.13" diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..eb8aac8d2 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -89,12 +93,22 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Exact comparisons, treating NA == NA as True - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # If CSVs have different columns (no missingness), mark all values as new + if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + same_mask = after_df_cmn.copy() + same_mask.loc[:] = False + else: + # Exact comparisons, treating NA == NA as True + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: @@ -414,6 +428,9 @@ def archive_exports(self, # pylint: disable=arguments-differ archive_success.append(exported_file) except FileNotFoundError: archive_fail.append(exported_file) + except shutil.SameFileError: + # no need to copy if the cached file is the same + archive_success.append(exported_file) self._exports_archived = True diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 5a3b804b2..f62a1afcf 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,10 +3,33 @@ from datetime import datetime from os.path import join from typing import Optional +import logging +from epiweeks import Week import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + columns = ["val", "se", "sample_size"] + # Get indicies where the XNOR is true (i.e. both are true or both are false). + masks = [ + ~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING)) + for column in columns + ] + for mask in masks: + if not logger is None and df.loc[mask].size > 0: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~mask] + elif logger is None and df.loc[mask].size > 0: + df = df.loc[~mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -16,7 +39,9 @@ def create_export_csv( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, remove_null_samples: Optional[bool] = False, - write_empty_days: Optional[bool] = False + write_empty_days: Optional[bool] = False, + logger: Optional[logging.Logger] = None, + weekly_dates = False, ): """Export data in the format expected by the Delphi API. @@ -43,6 +68,8 @@ def create_export_csv( write_empty_days: Optional[bool] If true, every day in between start_date and end_date will have a CSV file written even if there is no data for the day. If false, only the days present are written. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -65,12 +92,30 @@ def create_export_csv( dates = pd.date_range(start_date, end_date) for date in dates: + if weekly_dates: + t = Week.fromdate(pd.to_datetime(str(date))) + date_str = "weekly_" + str(t.year) + str(t.week).zfill(2) + else: + date_str = date.strftime('%Y%m%d') if metric is None: - export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{sensor}.csv" + export_filename = f"{date_str}_{geo_res}_{sensor}.csv" else: - export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" + export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index 906bd0a4e..35fe44fac 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -7,6 +7,7 @@ required = [ "boto3", "covidcast", + "epiweeks", "freezegun", "gitpython", "mock", @@ -24,7 +25,7 @@ setup( name="delphi_utils", - version="0.1.12", + version="0.1.13", description="Shared Utility Functions for Indicators", long_description=long_description, long_description_content_type="text/markdown", @@ -35,7 +36,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), package_data={'': ['data/*.csv']} diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..3050908f2 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -14,8 +14,12 @@ from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ archiver_from_params +from delphi_utils.nancodes import Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +27,51 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), + + # Common, but missing columns removed + "csv5": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } CSVS_AFTER = { @@ -45,23 +80,53 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but missing columns removed + "csv5": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), } - class TestArchiveDiffer: def test_stubs(self): @@ -80,10 +145,14 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -106,7 +175,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -114,7 +183,10 @@ def test_diff_and_filter_exports(self, tmp_path): # Check filesystem for actual files assert set(listdir(export_dir)) == { - "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + "csv0.csv", "csv1.csv", "csv1.csv.diff", + "csv3.csv", "csv4.csv", "csv4.csv.diff", + "csv5.csv", "csv5.csv.diff" + } assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), csv1_diff) @@ -132,7 +204,7 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -259,12 +331,16 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -346,7 +422,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +463,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -460,12 +544,16 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..d9906300d 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,12 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans + def _clean_directory(directory): """Clean files out of a directory.""" @@ -26,6 +30,7 @@ def _non_ignored_files_set(directory): class TestExport: """Tests for exporting CSVs.""" + # List of times for data points. TIMES = [ datetime.strptime(x, "%Y-%m-%d") @@ -43,6 +48,54 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [ + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + Nans.NOT_MISSING, + ], + "missing_se": [ + Nans.NOT_MISSING, + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + ], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER], + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [ + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + Nans.NOT_MISSING, + ], + "missing_se": [ + Nans.NOT_MISSING, + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + ], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER], + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -85,10 +138,14 @@ def test_export_rounding(self): ) pd.testing.assert_frame_equal( pd.read_csv(join(self.TEST_DIR, "20200215_county_deaths_test.csv")), - pd.DataFrame({"geo_id": [51093, 51175], - "val": [round(3.12345678910, 7), 2.1], - "se": [0.15, 0.22], - "sample_size": [100, 100]}) + pd.DataFrame( + { + "geo_id": [51093, 51175], + "val": [round(3.12345678910, 7), 2.1], + "se": [0.15, 0.22], + "sample_size": [100, 100], + } + ), ) def test_export_without_metric(self): @@ -180,13 +237,16 @@ def test_export_with_null_removal(self): """Test that `remove_null_samples = True` removes entries with null samples.""" _clean_directory(self.TEST_DIR) - df_with_nulls = self.DF.copy().append({ - "geo_id": "66666", - "timestamp": datetime(2020, 6, 6), - "val": 10, - "se": 0.2, - "sample_size": pd.NA}, - ignore_index=True) + df_with_nulls = self.DF.copy().append( + { + "geo_id": "66666", + "timestamp": datetime(2020, 6, 6), + "val": 10, + "se": 0.2, + "sample_size": pd.NA, + }, + ignore_index=True, + ) create_export_csv( df=df_with_nulls, @@ -210,13 +270,16 @@ def test_export_without_null_removal(self): """Test that `remove_null_samples = False` does not remove entries with null samples.""" _clean_directory(self.TEST_DIR) - df_with_nulls = self.DF.copy().append({ - "geo_id": "66666", - "timestamp": datetime(2020, 6, 6), - "val": 10, - "se": 0.2, - "sample_size": pd.NA}, - ignore_index=True) + df_with_nulls = self.DF.copy().append( + { + "geo_id": "66666", + "timestamp": datetime(2020, 6, 6), + "val": 10, + "se": 0.2, + "sample_size": pd.NA, + }, + ignore_index=True, + ) create_export_csv( df=df_with_nulls, @@ -235,3 +298,77 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_without_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF.copy(), export_dir=self.TEST_DIR, geo_res="county", sensor="test" + ) + df = pd.read_csv(join(self.TEST_DIR, "20200215_county_test.csv")).astype( + {"geo_id": str, "sample_size": int} + ) + expected_df = pd.DataFrame( + { + "geo_id": ["51093", "51175"], + "val": [3.12345678910, 2.1], + "se": [0.15, 0.22], + "sample_size": [100, 100], + } + ).astype({"geo_id": str, "sample_size": int}) + pd.testing.assert_frame_equal(df, expected_df) + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="county", + sensor="test", + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_county_test.csv", + "20200301_county_test.csv", + "20200315_county_test.csv", + ] + ) + df = pd.read_csv(join(self.TEST_DIR, "20200215_county_test.csv")).astype( + {"geo_id": str, "sample_size": int} + ) + expected_df = pd.DataFrame( + { + "geo_id": ["51093", "51175"], + "val": [3.12345678910, np.nan], + "se": [0.15, 0.22], + "sample_size": [100, 100], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER], + "missing_se": [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.NOT_MISSING] * 2, + } + ).astype({"geo_id": str, "sample_size": int}) + pd.testing.assert_frame_equal(df, expected_df) + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + ) diff --git a/ansible/inventory b/ansible/inventory index 424d05c3d..d67b775c6 100644 --- a/ansible/inventory +++ b/ansible/inventory @@ -1,5 +1,6 @@ [runtime_host] delphi-master-prod-01.delphi.cmu.edu +bigchunk-dev-02.delphi.cmu.edu [runtime_host_staging] app-mono-dev-01.delphi.cmu.edu diff --git a/ansible/templates/sir_complainsalot-params-prod.json.j2 b/ansible/templates/sir_complainsalot-params-prod.json.j2 index 0076bcc82..7a1f5dddd 100644 --- a/ansible/templates/sir_complainsalot-params-prod.json.j2 +++ b/ansible/templates/sir_complainsalot-params-prod.json.j2 @@ -37,7 +37,47 @@ "fb-survey": { "max_age": 3, "maintainers": ["U01069KCRS7"], - "retired-signals": ["smoothed_anxious_5d", "smoothed_wanxious_5d", "smoothed_depressed_5d", "smoothed_wdepressed_5d", "smoothed_felt_isolated_5d", "smoothed_wfelt_isolated_5d", "smoothed_large_event_1d", "smoothed_wlarge_event_1d", "smoothed_restaurant_1d", "smoothed_wrestaurant_1d", "smoothed_shop_1d", "smoothed_wshop_1d", "smoothed_spent_time_1d", "smoothed_wspent_time_1d", "smoothed_travel_outside_state_5d", "smoothed_wtravel_outside_state_5d", "smoothed_work_outside_home_1d", "smoothed_wwork_outside_home_1d", "smoothed_wearing_mask", "smoothed_wwearing_mask", "smoothed_vaccine_likely_local_health", "smoothed_wvaccine_likely_local_health", "smoothed_others_masked", "smoothed_wothers_masked", "smoothed_wanted_test_14d", "smoothed_wwanted_test_14d", "smoothed_covid_vaccinated_or_accept", "smoothed_wcovid_vaccinated_or_accept", "smoothed_accept_covid_vaccine", "smoothed_waccept_covid_vaccine", "smoothed_hesitancy_reason_allergic", "smoothed_whesitancy_reason_allergic", "smoothed_hesitancy_reason_not_recommended", "smoothed_whesitancy_reason_not_recommended", "smoothed_hesitancy_reason_distrust_vaccines", "smoothed_whesitancy_reason_distrust_vaccines", "smoothed_hesitancy_reason_health_condition", "smoothed_whesitancy_reason_health_condition", "smoothed_hesitancy_reason_pregnant", "smoothed_whesitancy_reason_pregnant", "smoothed_vaccine_likely_friends", "smoothed_wvaccine_likely_friends", "smoothed_vaccine_likely_who", "smoothed_wvaccine_likely_who", "smoothed_vaccine_likely_govt_health", "smoothed_wvaccine_likely_govt_health", "smoothed_vaccine_likely_politicians", "smoothed_wvaccine_likely_politicians", "smoothed_vaccine_likely_doctors", "smoothed_wvaccine_likely_doctors", "smoothed_felt_isolated_7d", "smoothed_wfelt_isolated_7d", "smoothed_worried_become_ill", "smoothed_wworried_become_ill", "smoothed_inperson_school_fulltime", "smoothed_winperson_school_fulltime", "smoothed_inperson_school_parttime", "smoothed_winperson_school_parttime", ["smoothed_vaccine_barrier_appointment_time_tried", "msa"], ["smoothed_vaccine_barrier_childcare_tried", "msa"], ["smoothed_vaccine_barrier_document_tried", "msa"], ["smoothed_vaccine_barrier_eligible_tried", "msa"], ["smoothed_vaccine_barrier_language_tried", "msa"], ["smoothed_vaccine_barrier_no_appointments_tried", "msa"], ["smoothed_vaccine_barrier_none_tried", "msa"], ["smoothed_vaccine_barrier_technical_difficulties_tried", "msa"], ["smoothed_vaccine_barrier_technology_access_tried", "msa"], ["smoothed_vaccine_barrier_time_tried", "msa"], ["smoothed_vaccine_barrier_travel_tried", "msa"], ["smoothed_vaccine_barrier_type_tried", "msa"], ["smoothed_wvaccine_barrier_appointment_time_tried", "msa"], ["smoothed_wvaccine_barrier_childcare_tried", "msa"], ["smoothed_wvaccine_barrier_document_tried", "msa"], ["smoothed_wvaccine_barrier_eligible_tried", "msa"], ["smoothed_wvaccine_barrier_language_tried", "msa"], ["smoothed_wvaccine_barrier_no_appointments_tried", "msa"], ["smoothed_wvaccine_barrier_none_tried", "msa"], ["smoothed_wvaccine_barrier_technical_difficulties_tried", "msa"], ["smoothed_wvaccine_barrier_technology_access_tried", "msa"], ["smoothed_wvaccine_barrier_time_tried", "msa"], ["smoothed_wvaccine_barrier_travel_tried", "msa"], ["smoothed_wvaccine_barrier_type_tried", "msa"]] + "retired-signals": [ + "smoothed_anxious_5d", "smoothed_wanxious_5d", + "smoothed_depressed_5d", "smoothed_wdepressed_5d", + "smoothed_felt_isolated_5d", "smoothed_wfelt_isolated_5d", + "smoothed_large_event_1d", "smoothed_wlarge_event_1d", + "smoothed_restaurant_1d", "smoothed_wrestaurant_1d", + "smoothed_shop_1d", "smoothed_wshop_1d", + "smoothed_spent_time_1d", "smoothed_wspent_time_1d", + "smoothed_travel_outside_state_5d", "smoothed_wtravel_outside_state_5d", + "smoothed_work_outside_home_1d", "smoothed_wwork_outside_home_1d", + "smoothed_wearing_mask", "smoothed_wwearing_mask", + "smoothed_vaccine_likely_local_health", "smoothed_wvaccine_likely_local_health", + "smoothed_others_masked", "smoothed_wothers_masked", + "smoothed_wanted_test_14d", "smoothed_wwanted_test_14d", + "smoothed_covid_vaccinated_or_accept", "smoothed_wcovid_vaccinated_or_accept", + "smoothed_accept_covid_vaccine", "smoothed_waccept_covid_vaccine", + "smoothed_hesitancy_reason_allergic", "smoothed_whesitancy_reason_allergic", + "smoothed_hesitancy_reason_not_recommended", "smoothed_whesitancy_reason_not_recommended", + "smoothed_hesitancy_reason_distrust_vaccines", "smoothed_whesitancy_reason_distrust_vaccines", + "smoothed_hesitancy_reason_health_condition", "smoothed_whesitancy_reason_health_condition", + "smoothed_hesitancy_reason_pregnant", "smoothed_whesitancy_reason_pregnant", + "smoothed_vaccine_likely_friends", "smoothed_wvaccine_likely_friends", + "smoothed_vaccine_likely_who", "smoothed_wvaccine_likely_who", + "smoothed_vaccine_likely_govt_health", "smoothed_wvaccine_likely_govt_health", + "smoothed_vaccine_likely_politicians", "smoothed_wvaccine_likely_politicians", + "smoothed_vaccine_likely_doctors", "smoothed_wvaccine_likely_doctors", + "smoothed_felt_isolated_7d", "smoothed_wfelt_isolated_7d", + "smoothed_worried_become_ill", "smoothed_wworried_become_ill", + ["smoothed_vaccine_barrier_appointment_time_tried", "msa"], ["smoothed_wvaccine_barrier_appointment_time_tried", "msa"], + ["smoothed_vaccine_barrier_childcare_tried", "msa"], ["smoothed_wvaccine_barrier_childcare_tried", "msa"], + ["smoothed_vaccine_barrier_document_tried", "msa"], ["smoothed_wvaccine_barrier_document_tried", "msa"], + ["smoothed_vaccine_barrier_eligible_tried", "msa"], ["smoothed_wvaccine_barrier_eligible_tried", "msa"], + ["smoothed_vaccine_barrier_language_tried", "msa"], ["smoothed_wvaccine_barrier_language_tried", "msa"], + ["smoothed_vaccine_barrier_no_appointments_tried", "msa"], ["smoothed_wvaccine_barrier_no_appointments_tried", "msa"], + ["smoothed_vaccine_barrier_none_tried", "msa"], ["smoothed_wvaccine_barrier_none_tried", "msa"], + ["smoothed_wvaccine_barrier_technical_difficulties_tried", "msa"], ["smoothed_vaccine_barrier_technical_difficulties_tried", "msa"], + ["smoothed_wvaccine_barrier_technology_access_tried", "msa"], ["smoothed_wvaccine_barrier_technology_access_tried", "msa"], + ["smoothed_vaccine_barrier_time_tried", "msa"], ["smoothed_wvaccine_barrier_time_tried", "msa"], + ["smoothed_vaccine_barrier_travel_tried", "msa"], ["smoothed_wvaccine_barrier_travel_tried", "msa"], + ["smoothed_vaccine_barrier_type_tried", "msa"], ["smoothed_wvaccine_barrier_type_tried", "msa"] + ] }, "indicator-combination": { "max_age": 4, diff --git a/cdc_covidnet/delphi_cdc_covidnet/covidnet.py b/cdc_covidnet/delphi_cdc_covidnet/covidnet.py index 03c2b7775..b202e2fa3 100644 --- a/cdc_covidnet/delphi_cdc_covidnet/covidnet.py +++ b/cdc_covidnet/delphi_cdc_covidnet/covidnet.py @@ -6,7 +6,7 @@ """ import json -import logging +from logging import Logger import os from typing import Tuple, List from multiprocessing import cpu_count, Pool @@ -100,7 +100,7 @@ def download_hosp_data( @staticmethod def download_all_hosp_data( - mappings_file: str, cache_path: str, parallel: bool = False + mappings_file: str, cache_path: str, logger: Logger, parallel: bool = False ) -> List[str]: """ Download hospitalization data for all states listed in the mappings JSON file to disk. @@ -146,7 +146,7 @@ def download_all_hosp_data( else: for args in state_args: CovidNet.download_hosp_data(*args) - logging.debug("Downloading for nid=%s, cid=%s", args[0], args[1]) + logger.debug("Downloading for nid=%s, cid=%s", args[0], args[1]) return state_files diff --git a/cdc_covidnet/delphi_cdc_covidnet/run.py b/cdc_covidnet/delphi_cdc_covidnet/run.py index 87e1419ce..0214d52ae 100644 --- a/cdc_covidnet/delphi_cdc_covidnet/run.py +++ b/cdc_covidnet/delphi_cdc_covidnet/run.py @@ -4,12 +4,13 @@ This module should contain a function called `run_module`, that is executed when the module is run with `python -m delphi_cdc_covidnet`. """ -import logging from datetime import datetime from os import remove from os.path import join from typing import Dict, Any +from delphi_utils import get_structured_logger + from .covidnet import CovidNet from .update_sensor import update_sensor @@ -32,7 +33,9 @@ def run_module(params: Dict[str, Dict[str, Any]]): - "wip_signal": list of str or bool, to be passed to delphi_utils.add_prefix. - "input_cache_dir": str, directory to download source files. """ - logging.basicConfig(level=logging.DEBUG) + logger = get_structured_logger( + __name__, filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True)) start_date = datetime.strptime(params["indicator"]["start_date"], "%Y-%m-%d") @@ -42,15 +45,15 @@ def run_module(params: Dict[str, Dict[str, Any]]): else: end_date = datetime.strptime(params["indicator"]["end_date"], "%Y-%m-%d") - logging.info("start date:\t%s", start_date.date()) - logging.info("end date:\t%s", end_date.date()) + logger.info("start date:\t%s", start_date.date()) + logger.info("end date:\t%s", end_date.date()) - logging.info("outpath:\t%s", params["common"]["export_dir"]) - logging.info("parallel:\t%s", params["indicator"]["parallel"]) + logger.info("outpath:\t%s", params["common"]["export_dir"]) + logger.info("parallel:\t%s", params["indicator"]["parallel"]) # Only geo is state, and no weekday adjustment for now # COVID-NET data is by weeks anyway, not daily - logging.info("starting state, no adj") + logger.info("starting state, no adj") # Download latest COVID-NET files into the cache directory first mappings_file = join(params["indicator"]["input_cache_dir"], "init.json") @@ -58,7 +61,8 @@ def run_module(params: Dict[str, Dict[str, Any]]): _, mmwr_info, _ = CovidNet.read_mappings(mappings_file) state_files = CovidNet.download_all_hosp_data( mappings_file, params["indicator"]["input_cache_dir"], - parallel=params["indicator"]["parallel"]) + parallel=params["indicator"]["parallel"], + logger=logger) update_sensor( state_files, @@ -73,4 +77,4 @@ def run_module(params: Dict[str, Dict[str, Any]]): for state_file in state_files: remove(state_file) - logging.info("finished all") + logger.info("finished all") diff --git a/cdc_covidnet/tests/test_covidnet.py b/cdc_covidnet/tests/test_covidnet.py index 6846b9f5e..efe03fe29 100644 --- a/cdc_covidnet/tests/test_covidnet.py +++ b/cdc_covidnet/tests/test_covidnet.py @@ -1,4 +1,5 @@ import json +import logging from os.path import join, exists from tempfile import TemporaryDirectory @@ -7,6 +8,7 @@ from delphi_cdc_covidnet.api_config import APIConfig from delphi_cdc_covidnet.covidnet import CovidNet +TEST_LOGGER = logging.getLogger() class TestCovidNet: @@ -65,14 +67,14 @@ def test_hosp_data(self): # Non-parallel state_files = CovidNet.download_all_hosp_data( - init_file, temp_dir, parallel=False) + init_file, temp_dir, TEST_LOGGER, parallel=False) assert len(state_files) == num_states for state_file in state_files: assert exists(state_file) # Parallel state_files_par = CovidNet.download_all_hosp_data( - init_file, temp_dir, parallel=True) + init_file, temp_dir, TEST_LOGGER, parallel=True) assert set(state_files) == set(state_files_par) assert len(state_files_par) == num_states for state_file in state_files_par: diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index c9b340403..9580b5728 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -173,7 +173,8 @@ def run_module(params: Dict[str, Dict[str, Any]]): weekday, numtype, params["indicator"]["se"], - params["indicator"]["wip_signal"] + params["indicator"]["wip_signal"], + logger ) if numtype == "covid": data = load_combined_data(file_dict["denom"], diff --git a/changehc/delphi_changehc/sensor.py b/changehc/delphi_changehc/sensor.py index f4a8934ab..d1422567b 100644 --- a/changehc/delphi_changehc/sensor.py +++ b/changehc/delphi_changehc/sensor.py @@ -87,7 +87,7 @@ def backfill( return new_num, new_den @staticmethod - def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"): + def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den"): """Fitting routine. Args: @@ -121,7 +121,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"): se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)') rate_data['se'] = se_valid - logging.debug("{0}: {1:.3f},[{2:.3f}]".format( + logger.debug("{0}: {1:.3f},[{2:.3f}]".format( geo_id, rate_data['rate'][-1], rate_data['se'][-1] )) return {"geo_id": geo_id, diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index a87ea853f..95de8fe21 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -20,7 +20,7 @@ from .weekday import Weekday -def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", start_date=None, end_date=None): +def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None): """Write sensor values to csv. Args: @@ -47,7 +47,7 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", assert df[suspicious_se_mask].empty, " se contains suspiciously large values" assert not df["se"].isna().any(), " se contains nan values" if write_se: - logging.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name)) + logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name)) else: df.loc[:, "se"] = np.nan @@ -55,7 +55,7 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", suspicious_val_mask = df["val"].gt(90) if not df[suspicious_val_mask].empty: for geo in df.loc[suspicious_val_mask, "geo_id"]: - logging.warning("value suspiciously high, {0}: {1}".format( + logger.warning("value suspiciously high, {0}: {1}".format( geo, out_name )) @@ -68,10 +68,10 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", sensor=out_name, write_empty_days=True ) - logging.debug("wrote {0} rows for {1} {2}".format( + logger.debug("wrote {0} rows for {1} {2}".format( df.size, df["geo_id"].unique().size, geo_level )) - logging.debug("wrote files to {0}".format(output_path)) + logger.debug("wrote files to {0}".format(output_path)) return dates @@ -87,7 +87,8 @@ def __init__(self, weekday, numtype, se, - wip_signal): + wip_signal, + logger): """Init Sensor Updator. Args: @@ -100,7 +101,9 @@ def __init__(self, numtype: type of count data used, one of ["covid", "cli"] se: boolean to write out standard errors, if true, use an obfuscated name wip_signal: Prefix for WIP signals + logger: the structured logger """ + self.logger = logger self.startdate, self.enddate, self.dropdate = [ pd.to_datetime(t) for t in (startdate, enddate, dropdate)] # handle dates @@ -149,7 +152,7 @@ def geo_reindex(self, data): geo = self.geo gmpr = GeoMapper() if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}: - logging.error("{0} is invalid, pick one of 'county', " + self.logger.error("{0} is invalid, pick one of 'county', " "'state', 'msa', 'hrr', 'hss','nation'".format(geo)) return False if geo == "county": @@ -201,12 +204,12 @@ def update_sensor(self, sub_data.reset_index(level=0,inplace=True) if self.weekday: sub_data = Weekday.calc_adjustment(wd_params, sub_data) - res = CHCSensor.fit(sub_data, self.burnindate, geo_id) + res = CHCSensor.fit(sub_data, self.burnindate, geo_id, self.logger) res = pd.DataFrame(res).loc[final_sensor_idxs] dfs.append(res) else: n_cpu = min(10, cpu_count()) - logging.debug("starting pool with {0} workers".format(n_cpu)) + self.logger.debug("starting pool with {0} workers".format(n_cpu)) with Pool(n_cpu) as pool: pool_results = [] for geo_id, sub_data in data_frame.groupby(level=0,as_index=False): @@ -215,7 +218,7 @@ def update_sensor(self, sub_data = Weekday.calc_adjustment(wd_params, sub_data) pool_results.append( pool.apply_async( - CHCSensor.fit, args=(sub_data, self.burnindate, geo_id,), + CHCSensor.fit, args=(sub_data, self.burnindate, geo_id, self.logger), ) ) pool_results = [proc.get() for proc in pool_results] @@ -244,7 +247,8 @@ def update_sensor(self, write_se=self.se, day_shift=Config.DAY_SHIFT, out_name=signal, - output_path=output_path + output_path=output_path, + logger=self.logger ) if len(dates) > 0: stats.append((max(dates), len(dates))) diff --git a/changehc/setup.py b/changehc/setup.py index 6edef8528..d702874b3 100644 --- a/changehc/setup.py +++ b/changehc/setup.py @@ -27,7 +27,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/changehc/tests/test_sensor.py b/changehc/tests/test_sensor.py index 7c4aef01d..32afc3081 100644 --- a/changehc/tests/test_sensor.py +++ b/changehc/tests/test_sensor.py @@ -1,4 +1,5 @@ # standard +import logging import numpy as np import numpy.random as nr @@ -19,6 +20,7 @@ COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"] DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) +TEST_LOGGER = logging.getLogger() class TestLoadData: combined_data = load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE, @@ -56,7 +58,7 @@ def test_fit_fips(self): for fips in all_fips: sub_data = self.combined_data.loc[fips] sub_data = sub_data.reindex(date_range, fill_value=0) - res0 = CHCSensor.fit(sub_data, date_range[0], fips) + res0 = CHCSensor.fit(sub_data, date_range[0], fips, TEST_LOGGER) if np.isnan(res0["rate"]).all(): assert res0["incl"].sum() == 0 diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 779960bf7..c202c4325 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -1,4 +1,5 @@ # standard +import logging from copy import deepcopy import os from os.path import join, exists @@ -27,6 +28,7 @@ DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) OUTPATH="test_data/" +TEST_LOGGER = logging.getLogger() class TestCHCSensorUpdator: """Tests for updating the sensors.""" @@ -53,7 +55,8 @@ def test_shift_dates(self): self.weekday, self.numtype, self.se, - "" + "", + TEST_LOGGER ) ## Test init assert su_inst.startdate.month == 2 @@ -77,7 +80,8 @@ def test_geo_reindex(self): self.weekday, self.numtype, self.se, - "" + "", + TEST_LOGGER ) su_inst.shift_dates() test_data = pd.DataFrame({ @@ -103,7 +107,8 @@ def test_update_sensor(self): self.weekday, self.numtype, self.se, - "" + "", + TEST_LOGGER ) # As of 3/3/21 (40c258a), this set of data has county outputting data, state and hhs not # outputting data, and nation outputting data, which is undesirable. Ideal behaviour @@ -149,7 +154,8 @@ def test_write_to_csv_results(self): write_se=False, day_shift=CONFIG.DAY_SHIFT, out_name="name_of_signal", - output_path=td.name + output_path=td.name, + logger=TEST_LOGGER ) # check outputs @@ -203,7 +209,8 @@ def test_write_to_csv_with_se_results(self): write_se=True, day_shift=CONFIG.DAY_SHIFT, out_name="name_of_signal", - output_path=td.name + output_path=td.name, + logger=TEST_LOGGER ) # check outputs @@ -243,7 +250,8 @@ def test_write_to_csv_wrong_results(self): write_se=False, day_shift=CONFIG.DAY_SHIFT, out_name="name_of_signal", - output_path=td.name + output_path=td.name, + logger=TEST_LOGGER ) # nan se for included loc-date @@ -258,7 +266,8 @@ def test_write_to_csv_wrong_results(self): write_se=True, day_shift=CONFIG.DAY_SHIFT, out_name="name_of_signal", - output_path=td.name + output_path=td.name, + logger=TEST_LOGGER ) # large se value @@ -273,7 +282,8 @@ def test_write_to_csv_wrong_results(self): write_se=True, day_shift=CONFIG.DAY_SHIFT, out_name="name_of_signal", - output_path=td.name + output_path=td.name, + logger=TEST_LOGGER ) td.cleanup() diff --git a/claims_hosp/setup.py b/claims_hosp/setup.py index c7527193b..6c88e4383 100644 --- a/claims_hosp/setup.py +++ b/claims_hosp/setup.py @@ -24,7 +24,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py b/combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py index c54f3f3be..7fbaa2898 100755 --- a/combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py +++ b/combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py @@ -33,14 +33,6 @@ covidcast.covidcast._ASYNC_CALL = True # pylint: disable=protected-access -def check_none_data_frame(data_frame, label, date_range): - """Log and return True when a data frame is None.""" - if data_frame is None: - print(f"{label} completely unavailable in range {date_range}") - return True - return False - - def maybe_append(usa_facts, jhu): """ Append dataframes if available, otherwise return USAFacts. @@ -133,7 +125,7 @@ def get_updated_dates(signal, geo, date_range, issue_range=None, fetcher=covidca issues=issue_range ) - if check_none_data_frame(usafacts_df, "USA-FACTS", date_range): + if usafacts_df is None: return None merged_df = merge_dfs_by_geos(usafacts_df, jhu_df, geo) @@ -142,7 +134,8 @@ def get_updated_dates(signal, geo, date_range, issue_range=None, fetcher=covidca return unique_dates -def combine_usafacts_and_jhu(signal, geo, date_range, issue_range=None, fetcher=covidcast.signal): +def combine_usafacts_and_jhu(signal, geo, date_range, logger, + issue_range=None, fetcher=covidcast.signal): """Add rows for PR from JHU signals to USA-FACTS signals. For hhs and nation, fetch the county `num` data so we can compute the proportions correctly @@ -158,6 +151,7 @@ def combine_usafacts_and_jhu(signal, geo, date_range, issue_range=None, fetcher= # This occurs if the usafacts ~and the jhu query were empty if unique_dates is None: + logger.info("USA-FACTS completely unavailable for dates", date_range=date_range) return EMPTY_FRAME # Query only the represented window so that every geo is represented; a single window call is @@ -329,9 +323,15 @@ def run_module(params): log_exceptions=params["common"].get("log_exceptions", True)) for metric, geo_res, sensor_name, signal in variants: + logger.info("Generating signal and exporting to CSV", + geo_res = geo_res, + metric = metric, + sensor = sensor_name, + signal = signal) df = combine_usafacts_and_jhu(signal, geo_res, extend_raw_date_range(params, sensor_name), + logger, params['indicator']['issue_range']) df["timestamp"] = pd.to_datetime(df["timestamp"]) start_date = pd.to_datetime(params['indicator']['export_start_date']) diff --git a/combo_cases_and_deaths/setup.py b/combo_cases_and_deaths/setup.py index 8ea4b187b..db97840a7 100644 --- a/combo_cases_and_deaths/setup.py +++ b/combo_cases_and_deaths/setup.py @@ -22,7 +22,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/combo_cases_and_deaths/tests/test_run.py b/combo_cases_and_deaths/tests/test_run.py index 8d03627d4..e83af9abb 100644 --- a/combo_cases_and_deaths/tests/test_run.py +++ b/combo_cases_and_deaths/tests/test_run.py @@ -1,4 +1,5 @@ """Tests for running combo cases and deaths indicator.""" +import logging from datetime import date from itertools import product import os @@ -17,6 +18,7 @@ COLUMN_MAPPING) from delphi_combo_cases_and_deaths.constants import METRICS, SMOOTH_TYPES, SENSORS +TEST_LOGGER = logging.getLogger() def test_issue_dates(): """The smoothed value for a particular date is computed from the raw @@ -98,7 +100,7 @@ def make_mock(geo): ("1 1", 4, 1 if geo in ["nation", "hhs"] else 2), ("0 0", 2, 0) ]: - df = combine_usafacts_and_jhu("", geo, date_range, fetcher=mock_covidcast_signal) + df = combine_usafacts_and_jhu("", geo, date_range, TEST_LOGGER, fetcher=mock_covidcast_signal) assert df.size == expected_size * len(COLUMN_MAPPING), f""" Wrong number of rows in combined data frame for the number of available signals. @@ -126,7 +128,7 @@ def test_multiple_issues(mock_covidcast_signal): }), None ] * 2 - result = combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), fetcher=mock_covidcast_signal) + result = combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal) pd.testing.assert_frame_equal( result, pd.DataFrame( @@ -186,7 +188,7 @@ def test_combine_usafacts_and_jhu_special_geos(mock_covidcast_signal): ] * 6 # each call to combine_usafacts_and_jhu makes (2 + 2 * len(unique_timestamps)) = 12 calls to the fetcher pd.testing.assert_frame_equal( - combine_usafacts_and_jhu("confirmed_incidence_num", "nation", date_range=(0, 1), fetcher=mock_covidcast_signal), + combine_usafacts_and_jhu("confirmed_incidence_num", "nation", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal), pd.DataFrame({"timestamp": [20200101], "geo_id": ["us"], "val": [50 + 100 + 200], @@ -194,7 +196,7 @@ def test_combine_usafacts_and_jhu_special_geos(mock_covidcast_signal): "sample_size": [None]}) ) pd.testing.assert_frame_equal( - combine_usafacts_and_jhu("confirmed_incidence_prop", "nation", date_range=(0, 1), fetcher=mock_covidcast_signal), + combine_usafacts_and_jhu("confirmed_incidence_prop", "nation", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal), pd.DataFrame({"timestamp": [20200101], "geo_id": ["us"], "val": [(50 + 100 + 200) / (4903185 + 3723066) * 100000], @@ -202,7 +204,7 @@ def test_combine_usafacts_and_jhu_special_geos(mock_covidcast_signal): "sample_size": [None]}) ) pd.testing.assert_frame_equal( - combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), fetcher=mock_covidcast_signal), + combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal), pd.DataFrame({"geo_id": ["01000", "01001", "72001"], "val": [50, 100, 200], "timestamp": [20200101, 20200101, 20200101]}, @@ -229,7 +231,7 @@ def test_no_nation_jhu(mock_covidcast_signal): "value": [1], "timestamp": [20200101]}) ] - result = combine_usafacts_and_jhu("_num", "nation", date_range=(0, 1), fetcher=mock_covidcast_signal) + result = combine_usafacts_and_jhu("_num", "nation", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal) assert mock_covidcast_signal.call_args_list[-1] == call( "jhu-csse", diff --git a/covid_act_now/delphi_covid_act_now/run.py b/covid_act_now/delphi_covid_act_now/run.py index d9d983f0d..7cc96f6e4 100644 --- a/covid_act_now/delphi_covid_act_now/run.py +++ b/covid_act_now/delphi_covid_act_now/run.py @@ -45,7 +45,7 @@ def run_module(params): parquet_url = params["indicator"]["parquet_url"] # Load CAN county-level testing data - print("Pulling CAN data") + logger.info("Pulling CAN data") df_pq = load_data(parquet_url) df_county_testing = extract_testing_metrics(df_pq) @@ -54,7 +54,8 @@ def run_module(params): max_dates_exported = [] # Perform geo aggregations and export to receiving for geo_res in GEO_RESOLUTIONS: - print(f"Processing {geo_res}") + logger.info("Generating signal and exporting to CSV", + geo_res = geo_res) df = geo_map(df_county_testing, geo_res) # Export 'pcr_specimen_positivity_rate' @@ -79,7 +80,7 @@ def run_module(params): max_dates_exported.append(latest) # x2 to count both positivity and tests signals num_exported_files += exported_csv_dates.size * 2 - print(f"Exported dates: {earliest} to {latest}") + logger.info("Exported for dates between", earliest=earliest, latest=latest) elapsed_time_in_seconds = round(time.time() - start_time, 2) max_lag_in_days = (datetime.now() - min(max_dates_exported)).days diff --git a/doctor_visits/delphi_doctor_visits/run.py b/doctor_visits/delphi_doctor_visits/run.py index 65f30ed69..84bc1af0e 100644 --- a/doctor_visits/delphi_doctor_visits/run.py +++ b/doctor_visits/delphi_doctor_visits/run.py @@ -6,10 +6,11 @@ """ # standard packages -import logging from datetime import datetime, timedelta from pathlib import Path +from delphi_utils import get_structured_logger + # first party from .update_sensor import update_sensor, write_to_csv @@ -37,7 +38,9 @@ def run_module(params): - "obfuscated_prefix": str, prefix for signal name if write_se is True. - "parallel": bool, whether to update sensor in parallel. """ - logging.basicConfig(level=logging.DEBUG) + logger = get_structured_logger( + __name__, filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True)) ## get end date from input file # the filename is expected to be in the format: @@ -61,30 +64,30 @@ def run_module(params): startdate_dt = enddate_dt - timedelta(days=n_backfill_days) enddate = str(enddate_dt.date()) startdate = str(startdate_dt.date()) - logging.info("drop date:\t\t%s", dropdate) - logging.info("first sensor date:\t%s", startdate) - logging.info("last sensor date:\t%s", enddate) - logging.info("n_backfill_days:\t%s", n_backfill_days) - logging.info("n_waiting_days:\t%s", n_waiting_days) + logger.info("drop date:\t\t%s", dropdate) + logger.info("first sensor date:\t%s", startdate) + logger.info("last sensor date:\t%s", enddate) + logger.info("n_backfill_days:\t%s", n_backfill_days) + logger.info("n_waiting_days:\t%s", n_waiting_days) ## geographies geos = ["state", "msa", "hrr", "county", "hhs", "nation"] ## print out other vars - logging.info("outpath:\t\t%s", export_dir) - logging.info("parallel:\t\t%s", params["indicator"]["parallel"]) - logging.info("weekday:\t\t%s", params["indicator"]["weekday"]) - logging.info("write se:\t\t%s", se) - logging.info("obfuscated prefix:\t%s", prefix) + logger.info("outpath:\t\t%s", export_dir) + logger.info("parallel:\t\t%s", params["indicator"]["parallel"]) + logger.info("weekday:\t\t%s", params["indicator"]["weekday"]) + logger.info("write se:\t\t%s", se) + logger.info("obfuscated prefix:\t%s", prefix) ## start generating for geo in geos: for weekday in params["indicator"]["weekday"]: if weekday: - logging.info("starting %s, weekday adj", geo) + logger.info("starting %s, weekday adj", geo) else: - logging.info("starting %s, no adj", geo) + logger.info("starting %s, no adj", geo) sensor = update_sensor( filepath=params["indicator"]["input_file"], startdate=startdate, @@ -93,10 +96,11 @@ def run_module(params): geo=geo, parallel=params["indicator"]["parallel"], weekday=weekday, - se=params["indicator"]["se"] + se=params["indicator"]["se"], + logger=logger, ) if sensor is None: - logging.error("No sensors calculated, no output will be produced") + logger.error("No sensors calculated, no output will be produced") continue # write out results out_name = "smoothed_adj_cli" if weekday else "smoothed_cli" @@ -104,8 +108,8 @@ def run_module(params): assert prefix is not None, "template has no obfuscated prefix" out_name = prefix + "_" + out_name - write_to_csv(sensor, geo, se, out_name, export_dir) - logging.debug(f"wrote files to {export_dir}") - logging.info("finished %s", geo) + write_to_csv(sensor, geo, se, out_name, logger, export_dir) + logger.debug(f"wrote files to {export_dir}") + logger.info("finished %s", geo) - logging.info("finished all") + logger.info("finished all") diff --git a/doctor_visits/delphi_doctor_visits/sensor.py b/doctor_visits/delphi_doctor_visits/sensor.py index 22690f916..e96c8bfe0 100644 --- a/doctor_visits/delphi_doctor_visits/sensor.py +++ b/doctor_visits/delphi_doctor_visits/sensor.py @@ -6,9 +6,6 @@ """ -# standard packages -import logging - # third party import numpy as np import pandas as pd @@ -162,7 +159,8 @@ def fit(y_data, geo_id, recent_min_visits, min_recent_obs, - jeffreys): + jeffreys, + logger): """Fitting routine. Args: @@ -217,7 +215,7 @@ def fit(y_data, # if all rates are zero, don't bother if code_vals.sum() == 0: if jeffreys: - logging.error("p is 0 even though we used Jefferys estimate") + logger.error("p is 0 even though we used Jefferys estimate") new_rates.append(np.zeros((n_dates,))) continue @@ -240,7 +238,7 @@ def fit(y_data, se[include] = np.sqrt( np.divide((new_rates[include] * (1 - new_rates[include])), den[include])) - logging.debug(f"{geo_id}: {new_rates[-1]:.3f},[{se[-1]:.3f}]") + logger.debug(f"{geo_id}: {new_rates[-1]:.3f},[{se[-1]:.3f}]") included_indices = [x for x in final_sensor_idxs if include[x]] diff --git a/doctor_visits/delphi_doctor_visits/update_sensor.py b/doctor_visits/delphi_doctor_visits/update_sensor.py index 931ec3afa..068d2a058 100644 --- a/doctor_visits/delphi_doctor_visits/update_sensor.py +++ b/doctor_visits/delphi_doctor_visits/update_sensor.py @@ -9,7 +9,6 @@ """ # standard packages -import logging from datetime import timedelta from multiprocessing import Pool, cpu_count @@ -24,7 +23,7 @@ from .weekday import Weekday -def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, output_path="."): +def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, output_path="."): """Write sensor values to csv. Args: @@ -34,7 +33,7 @@ def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, output_path=" output_path: outfile path to write the csv (default is current directory) """ if se: - logging.info(f"========= WARNING: WRITING SEs TO {out_name} =========") + logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========") out_n = 0 for d in set(output_df["date"]): @@ -64,12 +63,12 @@ def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, output_path=" outfile.write( "%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) out_n += 1 - logging.debug(f"wrote {out_n} rows for {geo_level}") + logger.debug(f"wrote {out_n} rows for {geo_level}") def update_sensor( filepath, startdate, enddate, dropdate, geo, parallel, - weekday, se + weekday, se, logger ): """Generate sensor values. @@ -82,6 +81,7 @@ def update_sensor( parallel: boolean to run the sensor update in parallel weekday: boolean to adjust for weekday effects se: boolean to write out standard errors, if true, use an obfuscated name + logger: the structured logger """ # as of 2020-05-11, input file expected to have 10 columns # id cols: ServiceDate, PatCountyFIPS, PatAgeGroup, Pat HRR ID/Pat HRR Name @@ -125,7 +125,7 @@ def update_sensor( (burn_in_dates >= startdate) & (burn_in_dates <= enddate))[0][:len(sensor_dates)] # handle if we need to adjust by weekday - params = Weekday.get_params(data) if weekday else None + params = Weekday.get_params(data, logger) if weekday else None if weekday and np.any(np.all(params == 0,axis=1)): # Weekday correction failed for at least one count type return None @@ -155,13 +155,14 @@ def update_sensor( geo_id, Config.MIN_RECENT_VISITS, Config.MIN_RECENT_OBS, - jeffreys + jeffreys, + logger ) out.append(res) else: n_cpu = min(10, cpu_count()) - logging.debug(f"starting pool with {n_cpu} workers") + logger.debug(f"starting pool with {n_cpu} workers") with Pool(n_cpu) as pool: pool_results = [] @@ -182,6 +183,7 @@ def update_sensor( Config.MIN_RECENT_VISITS, Config.MIN_RECENT_OBS, jeffreys, + logger ), ) ) diff --git a/doctor_visits/delphi_doctor_visits/weekday.py b/doctor_visits/delphi_doctor_visits/weekday.py index 86e5278b2..529454f80 100644 --- a/doctor_visits/delphi_doctor_visits/weekday.py +++ b/doctor_visits/delphi_doctor_visits/weekday.py @@ -4,11 +4,11 @@ Created: 2020-05-06 """ -# standard packages -import logging + # third party import cvxpy as cp +from cvxpy.error import SolverError import numpy as np # first party @@ -19,7 +19,7 @@ class Weekday: """Class to handle weekday effects.""" @staticmethod - def get_params(data): + def get_params(data, logger): r"""Correct a signal estimated as numerator/denominator for weekday effects. The ordinary estimate would be numerator_t/denominator_t for each time point @@ -92,13 +92,13 @@ def get_params(data): _ = prob.solve() params[i,:] = b.value break - except: + except SolverError: # If the magnitude of the objective function is too large, an error is # thrown; Rescale the objective function by going through loop pass else: # Leaving params[i,:] = 0 is equivalent to not performing weekday correction - logging.error("Unable to calculate weekday correction") + logger.error("Unable to calculate weekday correction") return params diff --git a/doctor_visits/setup.py b/doctor_visits/setup.py index 3c74af4b6..d7c0fe0a9 100644 --- a/doctor_visits/setup.py +++ b/doctor_visits/setup.py @@ -23,7 +23,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/doctor_visits/tests/test_update_sensor.py b/doctor_visits/tests/test_update_sensor.py index 4e504e19c..ab74c1c90 100644 --- a/doctor_visits/tests/test_update_sensor.py +++ b/doctor_visits/tests/test_update_sensor.py @@ -1,9 +1,11 @@ """Tests for update_sensor.py.""" - +import logging import pandas as pd from delphi_doctor_visits.update_sensor import update_sensor +TEST_LOGGER = logging.getLogger() + class TestUpdateSensor: def test_update_sensor(self): actual = update_sensor( @@ -14,7 +16,8 @@ def test_update_sensor(self): geo="state", parallel=False, weekday=False, - se=False + se=False, + logger=TEST_LOGGER, ) comparison = pd.read_csv("./comparison/update_sensor/all.csv", parse_dates=["date"]) diff --git a/facebook/delphiFacebook/R/responses.R b/facebook/delphiFacebook/R/responses.R index a6a92a7a2..bca1d72a8 100644 --- a/facebook/delphiFacebook/R/responses.R +++ b/facebook/delphiFacebook/R/responses.R @@ -129,7 +129,10 @@ load_response_one <- function(input_filename, params, contingency_run) { Q79 = col_integer(), Q80 = col_integer(), I5 = col_character(), - I7 = col_character()), + I7 = col_character(), + E2_1 = col_integer(), + E2_2 = col_integer() + ), locale = locale(grouping_mark = "")) if (nrow(input_data) == 0) { return(tibble()) diff --git a/google_symptoms/setup.py b/google_symptoms/setup.py index 16a5aaecc..8cd7590ca 100644 --- a/google_symptoms/setup.py +++ b/google_symptoms/setup.py @@ -25,7 +25,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/hhs_facilities/delphi_hhs_facilities/run.py b/hhs_facilities/delphi_hhs_facilities/run.py index b41df5bcc..43d3a9bdd 100644 --- a/hhs_facilities/delphi_hhs_facilities/run.py +++ b/hhs_facilities/delphi_hhs_facilities/run.py @@ -36,6 +36,9 @@ def run_module(params) -> None: filled_fips_df = fill_missing_fips(raw_df, gmpr) stats = [] for geo, (sig_name, sig_cols, sig_func, sig_offset) in product(GEO_RESOLUTIONS, SIGNALS): + logger.info("Generating signal and exporting to CSV", + geo_res = geo, + signal_name = sig_name) mapped_df = convert_geo(filled_fips_df, geo, gmpr) output_df = generate_signal(mapped_df, sig_cols, sig_func, sig_offset) dates = create_export_csv(output_df, params["common"]["export_dir"], geo, sig_name) diff --git a/hhs_hosp/delphi_hhs/run.py b/hhs_hosp/delphi_hhs/run.py index 6af654845..45c2f5bc1 100644 --- a/hhs_hosp/delphi_hhs/run.py +++ b/hhs_hosp/delphi_hhs/run.py @@ -105,6 +105,10 @@ def run_module(params): geo_mapper = GeoMapper() stats = [] for sensor, smoother, geo in product(SIGNALS, SMOOTHERS, GEOS): + logger.info("Generating signal and exporting to CSV", + geo_res = geo, + sensor = sensor, + smoother = smoother) df = geo_mapper.add_geocode(make_signal(all_columns, sensor), "state_id", "state_code", diff --git a/jhu/setup.py b/jhu/setup.py index 9f17f34f4..4c015a7c7 100644 --- a/jhu/setup.py +++ b/jhu/setup.py @@ -22,7 +22,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/nchs_mortality/delphi_nchs_mortality/archive_diffs.py b/nchs_mortality/delphi_nchs_mortality/archive_diffs.py index 6524203b3..e8b790cee 100644 --- a/nchs_mortality/delphi_nchs_mortality/archive_diffs.py +++ b/nchs_mortality/delphi_nchs_mortality/archive_diffs.py @@ -8,7 +8,7 @@ from delphi_utils import S3ArchiveDiffer -def arch_diffs(params, daily_arch_diff): +def arch_diffs(params, daily_arch_diff, logger): """ Archive differences between new updates and existing data. @@ -23,6 +23,8 @@ def arch_diffs(params, daily_arch_diff): Read from params.json daily_arch_diff: S3ArchiveDiffer Used to store and update cache + logger: logging.Logger + The structured logger. """ weekly_export_dir = params["common"]["weekly_export_dir"] daily_export_dir = params["common"]["daily_export_dir"] @@ -59,7 +61,7 @@ def arch_diffs(params, daily_arch_diff): # Report failures: someone should probably look at them for exported_file in fails: - print(f"Failed to archive (weekly) '{exported_file}'") + logger.info("Failed to archive (weekly)", filename={exported_file}) # Daily run of archiving utility # - Uploads changed files to S3 @@ -83,4 +85,4 @@ def arch_diffs(params, daily_arch_diff): # Report failures: someone should probably look at them for exported_file in fails: - print(f"Failed to archive (daily) '{exported_file}'") + logger.info("Failed to archive (daily)", filename={exported_file}) diff --git a/nchs_mortality/delphi_nchs_mortality/export.py b/nchs_mortality/delphi_nchs_mortality/export.py deleted file mode 100644 index 47033a50a..000000000 --- a/nchs_mortality/delphi_nchs_mortality/export.py +++ /dev/null @@ -1,36 +0,0 @@ -# -*- coding: utf-8 -*- -"""Function to export the dataset in the format expected of the API.""" -import pandas as pd -from epiweeks import Week - -def export_csv(df, geo_name, sensor, export_dir, start_date): - """Export data set in format expected for ingestion by the API. - - Parameters - ---------- - df: pd.DataFrame - data frame with columns "geo_id", "timestamp", and "val" - geo_name: str - name of the geographic region, such as "state" or "hrr" - sensor: str - name of the sensor; only used for naming the output file - export_dir: str - path to location where the output CSV files to be uploaded should be stored - start_date: datetime.datetime - The first date to report - end_date: datetime.datetime - The last date to report - """ - df = df.copy() - df = df[df["timestamp"] >= start_date] - - dates = df["timestamp"].unique() - for date in dates: - t = Week.fromdate(pd.to_datetime(str(date))) - date_short = "weekly_" + str(t.year) + str(t.week).zfill(2) - export_fn = f"{date_short}_{geo_name}_{sensor}.csv" - result_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size"]] - result_df.to_csv(f"{export_dir}/{export_fn}", - index=False, - float_format="%.8f") - return pd.to_datetime(dates) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index fa0226fcb..ec5416bb2 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -9,12 +9,11 @@ from typing import Dict, Any import numpy as np -from delphi_utils import S3ArchiveDiffer, get_structured_logger +from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv from .archive_diffs import arch_diffs from .constants import (METRICS, SENSOR_NAME_MAP, SENSORS, INCIDENCE_BASE, GEO_RES) -from .export import export_csv from .pull import pull_nchs_mortality_data @@ -62,25 +61,29 @@ def run_module(params: Dict[str, Any]): df_pull = pull_nchs_mortality_data(token, test_file) for metric in METRICS: if metric == 'percent_of_expected_deaths': - print(metric) + logger.info("Generating signal and exporting to CSV", + metric = metric) df = df_pull.copy() df["val"] = df[metric] df["se"] = np.nan df["sample_size"] = np.nan df = df[~df["val"].isnull()] sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) - dates = export_csv( + dates = create_export_csv( df, - geo_name=GEO_RES, + geo_res=GEO_RES, export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=sensor_name, + weekly_dates=True ) if len(dates) > 0: stats.append((max(dates), len(dates))) else: for sensor in SENSORS: - print(metric, sensor) + logger.info("Generating signal and exporting to CSV", + metric = metric, + sensor = sensor) df = df_pull.copy() if sensor == "num": df["val"] = df[metric] @@ -90,12 +93,13 @@ def run_module(params: Dict[str, Any]): df["sample_size"] = np.nan df = df[~df["val"].isnull()] sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) - dates = export_csv( + dates = create_export_csv( df, - geo_name=GEO_RES, + geo_res=GEO_RES, export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=sensor_name, + weekly_dates=True ) if len(dates) > 0: stats.append((max(dates), len(dates))) @@ -107,7 +111,7 @@ def run_module(params: Dict[str, Any]): # - Uploads changed files to S3 # - Does not export any issues into receiving if "archive" in params: - arch_diffs(params, daily_arch_diff) + arch_diffs(params, daily_arch_diff, logger) elapsed_time_in_seconds = round(time.time() - start_time, 2) min_max_date = stats and min(s[0] for s in stats) diff --git a/nchs_mortality/setup.py b/nchs_mortality/setup.py index 6ae6bfbf7..76915936b 100644 --- a/nchs_mortality/setup.py +++ b/nchs_mortality/setup.py @@ -25,7 +25,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/nchs_mortality/tests/test_export.py b/nchs_mortality/tests/test_export.py deleted file mode 100644 index c05f287ca..000000000 --- a/nchs_mortality/tests/test_export.py +++ /dev/null @@ -1,51 +0,0 @@ -from datetime import datetime -from os.path import join, exists - -import pandas as pd - -from delphi_nchs_mortality.export import export_csv - - -class TestExport: - def test_export(self): - - # create fake dataset and save in a temporary directory - input_data = pd.DataFrame( - { - "geo_id": ["a", "a", "b", "b", "c", "c"], - "val": [0, 2, 3, 5, 10, 12], - "timestamp": [datetime(2020, 6, 2), datetime(2020, 6, 9)] * 3, - "se": [0.01, 0.02, 0.01, 0.01, 0.005, 0.01], - "sample_size": [100, 200, 500, 50, 80, 10] - } - ) - - export_csv( - input_data, - geo_name = "state", - sensor="region_thing", - export_dir="./receiving", - start_date = datetime(2020, 6, 2), - ) - - # check data for 2020-06-02 - expected_name = "weekly_202023_state_region_thing.csv" - assert exists(join("./receiving", expected_name)) - - output_data = pd.read_csv(join("./receiving", expected_name)) - - assert (output_data.columns == ["geo_id", "val", "se", "sample_size"]).all() - assert (output_data.geo_id == ["a", "b", "c"]).all() - assert (output_data.se.values == [0.01, 0.01, 0.005]).all() - assert (output_data.sample_size.values == [100, 500, 80]).all() - - # check data for 2020-06-03 - expected_name = "weekly_202024_state_region_thing.csv" - assert exists(join("./receiving", expected_name)) - - output_data = pd.read_csv(join("./receiving", expected_name)) - - assert (output_data.columns == ["geo_id", "val", "se", "sample_size"]).all() - assert (output_data.geo_id == ["a", "b", "c"]).all() - assert (output_data.se.values == [0.02, 0.01, 0.01]).all() - assert (output_data.sample_size.values == [200, 50, 10]).all() diff --git a/nowcast/setup.py b/nowcast/setup.py index 6a2ebc88f..54e88ee80 100644 --- a/nowcast/setup.py +++ b/nowcast/setup.py @@ -25,7 +25,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/quidel/delphi_quidel/data_tools.py b/quidel/delphi_quidel/data_tools.py index 5d67dd812..92afb2159 100644 --- a/quidel/delphi_quidel/data_tools.py +++ b/quidel/delphi_quidel/data_tools.py @@ -86,8 +86,6 @@ def _geographical_pooling(tpooled_tests, tpooled_ptests, min_obs, max_borrow_obs Same length as tests; proportion of parent observations to borrow. """ if (np.any(np.isnan(tpooled_tests)) or np.any(np.isnan(tpooled_ptests))): - print(tpooled_tests) - print(tpooled_ptests) raise ValueError('[parent] tests should be non-negative ' 'with no np.nan') if max_borrow_obs > min_obs: @@ -153,7 +151,6 @@ def raw_positive_prop(positives, tests, min_obs): positives = positives.astype(float) tests = tests.astype(float) if np.any(np.isnan(positives)) or np.any(np.isnan(tests)): - print(positives, tests) raise ValueError('positives and tests should be non-negative ' 'with no np.nan') if np.any(positives > tests): @@ -290,11 +287,10 @@ def raw_tests_per_device(devices, tests, min_obs): """ devices = devices.astype(float) tests = tests.astype(float) - if (np.any(np.isnan(devices)) or np.any(np.isnan(tests))): - print(devices) - print(tests) - raise ValueError('devices and tests should be non-negative ' - 'with no np.nan') + if np.any(np.isnan(devices)) or np.any(devices < 0): + raise ValueError("devices should be non-negative with no np.nan") + if np.any(np.isnan(tests)) or np.any(tests < 0): + raise ValueError("tests should be non-negative with no np.nan") if min_obs <= 0: raise ValueError('min_obs should be positive') tests[tests < min_obs] = np.nan diff --git a/quidel/delphi_quidel/pull.py b/quidel/delphi_quidel/pull.py index 6132ffe40..f168b3355 100644 --- a/quidel/delphi_quidel/pull.py +++ b/quidel/delphi_quidel/pull.py @@ -82,7 +82,7 @@ def regulate_column_names(df, test_type): return df def get_from_email(column_names, start_dates, end_dates, mail_server, - account, sender, password): + account, sender, password, logger): """ Get raw data from email account. @@ -98,6 +98,8 @@ def get_from_email(column_names, start_dates, end_dates, mail_server, email account of the sender password: str password of the datadrop email + logger: logging.Logger + The structured logger. Returns: df: pd.DataFrame @@ -131,7 +133,7 @@ def get_from_email(column_names, start_dates, end_dates, mail_server, if not whether_in_range: continue - print(f"Pulling {test} data received on %s"%search_date.date()) + logger.info("Pulling data", test=test, date=search_date.date()) toread = io.BytesIO() toread.write(att.payload) toread.seek(0) # reset the pointer @@ -153,10 +155,9 @@ def fix_zipcode(df): zipcode = int(float(zipcode)) zipcode5.append(zipcode) df['zip'] = zipcode5 - # print('Fixing %.2f %% of the data' % (fixnum * 100 / len(zipcode5))) return df -def fix_date(df): +def fix_date(df, logger): """ Remove invalid dates and select correct test date to use. @@ -175,16 +176,16 @@ def fix_date(df): df.insert(2, "timestamp", df["TestDate"]) mask = df["TestDate"] <= df["StorageDate"] - print("Removing %.2f%% of unusual data" % ((len(df) - np.sum(mask)) * 100 / len(df))) + logger.info(f"Removing {((len(df) - np.sum(mask)) * 100 / len(df)):.2f}% of unusual data") df = df[mask] mask = df["StorageDate"] - df["TestDate"] > pd.Timedelta(days=90) - print("Fixing %.2f%% of outdated data" % (np.sum(mask) * 100 / len(df))) + logger.info(f"Fixing {(np.sum(mask) * 100 / len(df)):.2f}% of outdated data") df["timestamp"].values[mask] = df["StorageDate"].values[mask] return df def preprocess_new_data(start_dates, end_dates, mail_server, account, - sender, password, test_mode): + sender, password, test_mode, logger): """ Pull and pre-process Quidel Antigen Test data from datadrop email. @@ -206,6 +207,8 @@ def preprocess_new_data(start_dates, end_dates, mail_server, account, password of the datadrop email test_mode: bool pull raw data from email or not + logger: logging.Logger + The structured logger. Returns: df: pd.DataFrame time_flag: datetime.date: @@ -220,7 +223,7 @@ def preprocess_new_data(start_dates, end_dates, mail_server, account, else: # Get new data from email dfs, time_flag = get_from_email(COLUMN_NAMES, start_dates, end_dates, - mail_server, account, sender, password) + mail_server, account, sender, password, logger) # No new data can be pulled if time_flag is None: @@ -228,13 +231,12 @@ def preprocess_new_data(start_dates, end_dates, mail_server, account, df_finals = {} for test_type in TEST_TYPES: - print(f"For {test_type}:") + logger.info(f"For {test_type}:") df = dfs[test_type] # Fix some of the fipcodes that are 9 digit instead of 5 digit df = fix_zipcode(df) # Create a column CanonicalDate according to StarageDate and TestDate - df = fix_date(df) - + df = fix_date(df, logger) # Compute numUniqueDevices numUniqueDevices = df.groupby( by=["timestamp", "zip"], @@ -309,17 +311,15 @@ def check_intermediate_file(cache_dir, pull_start_dates): sep=",", parse_dates=["timestamp"]) return previous_dfs, pull_start_dates -def pull_quidel_data(params): +def pull_quidel_data(params, logger): """ Pull new quidel test data and decide whether to combine it with historical records in ./cache. Parameters: params: dict including all the information read from params.json - END_FROM_TODAY_MINUS: int - report data until - X days - EXPORT_DAY_RANGE: int - number of dates to report + logger: logging.Logger + The structured logger. Returns: DataFrame: @@ -355,7 +355,7 @@ def pull_quidel_data(params): # Use _end_date to check the most recent date that we received data dfs, _end_date = preprocess_new_data( pull_start_dates, pull_end_dates, mail_server, - account, sender, password, test_mode) + account, sender, password, test_mode, logger) # Utilize previously stored data for test_type in TEST_TYPES: diff --git a/quidel/delphi_quidel/run.py b/quidel/delphi_quidel/run.py index 49f6ec66b..cd83d746a 100644 --- a/quidel/delphi_quidel/run.py +++ b/quidel/delphi_quidel/run.py @@ -63,9 +63,9 @@ def run_module(params: Dict[str, Any]): ) # Pull data and update export date - dfs, _end_date = pull_quidel_data(params["indicator"]) + dfs, _end_date = pull_quidel_data(params["indicator"], logger) if _end_date is None: - print("The data is up-to-date. Currently, no new data to be ingested.") + logger.info("The data is up-to-date. Currently, no new data to be ingested.") return export_end_dates = check_export_end_date(export_end_dates, _end_date, END_FROM_TODAY_MINUS) @@ -81,7 +81,6 @@ def run_module(params: Dict[str, Any]): for sensor in sensors: # Check either covid_ag or flu_ag test_type = "covid_ag" if "covid_ag" in sensor else "flu_ag" - print("state", sensor) data = dfs[test_type].copy() state_groups = geo_map("state", data, map_df).groupby("state_id") first_date, last_date = data["timestamp"].min(), data["timestamp"].max() @@ -97,7 +96,9 @@ def run_module(params: Dict[str, Any]): # County/HRR/MSA level for geo_res in GEO_RESOLUTIONS: - print(geo_res, sensor) + logger.info("Generating signal and exporting to CSV", + geo_res = geo_res, + sensor = sensor) data = dfs[test_type].copy() data, res_key = geo_map(geo_res, data, map_df) res_df = generate_sensor_for_other_geores( diff --git a/quidel/setup.py b/quidel/setup.py index f912aa0d0..2fddc0cdd 100644 --- a/quidel/setup.py +++ b/quidel/setup.py @@ -26,7 +26,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/quidel/tests/test_pull.py b/quidel/tests/test_pull.py index 435624f7e..17f596ce9 100644 --- a/quidel/tests/test_pull.py +++ b/quidel/tests/test_pull.py @@ -1,3 +1,4 @@ +import logging from datetime import datetime import pandas as pd @@ -13,6 +14,7 @@ END_FROM_TODAY_MINUS = 5 EXPORT_DAY_RANGE = 40 +TEST_LOGGER = logging.getLogger() class TestFixData: def test_fix_zipcode(self): @@ -28,7 +30,7 @@ def test_fix_date(self): datetime(2020, 6, 14), datetime(2020, 7, 10)], "TestDate":[datetime(2020, 1, 19), datetime(2020, 6, 10), datetime(2020, 6, 11), datetime(2020, 7, 2)]}) - df = fix_date(df) + df = fix_date(df, TEST_LOGGER) assert set(df["timestamp"]) == set([datetime(2020, 5, 19), datetime(2020, 6, 11), datetime(2020, 7, 2)]) @@ -49,7 +51,7 @@ def test_pull_quidel_data(self): "sender": "", "wip_signal": [""], "test_mode": True - }) + }, TEST_LOGGER) # For covid_ag df = dfs["covid_ag"] diff --git a/quidel_covidtest/delphi_quidel_covidtest/data_tools.py b/quidel_covidtest/delphi_quidel_covidtest/data_tools.py index 18898ec8e..f89a353ed 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/data_tools.py +++ b/quidel_covidtest/delphi_quidel_covidtest/data_tools.py @@ -92,8 +92,6 @@ def _geographical_pooling(tpooled_tests, tpooled_ptests, min_obs): """ if (np.any(np.isnan(tpooled_tests)) or np.any(np.isnan(tpooled_ptests))): - print(tpooled_tests) - print(tpooled_ptests) raise ValueError('[parent] tests should be non-negative ' 'with no np.nan') # STEP 1: "TOP UP" USING PARENT LOCATION @@ -156,7 +154,6 @@ def raw_positive_prop(positives, tests, min_obs): positives = positives.astype(float) tests = tests.astype(float) if np.any(np.isnan(positives)) or np.any(np.isnan(tests)): - print(positives, tests) raise ValueError('positives and tests should be non-negative ' 'with no np.nan') if np.any(positives > tests): @@ -296,11 +293,10 @@ def raw_tests_per_device(devices, tests, min_obs): """ devices = devices.astype(float) tests = tests.astype(float) - if (np.any(np.isnan(devices)) or np.any(np.isnan(tests))): - print(devices) - print(tests) - raise ValueError('devices and tests should be non-negative ' - 'with no np.nan') + if np.any(np.isnan(devices)) or np.any(devices < 0): + raise ValueError("devices should be non-negative with no np.nan") + if np.any(np.isnan(tests)) or np.any(tests < 0): + raise ValueError("tests should be non-negative with no np.nan") if min_obs <= 0: raise ValueError('min_obs should be positive') tests[tests < min_obs] = np.nan diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index fe042ed38..3efa9ed23 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -8,7 +8,7 @@ import pandas as pd import numpy as np -def get_from_s3(start_date, end_date, bucket): +def get_from_s3(start_date, end_date, bucket, logger): """ Get raw data from aws s3 bucket. @@ -19,6 +19,8 @@ def get_from_s3(start_date, end_date, bucket): pull data from file tagged with date on/before the end date bucket: s3.Bucket the aws s3 bucket that stores quidel data + logger: logging.Logger + The structured logger. output: df: pd.DataFrame time_flag: datetime.datetime @@ -49,7 +51,7 @@ def get_from_s3(start_date, end_date, bucket): for search_date in [start_date + timedelta(days=x) for x in range(n_days)]: if search_date in s3_files.keys(): # Avoid appending duplicate datasets - print("Pulling data received on %s"%search_date.date()) + logger.info(f"Pulling data received on {search_date.date()}") # Fetch data received on the same day for fn in s3_files[search_date]: @@ -76,10 +78,9 @@ def fix_zipcode(df): zipcode = int(float(zipcode)) zipcode5.append(zipcode) df['zip'] = zipcode5 - # print('Fixing %.2f %% of the data' % (fixnum * 100 / len(zipcode5))) return df -def fix_date(df): +def fix_date(df, logger): """ Remove invalid dates and select correct test date to use. @@ -98,15 +99,15 @@ def fix_date(df): df.insert(2, "timestamp", df["TestDate"]) mask = df["TestDate"] <= df["StorageDate"] - print("Removing %.2f%% of unusual data" % ((len(df) - np.sum(mask)) * 100 / len(df))) + logger.info(f"Removing {((len(df) - np.sum(mask)) * 100 / len(df)):.2f}% of unusual data") df = df[mask] mask = df["StorageDate"] - df["TestDate"] > pd.Timedelta(days=90) - print("Fixing %.2f%% of outdated data" % (np.sum(mask) * 100 / len(df))) + logger.info(f"Fixing {(np.sum(mask) * 100 / len(df)):.2f}% of outdated data") df["timestamp"].values[mask] = df["StorageDate"].values[mask] return df -def preprocess_new_data(start_date, end_date, params, test_mode): +def preprocess_new_data(start_date, end_date, params, test_mode, logger): """ Pull and pre-process Quidel Covid Test data. @@ -123,6 +124,8 @@ def preprocess_new_data(start_date, end_date, params, test_mode): read from params.json test_mode: bool pull raw data from s3 or not + logger: logging.Logger + The structured logger. output: df: pd.DataFrame time_flag: datetime.date: @@ -144,7 +147,7 @@ def preprocess_new_data(start_date, end_date, params, test_mode): aws_secret_access_key=aws_secret_access_key) bucket = s3.Bucket(bucket_name) # Get new data from s3 - df, time_flag = get_from_s3(start_date, end_date, bucket) + df, time_flag = get_from_s3(start_date, end_date, bucket, logger) # No new data can be pulled if time_flag is None: @@ -154,7 +157,7 @@ def preprocess_new_data(start_date, end_date, params, test_mode): df = fix_zipcode(df) # Create a column CanonicalDate according to StarageDate and TestDate - df = fix_date(df) + df = fix_date(df, logger) # Compute overallPositive overall_pos = df[df["OverallResult"] == "positive"].groupby( @@ -197,7 +200,7 @@ def check_intermediate_file(cache_dir, pull_start_date): return previous_df, pull_start_date return None, pull_start_date -def pull_quidel_covidtest(params): +def pull_quidel_covidtest(params, logger): """Pull the quidel covid test data. Conditionally merge new data with historical data from ./cache. @@ -205,10 +208,8 @@ def pull_quidel_covidtest(params): Parameters: params: dict including all the information read from params.json - end_from_today_minus: int - report data until - X days - export_day_range: int - number of dates to report + logger: logging.Logger + The structured logger. Returns: DataFrame: @@ -237,7 +238,7 @@ def pull_quidel_covidtest(params): # Pull data from the file at 5 digit zipcode level # Use _end_date to check the most recent date that we received data df, _end_date = preprocess_new_data( - pull_start_date, pull_end_date, params, test_mode) + pull_start_date, pull_end_date, params, test_mode, logger) # Utilize previously stored data if previous_df is not None: diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index d82f80135..5f084440c 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -76,9 +76,9 @@ def run_module(params: Dict[str, Any]): export_day_range = params["indicator"]["export_day_range"] # Pull data and update export date - df, _end_date = pull_quidel_covidtest(params["indicator"]) + df, _end_date = pull_quidel_covidtest(params["indicator"], logger) if _end_date is None: - print("The data is up-to-date. Currently, no new data to be ingested.") + logger.info("The data is up-to-date. Currently, no new data to be ingested.") return export_end_date = check_export_end_date(export_end_date, _end_date, END_FROM_TODAY_MINUS) @@ -98,7 +98,9 @@ def run_module(params: Dict[str, Any]): geo_data, res_key = geo_map(geo_res, data) geo_groups = geo_data.groupby(res_key) for sensor in sensors: - print(geo_res, sensor) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor) if sensor.endswith(SMOOTHED_POSITIVE): smoothers[sensor] = smoothers.pop(SMOOTHED_POSITIVE) elif sensor.endswith(RAW_POSITIVE): @@ -125,7 +127,9 @@ def run_module(params: Dict[str, Any]): for geo_res in PARENT_GEO_RESOLUTIONS: geo_data, res_key = geo_map(geo_res, data) for sensor in sensors: - print(geo_res, sensor) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor) res_df = generate_sensor_for_parent_geo( geo_groups, geo_data, res_key, smooth=smoothers[sensor][1], device=smoothers[sensor][0], first_date=first_date, diff --git a/quidel_covidtest/setup.py b/quidel_covidtest/setup.py index 9537a175b..4c01e8593 100644 --- a/quidel_covidtest/setup.py +++ b/quidel_covidtest/setup.py @@ -26,7 +26,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/quidel_covidtest/tests/test_pull.py b/quidel_covidtest/tests/test_pull.py index 48bb48d14..17ddbb6fd 100644 --- a/quidel_covidtest/tests/test_pull.py +++ b/quidel_covidtest/tests/test_pull.py @@ -1,3 +1,4 @@ +import logging from datetime import datetime import pandas as pd @@ -14,6 +15,8 @@ END_FROM_TODAY_MINUS = 5 EXPORT_DAY_RANGE = 40 +TEST_LOGGER = logging.getLogger() + class TestFixData: def test_fix_zipcode(self): @@ -28,7 +31,7 @@ def test_fix_date(self): datetime(2020, 6, 14), datetime(2020, 7, 10)], "TestDate":[datetime(2020, 1, 19), datetime(2020, 6, 10), datetime(2020, 6, 11), datetime(2020, 7, 2)]}) - df = fix_date(df) + df = fix_date(df, TEST_LOGGER) assert set(df["timestamp"]) == set([datetime(2020, 5, 19), datetime(2020, 6, 11), datetime(2020, 7, 2)]) @@ -50,7 +53,7 @@ def test_pull_quidel_covidtest(self): "bucket_name": "", "wip_signal": "", "test_mode": True - }) + }, TEST_LOGGER) first_date = df["timestamp"].min().date() last_date = df["timestamp"].max().date() diff --git a/safegraph_patterns/delphi_safegraph_patterns/process.py b/safegraph_patterns/delphi_safegraph_patterns/process.py index 1445ce028..330cf6762 100644 --- a/safegraph_patterns/delphi_safegraph_patterns/process.py +++ b/safegraph_patterns/delphi_safegraph_patterns/process.py @@ -125,7 +125,7 @@ def aggregate(df, metric, geo_res): return df.rename({geo_key: "geo_id"}, axis=1) def process(fname, sensors, metrics, geo_resolutions, - export_dir, brand_df, stats): + export_dir, brand_df, stats, logger): """ Process an input census block group-level CSV and export it. @@ -135,16 +135,20 @@ def process(fname, sensors, metrics, geo_resolutions, ---------- fname: str Input filename. - metrics: List[Tuple[str, bool]] - List of (metric_name, wip). sensors: List[str] List of (sensor) + metrics: List[Tuple[str, bool]] + List of (metric_name, wip). geo_resolutions: List[str] List of geo resolutions to export the data. + export_dir: str + The directory to export files to. brand_df: pd.DataFrame mapping info from naics_code to safegraph_brand_id stats: List[Tuple[datetime, int]] List to which we will add (max export date, number of export dates) + logger: logging.Logger + The structured logger. Returns ------- @@ -164,7 +168,7 @@ def process(fname, sensors, metrics, geo_resolutions, usecols=used_cols, parse_dates=["date_range_start", "date_range_end"]) dfs = construct_signals(df, metric_names, naics_codes, brand_df) - print("Finished pulling data from " + fname) + logger.info("Finished pulling data.", filename=fname) else: files = glob.glob(f'{fname}/**/*.csv.gz', recursive=True) dfs_dict = {"bars_visit": [], "restaurants_visit": []} @@ -180,9 +184,11 @@ def process(fname, sensors, metrics, geo_resolutions, ).groupby(["timestamp", "zip"]).sum().reset_index() dfs["restaurants_visit"] = pd.concat(dfs_dict["restaurants_visit"] ).groupby(["timestamp", "zip"]).sum().reset_index() - print("Finished pulling data from " + fname) + logger.info("Finished pulling data.", filename=fname) for geo_res, sensor in product(geo_resolutions, sensors): for metric, wip in zip(metric_names, wips): + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, metric=metric, sensor=sensor) df_export = aggregate(dfs[metric], metric, geo_res) df_export["val"] = df_export["_".join([metric, sensor])] df_export["se"] = np.nan diff --git a/safegraph_patterns/delphi_safegraph_patterns/run.py b/safegraph_patterns/delphi_safegraph_patterns/run.py index ffb0e4eb7..6eb474b9b 100644 --- a/safegraph_patterns/delphi_safegraph_patterns/run.py +++ b/safegraph_patterns/delphi_safegraph_patterns/run.py @@ -101,7 +101,8 @@ def run_module(params): sensors=SENSORS, geo_resolutions=GEO_RESOLUTIONS, export_dir=export_dir, - stats=stats + stats=stats, + logger=logger, ) with mp.Pool(n_core) as pool: diff --git a/safegraph_patterns/setup.py b/safegraph_patterns/setup.py index 4053b622e..5ead94b33 100644 --- a/safegraph_patterns/setup.py +++ b/safegraph_patterns/setup.py @@ -22,7 +22,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/sir_complainsalot/setup.py b/sir_complainsalot/setup.py index 3b18b5f19..c51253104 100644 --- a/sir_complainsalot/setup.py +++ b/sir_complainsalot/setup.py @@ -22,7 +22,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), ) diff --git a/usafacts/delphi_usafacts/run.py b/usafacts/delphi_usafacts/run.py index 90c11e28a..4c659679a 100644 --- a/usafacts/delphi_usafacts/run.py +++ b/usafacts/delphi_usafacts/run.py @@ -98,7 +98,7 @@ def run_module(params: Dict[str, Dict[str, Any]]): METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS): if "cumulative" in sensor and "seven_day_average" in smoother: continue - logger.info("generating signal and exporting to CSV", + logger.info("Generating signal and exporting to CSV", geo_res = geo_res, metric = metric, sensor = sensor, diff --git a/usafacts/setup.py b/usafacts/setup.py index b11951c7f..e15cae933 100644 --- a/usafacts/setup.py +++ b/usafacts/setup.py @@ -22,7 +22,7 @@ classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], packages=find_packages(), )