diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 495abd59b..d945f39f8 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -5,20 +5,26 @@ Created: 2022-08-03 """ -import os + import glob -from datetime import datetime +import os +import pathlib +import re +import shutil +from datetime import datetime, timedelta +from pathlib import Path +from typing import Tuple, Union # third party import pandas as pd from delphi_utils import GeoMapper - from .config import Config gmpr = GeoMapper() -def store_backfill_file(claims_filepath, _end_date, backfill_dir): + +def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): """ Store county level backfill data into backfill_dir. @@ -65,13 +71,20 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): "state_id": "string" }) - path = backfill_dir + \ - "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = backfill_dir + "/claims_hosp_as_of_%s.parquet" % datetime.strftime(_end_date, "%Y%m%d") + # Store intermediate file into the backfill folder + backfilldata.to_parquet(filename, index=False) + # Store intermediate file into the backfill folder - backfilldata.to_parquet(path, index=False) + try: + backfilldata.to_parquet(filename, index=False) + logger.info("Stored source data in parquet", filename=filename) + except: # pylint: disable=W0702 + logger.info("Failed to store source data in parquet") + return filename -def merge_backfill_file(backfill_dir, backfill_merge_day, today, - test_mode=False, check_nd=25): + +def merge_backfill_file(backfill_dir, backfill_merge_day, today, logger, test_mode=False, check_nd=25): """ Merge ~4 weeks' backfill data into one file. @@ -109,10 +122,15 @@ def get_date(file_link): # Check whether to merge # Check the number of files that are not merged - if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + if today.weekday() != backfill_merge_day: + logger.info("No new files to merge; skipping merging") + return + elif (today - earliest_date).days <= check_nd: + logger.info("Not enough days, skipping merging") return # Start to merge files + logger.info("Merging files", start_date=earliest_date, end_date=latest_date) pdList = [] for fn in new_files: df = pd.read_parquet(fn, engine='pyarrow') @@ -128,3 +146,106 @@ def get_date(file_link): for fn in new_files: os.remove(fn) return + + +def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): + """ + Merge existing backfill with the patch data included. This function is specifically run for patching. + + When the indicator fails for some reason or another, there's a gap in the backfill files. + The patch to fill in the missing dates happens later down the line when the backfill files are already merged. + This function takes the merged files with the missing date, insert the particular date, and merge back the file. + Parameters + ---------- + issue_date : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_file : str + specific file add to merged backfill file. + """ + new_files = sorted(Path(backfill_dir).glob("claims_hosp_*")) + new_files.remove(Path(backfill_file)) + + def get_file_with_date(files, issue_date) -> Union[Tuple[pathlib.Path, pathlib.Path], None]: + """ + Give file with the missing date. + + Parameters + ---------- + files: list of files in the backfill dir + issue_date: the issue date of the file to be inserted into + expand_flag: flag to indicate to check dates inclusive to from and to date in filenames + + Returns + ------- + Tuple[pathlib.Path, pathlib.Path] if file is found, along with new filename + after the insertion of the missing file + + None if no file is found + """ + for filepath in files: + pattern = re.findall(r"_(\d{8})", filepath.name) + + if len(pattern) == 2: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + end_date = datetime.strptime(pattern[1], "%Y%m%d") + # if date is in between from and to + if start_date <= issue_date and end_date >= issue_date: + return filepath, filepath + + elif len(pattern) == 1: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + if issue_date > start_date: + new_filename = filepath.name.replace(pattern[0], issue_date.strftime("%Y%m%d")) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + + for filepath in files: + if len(pattern) == 2: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + end_date = datetime.strptime(pattern[1], "%Y%m%d") + + # if date is either replacing a from date or a to date + if issue_date == end_date + timedelta(days=1): + new_filename = filepath.name.replace(pattern[1], issue_date.strftime("%Y%m%d")) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + + elif issue_date == start_date - timedelta(days=1): + new_filename = filepath.name.replace(pattern[0], issue_date.strftime("%Y%m%d")) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + + return None, None + + file_path, new_file_path = get_file_with_date(new_files, issue_date) + + if file_path is None: + logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) + return + + logger.info( + "Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_path + ) + # Start to merge files + file_name = file_path.stem + merge_file = f"{file_path.parent}/{file_name}_after_merge.parquet" + + try: + shutil.copyfile(file_path, merge_file) + existing_df = pd.read_parquet(merge_file, engine="pyarrow") + df = pd.read_parquet(backfill_file, engine="pyarrow") + merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"]) + merged_df.to_parquet(merge_file, index=False) + + # pylint: disable=W0703 + except Exception as e: + logger.info("Failed to merge existing backfill files", issue_date=issue_date.strftime("%Y-%m-%d"), msg=e) + os.remove(merge_file) + os.remove(backfill_file) + return + + os.remove(file_path) + os.rename(merge_file, new_file_path) + return diff --git a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py index ee6e98286..20fd0d953 100644 --- a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py +++ b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py @@ -53,9 +53,10 @@ def change_date_format(name): return name -def download(ftp_credentials, out_path, logger): +def download(ftp_credentials, out_path, logger, issue_date=None): """Pull the latest raw files.""" - current_time = datetime.datetime.now() + current_time = issue_date if issue_date else datetime.datetime.now() + seconds_in_day = 24 * 60 * 60 logger.info("Starting download") diff --git a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py index e417183c7..25d9f27dd 100644 --- a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py +++ b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py @@ -5,9 +5,9 @@ import datetime from pathlib import Path -def get_latest_filename(dir_path, logger): +def get_latest_filename(dir_path, logger, issue_date=None): """Get the latest filename from the list of downloaded raw files.""" - current_date = datetime.datetime.now() + current_date = issue_date if issue_date else datetime.datetime.now() files = list(Path(dir_path).glob("*")) latest_timestamp = datetime.datetime(1900, 1, 1) @@ -23,7 +23,6 @@ def get_latest_filename(dir_path, logger): if timestamp <= current_date: latest_timestamp = timestamp latest_filename = file - assert current_date.date() == latest_timestamp.date(), "no drop for today" logger.info("Latest claims file", filename=latest_filename) diff --git a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py index 19a962884..1f7251a89 100644 --- a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py +++ b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py @@ -57,5 +57,5 @@ def modify_and_write(data_path, logger, test_mode=False): dfs_list.append(dfs) else: dfs.to_csv(out_path, index=False) - logger.info("Wrote modified csv", filename=out_path) + logger.info("Wrote modified csv", filename=str(out_path)) return files, dfs_list diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py new file mode 100644 index 000000000..cc4ea4c29 --- /dev/null +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -0,0 +1,78 @@ +""" +This module is used for patching data in the delphi_claims_hosp package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + "custom_flag" : true, + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/covidcast-indicators/hopspital-admissions/patch", + "start_issue": "2024-04-20", + "end_issue": "2024-04-21" + } +} + +It will generate data for that range of issue dates, and store them in batch issue format: +[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv +""" + +from datetime import datetime, timedelta +from os import makedirs + +from delphi_utils import get_structured_logger, read_params + +from .run import run_module + + +def patch(): + """ + Run the hospital-admissions indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "start_date": str, YYYY-MM-DD format, first issue date + - "end_date": str, YYYY-MM-DD format, last issue date + - "patch_dir": str, directory to write all issues output + """ + params = read_params() + logger = get_structured_logger("delphi_claims_hosp.patch", filename=params["common"]["log_filename"]) + + start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") + end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") + + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=start_issue.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) + + makedirs(params["patch"]["patch_dir"], exist_ok=True) + + current_issue = start_issue + if not params["common"]["custom_run"]: + logger.warning("Custom flag not set; setting it to true for patching") + params["common"]["custom_flag"] = True + + while current_issue <= end_issue: + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) + + params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") + + current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/hospital-admissions""" + makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + + run_module(params, logger) + current_issue += timedelta(days=1) + + +if __name__ == "__main__": + patch() diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index a9752072c..895706a56 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -5,25 +5,27 @@ when the module is run with `python -m delphi_claims_hosp`. """ +import os + # standard packages import time -import os from datetime import datetime, timedelta from pathlib import Path # third party from delphi_utils import get_structured_logger +from .backfill import merge_backfill_file, merge_existing_backfill_files, store_backfill_file + # first party from .config import Config from .download_claims_ftp_files import download -from .modify_claims_drops import modify_and_write from .get_latest_claims_name import get_latest_filename +from .modify_claims_drops import modify_and_write from .update_indicator import ClaimsHospIndicatorUpdater -from .backfill import (store_backfill_file, merge_backfill_file) -def run_module(params): +def run_module(params, logger=None): """ Generate updated claims-based hospitalization indicator values. @@ -54,19 +56,25 @@ def run_module(params): adjustments (False). """ start_time = time.time() - logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + # safety check for patch parameters exists in file, but not running custom runs/patches + custom_run_flag = False if not params["common"].get("custom_run", False) else params["common"]["custom_run"] + issue_date_str = params.get("patch", {}).get("current_issue", None) + issue_date = datetime.strptime(issue_date_str + " 23:59:00", "%Y-%m-%d %H:%M:%S") if issue_date_str else None + if not logger: + logger = get_structured_logger( + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) # pull latest data - download(params["indicator"]["ftp_credentials"], - params["indicator"]["input_dir"], logger) + download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date) # aggregate data modify_and_write(params["indicator"]["input_dir"], logger) # find the latest files (these have timestamps) - claims_file = get_latest_filename(params["indicator"]["input_dir"], logger) + claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date) # handle range of estimates to produce # filename expected to have format: EDI_AGG_INPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz @@ -94,8 +102,13 @@ def run_module(params): if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] backfill_merge_day = params["indicator"]["backfill_merge_day"] - merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) - store_backfill_file(claims_file, dropdate_dt, backfill_dir) + if custom_run_flag: + backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) + + else: + merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today(), logger) + store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) # print out information logger.info("Loaded params", diff --git a/claims_hosp/setup.py b/claims_hosp/setup.py index 3b859c294..21b54f3c4 100644 --- a/claims_hosp/setup.py +++ b/claims_hosp/setup.py @@ -5,6 +5,7 @@ "covidcast", "darker[isort]~=2.1.1", "delphi-utils", + "freezegun", "numpy", "pandas", "paramiko", diff --git a/claims_hosp/tests/conftest.py b/claims_hosp/tests/conftest.py new file mode 100644 index 000000000..e0793ddd7 --- /dev/null +++ b/claims_hosp/tests/conftest.py @@ -0,0 +1,80 @@ +import shutil +from pathlib import Path + +import copy +import pytest +from unittest.mock import MagicMock +import delphi_claims_hosp + +TEST_DIR = Path(__file__).parent +@pytest.fixture(scope="session") +def params(): + params = { + "common": { + "export_dir": f"{TEST_DIR}/retrieve_files", + "log_filename": f"{TEST_DIR}/test.log", + "custom_run": False, + }, + "indicator": { + "drop_date": None, + "generate_backfill_files": True, + "ftp_credentials": + {"host": "test_host", + "user": "test_user", + "pass": "test_pass", + "port": 2222 + }, + "write_se": False, + "obfuscated_prefix": "foo_obfuscated", + "parallel": False, + "geos": ["nation"], + "weekday": [True, False], + "backfill_dir": f"{TEST_DIR}/backfill", + "start_date": "2020-02-01", + "end_date": None, + "drop_date": None, + "n_backfill_days": 70, + "n_waiting_days": 3, + "input_dir": f"{TEST_DIR}/receiving", + }, + "validation": { + "common": { + "span_length": 14, + "min_expected_lag": {"all": "3"}, + "max_expected_lag": {"all": "4"}, + } + } + } + return copy.deepcopy(params) + +@pytest.fixture +def params_w_patch(params): + params_copy = copy.deepcopy(params) + params_copy["common"]["custom_run"] = True + params_copy["patch"] = { + "start_issue": "2020-06-11", + "end_issue": "2020-06-11", + "patch_dir": "./patch_dir" + } + return params_copy + +@pytest.fixture(scope="session") +def run_as_module(params): + with mock.patch('delphi_claims_hosp.patch.get_structured_logger') as mock_get_structured_logger, \ + mock.patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ + mock.patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') as mock_ssh_client, \ + mock.patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False) as mock_exists: + mock_ssh_client_instance = MagicMock() + mock_ssh_client.return_value = mock_ssh_client_instance + mock_sftp = MagicMock() + mock_ssh_client_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")] + def mock_get(*args, **kwargs): + src = Path(f"{TEST_DIR}/test_data/{args[0]}") + dst = Path(f"{TEST_DIR}/receiving/{args[0]}") + shutil.copyfile(src, dst) + + mock_sftp.get.side_effect = mock_get + mock_read_params.return_value = params + + delphi_claims_hosp.run.run_module(params) diff --git a/claims_hosp/tests/receiving/.gitignore b/claims_hosp/tests/receiving/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/claims_hosp/tests/retrieve_files/.gitignore b/claims_hosp/tests/retrieve_files/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index fcd908461..9dc1334ad 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -1,21 +1,24 @@ +import calendar +import logging import os import glob from datetime import datetime +from pathlib import Path -# third party import pandas as pd import pytest -# first party +from delphi_utils.logger import get_structured_logger from delphi_claims_hosp.config import Config, GeoConstants -from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file +from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files CONFIG = Config() CONSTANTS = GeoConstants() +TEST_PATH = Path(__file__).parent PARAMS = { "indicator": { - "input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", - "backfill_dir": "./backfill", + "input_file": f"{TEST_PATH}/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", + "backfill_dir": f"{TEST_PATH}/backfill", "drop_date": "2020-06-11", } } @@ -25,59 +28,54 @@ class TestBackfill: - def test_store_backfill_file(self): - dropdate = datetime(2020, 1, 1) + def cleanup(self): + for file in glob.glob(f"{backfill_dir}/*.parquet"): + os.remove(file) + + def test_store_backfill_file(self, caplog): + dropdate = datetime(2020, 1, 1) fn = "claims_hosp_as_of_20200101.parquet" - assert fn not in os.listdir(backfill_dir) - + caplog.set_level(logging.INFO) + logger = get_structured_logger() + num_rows = len(pd.read_csv(DATA_FILEPATH)) + # Store backfill file - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) assert fn in os.listdir(backfill_dir) + assert "Stored source data in parquet" in caplog.text + + fn = "claims_hosp_as_of_20200101.parquet" backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') - + selected_columns = ['time_value', 'fips', 'state_id', 'num', 'den', 'lag', 'issue_date'] - assert set(selected_columns) == set(backfill_df.columns) - - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) - - def test_merge_backfill_file(self): - - today = datetime.today() + + assert set(selected_columns) == set(backfill_df.columns) + assert num_rows == len(backfill_df) + + self.cleanup() + def test_merge_backfill_file(self, caplog, monkeypatch): fn = "claims_hosp_from_20200611_to_20200614.parquet" - assert fn not in os.listdir(backfill_dir) - - # Check when there is no daily file to merge. - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) - assert fn not in os.listdir(backfill_dir) - - # Generate backfill daily files + caplog.set_level(logging.INFO) + logger = get_structured_logger() + # Generate backfill daily files for d in range(11, 15): - dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) - - # Check the when the merged file is not generated - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) - assert fn not in os.listdir(backfill_dir) - - # Generate the merged file, but not delete it - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=2) + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + today = datetime(2020, 7, 1) + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True, check_nd=4) + assert "Merging files" in caplog.text assert fn in os.listdir(backfill_dir) # Read daily file - new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet") - pdList = [] + new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*.parquet") + pdList = [] for file in new_files: - if "from" in file: - continue df = pd.read_parquet(file, engine='pyarrow') pdList.append(df) os.remove(file) @@ -85,13 +83,108 @@ def test_merge_backfill_file(self): assert len(new_files) == 1 expected = pd.concat(pdList).sort_values(["time_value", "fips"]) - + # Read the merged file merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow') - + assert set(expected.columns) == set(merged.columns) assert expected.shape[0] == merged.shape[0] assert expected.shape[1] == merged.shape[1] - - os.remove(backfill_dir + "/" + fn) + + self.cleanup() + + def test_merge_backfill_file_no_call(self, caplog): + fn = "claims_hosp_from_20200611_to_20200614.parquet" + caplog.set_level(logging.INFO) + logger = get_structured_logger() + today = datetime(2020, 6, 20) + + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + # Check when there is no daily file to merge. + merge_backfill_file(backfill_dir, today.weekday() + 1, today, logger, + test_mode=True, check_nd=8) assert fn not in os.listdir(backfill_dir) + assert "No new files to merge; skipping merging" in caplog.text + + today = datetime(2020, 7, 1) + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True) + assert "Not enough days, skipping merging" in caplog.text + self.cleanup() + + @pytest.mark.parametrize("issue_date", + [datetime(year=2020, month=6, day=13), + datetime(year=2020, month=6, day=11), + datetime(year=2020, month=6, day=14)]) + def test_merge_existing_backfill_files(self, caplog, monkeypatch, issue_date): + issue_date_str = issue_date.strftime("%Y%m%d") + caplog.set_level(logging.INFO) + logger = get_structured_logger() + def prep_backfill_data(start, end, issue_date_str): + # Generate backfill daily files + for d in range(start, end): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) + today = datetime(2020, 7, 1) + # creating expected file + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True, check_nd=4) + original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" + os.rename(original, f"{backfill_dir}/expected.parquet") + + # creating backfill without issue date + os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 3)) + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True, check_nd=3) + + old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + for file in old_files: + os.remove(file) + + prep_backfill_data(11, 15, issue_date_str) + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + + assert "Adding missing date to merged file" in caplog.text + + expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet") + + check = pd.concat([merged, expected]).drop_duplicates(keep=False) + + assert len(check) == 0 + + self.cleanup() + + + def test_merge_existing_backfill_files_no_call(self, caplog): + issue_date = datetime(year=2020, month=5, day=20) + caplog.set_level(logging.INFO) + logger = get_structured_logger() + def prep_backfill_data(): + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + today = datetime(2020, 6, 14) + # creating expected file + merge_backfill_file(backfill_dir, 28, today, logger, + test_mode=True) + + prep_backfill_data() + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + assert "Issue date has no matching merged files" in caplog.text + self.cleanup() + + + + diff --git a/claims_hosp/tests/test_download_claims_ftp_files.py b/claims_hosp/tests/test_download_claims_ftp_files.py index 3cde21ee5..aca51b095 100644 --- a/claims_hosp/tests/test_download_claims_ftp_files.py +++ b/claims_hosp/tests/test_download_claims_ftp_files.py @@ -1,19 +1,35 @@ # standard import datetime import re +from unittest.mock import MagicMock, patch +import logging -# third party -import numpy as np - -# first party from delphi_claims_hosp.download_claims_ftp_files import (change_date_format, - get_timestamp) + get_timestamp, download) OLD_FILENAME_TIMESTAMP = re.compile( r".*EDI_AGG_INPATIENT_[0-9]_(?P[0-9]*)_(?P[0-9]*)[^0-9]*") NEW_FILENAME_TIMESTAMP = re.compile(r".*EDI_AGG_INPATIENT_(?P[0-9]*)_(?P[0-9]*)[^0-9]*") +TEST_LOGGER = logging.getLogger() class TestDownloadClaimsFtpFiles: + + @patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') + @patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False) + def test_download(self, mock_exists, mock_sshclient): + mock_sshclient_instance = MagicMock() + mock_sshclient.return_value = mock_sshclient_instance + mock_sftp = MagicMock() + mock_sshclient_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")] + ftp_credentials = {"host": "test_host", "user": "test_user", "pass": "test_pass", "port": "test_port"} + out_path = "./test_data/" + + issue_date = datetime.datetime(2020, 11, 7) + download(ftp_credentials, out_path, TEST_LOGGER, issue_date=issue_date) + mock_sshclient_instance.connect.assert_called_once_with(ftp_credentials["host"], username=ftp_credentials["user"], password=ftp_credentials["pass"], port=ftp_credentials["port"]) + mock_sftp.get.assert_called() + def test_change_date_format(self): name = "SYNEDI_AGG_INPATIENT_20200611_1451CDT" diff --git a/claims_hosp/tests/test_patch.py b/claims_hosp/tests/test_patch.py new file mode 100644 index 000000000..b11d3a087 --- /dev/null +++ b/claims_hosp/tests/test_patch.py @@ -0,0 +1,49 @@ +import unittest +from datetime import datetime +from pathlib import Path +from unittest.mock import patch as mock_patch, MagicMock +import os +import shutil + +from delphi_claims_hosp.download_claims_ftp_files import change_date_format + +from conftest import TEST_DIR + +from delphi_claims_hosp.patch import patch +from delphi_claims_hosp.backfill import merge_existing_backfill_files, merge_backfill_file + +class TestPatchModule: + + def test_patch(self, params_w_patch): + with mock_patch('delphi_claims_hosp.patch.get_structured_logger'), \ + mock_patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ + mock_patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') as mock_ssh_client, \ + mock_patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False), \ + mock_patch('delphi_claims_hosp.run.merge_existing_backfill_files') as mock_patch_backfill, \ + mock_patch('delphi_claims_hosp.run.merge_backfill_file') as mock_backfill: + mock_ssh_client_instance = MagicMock() + mock_ssh_client.return_value = mock_ssh_client_instance + mock_sftp = MagicMock() + mock_ssh_client_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename=change_date_format("SYNEDI_AGG_INPATIENT_06112020_1451CDT.csv.gz"))] + def mock_get(*args, **kwargs): + file = change_date_format(args[0]) + src = Path(f"{TEST_DIR}/test_data/{file}") + dst = Path(f"{TEST_DIR}/receiving/{file}") + shutil.copyfile(src, dst) + mock_sftp.get.side_effect = mock_get + + mock_read_params.return_value = params_w_patch + mock_patch_backfill.side_effect = merge_existing_backfill_files + mock_backfill.side_effect = merge_backfill_file + + patch() + + assert mock_patch_backfill.call_count == 1 + assert mock_backfill.call_count == 0 + + issue_date = params_w_patch["patch"]["start_issue"].replace("-", "") + assert os.path.isdir(f'{TEST_DIR}/patch_dir/issue_{issue_date}/hospital-admissions') + + # Clean up the created directories after the test + shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"]) \ No newline at end of file diff --git a/claims_hosp/tests/test_run.py b/claims_hosp/tests/test_run.py new file mode 100644 index 000000000..651bd78ad --- /dev/null +++ b/claims_hosp/tests/test_run.py @@ -0,0 +1,43 @@ +from pathlib import Path +from unittest.mock import patch, MagicMock +import shutil + +import pytest +from delphi_claims_hosp.download_claims_ftp_files import change_date_format + +from delphi_claims_hosp.run import run_module +from delphi_claims_hosp.backfill import merge_existing_backfill_files, merge_backfill_file +from freezegun import freeze_time + +from conftest import TEST_DIR +class TestRun: + @freeze_time("2020-06-11 20:00:00") + def test_output_files(self, params): + with patch('delphi_claims_hosp.patch.get_structured_logger'), \ + patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') as mock_ssh_client, \ + patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False), \ + patch('delphi_claims_hosp.run.merge_existing_backfill_files') as mock_patch_backfill, \ + patch('delphi_claims_hosp.run.merge_backfill_file') as mock_backfill: + + mock_ssh_client_instance = MagicMock() + mock_ssh_client.return_value = mock_ssh_client_instance + mock_sftp = MagicMock() + mock_ssh_client_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_20200611_1451CDT.csv.gz")] + def mock_get(*args, **kwargs): + file = change_date_format(args[0]) + src = Path(f"{TEST_DIR}/test_data/{file}") + dst = Path(f"{TEST_DIR}/receiving/{file}") + shutil.copyfile(src, dst) + mock_sftp.get.side_effect = mock_get + + mock_patch_backfill.side_effect = merge_existing_backfill_files + mock_backfill.side_effect = merge_backfill_file + + run_module(params) + + assert mock_patch_backfill.call_count == 0 + assert mock_backfill.call_count == 1 + + # Clean up the created directories after the test + shutil.rmtree(params["common"]["export_dir"])