diff --git a/doctor_visits/README.md b/doctor_visits/README.md index ba2acf43e..9a9ac07b5 100644 --- a/doctor_visits/README.md +++ b/doctor_visits/README.md @@ -53,3 +53,9 @@ The output will show the number of unit tests that passed and failed, along with the percentage of code covered by the tests. None of the tests should fail and the code lines that are not covered by unit tests should be small and should not include critical sub-routines. + +## Running Patches: +To get data issued during specific date range, output in batch issue format, adjust `params.json` in accordance with `patch.py`, then run +``` +env/bin/python -m delphi_doctor_visits.patch +``` diff --git a/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py b/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py index 002d4a7c9..efd110d8b 100644 --- a/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py +++ b/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py @@ -51,9 +51,13 @@ def change_date_format(name): name = '_'.join(split_name) return name -def download(ftp_credentials, out_path, logger): +def download(ftp_credentials, out_path, logger, issue_date=None): """Pull the latest raw files.""" - current_time = datetime.datetime.now() + if not issue_date: + current_time = datetime.datetime.now() + else: + current_time = datetime.datetime.strptime(issue_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) + logger.info("starting download", time=current_time) seconds_in_day = 24 * 60 * 60 diff --git a/doctor_visits/delphi_doctor_visits/get_latest_claims_name.py b/doctor_visits/delphi_doctor_visits/get_latest_claims_name.py index e417183c7..0a86d532f 100644 --- a/doctor_visits/delphi_doctor_visits/get_latest_claims_name.py +++ b/doctor_visits/delphi_doctor_visits/get_latest_claims_name.py @@ -5,9 +5,12 @@ import datetime from pathlib import Path -def get_latest_filename(dir_path, logger): +def get_latest_filename(dir_path, logger, issue_date=None): """Get the latest filename from the list of downloaded raw files.""" - current_date = datetime.datetime.now() + if issue_date: + current_date = datetime.datetime.strptime(issue_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) + else: + current_date = datetime.datetime.now() files = list(Path(dir_path).glob("*")) latest_timestamp = datetime.datetime(1900, 1, 1) @@ -24,7 +27,7 @@ def get_latest_filename(dir_path, logger): latest_timestamp = timestamp latest_filename = file - assert current_date.date() == latest_timestamp.date(), "no drop for today" + assert current_date.date() == latest_timestamp.date(), f"no drop for {current_date}" logger.info("Latest claims file", filename=latest_filename) diff --git a/doctor_visits/delphi_doctor_visits/patch.py b/doctor_visits/delphi_doctor_visits/patch.py new file mode 100644 index 000000000..32b6d308f --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/patch.py @@ -0,0 +1,71 @@ +""" +This module is used for patching data in the delphi_doctor_visits package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/doctor_visits/AprilPatch", + "start_issue": "2024-04-20", + "end_issue": "2024-04-21" + } +} + +It will generate data for that range of issue dates, and store them in batch issue format: +[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv +""" + +from datetime import datetime, timedelta +from os import makedirs + +from delphi_utils import get_structured_logger, read_params + +from .run import run_module + + +def patch(): + """ + Run the doctor visits indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "start_date": str, YYYY-MM-DD format, first issue date + - "end_date": str, YYYY-MM-DD format, last issue date + - "patch_dir": str, directory to write all issues output + """ + params = read_params() + logger = get_structured_logger("delphi_doctor_visits.patch", filename=params["common"]["log_filename"]) + + start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") + end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") + + logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") + logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""") + logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + + makedirs(params["patch"]["patch_dir"], exist_ok=True) + + current_issue = start_issue + + while current_issue <= end_issue: + logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""") + + params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") + + current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/doctor-visits""" + makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + + run_module(params, logger) + current_issue += timedelta(days=1) + + +if __name__ == "__main__": + patch() diff --git a/doctor_visits/delphi_doctor_visits/run.py b/doctor_visits/delphi_doctor_visits/run.py index fd09c56d6..3c941534a 100644 --- a/doctor_visits/delphi_doctor_visits/run.py +++ b/doctor_visits/delphi_doctor_visits/run.py @@ -20,7 +20,7 @@ from .get_latest_claims_name import get_latest_filename -def run_module(params): # pylint: disable=too-many-statements +def run_module(params, logger=None): # pylint: disable=too-many-statements """ Run doctor visits indicator. @@ -42,18 +42,26 @@ def run_module(params): # pylint: disable=too-many-statements - "se": bool, whether to write out standard errors - "obfuscated_prefix": str, prefix for signal name if write_se is True. - "parallel": bool, whether to update sensor in parallel. + - "patch": Only used for patching data, remove if not patching. + Check out patch.py and README for more details on how to run patches. + - "start_date": str, YYYY-MM-DD format, first issue date + - "end_date": str, YYYY-MM-DD format, last issue date + - "patch_dir": str, directory to write all issues output """ start_time = time.time() - logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + issue_date = params.get("patch", {}).get("current_issue", None) + if not logger: + logger = get_structured_logger( + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) # pull latest data - download(params["indicator"]["ftp_credentials"], - params["indicator"]["input_dir"], logger) + download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date) # find the latest files (these have timestamps) - claims_file = get_latest_filename(params["indicator"]["input_dir"], logger) + claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date) # modify data modify_and_write(claims_file, logger) diff --git a/doctor_visits/tests/test_download.py b/doctor_visits/tests/test_download.py new file mode 100644 index 000000000..dc94e534c --- /dev/null +++ b/doctor_visits/tests/test_download.py @@ -0,0 +1,28 @@ +import unittest +from unittest.mock import patch, MagicMock +from delphi_doctor_visits.download_claims_ftp_files import download + +class TestDownload(unittest.TestCase): + @patch('delphi_doctor_visits.download_claims_ftp_files.paramiko.SSHClient') + @patch('delphi_doctor_visits.download_claims_ftp_files.path.exists', return_value=False) + def test_download(self, mock_exists, mock_sshclient): + mock_sshclient_instance = MagicMock() + mock_sshclient.return_value = mock_sshclient_instance + mock_sftp = MagicMock() + mock_sshclient_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_OUTPATIENT_20200207_1455CDT.csv.gz")] + ftp_credentials = {"host": "test_host", "user": "test_user", "pass": "test_pass", "port": "test_port"} + out_path = "./test_data/" + logger = MagicMock() + + #case 1: download with issue_date that does not exist on ftp server + download(ftp_credentials, out_path, logger, issue_date="2020-02-08") + mock_sshclient_instance.connect.assert_called_once_with(ftp_credentials["host"], username=ftp_credentials["user"], password=ftp_credentials["pass"], port=ftp_credentials["port"]) + mock_sftp.get.assert_not_called() + + # case 2: download with issue_date that exists on ftp server + download(ftp_credentials, out_path, logger, issue_date="2020-02-07") + mock_sftp.get.assert_called() + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/doctor_visits/tests/test_get_latest_claims_name.py b/doctor_visits/tests/test_get_latest_claims_name.py index 98bd19e2d..d1003ad47 100644 --- a/doctor_visits/tests/test_get_latest_claims_name.py +++ b/doctor_visits/tests/test_get_latest_claims_name.py @@ -11,9 +11,12 @@ class TestGetLatestFileName: logger = Mock() - + dir_path = "test_data" + def test_get_latest_claims_name(self): - dir_path = "./test_data/" - with pytest.raises(AssertionError): - get_latest_filename(dir_path, self.logger) + get_latest_filename(self.dir_path, self.logger) + + def test_get_latest_claims_name_with_issue_date(self): + result = get_latest_filename(self.dir_path, self.logger, issue_date="2020-02-07") + assert str(result) == f"{self.dir_path}/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz" diff --git a/doctor_visits/tests/test_patch.py b/doctor_visits/tests/test_patch.py new file mode 100644 index 000000000..5b4575a09 --- /dev/null +++ b/doctor_visits/tests/test_patch.py @@ -0,0 +1,37 @@ +import unittest +from unittest.mock import patch as mock_patch, call +from delphi_doctor_visits.patch import patch +import os +import shutil + +class TestPatchModule(unittest.TestCase): + def test_patch(self): + with mock_patch('delphi_doctor_visits.patch.run_module') as mock_run_module, \ + mock_patch('delphi_doctor_visits.patch.get_structured_logger') as mock_get_structured_logger, \ + mock_patch('delphi_doctor_visits.patch.read_params') as mock_read_params: + + mock_read_params.return_value = { + "common": { + "log_filename": "test.log" + }, + "patch": { + "start_issue": "2021-01-01", + "end_issue": "2021-01-02", + "patch_dir": "./patch_dir" + } + } + + patch() + + self.assertIn('current_issue', mock_read_params.return_value['patch']) + self.assertEqual(mock_read_params.return_value['patch']['current_issue'], '2021-01-02') + + self.assertTrue(os.path.isdir('./patch_dir')) + self.assertTrue(os.path.isdir('./patch_dir/issue_20210101/doctor-visits')) + self.assertTrue(os.path.isdir('./patch_dir/issue_20210102/doctor-visits')) + + # Clean up the created directories after the test + shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"]) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file