diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 3016571af..c8fd22c0e 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.58 +current_version = 0.3.59 commit = True message = chore: bump covidcast-indicators to {new_version} tag = False diff --git a/ansible/templates/sir_complainsalot-params-prod.json.j2 b/ansible/templates/sir_complainsalot-params-prod.json.j2 index 7bb2d179a..0dc66608d 100644 --- a/ansible/templates/sir_complainsalot-params-prod.json.j2 +++ b/ansible/templates/sir_complainsalot-params-prod.json.j2 @@ -44,7 +44,13 @@ }, "nssp": { "max_age":19, - "maintainers": [] + "maintainers": [], + "retired-signals": [ + "pct_ed_visits_combined_2023rvr", + "pct_ed_visits_covid_2023rvr", + "pct_ed_visits_influenza_2023rvr", + "pct_ed_visits_rsv_2023rvr" + ] }, "nhsn": { "max_age":19, diff --git a/changehc/version.cfg b/changehc/version.cfg index 7d679606f..e99046324 100644 --- a/changehc/version.cfg +++ b/changehc/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/claims_hosp/version.cfg b/claims_hosp/version.cfg index 7d679606f..e99046324 100644 --- a/claims_hosp/version.cfg +++ b/claims_hosp/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/doctor_visits/version.cfg b/doctor_visits/version.cfg index 7d679606f..e99046324 100644 --- a/doctor_visits/version.cfg +++ b/doctor_visits/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/google_symptoms/version.cfg b/google_symptoms/version.cfg index 7d679606f..e99046324 100644 --- a/google_symptoms/version.cfg +++ b/google_symptoms/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/hhs_hosp/version.cfg b/hhs_hosp/version.cfg index 7d679606f..e99046324 100644 --- a/hhs_hosp/version.cfg +++ b/hhs_hosp/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/nchs_mortality/version.cfg b/nchs_mortality/version.cfg index 7d679606f..e99046324 100644 --- a/nchs_mortality/version.cfg +++ b/nchs_mortality/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/nhsn/delphi_nhsn/constants.py b/nhsn/delphi_nhsn/constants.py index e6e6e4359..d51241b4f 100644 --- a/nhsn/delphi_nhsn/constants.py +++ b/nhsn/delphi_nhsn/constants.py @@ -2,6 +2,9 @@ GEOS = ["state", "nation", "hhs"] +MAIN_DATASET_ID = "ua7e-t2fy" +PRELIM_DATASET_ID = "mpgq-jmmr" + # column name from socrata TOTAL_ADMISSION_COVID_API = "totalconfc19newadm" TOTAL_ADMISSION_FLU_API = "totalconfflunewadm" diff --git a/nhsn/delphi_nhsn/patch.py b/nhsn/delphi_nhsn/patch.py new file mode 100644 index 000000000..cefb0e564 --- /dev/null +++ b/nhsn/delphi_nhsn/patch.py @@ -0,0 +1,94 @@ +""" +This module is used for patching data in the delphi_nhsn package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nhsn/patch" + } +} + +It will generate data for the range of issue dates corresponding to source data files available in "backup_dir" +specified under "common", and store them in batch issue format under "patch_dir": +[name-of-patch]/issue_[issue-date]/nhsn/actual_data_file.csv +""" + +from datetime import datetime +from os import makedirs +from pathlib import Path +from typing import List + +from delphi_utils import get_structured_logger, read_params +from epiweeks import Week + +from .run import run_module + + +def filter_source_files(source_files: List[Path]): + """ + Filter patch files such that each element in the list is an unique epiweek with the latest issue date. + + Parameters + ---------- + source_files + + Returns + ------- + list of issue dates + + """ + epiweek_dict = dict() + + for file in source_files: + if "prelim" not in file.stem: + current_issue_date = datetime.strptime(file.name.split(".")[0], "%Y%m%d") + epiweek = Week.fromdate(current_issue_date) + epiweek_dict[epiweek] = file + + filtered_patch_list = list(epiweek_dict.values()) + return filtered_patch_list + + +def patch(params): + """ + Run the doctor visits indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "patch_dir": str, directory to write all issues output + """ + logger = get_structured_logger("delphi_nhsn.patch", filename=params["common"]["log_filename"]) + + source_files = sorted(Path(params["common"]["backup_dir"]).glob("*.csv.gz")) + makedirs(params["patch"]["patch_dir"], exist_ok=True) + + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=source_files[0].name.split(".")[0], + end_issue=source_files[-1].name.split(".")[0], + ) + + patch_list = filter_source_files(source_files) + for file in patch_list: + issue_date_str = file.name.split(".")[0] + logger.info("Running issue", issue_date=datetime.strptime(issue_date_str, "%Y%m%d").strftime("%Y-%m-%d")) + params["patch"]["issue_date"] = issue_date_str + # regardless of week date type or not the directory name must be issue_date_YYYYMMDD + # conversion in done in acquisition + current_issue_dir = f"{params['patch']['patch_dir']}/issue_{issue_date_str}/nhsn" + makedirs(current_issue_dir, exist_ok=True) + params["common"]["export_dir"] = current_issue_dir + params["common"]["custom_run"] = True + run_module(params, logger) + + +if __name__ == "__main__": + patch(read_params()) diff --git a/nhsn/delphi_nhsn/pull.py b/nhsn/delphi_nhsn/pull.py index 2e1114142..7377ef958 100644 --- a/nhsn/delphi_nhsn/pull.py +++ b/nhsn/delphi_nhsn/pull.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- """Functions for pulling NSSP ER data.""" import logging +from pathlib import Path from typing import Optional import pandas as pd from delphi_utils import create_backup_csv from sodapy import Socrata -from .constants import PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT +from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT def pull_data(socrata_token: str, dataset_id: str): @@ -27,7 +28,42 @@ def pull_data(socrata_token: str, dataset_id: str): return df -def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None): +def pull_data_from_file(filepath: str, issue_date: str, logger, prelim_flag=False) -> pd.DataFrame: + """ + Pull data from source file. + + The source file is generated from delphi_utils.create_backup_csv + Parameters + ---------- + filepath: full path where the source file is located + issue_date: date when the file was pulled / generated + logger + prelim_flag: boolean to indicate which dataset to grab + + Returns + ------- + pd.DataFrame + Dataframe as described above. + """ + df = pd.DataFrame() + if issue_date: + issue_date = issue_date.replace("-", "") + filename = f"{issue_date}_prelim.csv.gz" if prelim_flag else f"{issue_date}.csv.gz" + backup_file = Path(filepath, filename) + + if backup_file.exists(): + df = pd.read_csv(backup_file, compression="gzip") + logger.info("Pulling data from file", file=filename, num_rows=len(df)) + return df + + +def pull_nhsn_data( + socrata_token: str, + backup_dir: str, + custom_run: bool, + issue_date: Optional[str], + logger: Optional[logging.Logger] = None, +): """Pull the latest NHSN hospital admission data, and conforms it into a dataset. The output dataset has: @@ -52,7 +88,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger Dataframe as described above. """ # Pull data from Socrata API - df = pull_data(socrata_token, dataset_id="ua7e-t2fy") + df = ( + pull_data(socrata_token, dataset_id=MAIN_DATASET_ID) + if not custom_run + else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False) + ) keep_columns = list(TYPE_DICT.keys()) @@ -75,7 +115,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger def pull_preliminary_nhsn_data( - socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None + socrata_token: str, + backup_dir: str, + custom_run: bool, + issue_date: Optional[str], + logger: Optional[logging.Logger] = None, ): """Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset. @@ -100,8 +144,11 @@ def pull_preliminary_nhsn_data( pd.DataFrame Dataframe as described above. """ - # Pull data from Socrata API - df = pull_data(socrata_token, dataset_id="mpgq-jmmr") + df = ( + pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID) + if not custom_run + else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True) + ) keep_columns = list(PRELIM_TYPE_DICT.keys()) diff --git a/nhsn/delphi_nhsn/run.py b/nhsn/delphi_nhsn/run.py index 80f7cab47..92e24bbda 100644 --- a/nhsn/delphi_nhsn/run.py +++ b/nhsn/delphi_nhsn/run.py @@ -25,7 +25,7 @@ from .pull import pull_nhsn_data, pull_preliminary_nhsn_data -def run_module(params): +def run_module(params, logger=None): """ Run the indicator. @@ -35,14 +35,16 @@ def run_module(params): Nested dictionary of parameters. """ start_time = time.time() - logger = get_structured_logger( - __name__, - filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True), - ) + if not logger: + logger = get_structured_logger( + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) export_dir = params["common"]["export_dir"] backup_dir = params["common"]["backup_dir"] custom_run = params["common"].get("custom_run", False) + issue_date = params.get("patch", dict()).get("issue_date", None) socrata_token = params["indicator"]["socrata_token"] export_start_date = params["indicator"]["export_start_date"] run_stats = [] @@ -51,12 +53,16 @@ def run_module(params): export_start_date = date.today() - timedelta(days=date.today().weekday() + 2) export_start_date = export_start_date.strftime("%Y-%m-%d") - nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) - preliminary_nhsn_df = pull_preliminary_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) + nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger) + preliminary_nhsn_df = pull_preliminary_nhsn_data( + socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger + ) geo_mapper = GeoMapper() signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP} - signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP}) + # some of the source backups do not include for preliminary data TODO remove after first patch + if not preliminary_nhsn_df.empty: + signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP}) for signal, df_pull in signal_df_dict.items(): for geo in GEOS: @@ -67,14 +73,17 @@ def run_module(params): df = df[df["geo_id"] == "us"] elif geo == "hhs": df = df[df["geo_id"] != "us"] + df = df[df["geo_id"].str.len() == 2] df.rename(columns={"geo_id": "state_id"}, inplace=True) df = geo_mapper.add_geocode(df, "state_id", "state_code", from_col="state_id") df = geo_mapper.add_geocode(df, "state_code", "hhs", from_col="state_code", new_col="hhs") df = geo_mapper.replace_geocode( df, from_col="state_code", from_code="state_code", new_col="geo_id", new_code="hhs" ) - else: + elif geo == "state": df = df[df_pull["geo_id"] != "us"] + df = df[df["geo_id"].str.len() == 2] # hhs region is a value in geo_id column + df["se"] = np.nan df["sample_size"] = np.nan dates = create_export_csv( diff --git a/nhsn/tests/conftest.py b/nhsn/tests/conftest.py index 525d8ae7e..b89946a02 100644 --- a/nhsn/tests/conftest.py +++ b/nhsn/tests/conftest.py @@ -16,10 +16,10 @@ # queries the nhsn data with timestamp (2021-08-19, 2021-10-19) with CO and USA data -with open("test_data/page.json", "r") as f: +with open(f"{TEST_DIR}/test_data/page.json", "r") as f: TEST_DATA = json.load(f) -with open("test_data/prelim_page.json", "r") as f: +with open(f"{TEST_DIR}/test_data/prelim_page.json", "r") as f: PRELIM_TEST_DATA = json.load(f) @pytest.fixture(scope="session") @@ -50,11 +50,12 @@ def params(): @pytest.fixture def params_w_patch(params): params_copy = copy.deepcopy(params) + params_copy["common"]["custom_run"] = True params_copy["patch"] = { - "start_issue": "2024-06-27", - "end_issue": "2024-06-29", - "patch_dir": "./patch_dir" + "patch_dir": f"{TEST_DIR}/patch_dir", + "issue_date": "2024-12-12", } + return params_copy @pytest.fixture(scope="function") diff --git a/nhsn/tests/test_data/20241212.csv.gz b/nhsn/tests/test_data/20241212.csv.gz new file mode 100644 index 000000000..26f91c200 Binary files /dev/null and b/nhsn/tests/test_data/20241212.csv.gz differ diff --git a/nhsn/tests/test_data/20241212_prelim.csv.gz b/nhsn/tests/test_data/20241212_prelim.csv.gz new file mode 100644 index 000000000..9ef690301 Binary files /dev/null and b/nhsn/tests/test_data/20241212_prelim.csv.gz differ diff --git a/nhsn/tests/test_data/page.json b/nhsn/tests/test_data/page.json index 749147244..5d4eda759 100644 --- a/nhsn/tests/test_data/page.json +++ b/nhsn/tests/test_data/page.json @@ -2122,5 +2122,123 @@ "pctconfc19icubedsperchos": "-0.0002", "pctconffluicubedsperchos": "-0.012", "pctconfrsvicubedsperchos": "0.0" + }, + { + "weekendingdate": "2021-10-16T00:00:00.000", + "jurisdiction": "region 1", + "numinptbeds": "729772.05", + "numinptbedsadult": "660758.41", + "numinptbedsped": "53387.68", + "numinptbedsocc": "556817.29", + "numinptbedsoccadult": "510524.46", + "numinptbedsoccped": "35679.97", + "numicubeds": "114471.4", + "numicubedsadult": "86925.11", + "numicubedsped": "21902.45", + "numicubedsocc": "85344.84", + "numicubedsoccadult": "66877.69", + "numicubedsoccped": "14657.22", + "numconfc19hosppatsadult": "55448.05", + "numconfc19hosppatsped": "847.15", + "totalconfc19hosppats": "56295.2", + "totalconffluhosppats": "259.85", + "numconfc19icupatsadult": "18104.6", + "totalconfc19icupats": "18104.6", + "totalconffluicupats": "39.33", + "totalconfc19newadmped": "1301.0", + "numconfc19newadmadult18to49": "11532.0", + "totalconfc19newadmadult": "44677.0", + "numconfc19newadmunk": "1982.0", + "totalconfc19newadm": "45978.0", + "totalconfflunewadm": "266.0", + "pctinptbedsocc": "0.763", + "pctconfc19inptbeds": "0.0771", + "pctconffluinptbeds": "0.0004", + "pcticubedsocc": "0.7456", + "pctconfc19icubeds": "0.1582", + "pctconffluicubeds": "0.0003", + "pctconfc19newadmadult": "0.9717", + "pctconfc19newadmped": "0.0283", + "numinptbedshosprep": "5396", + "numinptbedsocchosprep": "5396", + "numicubedshosprep": "5396", + "numicubedsocchosprep": "5396", + "totalconfc19hosppatshosprep": "5396", + "totalconffluhosppatshosprep": "4317", + "totalconfrsvhosppatshosprep": "0", + "totalconfc19icupatshosprep": "5396", + "totalconffluicupatshosprep": "4306", + "totalconfrsvicupatshosprep": "0", + "totalconfc19newadmpedhosprep": "5278", + "totalconfc19newadmadulthosprep": "5394", + "totalconfc19newadmhosprep": "5394", + "totalconfflunewadmpedhosprep": "0", + "totalconfflunewadmadulthosprep": "0", + "totalconfflunewadmhosprep": "4307", + "totalconfrsvnewadmpedhosprep": "0", + "totalconfrsvnewadmadulthosprep": "0", + "totalconfrsvnewadmhosprep": "0", + "pctinptbedsocchosprep": "5396", + "pcticubedsocchosprep": "5396", + "pctconfc19inptbedshosprep": "5396", + "pctconffluinptbedshosprep": "4317", + "pctconfrsvinptbedshosprep": "0", + "pctconfc19icubedshosprep": "5396", + "pctconffluicubedshosprep": "4306", + "pctconfrsvicubedshosprep": "0", + "numinptbedsperchosprep": "0.9492", + "numinptbedsoccperchosprep": "0.9492", + "numicubedsperchosprep": "0.9492", + "numicubedsoccperchosprep": "0.9492", + "totalconfc19hosppatsperc": "0.9492", + "totalconffluhosppatsperc": "0.7594", + "totalconfrsvhosppatsperc": "0.0", + "totalconfc19icupatsperchosprep": "0.9492", + "totalconffluicupatsperchosprep": "0.7574", + "totalconfrsvicupatsperchosprep": "0.0", + "totalconfc19newadmpedper": "0.9284", + "totalconfc19newadmadultp": "0.9488", + "totalconfc19newadmperchosprep": "94.88", + "totalconfflunewadmpedper": "0.0", + "totalconfflunewadmadultp": "0.0", + "totalconfflunewadmperchosprep": "75.76", + "totalconfrsvnewadmpedper": "0.0", + "totalconfrsvnewadmadultp": "0.0", + "totalconfrsvnewadmperchosprep": "0.0", + "pctinptbedsoccperchosprep": "0.9492", + "pcticubedsoccperchosprep": "0.9492", + "pctconfc19inptbedsperchosprep": "0.9492", + "pctconffluinptbedsperchosprep": "0.7594", + "pctconfrsvinptbedsperchosprep": "0.0", + "pctconfc19icubedsperchosprep": "0.9492", + "pctconffluicubedsperchosprep": "0.7574", + "pctconfrsvicubedsperchosprep": "0.0", + "numinptbedsperchosprepabschg": "-0.07", + "numinptbedsoccperchospre": "-0.07", + "numicubedsperchosprepabschg": "-0.07", + "numicubedsoccperchosprepabschg": "-0.07", + "totalconfc19hosppatsperc_1": "-0.02", + "totalconffluhosppatsperc_1": "-1.23", + "totalconfrsvhosppatsperc_1": "0.0", + "totalconfc19icupatsperch": "-0.02", + "totalconffluicupatsperch": "-1.2", + "totalconfrsvicupatsperch": "0.0", + "totalconfc19newadmpedper_1": "-0.04", + "totalconfc19newadmadultp_1": "-0.04", + "totalconfc19newadmpercho": "-0.04", + "totalconfflunewadmpedper_1": "0.0", + "totalconfflunewadmadultp_1": "0.0", + "totalconfflunewadmpercho": "-1.27", + "totalconfrsvnewadmpedper_1": "0.0", + "totalconfrsvnewadmadultp_1": "0.0", + "totalconfrsvnewadmpercho": "0.0", + "pctinptbedsoccperchospre": "-0.0007", + "pcticubedsoccperchosprepabschg": "-0.0007", + "pctconfc19inptbedspercho": "-0.0002", + "pctconffluinptbedspercho": "-0.0123", + "pctconfrsvinptbedspercho": "0.0", + "pctconfc19icubedsperchos": "-0.0002", + "pctconffluicubedsperchos": "-0.012", + "pctconfrsvicubedsperchos": "0.0" } ] \ No newline at end of file diff --git a/nhsn/tests/test_patch.py b/nhsn/tests/test_patch.py new file mode 100644 index 000000000..066ef4736 --- /dev/null +++ b/nhsn/tests/test_patch.py @@ -0,0 +1,129 @@ +import glob +import os +from collections import defaultdict +from pathlib import Path +import shutil +from unittest.mock import patch as mock_patch + +import pandas as pd +from datetime import datetime, timedelta + +from epiweeks import Week + +from delphi_nhsn.patch import filter_source_files, patch +from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_API, TOTAL_ADMISSION_FLU_API +from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR + +class TestPatch: + + def generate_date_list(self, start_date, end_date): + # Generate a list of dates + date_list = [] + current_date = start_date + + while current_date <= end_date: + date_list.append(current_date.strftime('%Y%m%d')) + current_date += timedelta(days=1) + return date_list + + def generate_dummy_file_names(self): + start_date = datetime(2024, 8, 1) + end_date = datetime(2024, 8, 4) + date_list_part1 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 9, 6) + end_date = datetime(2024, 9, 10) + date_list_part2 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 10, 6) + end_date = datetime(2024, 10, 15) + date_list_part3 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 11, 16) + end_date = datetime(2024, 11, 22) + date_list_part4 = self.generate_date_list(start_date, end_date) + + date_list = date_list_part1 + date_list_part2 + date_list_part3 + date_list_part4 + + file_list = [] + for date in date_list: + custom_filename = Path(f"/tmp/{date}.csv.gz") + file_list.append(custom_filename) + return file_list + + def test_filter_source_files(self): + filelist = self.generate_dummy_file_names() + epiweek_dict = defaultdict(list) + for file in filelist: + issue_dt = datetime.strptime(file.name.split(".")[0], "%Y%m%d") + issue_epiweek = Week.fromdate(issue_dt) + epiweek_dict[issue_epiweek].append(issue_dt) + patch_issue_list = filter_source_files(filelist) + for file in patch_issue_list: + issue_dt = datetime.strptime(file.name.split(".")[0], "%Y%m%d") + issue_epiweek = Week.fromdate(issue_dt) + assert max(epiweek_dict[issue_epiweek]) == issue_dt + + def generate_test_source_files(self): + start_date = datetime(2024, 8, 1) + end_date = datetime(2024, 8, 4) + date_list_part1 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 9, 6) + end_date = datetime(2024, 9, 10) + date_list_part2 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 11, 16) + end_date = datetime(2024, 11, 22) + date_list_part4 = self.generate_date_list(start_date, end_date) + + date_list = date_list_part1 + date_list_part2 + date_list_part4 + + file_list = [] + prelim_file_list = [] + for date in date_list: + custom_filename = f"{TEST_DIR}/backups/{date}.csv.gz" + custom_filename_prelim = f"{TEST_DIR}/backups/{date}_prelim.csv.gz" + test_data = pd.DataFrame(TEST_DATA) + test_data[TOTAL_ADMISSION_COVID_API] = int(date) + test_data[TOTAL_ADMISSION_FLU_API] = int(date) + test_prelim_data = pd.DataFrame(PRELIM_TEST_DATA) + test_prelim_data[TOTAL_ADMISSION_COVID_API] = int(date) + test_prelim_data[TOTAL_ADMISSION_FLU_API] = int(date) + + test_data = test_data.head(2) + test_data.to_csv( + custom_filename, index=False, na_rep="NA", compression="gzip" + ) + test_prelim_data = test_data.head(2) + test_prelim_data.to_csv( + custom_filename_prelim, index=False, na_rep="NA", compression="gzip" + ) + file_list.append(custom_filename) + prelim_file_list.append(custom_filename_prelim) + return file_list, prelim_file_list + + def test_patch(self, params_w_patch): + with mock_patch("delphi_nhsn.patch.read_params", return_value=params_w_patch): + file_list, prelim_file_list = self.generate_test_source_files() + patch(params_w_patch) + + for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("*"): + issue_dt_str = issue_path.name.replace("issue_", "") + for file in Path(issue_path / "nhsn").iterdir(): + df = pd.read_csv(file) + assert issue_dt_str == str(int(df["val"][0])) + + # clean up + shutil.rmtree(f"{TEST_DIR}/patch_dir") + + for file in file_list: + os.remove(file) + + for file in prelim_file_list: + os.remove(file) + + + + + diff --git a/nhsn/tests/test_pull.py b/nhsn/tests/test_pull.py index c09c838d7..daa3acd92 100644 --- a/nhsn/tests/test_pull.py +++ b/nhsn/tests/test_pull.py @@ -8,13 +8,12 @@ from delphi_nhsn.pull import ( pull_nhsn_data, pull_data, - pull_preliminary_nhsn_data + pull_preliminary_nhsn_data, pull_data_from_file ) from delphi_nhsn.constants import SIGNALS_MAP, PRELIM_SIGNALS_MAP from delphi_utils import get_structured_logger -from conftest import TEST_DATA, PRELIM_TEST_DATA - +from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR DATASETS = [{"id":"ua7e-t2fy", "test_data": TEST_DATA}, @@ -42,16 +41,46 @@ def test_socrata_call(self, mock_socrata, dataset, params): # Check that get method was called with correct arguments mock_client.get.assert_any_call(dataset["id"], limit=50000, offset=0) + def test_pull_from_file(self, caplog, params_w_patch): + backup_dir = f"{TEST_DIR}/test_data" + issue_date = params_w_patch["patch"]["issue_date"] + logger = get_structured_logger() + + # Load test data + expected_data = pd.DataFrame(TEST_DATA) + + df = pull_data_from_file(backup_dir, issue_date, logger=logger) + df = df.astype('str') + expected_data = expected_data.astype('str') + assert "Pulling data from file" in caplog.text + + pd.testing.assert_frame_equal(expected_data, df) + + def test_pull_from_file_prelim(self, caplog, params_w_patch): + backup_dir = f"{TEST_DIR}/test_data" + issue_date = params_w_patch["patch"]["issue_date"] + logger = get_structured_logger() + + # Load test data + expected_data = pd.DataFrame(PRELIM_TEST_DATA) + + df = pull_data_from_file(backup_dir, issue_date, logger=logger, prelim_flag=True) + df = df.astype('str') + expected_data = expected_data.astype('str') + + assert "Pulling data from file" in caplog.text + pd.testing.assert_frame_equal(expected_data, df) + def test_pull_nhsn_data_output(self, caplog, params): with patch('sodapy.Socrata.get') as mock_get: mock_get.side_effect = [TEST_DATA, []] backup_dir = params["common"]["backup_dir"] test_token = params["indicator"]["socrata_token"] - custom_run = True + custom_run = params["common"]["custom_run"] logger = get_structured_logger() - result = pull_nhsn_data(test_token, backup_dir, custom_run, logger) + result = pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check result assert result["timestamp"].notnull().all(), "timestamp has rogue NaN" @@ -74,7 +103,7 @@ def test_pull_nhsn_data_backup(self, caplog, params): logger = get_structured_logger() # Call function with test token - pull_nhsn_data(test_token, backup_dir, custom_run, logger) + pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check logger used: assert "Backup file created" in caplog.text @@ -99,11 +128,11 @@ def test_pull_prelim_nhsn_data_output(self, caplog, params): mock_get.side_effect = [PRELIM_TEST_DATA, []] backup_dir = params["common"]["backup_dir"] test_token = params["indicator"]["socrata_token"] - custom_run = True + custom_run = params["common"]["custom_run"] logger = get_structured_logger() - result = pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, logger) + result = pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check result assert result["timestamp"].notnull().all(), "timestamp has rogue NaN" @@ -126,7 +155,7 @@ def test_pull_prelim_nhsn_data_backup(self, caplog, params): logger = get_structured_logger() # Call function with test token - pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, logger) + pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check logger used: assert "Backup file created" in caplog.text diff --git a/nhsn/tests/test_run.py b/nhsn/tests/test_run.py index cfc47e50f..ef99def7b 100644 --- a/nhsn/tests/test_run.py +++ b/nhsn/tests/test_run.py @@ -1,3 +1,4 @@ +import glob import os from pathlib import Path @@ -44,6 +45,14 @@ def test_output_files_exist(self, params, run_as_module): "geo_id", "val", "se", "sample_size", ] assert (df.columns.values == expected_columns).all() + if geo == "state": + states = list(df["geo_id"].values) + assert all(len(state) == 2 for state in states) for file in Path(export_dir).glob("*.csv"): os.remove(file) + + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = glob.glob(f"{Path(params['common']['backup_dir'])}/{today}*") + for file in backup_dir: + os.remove(file) diff --git a/nssp/DETAILS.md b/nssp/DETAILS.md index 692d85559..9bac16879 100644 --- a/nssp/DETAILS.md +++ b/nssp/DETAILS.md @@ -2,29 +2,16 @@ We import the NSSP Emergency Department Visit data, including percentage and smoothed percentage of ER visits attributable to a given pathogen, from the CDC website. The data is provided at the county level, state level and national level; we do a population-weighted mean to aggregate from county data up to the HRR and MSA levels. -There are 2 sources we grab data from for nssp: -- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview -- Secondary (2023RVR) source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview -There are 8 signals output from the primary source and 4 output from secondary. There are no smoothed signals from secondary source. - -Note that the data produced from secondary source are mostly the same as their primary source equivalent, with past analysis shows around 95% of datapoints having less than 0.1 value difference and the other 5% having a 0.1 to 1.2 value difference. +NSSP source data: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview ## Geographical Levels -Primary source: * `state`: reported from source using two-letter postal code * `county`: reported from source using fips code * `national`: just `us` for now, reported from source * `hhs`, `hrr`, `msa`: not reported from source, so we computed them from county-level data using a weighted mean. Each county is assigned a weight equal to its population in the last census (2020). -Secondary (2023RVR) source: -* `state`: reported from source -* `hhs`: reported from source -* `national`: reported from source - ## Metrics * `percent_visits_covid`, `percent_visits_rsv`, `percent_visits_influenza`: percentage of emergency department patient visits for specified pathogen. * `percent_visits_combined`: sum of the three percentages of visits for flu, rsv and covid. * `smoothed_percent_visits_covid`, `smoothed_percent_visits_rsv`, `smoothed_percent_visits_influenza`: 3 week moving average of the percentage of emergency department patient visits for specified pathogen. -* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid. -* `percent_visits_covid_2023RVR`, `percent_visits_rsv_2023RVR`, `percent_visits_influenza_2023RVR`: Taken from secondary source, percentage of emergency department patient visits for specified pathogen. -* `percent_visits_combined_2023RVR`: Taken from secondary source, sum of the three percentages of visits for flu, rsv and covid. +* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid. \ No newline at end of file diff --git a/nssp/README.md b/nssp/README.md index c3f57b94b..d062771c0 100644 --- a/nssp/README.md +++ b/nssp/README.md @@ -2,9 +2,7 @@ We import the NSSP Emergency Department Visit data, currently only the smoothed concentration, from the CDC website, aggregate to the state and national level from the wastewater sample site level, and export the aggregated data. -There are 2 sources we grab data from for nssp: -- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview -- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview +NSSP source data: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview For details see the `DETAILS.md` file in this directory. diff --git a/nssp/delphi_nssp/constants.py b/nssp/delphi_nssp/constants.py index 76d51b927..9b98d2012 100644 --- a/nssp/delphi_nssp/constants.py +++ b/nssp/delphi_nssp/constants.py @@ -41,29 +41,3 @@ "fips": str, } ) - -SECONDARY_COLS_MAP = { - "week_end": "timestamp", - "geography": "geo_value", - "percent_visits": "val", - "pathogen": "signal", -} - -SECONDARY_SIGNALS_MAP = { - "COVID-19": "pct_ed_visits_covid_2023RVR", - "Influenza": "pct_ed_visits_influenza_2023RVR", - "RSV": "pct_ed_visits_rsv_2023RVR", - "Combined": "pct_ed_visits_combined_2023RVR", -} - -SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()] -SECONDARY_GEOS = ["state", "nation", "hhs"] - -SECONDARY_TYPE_DICT = { - "timestamp": "datetime64[ns]", - "geo_value": str, - "val": float, - "geo_type": str, - "signal": str, -} -SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()] diff --git a/nssp/delphi_nssp/pull.py b/nssp/delphi_nssp/pull.py index 94058dea8..8fcc4da09 100644 --- a/nssp/delphi_nssp/pull.py +++ b/nssp/delphi_nssp/pull.py @@ -10,10 +10,6 @@ from .constants import ( NEWLINE, - SECONDARY_COLS_MAP, - SECONDARY_KEEP_COLS, - SECONDARY_SIGNALS_MAP, - SECONDARY_TYPE_DICT, SIGNALS, SIGNALS_MAP, TYPE_DICT, @@ -96,53 +92,3 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger keep_columns = ["timestamp", "geography", "county", "fips"] return df_ervisits[SIGNALS + keep_columns] - - -def secondary_pull_nssp_data( - socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None -): - """Pull the latest NSSP ER visits secondary dataset. - - https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview - - The output dataset has: - - - Each row corresponds to a single observation - - Parameters - ---------- - socrata_token: str - My App Token for pulling the NSSP data (could be the same as the nchs data) - - Returns - ------- - pd.DataFrame - Dataframe as described above. - """ - socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9") - df_ervisits = pd.DataFrame.from_records(socrata_results) - create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger) - df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP) - - # geo_type is not provided in the dataset, so we infer it from the geo_value - # which is either state names, "National" or hhs region numbers - df_ervisits["geo_type"] = "state" - - df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation" - - hhs_region_mask = df_ervisits["geo_value"].str.lower().str.startswith("region ") - df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace( - "Region ", "" - ) - df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs" - - df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP) - - df_ervisits = df_ervisits[SECONDARY_KEEP_COLS] - - try: - df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT) - except KeyError as exc: - raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc - - return df_ervisits diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index 417c49ab2..7e069e483 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -31,8 +31,8 @@ from delphi_utils.geomap import GeoMapper from delphi_utils.nancodes import add_default_nancodes -from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS -from .pull import pull_nssp_data, secondary_pull_nssp_data +from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS +from .pull import pull_nssp_data def add_needed_columns(df, col_names=None): @@ -141,52 +141,5 @@ def run_module(params): if len(dates) > 0: run_stats.append((max(dates), len(dates))) - logger.info("Generating secondary signals") - secondary_df_pull = secondary_pull_nssp_data(socrata_token, backup_dir, custom_run, logger) - for signal in SECONDARY_SIGNALS: - secondary_df_pull_signal = secondary_df_pull[secondary_df_pull["signal"] == signal] - if secondary_df_pull_signal.empty: - logger.warning("No data found for signal", signal=signal) - continue - for geo in SECONDARY_GEOS: - df = secondary_df_pull_signal.copy() - logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal) - if geo == "state": - df = df[(df["geo_type"] == "state")] - df["geo_id"] = df["geo_value"].apply( - lambda x: ( - us.states.lookup(x).abbr.lower() - if us.states.lookup(x) - else ("dc" if x == "District of Columbia" else x) - ) - ) - unexpected_state_names = df[df["geo_id"] == df["geo_value"]] - if unexpected_state_names.shape[0] > 0: - logger.error( - "Unexpected state names", - unexpected_state_names=unexpected_state_names["geo_value"].unique(), - ) - raise RuntimeError - elif geo == "nation": - df = df[(df["geo_type"] == "nation")] - df["geo_id"] = "us" - elif geo == "hhs": - df = df[(df["geo_type"] == "hhs")] - df["geo_id"] = df["geo_value"] - # add se, sample_size, and na codes - missing_cols = set(CSV_COLS) - set(df.columns) - df = add_needed_columns(df, col_names=list(missing_cols)) - df_csv = df[CSV_COLS + ["timestamp"]] - # actual export - dates = create_export_csv( - df_csv, - geo_res=geo, - export_dir=export_dir, - sensor=signal, - weekly_dates=True, - ) - if len(dates) > 0: - run_stats.append((max(dates), len(dates))) - ## log this indicator run logging(start_time, run_stats, logger) diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index 30debd6cd..a03221019 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -7,16 +7,11 @@ from delphi_nssp.pull import ( pull_nssp_data, - secondary_pull_nssp_data, pull_with_socrata_api, ) from delphi_nssp.constants import ( NEWLINE, - SECONDARY_COLS_MAP, - SECONDARY_KEEP_COLS, - SECONDARY_SIGNALS_MAP, - SECONDARY_TYPE_DICT, SIGNALS, SIGNALS_MAP, TYPE_DICT, @@ -81,44 +76,5 @@ def test_pull_nssp_data(self, mock_socrata, caplog): for file in backup_files: os.remove(file) - @patch("delphi_nssp.pull.Socrata") - def test_secondary_pull_nssp_data(self, mock_socrata): - today = pd.Timestamp.today().strftime("%Y%m%d") - backup_dir = 'test_raw_data_backups' - - # Load test data - with open("test_data/secondary_page.txt", "r") as f: - test_data = json.load(f) - - # Mock Socrata client and its get method - mock_client = MagicMock() - mock_client.get.side_effect = [test_data, []] # Return test data on first call, empty list on second call - mock_socrata.return_value = mock_client - - custom_run = False - logger = get_structured_logger() - # Call function with test token - test_token = "test_token" - result = secondary_pull_nssp_data(test_token, backup_dir, custom_run, logger) - # print(result) - - # Check that Socrata client was initialized with correct arguments - mock_socrata.assert_called_once_with("data.cdc.gov", test_token) - - # Check that get method was called with correct arguments - mock_client.get.assert_any_call("7mra-9cq9", limit=50000, offset=0) - - for col in SECONDARY_KEEP_COLS: - assert result[col].notnull().all(), f"{col} has rogue NaN" - - assert result[result['geo_value'].str.startswith('Region') ].empty, "'Region ' need to be removed from geo_value for geo_type 'hhs'" - assert (result[result['geo_type'] == 'nation']['geo_value'] == 'National').all(), "All rows with geo_type 'nation' must have geo_value 'National'" - - # Check that backup file was created - backup_files = glob.glob(f"{backup_dir}/{today}*") - assert len(backup_files) == 2, "Backup file was not created" - for file in backup_files: - os.remove(file) - if __name__ == "__main__": unittest.main() diff --git a/nssp/version.cfg b/nssp/version.cfg index 7d679606f..e99046324 100644 --- a/nssp/version.cfg +++ b/nssp/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/quidel_covidtest/version.cfg b/quidel_covidtest/version.cfg index 7d679606f..e99046324 100644 --- a/quidel_covidtest/version.cfg +++ b/quidel_covidtest/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59 diff --git a/sir_complainsalot/params.json.template b/sir_complainsalot/params.json.template index 64c4bee17..cf784774f 100644 --- a/sir_complainsalot/params.json.template +++ b/sir_complainsalot/params.json.template @@ -44,7 +44,13 @@ }, "nssp": { "max_age":19, - "maintainers": [] + "maintainers": [], + "retired-signals": [ + "pct_ed_visits_combined_2023rvr", + "pct_ed_visits_covid_2023rvr", + "pct_ed_visits_influenza_2023rvr", + "pct_ed_visits_rsv_2023rvr" + ] } } } diff --git a/sir_complainsalot/version.cfg b/sir_complainsalot/version.cfg index 7d679606f..e99046324 100644 --- a/sir_complainsalot/version.cfg +++ b/sir_complainsalot/version.cfg @@ -1 +1 @@ -current_version = 0.3.58 +current_version = 0.3.59