diff --git a/nhsn/delphi_nhsn/constants.py b/nhsn/delphi_nhsn/constants.py index d51241b4f..d904057fe 100644 --- a/nhsn/delphi_nhsn/constants.py +++ b/nhsn/delphi_nhsn/constants.py @@ -6,29 +6,48 @@ PRELIM_DATASET_ID = "mpgq-jmmr" # column name from socrata -TOTAL_ADMISSION_COVID_API = "totalconfc19newadm" -TOTAL_ADMISSION_FLU_API = "totalconfflunewadm" +TOTAL_ADMISSION_COVID_COL = "totalconfc19newadm" +TOTAL_ADMISSION_FLU_COL = "totalconfflunewadm" +NUM_HOSP_REPORTING_COVID_COL = "totalconfc19newadmhosprep" +NUM_HOSP_REPORTING_FLU_COL = "totalconfflunewadmhosprep" + +# signal name +TOTAL_ADMISSION_COVID = "confirmed_admissions_covid_ew" +TOTAL_ADMISSION_FLU = "confirmed_admissions_flu_ew" +NUM_HOSP_REPORTING_COVID = "hosprep_confirmed_admissions_covid_ew" +NUM_HOSP_REPORTING_FLU = "hosprep_confirmed_admissions_flu_ew" SIGNALS_MAP = { - "confirmed_admissions_covid_ew": TOTAL_ADMISSION_COVID_API, - "confirmed_admissions_flu_ew": TOTAL_ADMISSION_FLU_API, + TOTAL_ADMISSION_COVID: TOTAL_ADMISSION_COVID_COL, + TOTAL_ADMISSION_FLU: TOTAL_ADMISSION_FLU_COL, + NUM_HOSP_REPORTING_COVID: NUM_HOSP_REPORTING_COVID_COL, + NUM_HOSP_REPORTING_FLU: NUM_HOSP_REPORTING_FLU_COL, } TYPE_DICT = { "timestamp": "datetime64[ns]", "geo_id": str, - "confirmed_admissions_covid_ew": float, - "confirmed_admissions_flu_ew": float, + TOTAL_ADMISSION_COVID: float, + TOTAL_ADMISSION_FLU: float, + NUM_HOSP_REPORTING_COVID: float, + NUM_HOSP_REPORTING_FLU: float, } # signal mapping for secondary, preliminary source +# made copy incase things would diverge + PRELIM_SIGNALS_MAP = { - "confirmed_admissions_covid_ew_prelim": TOTAL_ADMISSION_COVID_API, - "confirmed_admissions_flu_ew_prelim": TOTAL_ADMISSION_FLU_API, + f"{TOTAL_ADMISSION_COVID}_prelim": TOTAL_ADMISSION_COVID_COL, + f"{TOTAL_ADMISSION_FLU}_prelim": TOTAL_ADMISSION_FLU_COL, + f"{NUM_HOSP_REPORTING_COVID}_prelim": NUM_HOSP_REPORTING_COVID_COL, + f"{NUM_HOSP_REPORTING_FLU}_prelim": NUM_HOSP_REPORTING_FLU_COL, } + PRELIM_TYPE_DICT = { "timestamp": "datetime64[ns]", "geo_id": str, - "confirmed_admissions_covid_ew_prelim": float, - "confirmed_admissions_flu_ew_prelim": float, + f"{TOTAL_ADMISSION_COVID}_prelim": float, + f"{TOTAL_ADMISSION_FLU}_prelim": float, + f"{NUM_HOSP_REPORTING_COVID}_prelim": float, + f"{NUM_HOSP_REPORTING_FLU}_prelim": float, } diff --git a/nhsn/delphi_nhsn/pull.py b/nhsn/delphi_nhsn/pull.py index 7377ef958..515547f67 100644 --- a/nhsn/delphi_nhsn/pull.py +++ b/nhsn/delphi_nhsn/pull.py @@ -1,8 +1,13 @@ # -*- coding: utf-8 -*- """Functions for pulling NSSP ER data.""" +import copy import logging +import random +import time +from datetime import datetime, timedelta from pathlib import Path from typing import Optional +from urllib.error import HTTPError import pandas as pd from delphi_utils import create_backup_csv @@ -11,20 +16,73 @@ from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT -def pull_data(socrata_token: str, dataset_id: str): +def check_last_updated(socrata_token, dataset_id, logger): + """ + Check last updated timestamp to determine if data should be pulled or not. + + Note -- if the call to the API fails, the behavior is to treat the data as stale, + as possibly having duplicate is preferable to missing data + + Parameters + ---------- + socrata_token + dataset_id + logger + + Returns bool + ------- + + """ + recently_updated_source = True + try: + client = Socrata("data.cdc.gov", socrata_token) + response = client.get_metadata(dataset_id) + + updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"])) + now = datetime.utcnow() + recently_updated_source = (now - updated_timestamp) < timedelta(days=1) + + prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else "" + if recently_updated_source: + logger.info( + f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp + ) + else: + logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp) + # pylint: disable=W0703 + except Exception as e: + logger.info("error while processing socrata metadata; treating data as stale", error=str(e)) + return recently_updated_source + + +def pull_data(socrata_token: str, dataset_id: str, backup_dir: str, logger): """Pull data from Socrata API.""" client = Socrata("data.cdc.gov", socrata_token) + logger.info("Pulling data from Socrata API") results = [] offset = 0 limit = 50000 # maximum limit allowed by SODA 2.0 - while True: + # retry logic for 500 error + try: page = client.get(dataset_id, limit=limit, offset=offset) - if not page: - break # exit the loop if no more results + except HTTPError as err: + if err.code == 503: + time.sleep(2 + random.randint(0, 1000) / 1000.0) + page = client.get(dataset_id, limit=limit, offset=offset) + else: + logger.info("Error pulling data from Socrata API", error=str(err)) + raise err + + while len(page) > 0: results.extend(page) offset += limit + page = client.get(dataset_id, limit=limit, offset=offset) - df = pd.DataFrame.from_records(results) + if results: + df = pd.DataFrame.from_records(results) + create_backup_csv(df, backup_dir, False, logger=logger) + else: + df = pd.DataFrame() return df @@ -89,25 +147,33 @@ def pull_nhsn_data( """ # Pull data from Socrata API df = ( - pull_data(socrata_token, dataset_id=MAIN_DATASET_ID) + pull_data(socrata_token, MAIN_DATASET_ID, backup_dir, logger) if not custom_run else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False) ) - keep_columns = list(TYPE_DICT.keys()) + recently_updated = True if custom_run else check_last_updated(socrata_token, MAIN_DATASET_ID, logger) - if not df.empty: - create_backup_csv(df, backup_dir, custom_run, logger=logger) + keep_columns = list(TYPE_DICT.keys()) + if not df.empty and recently_updated: df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"}) + filtered_type_dict = copy.deepcopy(TYPE_DICT) for signal, col_name in SIGNALS_MAP.items(): - df[signal] = df[col_name] + # older backups don't have certain columns + try: + df[signal] = df[col_name] + except KeyError: + logger.info("column not available in data", col_name=col_name) + keep_columns.remove(signal) + del filtered_type_dict[signal] df = df[keep_columns] df["geo_id"] = df["geo_id"].str.lower() df.loc[df["geo_id"] == "usa", "geo_id"] = "us" - df = df.astype(TYPE_DICT) + + df = df.astype(filtered_type_dict) else: df = pd.DataFrame(columns=keep_columns) @@ -144,24 +210,31 @@ def pull_preliminary_nhsn_data( pd.DataFrame Dataframe as described above. """ + # Pull data from Socrata API df = ( - pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID) + pull_data(socrata_token, PRELIM_DATASET_ID, backup_dir, logger) if not custom_run else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True) ) keep_columns = list(PRELIM_TYPE_DICT.keys()) + recently_updated = True if custom_run else check_last_updated(socrata_token, PRELIM_DATASET_ID, logger) - if not df.empty: - create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger) - + if not df.empty and recently_updated: df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"}) + filtered_type_dict = copy.deepcopy(PRELIM_TYPE_DICT) for signal, col_name in PRELIM_SIGNALS_MAP.items(): - df[signal] = df[col_name] + try: + df[signal] = df[col_name] + except KeyError: + logger.info("column not available in data", col_name=col_name, signal=signal) + keep_columns.remove(signal) + del filtered_type_dict[signal] df = df[keep_columns] - df = df.astype(PRELIM_TYPE_DICT) + df = df.astype(filtered_type_dict) + df["geo_id"] = df["geo_id"].str.lower() df.loc[df["geo_id"] == "usa", "geo_id"] = "us" else: diff --git a/nhsn/delphi_nhsn/run.py b/nhsn/delphi_nhsn/run.py index 92e24bbda..7cf9bbace 100644 --- a/nhsn/delphi_nhsn/run.py +++ b/nhsn/delphi_nhsn/run.py @@ -14,8 +14,10 @@ unpublished signals are. See `delphi_utils.add_prefix()` - Any other indicator-specific settings """ +import re import time from datetime import date, datetime, timedelta +from itertools import product import numpy as np from delphi_utils import GeoMapper, get_structured_logger @@ -59,16 +61,20 @@ def run_module(params, logger=None): ) geo_mapper = GeoMapper() - signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP} - # some of the source backups do not include for preliminary data TODO remove after first patch + signal_df_dict = dict() + if not nhsn_df.empty: + signal_df_dict.update({signal: nhsn_df for signal in SIGNALS_MAP}) + # some of the source backups do not include for preliminary data if not preliminary_nhsn_df.empty: signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP}) - for signal, df_pull in signal_df_dict.items(): - for geo in GEOS: - df = df_pull.copy() + for geo, signals_df in product(GEOS, signal_df_dict.items()): + signal, df_pull = signals_df + df = df_pull.copy() + try: df = df[["timestamp", "geo_id", signal]] df.rename({signal: "val"}, axis=1, inplace=True) + if geo == "nation": df = df[df["geo_id"] == "us"] elif geo == "hhs": @@ -96,6 +102,14 @@ def run_module(params, logger=None): ) if len(dates) > 0: run_stats.append((max(dates), len(dates))) + # some signal columns are unavailable for patching. + except KeyError as e: + missing_signal = re.search(r"'([^']*)'", str(e)).group(1) + full_signal_list = list(SIGNALS_MAP.keys()) + list(PRELIM_SIGNALS_MAP.keys()) + if missing_signal in full_signal_list: + logger.info("signal not available in data", signal=missing_signal) + else: + raise RuntimeError("Column(s) that shouldn't be missing is missing") from e elapsed_time_in_seconds = round(time.time() - start_time, 2) min_max_date = run_stats and min(s[0] for s in run_stats) diff --git a/nhsn/tests/conftest.py b/nhsn/tests/conftest.py index b89946a02..b321f1236 100644 --- a/nhsn/tests/conftest.py +++ b/nhsn/tests/conftest.py @@ -1,5 +1,6 @@ import copy import json +import time from unittest.mock import patch import pytest @@ -60,7 +61,8 @@ def params_w_patch(params): @pytest.fixture(scope="function") def run_as_module(params): - with patch('sodapy.Socrata.get') as mock_get: + with patch('sodapy.Socrata.get') as mock_get, \ + patch('sodapy.Socrata.get_metadata') as mock_get_metadata: def side_effect(*args, **kwargs): if kwargs['offset'] == 0: if "ua7e-t2fy" in args[0]: @@ -70,5 +72,6 @@ def side_effect(*args, **kwargs): else: return [] mock_get.side_effect = side_effect + mock_get_metadata.return_value = {"rowsUpdatedAt": time.time()} run_module(params) diff --git a/nhsn/tests/patch_dir/.gitignore b/nhsn/tests/patch_dir/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/nhsn/tests/test_data/20241119.csv.gz b/nhsn/tests/test_data/20241119.csv.gz new file mode 100644 index 000000000..57c9bafb1 Binary files /dev/null and b/nhsn/tests/test_data/20241119.csv.gz differ diff --git a/nhsn/tests/test_patch.py b/nhsn/tests/test_patch.py index 066ef4736..72da1e40c 100644 --- a/nhsn/tests/test_patch.py +++ b/nhsn/tests/test_patch.py @@ -1,17 +1,19 @@ -import glob import os from collections import defaultdict from pathlib import Path import shutil from unittest.mock import patch as mock_patch - +import re import pandas as pd from datetime import datetime, timedelta +import pytest from epiweeks import Week from delphi_nhsn.patch import filter_source_files, patch -from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_API, TOTAL_ADMISSION_FLU_API +from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_COL, TOTAL_ADMISSION_FLU_COL, \ + NUM_HOSP_REPORTING_FLU_COL, NUM_HOSP_REPORTING_COVID_COL, GEOS, TOTAL_ADMISSION_COVID, TOTAL_ADMISSION_FLU, \ + NUM_HOSP_REPORTING_COVID, NUM_HOSP_REPORTING_FLU from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR class TestPatch: @@ -85,11 +87,15 @@ def generate_test_source_files(self): custom_filename = f"{TEST_DIR}/backups/{date}.csv.gz" custom_filename_prelim = f"{TEST_DIR}/backups/{date}_prelim.csv.gz" test_data = pd.DataFrame(TEST_DATA) - test_data[TOTAL_ADMISSION_COVID_API] = int(date) - test_data[TOTAL_ADMISSION_FLU_API] = int(date) + test_data[TOTAL_ADMISSION_COVID_COL] = int(date) + test_data[TOTAL_ADMISSION_FLU_COL] = int(date) + test_data[NUM_HOSP_REPORTING_COVID_COL] = int(date) + test_data[NUM_HOSP_REPORTING_FLU_COL] = int(date) test_prelim_data = pd.DataFrame(PRELIM_TEST_DATA) - test_prelim_data[TOTAL_ADMISSION_COVID_API] = int(date) - test_prelim_data[TOTAL_ADMISSION_FLU_API] = int(date) + test_prelim_data[TOTAL_ADMISSION_COVID_COL] = int(date) + test_prelim_data[TOTAL_ADMISSION_FLU_COL] = int(date) + test_prelim_data[NUM_HOSP_REPORTING_COVID_COL] = int(date) + test_prelim_data[NUM_HOSP_REPORTING_FLU_COL] = int(date) test_data = test_data.head(2) test_data.to_csv( @@ -108,14 +114,15 @@ def test_patch(self, params_w_patch): file_list, prelim_file_list = self.generate_test_source_files() patch(params_w_patch) - for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("*"): + for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("issue*"): issue_dt_str = issue_path.name.replace("issue_", "") for file in Path(issue_path / "nhsn").iterdir(): df = pd.read_csv(file) assert issue_dt_str == str(int(df["val"][0])) # clean up - shutil.rmtree(f"{TEST_DIR}/patch_dir") + for file in Path(f"{TEST_DIR}/patch_dir").glob("issue*"): + shutil.rmtree(file) for file in file_list: os.remove(file) @@ -123,6 +130,23 @@ def test_patch(self, params_w_patch): for file in prelim_file_list: os.remove(file) + def test_patch_incomplete_file(self, params_w_patch): + os.makedirs(params_w_patch["patch"]["patch_dir"], exist_ok=True) + issue_date = "20241119" + existing_signals = [TOTAL_ADMISSION_COVID, TOTAL_ADMISSION_FLU] + backup_dir = params_w_patch.get("common").get("backup_dir") + shutil.copy(f"{TEST_DIR}/test_data/{issue_date}.csv.gz", backup_dir) + + with mock_patch("delphi_nhsn.patch.read_params", return_value=params_w_patch): + patch(params_w_patch) + + files = list(Path(f"{TEST_DIR}/patch_dir/issue_{issue_date}/nhsn").glob("*.csv")) + dates = set([re.search(r"\d{6}", file.name).group() for file in files]) + assert len(files) == len(GEOS) * len(existing_signals) * len(dates) + # clean up + for file in Path(f"{TEST_DIR}/patch_dir").glob("issue*"): + shutil.rmtree(file) + diff --git a/nhsn/tests/test_pull.py b/nhsn/tests/test_pull.py index daa3acd92..91411e1d6 100644 --- a/nhsn/tests/test_pull.py +++ b/nhsn/tests/test_pull.py @@ -1,24 +1,34 @@ import glob +import json +import time from unittest.mock import patch, MagicMock import os import pytest - +from urllib.error import HTTPError import pandas as pd from delphi_nhsn.pull import ( pull_nhsn_data, pull_data, - pull_preliminary_nhsn_data, pull_data_from_file + pull_data_from_file, + pull_preliminary_nhsn_data, check_last_updated ) -from delphi_nhsn.constants import SIGNALS_MAP, PRELIM_SIGNALS_MAP +from delphi_nhsn.constants import TYPE_DICT, PRELIM_TYPE_DICT, PRELIM_DATASET_ID, MAIN_DATASET_ID from delphi_utils import get_structured_logger from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR -DATASETS = [{"id":"ua7e-t2fy", - "test_data": TEST_DATA}, - {"id":"mpgq-jmmr", - "test_data":PRELIM_TEST_DATA} +DATASETS = [{"id":MAIN_DATASET_ID, + "test_data": TEST_DATA, + "msg_prefix": "", + "prelim_flag": False, + }, + + {"id":PRELIM_DATASET_ID, + "test_data":PRELIM_TEST_DATA, + "msg_prefix": "Preliminary ", + "prelim_flag": True, + } ] @@ -27,13 +37,17 @@ class TestPullNHSNData: @pytest.mark.parametrize('dataset', DATASETS, ids=["data", "prelim_data"]) def test_socrata_call(self, mock_socrata, dataset, params): test_token = params["indicator"]["socrata_token"] + backup_dir = f"{TEST_DIR}/test_data" + logger = get_structured_logger() # Mock Socrata client and its get method mock_client = MagicMock() mock_socrata.return_value = mock_client - mock_client.get.side_effect = [[]] + # testing retry behavior + http_error = HTTPError(url="", hdrs="", fp="", msg="Service Temporarily Unavailable",code=503) + mock_client.get.side_effect = [http_error,[]] - pull_data(test_token, dataset["id"]) + pull_data(test_token, dataset["id"], backup_dir, logger) # Check that Socrata client was initialized with correct arguments mock_socrata.assert_called_once_with("data.cdc.gov", test_token) @@ -41,137 +55,171 @@ def test_socrata_call(self, mock_socrata, dataset, params): # Check that get method was called with correct arguments mock_client.get.assert_any_call(dataset["id"], limit=50000, offset=0) - def test_pull_from_file(self, caplog, params_w_patch): + @pytest.mark.parametrize('dataset', DATASETS, ids=["data", "prelim_data"]) + def test_pull_from_file(self, caplog, dataset, params_w_patch): backup_dir = f"{TEST_DIR}/test_data" issue_date = params_w_patch["patch"]["issue_date"] logger = get_structured_logger() - + prelim_flag = dataset["prelim_flag"] # Load test data - expected_data = pd.DataFrame(TEST_DATA) + expected_data = pd.DataFrame(dataset["test_data"]) - df = pull_data_from_file(backup_dir, issue_date, logger=logger) + df = pull_data_from_file(backup_dir, issue_date, logger=logger, prelim_flag=prelim_flag) df = df.astype('str') expected_data = expected_data.astype('str') assert "Pulling data from file" in caplog.text pd.testing.assert_frame_equal(expected_data, df) - def test_pull_from_file_prelim(self, caplog, params_w_patch): - backup_dir = f"{TEST_DIR}/test_data" - issue_date = params_w_patch["patch"]["issue_date"] + @patch("delphi_nhsn.pull.Socrata") + @patch("delphi_nhsn.pull.create_backup_csv") + def test_pull_nhsn_data_output(self, mock_create_backup, mock_socrata, caplog, params): + now = time.time() + # Mock Socrata client and its get method + mock_client = MagicMock() + mock_socrata.return_value = mock_client + mock_client.get.side_effect = [TEST_DATA,[]] + + mock_client.get_metadata.return_value = {"rowsUpdatedAt": now} + + backup_dir = params["common"]["backup_dir"] + test_token = params["indicator"]["socrata_token"] + custom_run = params["common"]["custom_run"] + logger = get_structured_logger() + result = pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) + mock_create_backup.assert_called_once() + + expected_columns = set(TYPE_DICT.keys()) + assert set(result.columns) == expected_columns + + for column in list(result.columns): + assert result[column].notnull().all(), f"{column} has rogue NaN" + + + @patch("delphi_nhsn.pull.Socrata") + def test_pull_nhsn_data_backup(self, mock_socrata, caplog, params): + now = time.time() + # Mock Socrata client and its get method + mock_client = MagicMock() + mock_socrata.return_value = mock_client + mock_client.get.side_effect = [TEST_DATA, []] + + mock_client.get_metadata.return_value = {"rowsUpdatedAt": now} + + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = params["common"]["backup_dir"] + custom_run = params["common"]["custom_run"] + test_token = params["indicator"]["socrata_token"] + + # Load test data + expected_data = pd.DataFrame(TEST_DATA) + + logger = get_structured_logger() + # Call function with test token + pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) + + # Check logger used: + assert "Backup file created" in caplog.text + + # Check that backup file was created + backup_files = glob.glob(f"{backup_dir}/{today}*") + assert len(backup_files) == 2, "Backup file was not created" + + for backup_file in backup_files: + if backup_file.endswith(".csv.gz"): + dtypes = expected_data.dtypes.to_dict() + actual_data = pd.read_csv(backup_file, dtype=dtypes) + else: + actual_data = pd.read_parquet(backup_file) + pd.testing.assert_frame_equal(expected_data, actual_data) + + # clean up + for file in backup_files: + os.remove(file) + + @patch("delphi_nhsn.pull.Socrata") + @patch("delphi_nhsn.pull.create_backup_csv") + def test_pull_prelim_nhsn_data_output(self, mock_create_backup, mock_socrata, caplog, params): + now = time.time() + # Mock Socrata client and its get method + mock_client = MagicMock() + mock_socrata.return_value = mock_client + mock_client.get.side_effect = [TEST_DATA, []] + + mock_client.get_metadata.return_value = {"rowsUpdatedAt": now} + + backup_dir = params["common"]["backup_dir"] + test_token = params["indicator"]["socrata_token"] + custom_run = params["common"]["custom_run"] + + logger = get_structured_logger() + + result = pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) + mock_create_backup.assert_called_once() + + expected_columns = set(PRELIM_TYPE_DICT.keys()) + assert set(result.columns) == expected_columns + + for column in list(result.columns): + assert result[column].notnull().all(), f"{column} has rogue NaN" + @patch("delphi_nhsn.pull.Socrata") + def test_pull_prelim_nhsn_data_backup(self, mock_socrata, caplog, params): + now = time.time() + # Mock Socrata client and its get method + mock_client = MagicMock() + mock_socrata.return_value = mock_client + mock_client.get.side_effect = [PRELIM_TEST_DATA, []] + + mock_client.get_metadata.return_value = {"rowsUpdatedAt": now} + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = params["common"]["backup_dir"] + custom_run = params["common"]["custom_run"] + test_token = params["indicator"]["socrata_token"] + # Load test data expected_data = pd.DataFrame(PRELIM_TEST_DATA) - df = pull_data_from_file(backup_dir, issue_date, logger=logger, prelim_flag=True) - df = df.astype('str') - expected_data = expected_data.astype('str') + logger = get_structured_logger() + # Call function with test token + pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) - assert "Pulling data from file" in caplog.text - pd.testing.assert_frame_equal(expected_data, df) + # Check logger used: + assert "Backup file created" in caplog.text + + # Check that backup file was created + backup_files = glob.glob(f"{backup_dir}/{today}*") + assert len(backup_files) == 2, "Backup file was not created" + + for backup_file in backup_files: + if backup_file.endswith(".csv.gz"): + dtypes = expected_data.dtypes.to_dict() + actual_data = pd.read_csv(backup_file, dtype=dtypes) + else: + actual_data = pd.read_parquet(backup_file) + pd.testing.assert_frame_equal(expected_data, actual_data) + + # clean up + for file in backup_files: + os.remove(file) + + @pytest.mark.parametrize('dataset', DATASETS, ids=["data", "prelim_data"]) + @pytest.mark.parametrize("updatedAt", [time.time(), time.time() - 172800], ids=["updated", "stale"]) + @patch("delphi_nhsn.pull.Socrata") + def test_check_last_updated(self, mock_socrata, dataset, updatedAt, caplog): + mock_client = MagicMock() + mock_socrata.return_value = mock_client + mock_client.get_metadata.return_value = {"rowsUpdatedAt": updatedAt } + logger = get_structured_logger() + + check_last_updated(mock_client, dataset["id"], logger) + + # Check that get method was called with correct arguments + now = time.time() + if now - updatedAt < 60: + assert f"{dataset['msg_prefix']}NHSN data was recently updated; Pulling data" in caplog.text + else: + stale_msg = f"{dataset['msg_prefix']}NHSN data is stale; Skipping" + assert stale_msg in caplog.text - def test_pull_nhsn_data_output(self, caplog, params): - with patch('sodapy.Socrata.get') as mock_get: - mock_get.side_effect = [TEST_DATA, []] - backup_dir = params["common"]["backup_dir"] - test_token = params["indicator"]["socrata_token"] - custom_run = params["common"]["custom_run"] - - logger = get_structured_logger() - - result = pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) - - # Check result - assert result["timestamp"].notnull().all(), "timestamp has rogue NaN" - assert result["geo_id"].notnull().all(), "geography has rogue NaN" - - # Check for each signal in SIGNALS - for signal in SIGNALS_MAP.keys(): - assert result[signal].notnull().all(), f"{signal} has rogue NaN" - def test_pull_nhsn_data_backup(self, caplog, params): - with patch('sodapy.Socrata.get') as mock_get: - mock_get.side_effect = [TEST_DATA, []] - - today = pd.Timestamp.today().strftime("%Y%m%d") - backup_dir = params["common"]["backup_dir"] - custom_run = params["common"]["custom_run"] - test_token = params["indicator"]["socrata_token"] - - # Load test data - expected_data = pd.DataFrame(TEST_DATA) - - logger = get_structured_logger() - # Call function with test token - pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) - - # Check logger used: - assert "Backup file created" in caplog.text - - # Check that backup file was created - backup_files = glob.glob(f"{backup_dir}/{today}*") - assert len(backup_files) == 2, "Backup file was not created" - - for backup_file in backup_files: - if backup_file.endswith(".csv.gz"): - dtypes = expected_data.dtypes.to_dict() - actual_data = pd.read_csv(backup_file, dtype=dtypes) - else: - actual_data = pd.read_parquet(backup_file) - pd.testing.assert_frame_equal(expected_data, actual_data) - - # clean up - for file in backup_files: - os.remove(file) - def test_pull_prelim_nhsn_data_output(self, caplog, params): - with patch('sodapy.Socrata.get') as mock_get: - mock_get.side_effect = [PRELIM_TEST_DATA, []] - backup_dir = params["common"]["backup_dir"] - test_token = params["indicator"]["socrata_token"] - custom_run = params["common"]["custom_run"] - - logger = get_structured_logger() - - result = pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) - - # Check result - assert result["timestamp"].notnull().all(), "timestamp has rogue NaN" - assert result["geo_id"].notnull().all(), "geography has rogue NaN" - - # Check for each signal in SIGNALS - for signal in PRELIM_SIGNALS_MAP.keys(): - assert result[signal].notnull().all(), f"{signal} has rogue NaN" - def test_pull_prelim_nhsn_data_backup(self, caplog, params): - with patch('sodapy.Socrata.get') as mock_get: - mock_get.side_effect = [PRELIM_TEST_DATA, []] - - today = pd.Timestamp.today().strftime("%Y%m%d") - backup_dir = params["common"]["backup_dir"] - custom_run = params["common"]["custom_run"] - test_token = params["indicator"]["socrata_token"] - - # Load test data - expected_data = pd.DataFrame(PRELIM_TEST_DATA) - - logger = get_structured_logger() - # Call function with test token - pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) - - # Check logger used: - assert "Backup file created" in caplog.text - - # Check that backup file was created - backup_files = glob.glob(f"{backup_dir}/{today}*") - assert len(backup_files) == 2, "Backup file was not created" - - for backup_file in backup_files: - if backup_file.endswith(".csv.gz"): - dtypes = expected_data.dtypes.to_dict() - actual_data = pd.read_csv(backup_file, dtype=dtypes) - else: - actual_data = pd.read_parquet(backup_file) - pd.testing.assert_frame_equal(expected_data, actual_data) - - # clean up - for file in backup_files: - os.remove(file) \ No newline at end of file