From 23e105540495465326e52f9fa54ff361a9560308 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 22 Aug 2024 15:24:35 -0400 Subject: [PATCH 01/28] implement --- claims_hosp/delphi_claims_hosp/patch.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 claims_hosp/delphi_claims_hosp/patch.py diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py new file mode 100644 index 000000000..e69de29bb From 39c433e27b2db7de63683bfb3c6208a850d127bd Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 22 Aug 2024 15:24:48 -0400 Subject: [PATCH 02/28] implement --- claims_hosp/delphi_claims_hosp/patch.py | 71 +++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index e69de29bb..6fd6e7390 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -0,0 +1,71 @@ +""" +This module is used for patching data in the delphi_doctor_visits package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/covidcast-indicators/doctor_visits/AprilPatch", + "start_issue": "2024-04-20", + "end_issue": "2024-04-21" + } +} + +It will generate data for that range of issue dates, and store them in batch issue format: +[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv +""" + +from datetime import datetime, timedelta +from os import makedirs + +from delphi_utils import get_structured_logger, read_params + +from .run import run_module + + +def patch(): + """ + Run the doctor visits indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "start_date": str, YYYY-MM-DD format, first issue date + - "end_date": str, YYYY-MM-DD format, last issue date + - "patch_dir": str, directory to write all issues output + """ + params = read_params() + logger = get_structured_logger("delphi_claims_hosp.patch", filename=params["common"]["log_filename"]) + + start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") + end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") + + logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") + logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""") + logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + + makedirs(params["patch"]["patch_dir"], exist_ok=True) + + current_issue = start_issue + + while current_issue <= end_issue: + logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""") + + params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") + + current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/doctor-visits""" + makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + + run_module(params, logger) + current_issue += timedelta(days=1) + + +if __name__ == "__main__": + patch() From cbc7894bcf4d8657086545a5972fc6ad2fac5030 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 22 Aug 2024 15:57:46 -0400 Subject: [PATCH 03/28] implimentation done --- .../download_claims_ftp_files.py | 5 +++-- .../get_latest_claims_name.py | 4 ++-- claims_hosp/delphi_claims_hosp/patch.py | 11 +++++++---- claims_hosp/delphi_claims_hosp/run.py | 17 +++++++++++------ 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py index 2ce093488..5c9019035 100644 --- a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py +++ b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py @@ -53,9 +53,10 @@ def change_date_format(name): return name -def download(ftp_credentials, out_path, logger): +def download(ftp_credentials, out_path, logger, issue_date=None): """Pull the latest raw files.""" - current_time = datetime.datetime.now() + current_time = issue_date if issue_date else datetime.datetime.now() + seconds_in_day = 24 * 60 * 60 logger.info("starting download", time=current_time) diff --git a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py index e417183c7..ab03cbd14 100644 --- a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py +++ b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py @@ -5,9 +5,9 @@ import datetime from pathlib import Path -def get_latest_filename(dir_path, logger): +def get_latest_filename(dir_path, logger, issue_date=None): """Get the latest filename from the list of downloaded raw files.""" - current_date = datetime.datetime.now() + current_date = issue_date if issue_date else datetime.datetime.now() files = list(Path(dir_path).glob("*")) latest_timestamp = datetime.datetime(1900, 1, 1) diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index 6fd6e7390..d3de10c33 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -45,16 +45,19 @@ def patch(): start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") - logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") - logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""") - logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=start_issue.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) makedirs(params["patch"]["patch_dir"], exist_ok=True) current_issue = start_issue while current_issue <= end_issue: - logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""") + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 53c4cd33b..39ea56d2f 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -23,7 +23,7 @@ from .backfill import (store_backfill_file, merge_backfill_file) -def run_module(params): +def run_module(params, logger=None): """ Generate updated claims-based hospitalization indicator values. @@ -54,19 +54,24 @@ def run_module(params): adjustments (False). """ start_time = time.time() - logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + issue_date_str = params.get("patch", {}).get("current_issue", None) + issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") + if not logger: + logger = get_structured_logger( + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) # pull latest data download(params["indicator"]["ftp_credentials"], - params["indicator"]["input_dir"], logger) + params["indicator"]["input_dir"], logger, issue_date=issue_date) # aggregate data modify_and_write(params["indicator"]["input_dir"], logger) # find the latest files (these have timestamps) - claims_file = get_latest_filename(params["indicator"]["input_dir"], logger) + claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date) # handle range of estimates to produce # filename expected to have format: EDI_AGG_INPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz From 5b9e69a04614eb13e953b087be7d0ebd8a0c285f Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 27 Aug 2024 09:52:52 -0400 Subject: [PATCH 04/28] fixed typo and conditional --- claims_hosp/delphi_claims_hosp/get_latest_claims_name.py | 4 ++-- claims_hosp/delphi_claims_hosp/patch.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py index ab03cbd14..eae5763ed 100644 --- a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py +++ b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py @@ -23,8 +23,8 @@ def get_latest_filename(dir_path, logger, issue_date=None): if timestamp <= current_date: latest_timestamp = timestamp latest_filename = file - - assert current_date.date() == latest_timestamp.date(), "no drop for today" + if issue_date is None: + assert current_date.date() == latest_timestamp.date(), "no drop for today" logger.info("Latest claims file", filename=latest_filename) diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index d3de10c33..104bd90aa 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -62,7 +62,7 @@ def patch(): params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") - current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/doctor-visits""" + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/hospital-admissions""" makedirs(f"{current_issue_dir}", exist_ok=True) params["common"]["export_dir"] = f"""{current_issue_dir}""" From 1bb84d8888c13068737cb4054d2910a989af3d6d Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 28 Aug 2024 16:24:27 -0400 Subject: [PATCH 05/28] feat: adding patching with backfill --- claims_hosp/delphi_claims_hosp/backfill.py | 64 ++++++++++++++-- claims_hosp/delphi_claims_hosp/run.py | 24 ++++-- claims_hosp/tests/test_backfill.py | 75 ++++++++++++++++++- .../tests/test_download_claims_ftp_files.py | 23 +++++- claims_hosp/tests/test_patch.py | 31 ++++++++ 5 files changed, 200 insertions(+), 17 deletions(-) create mode 100644 claims_hosp/tests/test_patch.py diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 495abd59b..75e6603fa 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -5,15 +5,18 @@ Created: 2022-08-03 """ -import os + import glob +import os +import re +import shutil from datetime import datetime +from typing import Union # third party import pandas as pd from delphi_utils import GeoMapper - from .config import Config gmpr = GeoMapper() @@ -69,9 +72,58 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") # Store intermediate file into the backfill folder backfilldata.to_parquet(path, index=False) + return path + + +def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): + """ + Merge existing backfill with the patch data included. + Parameters + ---------- + issue_date : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_file : str + + """ + + new_files = glob.glob(backfill_dir + "/claims_hosp_*") + + def get_file_with_date(files) -> Union[str, None]: + for filename in files: + pattern = re.findall(r"\d{8}", filename) + if len(pattern) == 2: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + end_date = datetime.strptime(pattern[1], "%Y%m%d") + if start_date <= issue_date or end_date <= issue_date: + return filename + return "" + + file_name = get_file_with_date(new_files) + + if len(file_name) == 0: + logger.info("patch file is too recent to merge", issue_date=issue_date.strftime("%Y-%m-%d")) + return + + # Start to merge files + merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" + try: + shutil.copyfile(file_name, merge_file) + existing_df = pd.read_parquet(merge_file, engine="pyarrow") + df = pd.read_parquet(backfill_file, engine="pyarrow") + merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"]) + merged_df.to_parquet(merge_file, index=False) + os.remove(file_name) + os.rename(merge_file, file_name) + # pylint: disable=W0703: + except Exception as e: + os.remove(merge_file) + logger.error(e) + return + -def merge_backfill_file(backfill_dir, backfill_merge_day, today, - test_mode=False, check_nd=25): +def merge_backfill_file(backfill_dir, backfill_merge_day, most_recent, test_mode=False, check_nd=25): """ Merge ~4 weeks' backfill data into one file. @@ -80,7 +132,7 @@ def merge_backfill_file(backfill_dir, backfill_merge_day, today, threshold to allow flexibility in data delivery. Parameters ---------- - today : datetime + most_recent : datetime The most recent date when the raw data is received backfill_dir : str specified path to store backfill files. @@ -109,7 +161,7 @@ def get_date(file_link): # Check whether to merge # Check the number of files that are not merged - if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + if most_recent.weekday() != backfill_merge_day or (most_recent - earliest_date).days <= check_nd: return # Start to merge files diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 39ea56d2f..b58aea934 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -5,22 +5,24 @@ when the module is run with `python -m delphi_claims_hosp`. """ +import os + # standard packages import time -import os from datetime import datetime, timedelta from pathlib import Path # third party from delphi_utils import get_structured_logger +from .backfill import merge_backfill_file, merge_existing_backfill_files, store_backfill_file + # first party from .config import Config from .download_claims_ftp_files import download -from .modify_claims_drops import modify_and_write from .get_latest_claims_name import get_latest_filename +from .modify_claims_drops import modify_and_write from .update_indicator import ClaimsHospIndicatorUpdater -from .backfill import (store_backfill_file, merge_backfill_file) def run_module(params, logger=None): @@ -56,6 +58,10 @@ def run_module(params, logger=None): start_time = time.time() issue_date_str = params.get("patch", {}).get("current_issue", None) issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") + # safety check for patch parameters exists in file, but not running custom runs/patches + custom_run_flag = ( + False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False) + ) if not logger: logger = get_structured_logger( __name__, @@ -64,8 +70,7 @@ def run_module(params, logger=None): ) # pull latest data - download(params["indicator"]["ftp_credentials"], - params["indicator"]["input_dir"], logger, issue_date=issue_date) + download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date) # aggregate data modify_and_write(params["indicator"]["input_dir"], logger) @@ -99,8 +104,13 @@ def run_module(params, logger=None): if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] backfill_merge_day = params["indicator"]["backfill_merge_day"] - merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) - store_backfill_file(claims_file, dropdate_dt, backfill_dir) + if custom_run_flag: + backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir) + merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) + + else: + merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) + store_backfill_file(claims_file, dropdate_dt, backfill_dir) # print out information logger.info("Loaded params", diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index fcd908461..e8bb5c257 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -1,6 +1,9 @@ +import logging import os import glob from datetime import datetime +from pathlib import Path +import shutil # third party import pandas as pd @@ -8,20 +11,22 @@ # first party from delphi_claims_hosp.config import Config, GeoConstants -from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file +from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files CONFIG = Config() CONSTANTS = GeoConstants() +TEST_PATH = Path(__file__).parent PARAMS = { "indicator": { - "input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", - "backfill_dir": "./backfill", + "input_file": f"{TEST_PATH}/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", + "backfill_dir": f"{TEST_PATH}/backfill", "drop_date": "2020-06-11", } } DATA_FILEPATH = PARAMS["indicator"]["input_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) backfill_dir = PARAMS["indicator"]["backfill_dir"] +TEST_LOGGER = logging.getLogger() class TestBackfill: @@ -95,3 +100,67 @@ def test_merge_backfill_file(self): os.remove(backfill_dir + "/" + fn) assert fn not in os.listdir(backfill_dir) + + def test_merge_existing_backfill_files(self): + issue_date = datetime(year=2020, month=6, day=13) + issue_date_str = issue_date.strftime("%Y%m%d") + def prep_backfill_data(): + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + + today = datetime(2020, 6, 14) + # creating expected file + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=2) + original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" + os.rename(original, f"{backfill_dir}/expected.parquet") + + # creating backfill without issue date + os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") + today = datetime(2020, 6, 14) + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=2) + + old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + for file in old_files: + os.remove(file) + + prep_backfill_data() + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + + expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet") + + check_diff = expected.merge(merged, how='left', indicator=True) + assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0] + for file in glob.glob(backfill_dir + "/*.parquet"): + os.remove(file) + + + def test_merge_existing_backfill_files_no_call(self): + issue_date = datetime(year=2020, month=6, day=20) + issue_date_str = issue_date.strftime("%Y%m%d") + def prep_backfill_data(): + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + + today = datetime(2020, 6, 14) + # creating expected file + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=8) + + prep_backfill_data() + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + + old_files = glob.glob(backfill_dir + "*.parquet") + for file in old_files: + os.remove(file) + + + diff --git a/claims_hosp/tests/test_download_claims_ftp_files.py b/claims_hosp/tests/test_download_claims_ftp_files.py index 3cde21ee5..dc60299bf 100644 --- a/claims_hosp/tests/test_download_claims_ftp_files.py +++ b/claims_hosp/tests/test_download_claims_ftp_files.py @@ -1,19 +1,40 @@ # standard import datetime import re +from mock import MagicMock, patch +import logging # third party import numpy as np +from freezegun import freeze_time # first party from delphi_claims_hosp.download_claims_ftp_files import (change_date_format, - get_timestamp) + get_timestamp, download) OLD_FILENAME_TIMESTAMP = re.compile( r".*EDI_AGG_INPATIENT_[0-9]_(?P[0-9]*)_(?P[0-9]*)[^0-9]*") NEW_FILENAME_TIMESTAMP = re.compile(r".*EDI_AGG_INPATIENT_(?P[0-9]*)_(?P[0-9]*)[^0-9]*") +TEST_LOGGER = logging.getLogger() class TestDownloadClaimsFtpFiles: + + @patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') + @patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False) + def test_download(self, mock_exists, mock_sshclient): + mock_sshclient_instance = MagicMock() + mock_sshclient.return_value = mock_sshclient_instance + mock_sftp = MagicMock() + mock_sshclient_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")] + ftp_credentials = {"host": "test_host", "user": "test_user", "pass": "test_pass", "port": "test_port"} + out_path = "./test_data/" + + issue_date = datetime.datetime(2020, 11, 7) + download(ftp_credentials, out_path, TEST_LOGGER, issue_date=issue_date) + mock_sshclient_instance.connect.assert_called_once_with(ftp_credentials["host"], username=ftp_credentials["user"], password=ftp_credentials["pass"], port=ftp_credentials["port"]) + mock_sftp.get.assert_called() + def test_change_date_format(self): name = "SYNEDI_AGG_INPATIENT_20200611_1451CDT" diff --git a/claims_hosp/tests/test_patch.py b/claims_hosp/tests/test_patch.py new file mode 100644 index 000000000..37688bda2 --- /dev/null +++ b/claims_hosp/tests/test_patch.py @@ -0,0 +1,31 @@ +import unittest +from unittest.mock import patch as mock_patch +from delphi_claims_hosp.patch import patch +import os +import shutil + +class TestPatchModule: + def test_patch(self): + with mock_patch('delphi_claims_hosp.patch.get_structured_logger') as mock_get_structured_logger, \ + mock_patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ + mock_patch('delphi_claims_hosp.patch.run_module') as mock_run_module: + + mock_read_params.return_value = { + "common": { + "log_filename": "test.log" + }, + "patch": { + "start_issue": "2021-01-01", + "end_issue": "2021-01-02", + "patch_dir": "./patch_dir" + } + } + + patch() + + assert os.path.isdir('./patch_dir') + assert os.path.isdir('./patch_dir/issue_20210101/hospital-admissions') + assert os.path.isdir('./patch_dir/issue_20210102/hospital-admissions') + + # Clean up the created directories after the test + shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"]) \ No newline at end of file From 3d5701cd6de30812bf7d603d32a37c6fbb3ecf3b Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 30 Aug 2024 12:54:30 -0400 Subject: [PATCH 06/28] lint --- claims_hosp/delphi_claims_hosp/backfill.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 75e6603fa..91c812091 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -78,6 +78,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): """ Merge existing backfill with the patch data included. + Parameters ---------- issue_date : datetime @@ -85,9 +86,7 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge backfill_dir : str specified path to store backfill files. backfill_file : str - """ - new_files = glob.glob(backfill_dir + "/claims_hosp_*") def get_file_with_date(files) -> Union[str, None]: From 230086ab41f7bc126765d66be29af79c92f5a309 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 6 Sep 2024 10:18:30 -0400 Subject: [PATCH 07/28] suggested changes --- claims_hosp/delphi_claims_hosp/backfill.py | 3 +++ claims_hosp/delphi_claims_hosp/patch.py | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 91c812091..b8a9b05d4 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -79,6 +79,9 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge """ Merge existing backfill with the patch data included. + When the indicator fails for some reason or another, there's a gap in the backfill files. + The patch to fill in the missing dates happens later down the line when the backfill files are already merged. + This function takes the merged files with the missing date, insert the particular date, and merge back the file. Parameters ---------- issue_date : datetime diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index 104bd90aa..b812bce24 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -1,17 +1,18 @@ """ -This module is used for patching data in the delphi_doctor_visits package. +This module is used for patching data in the delphi_claims_hosp package. To use this module, you need to specify the range of issue dates in params.json, like so: { "common": { + "custom_flag" : true, ... }, "validation": { ... }, "patch": { - "patch_dir": "/covidcast-indicators/doctor_visits/AprilPatch", + "patch_dir": "/covidcast-indicators/hopspital-admissions/patch", "start_issue": "2024-04-20", "end_issue": "2024-04-21" } @@ -31,7 +32,7 @@ def patch(): """ - Run the doctor visits indicator for a range of issue dates. + Run the hospital-admissions indicator for a range of issue dates. The range of issue dates is specified in params.json using the following keys: - "patch": Only used for patching data From 7bd15be93d85de222beed94042207d3515f17d7a Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 6 Sep 2024 10:21:13 -0400 Subject: [PATCH 08/28] suggested changes --- claims_hosp/delphi_claims_hosp/backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index b8a9b05d4..98bb8cfb6 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -77,7 +77,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): """ - Merge existing backfill with the patch data included. + Merge existing backfill with the patch data included. This function is specifically run for patching. When the indicator fails for some reason or another, there's a gap in the backfill files. The patch to fill in the missing dates happens later down the line when the backfill files are already merged. From b71dd820e7cd6623363ec5d122fc0c9c39884f3a Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 15 Oct 2024 10:55:11 -0400 Subject: [PATCH 09/28] making backfill to monthly in progess --- changehc/delphi_changehc/patch.py | 0 claims_hosp/delphi_claims_hosp/backfill.py | 40 ++++++++-------------- claims_hosp/delphi_claims_hosp/run.py | 2 +- claims_hosp/tests/test_backfill.py | 35 +++++++------------ 4 files changed, 28 insertions(+), 49 deletions(-) create mode 100644 changehc/delphi_changehc/patch.py diff --git a/changehc/delphi_changehc/patch.py b/changehc/delphi_changehc/patch.py new file mode 100644 index 000000000..e69de29bb diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 98bb8cfb6..ff1bb6877 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -10,11 +10,12 @@ import os import re import shutil -from datetime import datetime +from datetime import datetime, timedelta from typing import Union # third party import pandas as pd +import pytz from delphi_utils import GeoMapper from .config import Config @@ -94,11 +95,12 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge def get_file_with_date(files) -> Union[str, None]: for filename in files: - pattern = re.findall(r"\d{8}", filename) - if len(pattern) == 2: - start_date = datetime.strptime(pattern[0], "%Y%m%d") - end_date = datetime.strptime(pattern[1], "%Y%m%d") - if start_date <= issue_date or end_date <= issue_date: + pattern = re.findall(r"\d{6}", filename) + if len(pattern) == 1: + file_month = datetime.strptime(pattern[0], "%Y%m") + start_date = file_month.replace(day=1) + end_date = (start_date + timedelta(days=32)).replace(day=1) + if issue_date >= start_date and issue_date < end_date: return filename return "" @@ -125,27 +127,17 @@ def get_file_with_date(files) -> Union[str, None]: return -def merge_backfill_file(backfill_dir, backfill_merge_day, most_recent, test_mode=False, check_nd=25): +def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False): """ - Merge ~4 weeks' backfill data into one file. + Merge a month's source data into one file. - Usually this function should merge 28 days' data into a new file so as to - save the reading time when running the backfill pipelines. We set a softer - threshold to allow flexibility in data delivery. Parameters ---------- most_recent : datetime The most recent date when the raw data is received backfill_dir : str specified path to store backfill files. - backfill_merge_day: int - The day of a week that we used to merge the backfill files. e.g. 0 - is Monday. test_mode: bool - check_nd: int - The criteria of the number of unmerged files. Ideally, we want the - number to be 28, but we use a looser criteria from practical - considerations """ new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") if len(new_files) == 0: # if no any daily file is stored @@ -158,23 +150,19 @@ def get_date(file_link): return datetime.strptime(fn, "%Y%m%d") date_list = list(map(get_date, new_files)) - earliest_date = min(date_list) latest_date = max(date_list) - - # Check whether to merge - # Check the number of files that are not merged - if most_recent.weekday() != backfill_merge_day or (most_recent - earliest_date).days <= check_nd: + if latest_date.month == most_recent.month: + logger.info("Not a new month; skipping merging") return + # Start to merge files pdList = [] for fn in new_files: df = pd.read_parquet(fn, engine='pyarrow') pdList.append(df) merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) - path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%( - datetime.strftime(earliest_date, "%Y%m%d"), - datetime.strftime(latest_date, "%Y%m%d")) + path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet" merged_file.to_parquet(path, index=False) # Delete daily files once we have the merged one. diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index b58aea934..fe8d76fda 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -109,7 +109,7 @@ def run_module(params, logger=None): merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) else: - merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) + merge_backfill_file(backfill_dir, datetime.today()) store_backfill_file(claims_file, dropdate_dt, backfill_dir) # print out information diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index e8bb5c257..ec0de4e5c 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -3,7 +3,6 @@ import glob from datetime import datetime from pathlib import Path -import shutil # third party import pandas as pd @@ -49,16 +48,13 @@ def test_store_backfill_file(self): assert fn not in os.listdir(backfill_dir) def test_merge_backfill_file(self): - - today = datetime.today() - - fn = "claims_hosp_from_20200611_to_20200614.parquet" + fn = "claims_hosp_202006.parquet" assert fn not in os.listdir(backfill_dir) # Check when there is no daily file to merge. today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) assert fn not in os.listdir(backfill_dir) # Generate backfill daily files @@ -66,15 +62,10 @@ def test_merge_backfill_file(self): dropdate = datetime(2020, 6, d) store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) - # Check the when the merged file is not generated - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) - assert fn not in os.listdir(backfill_dir) - - # Generate the merged file, but not delete it - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=2) + # Check when the merged file is not generated + today = datetime(2020, 7, 1) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) assert fn in os.listdir(backfill_dir) # Read daily file @@ -112,15 +103,15 @@ def prep_backfill_data(): today = datetime(2020, 6, 14) # creating expected file - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=2) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" os.rename(original, f"{backfill_dir}/expected.parquet") # creating backfill without issue date os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, + merge_backfill_file(backfill_dir, today, test_mode=True, check_nd=2) old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") @@ -141,7 +132,7 @@ def prep_backfill_data(): def test_merge_existing_backfill_files_no_call(self): - issue_date = datetime(year=2020, month=6, day=20) + issue_date = datetime(year=2020, month=5, day=20) issue_date_str = issue_date.strftime("%Y%m%d") def prep_backfill_data(): # Generate backfill daily files @@ -151,8 +142,8 @@ def prep_backfill_data(): today = datetime(2020, 6, 14) # creating expected file - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) prep_backfill_data() file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) From eed2a638041de5f33b806b6ff44372352ecc8bf7 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 17 Oct 2024 11:28:10 -0400 Subject: [PATCH 10/28] adjusting logic to match new naming format and chunking --- claims_hosp/tests/test_backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index ec0de4e5c..661946436 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -69,7 +69,7 @@ def test_merge_backfill_file(self): assert fn in os.listdir(backfill_dir) # Read daily file - new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet") + new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*.parquet") pdList = [] for file in new_files: if "from" in file: From 65a06d8d89581b6307227bb0f9d994a4e0e640d2 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 21 Oct 2024 13:54:28 -0400 Subject: [PATCH 11/28] added logging and more clean up --- claims_hosp/delphi_claims_hosp/backfill.py | 37 ++++--- claims_hosp/tests/test_backfill.py | 122 ++++++++++++--------- 2 files changed, 92 insertions(+), 67 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index ff1bb6877..07db1827b 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -22,7 +22,7 @@ gmpr = GeoMapper() -def store_backfill_file(claims_filepath, _end_date, backfill_dir): +def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): """ Store county level backfill data into backfill_dir. @@ -57,6 +57,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date) & (~backfilldata["fips"].isnull()), selected_columns] + logger.info("Filtering backfill data", startdate=_start_date, enddate=_end_date) backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") @@ -69,10 +70,15 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): "state_id": "string" }) - path = backfill_dir + \ - "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = "claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + path = f"{backfill_dir}/{filename}" + # Store intermediate file into the backfill folder - backfilldata.to_parquet(path, index=False) + try: + backfilldata.to_parquet(path, index=False) + logger.info("Stored backfill data in parquet", filename=filename) + except: + logger.info("Failed to store backfill data in parquet", ) return path @@ -90,26 +96,29 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge backfill_dir : str specified path to store backfill files. backfill_file : str + specific file add to merged backfill file. """ new_files = glob.glob(backfill_dir + "/claims_hosp_*") def get_file_with_date(files) -> Union[str, None]: for filename in files: - pattern = re.findall(r"\d{6}", filename) - if len(pattern) == 1: - file_month = datetime.strptime(pattern[0], "%Y%m") - start_date = file_month.replace(day=1) - end_date = (start_date + timedelta(days=32)).replace(day=1) - if issue_date >= start_date and issue_date < end_date: + # need to only match files with 6 digits for merged files + pattern = re.findall(r"_(\d{6,6})\.parquet", filename) + if pattern: + file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) + end_date = (file_month + timedelta(days=32)).replace(day=1) + if issue_date >= file_month and issue_date < end_date: return filename return "" file_name = get_file_with_date(new_files) if len(file_name) == 0: - logger.info("patch file is too recent to merge", issue_date=issue_date.strftime("%Y-%m-%d")) + logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) return + logger.info("Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name) + # Start to merge files merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" try: @@ -139,8 +148,10 @@ def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False): specified path to store backfill files. test_mode: bool """ - new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + previous_month = (most_recent.replace(day=1) - timedelta(days=1)).strftime("%Y%m") + new_files = glob.glob(backfill_dir + f"/claims_hosp_as_of_{previous_month}*") if len(new_files) == 0: # if no any daily file is stored + logger.info("No new files to merge; skipping merging") return def get_date(file_link): @@ -155,7 +166,7 @@ def get_date(file_link): logger.info("Not a new month; skipping merging") return - + logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) # Start to merge files pdList = [] for fn in new_files: diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index 661946436..a72c3496e 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -9,6 +9,7 @@ import pytest # first party +from delphi_utils.logger import get_structured_logger from delphi_claims_hosp.config import Config, GeoConstants from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files @@ -25,55 +26,65 @@ DATA_FILEPATH = PARAMS["indicator"]["input_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) backfill_dir = PARAMS["indicator"]["backfill_dir"] -TEST_LOGGER = logging.getLogger() class TestBackfill: - def test_store_backfill_file(self): - dropdate = datetime(2020, 1, 1) + def cleanup(self): + for file in glob.glob(f"{backfill_dir}/*.parquet"): + os.remove(file) + + def test_store_backfill_file(self, caplog): + dropdate = datetime(2020, 1, 1) fn = "claims_hosp_as_of_20200101.parquet" - assert fn not in os.listdir(backfill_dir) - + caplog.set_level(logging.INFO) + logger = get_structured_logger() + num_rows = len(pd.read_csv(DATA_FILEPATH)) + # Store backfill file - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) assert fn in os.listdir(backfill_dir) + assert "Stored backfill data in parquet" in caplog.text + + fn = "claims_hosp_as_of_20200101.parquet" backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') - + selected_columns = ['time_value', 'fips', 'state_id', 'num', 'den', 'lag', 'issue_date'] - assert set(selected_columns) == set(backfill_df.columns) - - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) + + assert set(selected_columns) == set(backfill_df.columns) + assert num_rows == len(backfill_df) + + self.cleanup() - def test_merge_backfill_file(self): + def test_merge_backfill_file(self, caplog): fn = "claims_hosp_202006.parquet" - assert fn not in os.listdir(backfill_dir) - + caplog.set_level(logging.INFO) + logger = get_structured_logger() + # Check when there is no daily file to merge. today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) assert fn not in os.listdir(backfill_dir) - - # Generate backfill daily files + assert "No new files to merge; skipping merging" in caplog.text + + + # Generate backfill daily files for d in range(11, 15): - dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) - - # Check when the merged file is not generated + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + today = datetime(2020, 7, 1) - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) + assert "Merging files" in caplog.text assert fn in os.listdir(backfill_dir) # Read daily file new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*.parquet") - pdList = [] + pdList = [] for file in new_files: - if "from" in file: - continue df = pd.read_parquet(file, engine='pyarrow') pdList.append(df) os.remove(file) @@ -81,77 +92,80 @@ def test_merge_backfill_file(self): assert len(new_files) == 1 expected = pd.concat(pdList).sort_values(["time_value", "fips"]) - + # Read the merged file merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow') - + assert set(expected.columns) == set(merged.columns) assert expected.shape[0] == merged.shape[0] assert expected.shape[1] == merged.shape[1] - - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) - def test_merge_existing_backfill_files(self): + self.cleanup() + + def test_merge_existing_backfill_files(self, caplog): issue_date = datetime(year=2020, month=6, day=13) issue_date_str = issue_date.strftime("%Y%m%d") + caplog.set_level(logging.INFO) + logger = get_structured_logger() def prep_backfill_data(): # Generate backfill daily files for d in range(11, 15): dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) - today = datetime(2020, 6, 14) + today = datetime(2020, 7, 1) # creating expected file - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) - original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" + original = f"{backfill_dir}/claims_hosp_202006.parquet" os.rename(original, f"{backfill_dir}/expected.parquet") # creating backfill without issue date os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today, - test_mode=True, check_nd=2) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") for file in old_files: os.remove(file) prep_backfill_data() - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) - merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + + assert "Adding missing date to merged file" in caplog.text expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") - merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet") + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_202006.parquet") - check_diff = expected.merge(merged, how='left', indicator=True) - assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0] - for file in glob.glob(backfill_dir + "/*.parquet"): - os.remove(file) + check = pd.concat([merged, expected]).drop_duplicates(keep=False) + assert len(check) == 0 - def test_merge_existing_backfill_files_no_call(self): + self.cleanup() + + + def test_merge_existing_backfill_files_no_call(self, caplog): issue_date = datetime(year=2020, month=5, day=20) - issue_date_str = issue_date.strftime("%Y%m%d") + caplog.set_level(logging.INFO) + logger = get_structured_logger() def prep_backfill_data(): # Generate backfill daily files for d in range(11, 15): dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) today = datetime(2020, 6, 14) # creating expected file - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) prep_backfill_data() - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) - merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + assert "Issue date has no matching merged files" in caplog.text - old_files = glob.glob(backfill_dir + "*.parquet") - for file in old_files: - os.remove(file) + self.cleanup() From 6995c8ae87a2aff156d4f3ce89f5ecff7d518790 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 28 Oct 2024 11:51:56 -0400 Subject: [PATCH 12/28] added conditional for merging --- claims_hosp/delphi_claims_hosp/backfill.py | 12 +++++---- claims_hosp/tests/test_backfill.py | 29 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 07db1827b..b138767e2 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -12,6 +12,7 @@ import shutil from datetime import datetime, timedelta from typing import Union +import calendar # third party import pandas as pd @@ -57,7 +58,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date) & (~backfilldata["fips"].isnull()), selected_columns] - logger.info("Filtering backfill data", startdate=_start_date, enddate=_end_date) + logger.info("Filtering source data", startdate=_start_date, enddate=_end_date) backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") @@ -76,9 +77,9 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): # Store intermediate file into the backfill folder try: backfilldata.to_parquet(path, index=False) - logger.info("Stored backfill data in parquet", filename=filename) + logger.info("Stored source data in parquet", filename=filename) except: - logger.info("Failed to store backfill data in parquet", ) + logger.info("Failed to store source data in parquet") return path @@ -162,8 +163,9 @@ def get_date(file_link): date_list = list(map(get_date, new_files)) latest_date = max(date_list) - if latest_date.month == most_recent.month: - logger.info("Not a new month; skipping merging") + num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] + if len(date_list) < num_of_days_in_month: + logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index a72c3496e..acd95671b 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -1,3 +1,4 @@ +import calendar import logging import os import glob @@ -57,7 +58,7 @@ def test_store_backfill_file(self, caplog): self.cleanup() - def test_merge_backfill_file(self, caplog): + def test_merge_backfill_file(self, caplog, monkeypatch): fn = "claims_hosp_202006.parquet" caplog.set_level(logging.INFO) logger = get_structured_logger() @@ -76,6 +77,7 @@ def test_merge_backfill_file(self, caplog): store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) today = datetime(2020, 7, 1) + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) merge_backfill_file(backfill_dir, today, logger, test_mode=True) assert "Merging files" in caplog.text @@ -102,6 +104,29 @@ def test_merge_backfill_file(self, caplog): self.cleanup() + def test_merge_backfill_file_no_call(self, caplog): + fn = "claims_hosp_202006.parquet" + caplog.set_level(logging.INFO) + logger = get_structured_logger() + + # Check when there is no daily file to merge. + today = datetime(2020, 6, 14) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + assert fn not in os.listdir(backfill_dir) + assert "No new files to merge; skipping merging" in caplog.text + + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + today = datetime(2020, 7, 1) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + assert "Not enough days, skipping merging" in caplog.text + self.cleanup() + def test_merge_existing_backfill_files(self, caplog): issue_date = datetime(year=2020, month=6, day=13) issue_date_str = issue_date.strftime("%Y%m%d") @@ -164,8 +189,8 @@ def prep_backfill_data(): file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) assert "Issue date has no matching merged files" in caplog.text - self.cleanup() + From 1666e0ccf54e4a194698ef45c090017f9927a758 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 30 Oct 2024 12:30:23 -0400 Subject: [PATCH 13/28] lint --- claims_hosp/delphi_claims_hosp/backfill.py | 16 +++++++++------- claims_hosp/delphi_claims_hosp/run.py | 7 +++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index b138767e2..836338223 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -6,23 +6,23 @@ """ +import calendar import glob import os import re import shutil from datetime import datetime, timedelta from typing import Union -import calendar # third party import pandas as pd -import pytz from delphi_utils import GeoMapper from .config import Config gmpr = GeoMapper() + def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): """ Store county level backfill data into backfill_dir. @@ -71,14 +71,14 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): "state_id": "string" }) - filename = "claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = "claims_hosp_as_of_%s.parquet" % datetime.strftime(_end_date, "%Y%m%d") path = f"{backfill_dir}/{filename}" # Store intermediate file into the backfill folder try: backfilldata.to_parquet(path, index=False) logger.info("Stored source data in parquet", filename=filename) - except: + except: # pylint: disable=W0702 logger.info("Failed to store source data in parquet") return path @@ -108,7 +108,7 @@ def get_file_with_date(files) -> Union[str, None]: if pattern: file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) end_date = (file_month + timedelta(days=32)).replace(day=1) - if issue_date >= file_month and issue_date < end_date: + if file_month <= issue_date < end_date: return filename return "" @@ -118,7 +118,9 @@ def get_file_with_date(files) -> Union[str, None]: logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) return - logger.info("Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name) + logger.info( + "Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name + ) # Start to merge files merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" @@ -168,7 +170,7 @@ def get_date(file_link): logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return - logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) + logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1]) # Start to merge files pdList = [] for fn in new_files: diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index fe8d76fda..0d24192c0 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -103,14 +103,13 @@ def run_module(params, logger=None): # Store backfill data if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] - backfill_merge_day = params["indicator"]["backfill_merge_day"] if custom_run_flag: - backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir) + backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) else: - merge_backfill_file(backfill_dir, datetime.today()) - store_backfill_file(claims_file, dropdate_dt, backfill_dir) + merge_backfill_file(backfill_dir, datetime.today(), logger) + store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) # print out information logger.info("Loaded params", From 98d631a9a9e2a73eab1fbaccbc68dc823cd16261 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 30 Oct 2024 13:04:49 -0400 Subject: [PATCH 14/28] remove unrelated file --- changehc/delphi_changehc/patch.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 changehc/delphi_changehc/patch.py diff --git a/changehc/delphi_changehc/patch.py b/changehc/delphi_changehc/patch.py deleted file mode 100644 index e69de29bb..000000000 From 74bebe394724b85272ad8d5a2ae10192489e1f20 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 11:02:37 -0500 Subject: [PATCH 15/28] update libary for ci --- claims_hosp/tests/test_download_claims_ftp_files.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/claims_hosp/tests/test_download_claims_ftp_files.py b/claims_hosp/tests/test_download_claims_ftp_files.py index dc60299bf..aca51b095 100644 --- a/claims_hosp/tests/test_download_claims_ftp_files.py +++ b/claims_hosp/tests/test_download_claims_ftp_files.py @@ -1,14 +1,9 @@ # standard import datetime import re -from mock import MagicMock, patch +from unittest.mock import MagicMock, patch import logging -# third party -import numpy as np -from freezegun import freeze_time - -# first party from delphi_claims_hosp.download_claims_ftp_files import (change_date_format, get_timestamp, download) From 8be796eb34ba66a60d24ea0f1e172eed197edaea Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 11:38:36 -0500 Subject: [PATCH 16/28] fix test --- claims_hosp/delphi_claims_hosp/backfill.py | 2 +- claims_hosp/tests/test_backfill.py | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 836338223..fae2c120d 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -166,7 +166,7 @@ def get_date(file_link): date_list = list(map(get_date, new_files)) latest_date = max(date_list) num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] - if len(date_list) < num_of_days_in_month: + if len(date_list) < (num_of_days_in_month * .8) or most_recent == latest_date + timedelta(days=1): logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index acd95671b..7a98739bf 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -5,11 +5,8 @@ from datetime import datetime from pathlib import Path -# third party import pandas as pd -import pytest -# first party from delphi_utils.logger import get_structured_logger from delphi_claims_hosp.config import Config, GeoConstants from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files @@ -44,7 +41,7 @@ def test_store_backfill_file(self, caplog): # Store backfill file store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) assert fn in os.listdir(backfill_dir) - assert "Stored backfill data in parquet" in caplog.text + assert "Stored source data in parquet" in caplog.text fn = "claims_hosp_as_of_20200101.parquet" @@ -127,7 +124,7 @@ def test_merge_backfill_file_no_call(self, caplog): assert "Not enough days, skipping merging" in caplog.text self.cleanup() - def test_merge_existing_backfill_files(self, caplog): + def test_merge_existing_backfill_files(self, caplog, monkeypatch): issue_date = datetime(year=2020, month=6, day=13) issue_date_str = issue_date.strftime("%Y%m%d") caplog.set_level(logging.INFO) @@ -138,6 +135,7 @@ def prep_backfill_data(): dropdate = datetime(2020, 6, d) store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) today = datetime(2020, 7, 1) # creating expected file merge_backfill_file(backfill_dir, today, logger, @@ -147,6 +145,7 @@ def prep_backfill_data(): # creating backfill without issue date os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 3)) merge_backfill_file(backfill_dir, today, logger, test_mode=True) From 75a9e4af20111fb59f96b4aa77c0de69626c56d7 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 11:43:21 -0500 Subject: [PATCH 17/28] lint --- claims_hosp/delphi_claims_hosp/backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index fae2c120d..22cda224d 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -166,7 +166,7 @@ def get_date(file_link): date_list = list(map(get_date, new_files)) latest_date = max(date_list) num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] - if len(date_list) < (num_of_days_in_month * .8) or most_recent == latest_date + timedelta(days=1): + if len(date_list) < (num_of_days_in_month * 0.8) or most_recent == latest_date + timedelta(days=1): logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return From f1c11e5bf47f8316681fd84a9eeb9bb51e670816 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 8 Nov 2024 14:59:51 -0500 Subject: [PATCH 18/28] fixing logic --- claims_hosp/delphi_claims_hosp/backfill.py | 55 +++++++++++++--------- claims_hosp/delphi_claims_hosp/patch.py | 3 ++ claims_hosp/delphi_claims_hosp/run.py | 2 +- 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 22cda224d..78835c438 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -9,9 +9,11 @@ import calendar import glob import os +import pathlib import re import shutil from datetime import datetime, timedelta +from pathlib import Path from typing import Union # third party @@ -101,41 +103,47 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge """ new_files = glob.glob(backfill_dir + "/claims_hosp_*") - def get_file_with_date(files) -> Union[str, None]: - for filename in files: + def get_file_with_date(files) -> Union[pathlib.Path, None]: + for file_path in files: # need to only match files with 6 digits for merged files - pattern = re.findall(r"_(\d{6,6})\.parquet", filename) + pattern = re.findall(r"_(\d{6,6})\.parquet", file_path) if pattern: file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) end_date = (file_month + timedelta(days=32)).replace(day=1) if file_month <= issue_date < end_date: - return filename - return "" + return Path(file_path) + return - file_name = get_file_with_date(new_files) + file_path = get_file_with_date(new_files) - if len(file_name) == 0: + if not file_path: logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) return logger.info( - "Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name + "Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_path ) # Start to merge files - merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" + file_name = Path(file_path).name + merge_file = f"{file_path.parent}/{file_name}_after_merge.parquet" + try: - shutil.copyfile(file_name, merge_file) + shutil.copyfile(file_path, merge_file) existing_df = pd.read_parquet(merge_file, engine="pyarrow") df = pd.read_parquet(backfill_file, engine="pyarrow") merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"]) merged_df.to_parquet(merge_file, index=False) - os.remove(file_name) - os.rename(merge_file, file_name) + # pylint: disable=W0703: except Exception as e: + logger.info("Failed to merge existing backfill files", issue_date=issue_date.strftime("%Y-%m-%d"), msg=e) os.remove(merge_file) - logger.error(e) + os.remove(backfill_file) + return + + os.remove(file_path) + os.rename(merge_file, file_path) return @@ -163,22 +171,27 @@ def get_date(file_link): fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1] return datetime.strptime(fn, "%Y%m%d") - date_list = list(map(get_date, new_files)) + date_list = sorted(map(get_date, new_files)) latest_date = max(date_list) num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] - if len(date_list) < (num_of_days_in_month * 0.8) or most_recent == latest_date + timedelta(days=1): + if len(date_list) < (num_of_days_in_month * 0.8) and most_recent != latest_date + timedelta(days=1): logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1]) # Start to merge files pdList = [] - for fn in new_files: - df = pd.read_parquet(fn, engine='pyarrow') - pdList.append(df) - merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) - path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet" - merged_file.to_parquet(path, index=False) + try: + for fn in new_files: + df = pd.read_parquet(fn, engine='pyarrow') + pdList.append(df) + merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) + path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet" + merged_file.to_parquet(path, index=False) + + except Exception as e: + logger.info("Failed to merge backfill files", msg=e) + return # Delete daily files once we have the merged one. if not test_mode: diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index b812bce24..cc4ea4c29 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -56,6 +56,9 @@ def patch(): makedirs(params["patch"]["patch_dir"], exist_ok=True) current_issue = start_issue + if not params["common"]["custom_run"]: + logger.warning("Custom flag not set; setting it to true for patching") + params["common"]["custom_flag"] = True while current_issue <= end_issue: logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 3a8538bb2..9facdb346 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -60,7 +60,7 @@ def run_module(params, logger=None): issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") # safety check for patch parameters exists in file, but not running custom runs/patches custom_run_flag = ( - False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False) + False if not params["common"].get("custom_run", False) else params["common"]["custom_run"] ) if not logger: logger = get_structured_logger( From cf4e4bc6b18575c79f7800d59b6a04317f41d5b1 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 8 Nov 2024 15:09:12 -0500 Subject: [PATCH 19/28] lint --- claims_hosp/delphi_claims_hosp/backfill.py | 7 ++++--- claims_hosp/delphi_claims_hosp/modify_claims_drops.py | 2 +- claims_hosp/delphi_claims_hosp/run.py | 4 +--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 78835c438..c6d3a6ba7 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -112,7 +112,7 @@ def get_file_with_date(files) -> Union[pathlib.Path, None]: end_date = (file_month + timedelta(days=32)).replace(day=1) if file_month <= issue_date < end_date: return Path(file_path) - return + return None file_path = get_file_with_date(new_files) @@ -135,7 +135,7 @@ def get_file_with_date(files) -> Union[pathlib.Path, None]: merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"]) merged_df.to_parquet(merge_file, index=False) - # pylint: disable=W0703: + # pylint: disable=W0703 except Exception as e: logger.info("Failed to merge existing backfill files", issue_date=issue_date.strftime("%Y-%m-%d"), msg=e) os.remove(merge_file) @@ -183,12 +183,13 @@ def get_date(file_link): pdList = [] try: for fn in new_files: - df = pd.read_parquet(fn, engine='pyarrow') + df = pd.read_parquet(fn, engine="pyarrow") pdList.append(df) merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet" merged_file.to_parquet(path, index=False) + # pylint: disable=W0703 except Exception as e: logger.info("Failed to merge backfill files", msg=e) return diff --git a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py index 19a962884..1f7251a89 100644 --- a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py +++ b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py @@ -57,5 +57,5 @@ def modify_and_write(data_path, logger, test_mode=False): dfs_list.append(dfs) else: dfs.to_csv(out_path, index=False) - logger.info("Wrote modified csv", filename=out_path) + logger.info("Wrote modified csv", filename=str(out_path)) return files, dfs_list diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 9facdb346..195f51097 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -59,9 +59,7 @@ def run_module(params, logger=None): issue_date_str = params.get("patch", {}).get("current_issue", None) issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") # safety check for patch parameters exists in file, but not running custom runs/patches - custom_run_flag = ( - False if not params["common"].get("custom_run", False) else params["common"]["custom_run"] - ) + custom_run_flag = False if not params["common"].get("custom_run", False) else params["common"]["custom_run"] if not logger: logger = get_structured_logger( __name__, From 0498ed4a502bd20c257e64768b0513000ab92a31 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 8 Nov 2024 17:25:10 -0500 Subject: [PATCH 20/28] making test more robust in progress --- claims_hosp/tests/conftest.py | 87 ++++++++++++++++++++++++++ claims_hosp/tests/receiving/.gitignore | 0 claims_hosp/tests/test_patch.py | 51 +++++++++------ claims_hosp/tests/test_run.py | 13 ++++ 4 files changed, 131 insertions(+), 20 deletions(-) create mode 100644 claims_hosp/tests/conftest.py create mode 100644 claims_hosp/tests/receiving/.gitignore create mode 100644 claims_hosp/tests/test_run.py diff --git a/claims_hosp/tests/conftest.py b/claims_hosp/tests/conftest.py new file mode 100644 index 000000000..7250ce090 --- /dev/null +++ b/claims_hosp/tests/conftest.py @@ -0,0 +1,87 @@ +import logging +import shutil +from pathlib import Path +import re + +import copy +import pytest +import mock +import pandas as pd +from unittest.mock import MagicMock + +from os import listdir, remove, makedirs +from os.path import join, exists + +import delphi_claims_hosp + +TEST_DIR = Path(__file__).parent +@pytest.fixture(scope="session") +def params(): + params = { + "common": { + "export_dir": f"{TEST_DIR}/receiving", + "log_filename": f"{TEST_DIR}/test.log", + "custom_run": True, + }, + "indicator": { + "drop_date": None, + "generate_backfill_files": True, + "ftp_credentials": + {"host": "test_host", + "user": "test_user", + "pass": "test_pass", + "port": 2222 + }, + "write_se": False, + "obfuscated_prefix": "foo_obfuscated", + "parallel": False, + "geos": ["nation"], + "weekday": [True, False], + "backfill_dir": f"{TEST_DIR}/backfill", + "start_date": "2020-02-01", + "end_date": None, + "drop_date": None, + "n_backfill_days": 70, + "n_waiting_days": 3, + "input_dir": f"{TEST_DIR}/receiving", + }, + "validation": { + "common": { + "span_length": 14, + "min_expected_lag": {"all": "3"}, + "max_expected_lag": {"all": "4"}, + } + } + } + return copy.deepcopy(params) + +@pytest.fixture +def params_w_patch(params): + params_copy = copy.deepcopy(params) + params_copy["patch"] = { + "start_issue": "2020-11-07", + "end_issue": "2020-11-07", + "patch_dir": "./patch_dir" + } + return params_copy + +@pytest.fixture(scope="session") +def run_as_module(params): + with mock.patch('delphi_claims_hosp.patch.get_structured_logger') as mock_get_structured_logger, \ + mock.patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ + mock.patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') as mock_ssh_client, \ + mock.patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False) as mock_exists: + mock_ssh_client_instance = MagicMock() + mock_ssh_client.return_value = mock_ssh_client_instance + mock_sftp = MagicMock() + mock_ssh_client_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")] + def mock_get(*args, **kwargs): + src = Path(f"{TEST_DIR}/test_data/{args[0]}") + dst = Path(f"{TEST_DIR}/receiving/{args[0]}") + shutil.copyfile(src, dst) + + mock_sftp.get.side_effect = mock_get + mock_read_params.return_value = params + + delphi_claims_hosp.run.run_module(params) diff --git a/claims_hosp/tests/receiving/.gitignore b/claims_hosp/tests/receiving/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/claims_hosp/tests/test_patch.py b/claims_hosp/tests/test_patch.py index 37688bda2..a89ca45b6 100644 --- a/claims_hosp/tests/test_patch.py +++ b/claims_hosp/tests/test_patch.py @@ -1,31 +1,42 @@ import unittest -from unittest.mock import patch as mock_patch -from delphi_claims_hosp.patch import patch +from datetime import datetime +from pathlib import Path +from unittest.mock import patch as mock_patch, MagicMock import os import shutil +from conftest import TEST_DIR + +from delphi_claims_hosp.patch import patch +from delphi_claims_hosp.backfill import merge_existing_backfill_files class TestPatchModule: - def test_patch(self): - with mock_patch('delphi_claims_hosp.patch.get_structured_logger') as mock_get_structured_logger, \ - mock_patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ - mock_patch('delphi_claims_hosp.patch.run_module') as mock_run_module: - - mock_read_params.return_value = { - "common": { - "log_filename": "test.log" - }, - "patch": { - "start_issue": "2021-01-01", - "end_issue": "2021-01-02", - "patch_dir": "./patch_dir" - } - } + + def test_patch(self, params_w_patch): + with mock_patch('delphi_claims_hosp.patch.get_structured_logger'), \ + mock_patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ + mock_patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') as mock_ssh_client, \ + mock_patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False), \ + mock_patch('delphi_claims_hosp.backfill.merge_existing_backfill_files') as mock_backfill: + mock_ssh_client_instance = MagicMock() + mock_ssh_client.return_value = mock_ssh_client_instance + mock_sftp = MagicMock() + mock_ssh_client_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")] + def mock_get(*args, **kwargs): + src = Path(f"{TEST_DIR}/test_data/{args[0]}") + dst = Path(f"{TEST_DIR}/receiving/{args[0]}") + shutil.copyfile(src, dst) + mock_sftp.get.side_effect = mock_get + + mock_read_params.return_value = params_w_patch + mock_backfill.side_effect = merge_existing_backfill_files patch() - assert os.path.isdir('./patch_dir') - assert os.path.isdir('./patch_dir/issue_20210101/hospital-admissions') - assert os.path.isdir('./patch_dir/issue_20210102/hospital-admissions') + # assert mock_backfill.assert_called() + + issue_date = params_w_patch["patch"]["start_issue"].replace("-", "") + assert os.path.isdir(f'{TEST_DIR}/patch_dir/issue_{issue_date}/hospital-admissions') # Clean up the created directories after the test shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"]) \ No newline at end of file diff --git a/claims_hosp/tests/test_run.py b/claims_hosp/tests/test_run.py new file mode 100644 index 000000000..4b68e8906 --- /dev/null +++ b/claims_hosp/tests/test_run.py @@ -0,0 +1,13 @@ +import logging +from os import listdir +from os.path import join +from itertools import product + +import pandas as pd +import pytest + +from conftest import TEST_DIR +class TestRun: + @pytest.mark.freeze_time("2020-11-07") + def test_output_files_exist(self, run_as_module): + pass From f95a1462fe06556d96ea31053d45624f1f98efa0 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 12 Nov 2024 09:28:48 -0500 Subject: [PATCH 21/28] cleaning up and fixing tests --- claims_hosp/delphi_claims_hosp/run.py | 4 +- claims_hosp/tests/conftest.py | 9 ++-- claims_hosp/tests/retrieve_files/.gitignore | 0 claims_hosp/tests/test_patch.py | 21 ++++++---- claims_hosp/tests/test_run.py | 46 +++++++++++++++++---- 5 files changed, 59 insertions(+), 21 deletions(-) create mode 100644 claims_hosp/tests/retrieve_files/.gitignore diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 195f51097..10aebe121 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -56,10 +56,10 @@ def run_module(params, logger=None): adjustments (False). """ start_time = time.time() - issue_date_str = params.get("patch", {}).get("current_issue", None) - issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") # safety check for patch parameters exists in file, but not running custom runs/patches custom_run_flag = False if not params["common"].get("custom_run", False) else params["common"]["custom_run"] + issue_date_str = params.get("patch", {}).get("current_issue", None) + issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") if issue_date_str else None if not logger: logger = get_structured_logger( __name__, diff --git a/claims_hosp/tests/conftest.py b/claims_hosp/tests/conftest.py index 7250ce090..3c5a82425 100644 --- a/claims_hosp/tests/conftest.py +++ b/claims_hosp/tests/conftest.py @@ -19,9 +19,9 @@ def params(): params = { "common": { - "export_dir": f"{TEST_DIR}/receiving", + "export_dir": f"{TEST_DIR}/retrieve_files", "log_filename": f"{TEST_DIR}/test.log", - "custom_run": True, + "custom_run": False, }, "indicator": { "drop_date": None, @@ -58,9 +58,10 @@ def params(): @pytest.fixture def params_w_patch(params): params_copy = copy.deepcopy(params) + params_copy["common"]["custom_run"] = True params_copy["patch"] = { - "start_issue": "2020-11-07", - "end_issue": "2020-11-07", + "start_issue": "2020-06-12", + "end_issue": "2020-06-12", "patch_dir": "./patch_dir" } return params_copy diff --git a/claims_hosp/tests/retrieve_files/.gitignore b/claims_hosp/tests/retrieve_files/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/claims_hosp/tests/test_patch.py b/claims_hosp/tests/test_patch.py index a89ca45b6..b11d3a087 100644 --- a/claims_hosp/tests/test_patch.py +++ b/claims_hosp/tests/test_patch.py @@ -4,10 +4,13 @@ from unittest.mock import patch as mock_patch, MagicMock import os import shutil + +from delphi_claims_hosp.download_claims_ftp_files import change_date_format + from conftest import TEST_DIR from delphi_claims_hosp.patch import patch -from delphi_claims_hosp.backfill import merge_existing_backfill_files +from delphi_claims_hosp.backfill import merge_existing_backfill_files, merge_backfill_file class TestPatchModule: @@ -16,24 +19,28 @@ def test_patch(self, params_w_patch): mock_patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ mock_patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') as mock_ssh_client, \ mock_patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False), \ - mock_patch('delphi_claims_hosp.backfill.merge_existing_backfill_files') as mock_backfill: + mock_patch('delphi_claims_hosp.run.merge_existing_backfill_files') as mock_patch_backfill, \ + mock_patch('delphi_claims_hosp.run.merge_backfill_file') as mock_backfill: mock_ssh_client_instance = MagicMock() mock_ssh_client.return_value = mock_ssh_client_instance mock_sftp = MagicMock() mock_ssh_client_instance.open_sftp.return_value = mock_sftp - mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")] + mock_sftp.listdir_attr.return_value = [MagicMock(filename=change_date_format("SYNEDI_AGG_INPATIENT_06112020_1451CDT.csv.gz"))] def mock_get(*args, **kwargs): - src = Path(f"{TEST_DIR}/test_data/{args[0]}") - dst = Path(f"{TEST_DIR}/receiving/{args[0]}") + file = change_date_format(args[0]) + src = Path(f"{TEST_DIR}/test_data/{file}") + dst = Path(f"{TEST_DIR}/receiving/{file}") shutil.copyfile(src, dst) mock_sftp.get.side_effect = mock_get mock_read_params.return_value = params_w_patch - mock_backfill.side_effect = merge_existing_backfill_files + mock_patch_backfill.side_effect = merge_existing_backfill_files + mock_backfill.side_effect = merge_backfill_file patch() - # assert mock_backfill.assert_called() + assert mock_patch_backfill.call_count == 1 + assert mock_backfill.call_count == 0 issue_date = params_w_patch["patch"]["start_issue"].replace("-", "") assert os.path.isdir(f'{TEST_DIR}/patch_dir/issue_{issue_date}/hospital-admissions') diff --git a/claims_hosp/tests/test_run.py b/claims_hosp/tests/test_run.py index 4b68e8906..651bd78ad 100644 --- a/claims_hosp/tests/test_run.py +++ b/claims_hosp/tests/test_run.py @@ -1,13 +1,43 @@ -import logging -from os import listdir -from os.path import join -from itertools import product +from pathlib import Path +from unittest.mock import patch, MagicMock +import shutil -import pandas as pd import pytest +from delphi_claims_hosp.download_claims_ftp_files import change_date_format + +from delphi_claims_hosp.run import run_module +from delphi_claims_hosp.backfill import merge_existing_backfill_files, merge_backfill_file +from freezegun import freeze_time from conftest import TEST_DIR class TestRun: - @pytest.mark.freeze_time("2020-11-07") - def test_output_files_exist(self, run_as_module): - pass + @freeze_time("2020-06-11 20:00:00") + def test_output_files(self, params): + with patch('delphi_claims_hosp.patch.get_structured_logger'), \ + patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') as mock_ssh_client, \ + patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False), \ + patch('delphi_claims_hosp.run.merge_existing_backfill_files') as mock_patch_backfill, \ + patch('delphi_claims_hosp.run.merge_backfill_file') as mock_backfill: + + mock_ssh_client_instance = MagicMock() + mock_ssh_client.return_value = mock_ssh_client_instance + mock_sftp = MagicMock() + mock_ssh_client_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_20200611_1451CDT.csv.gz")] + def mock_get(*args, **kwargs): + file = change_date_format(args[0]) + src = Path(f"{TEST_DIR}/test_data/{file}") + dst = Path(f"{TEST_DIR}/receiving/{file}") + shutil.copyfile(src, dst) + mock_sftp.get.side_effect = mock_get + + mock_patch_backfill.side_effect = merge_existing_backfill_files + mock_backfill.side_effect = merge_backfill_file + + run_module(params) + + assert mock_patch_backfill.call_count == 0 + assert mock_backfill.call_count == 1 + + # Clean up the created directories after the test + shutil.rmtree(params["common"]["export_dir"]) From 1d98c03c295f083047446f33415abe727ab8c32b Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 13 Nov 2024 15:49:52 -0500 Subject: [PATCH 22/28] fix test and suggestions --- claims_hosp/delphi_claims_hosp/get_latest_claims_name.py | 3 +-- claims_hosp/tests/conftest.py | 8 -------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py index eae5763ed..25d9f27dd 100644 --- a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py +++ b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py @@ -23,8 +23,7 @@ def get_latest_filename(dir_path, logger, issue_date=None): if timestamp <= current_date: latest_timestamp = timestamp latest_filename = file - if issue_date is None: - assert current_date.date() == latest_timestamp.date(), "no drop for today" + assert current_date.date() == latest_timestamp.date(), "no drop for today" logger.info("Latest claims file", filename=latest_filename) diff --git a/claims_hosp/tests/conftest.py b/claims_hosp/tests/conftest.py index 3c5a82425..78a9efafc 100644 --- a/claims_hosp/tests/conftest.py +++ b/claims_hosp/tests/conftest.py @@ -1,17 +1,9 @@ -import logging import shutil from pathlib import Path -import re import copy import pytest -import mock -import pandas as pd from unittest.mock import MagicMock - -from os import listdir, remove, makedirs -from os.path import join, exists - import delphi_claims_hosp TEST_DIR = Path(__file__).parent From a6cb942be72b75b25cb2465071b8f08d6f441338 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 19 Nov 2024 10:25:25 -0500 Subject: [PATCH 23/28] add package --- claims_hosp/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/claims_hosp/setup.py b/claims_hosp/setup.py index 3b859c294..21b54f3c4 100644 --- a/claims_hosp/setup.py +++ b/claims_hosp/setup.py @@ -5,6 +5,7 @@ "covidcast", "darker[isort]~=2.1.1", "delphi-utils", + "freezegun", "numpy", "pandas", "paramiko", From b1ee511c3ba6ccd73f36e642397c8a95dfb9c922 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 19 Nov 2024 11:01:40 -0500 Subject: [PATCH 24/28] added time to issue date to accomdate for filetime --- claims_hosp/delphi_claims_hosp/run.py | 2 +- claims_hosp/tests/conftest.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 10aebe121..b54f0d2cd 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -59,7 +59,7 @@ def run_module(params, logger=None): # safety check for patch parameters exists in file, but not running custom runs/patches custom_run_flag = False if not params["common"].get("custom_run", False) else params["common"]["custom_run"] issue_date_str = params.get("patch", {}).get("current_issue", None) - issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") if issue_date_str else None + issue_date = datetime.strptime(issue_date_str + " 23:59:00", "%Y-%m-%d %H:%M:%S") if issue_date_str else None if not logger: logger = get_structured_logger( __name__, diff --git a/claims_hosp/tests/conftest.py b/claims_hosp/tests/conftest.py index 78a9efafc..e0793ddd7 100644 --- a/claims_hosp/tests/conftest.py +++ b/claims_hosp/tests/conftest.py @@ -52,8 +52,8 @@ def params_w_patch(params): params_copy = copy.deepcopy(params) params_copy["common"]["custom_run"] = True params_copy["patch"] = { - "start_issue": "2020-06-12", - "end_issue": "2020-06-12", + "start_issue": "2020-06-11", + "end_issue": "2020-06-11", "patch_dir": "./patch_dir" } return params_copy From d5b414af7a4a9225ddf6ccf29bfea0edeed9d1de Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 24 Dec 2024 13:15:50 -0500 Subject: [PATCH 25/28] reverting in process --- claims_hosp/delphi_claims_hosp/backfill.py | 156 +++++++++++---------- claims_hosp/tests/test_backfill.py | 36 ++--- 2 files changed, 105 insertions(+), 87 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index c6d3a6ba7..1fd0806d1 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -60,7 +60,6 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date) & (~backfilldata["fips"].isnull()), selected_columns] - logger.info("Filtering source data", startdate=_start_date, enddate=_end_date) backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") @@ -73,17 +72,80 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): "state_id": "string" }) - filename = "claims_hosp_as_of_%s.parquet" % datetime.strftime(_end_date, "%Y%m%d") - path = f"{backfill_dir}/{filename}" + filename = backfill_dir + \ + "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + # Store intermediate file into the backfill folder + backfilldata.to_parquet(filename, index=False) # Store intermediate file into the backfill folder try: - backfilldata.to_parquet(path, index=False) + backfilldata.to_parquet(filename, index=False) logger.info("Stored source data in parquet", filename=filename) except: # pylint: disable=W0702 logger.info("Failed to store source data in parquet") - return path + return filename + +def merge_backfill_file(backfill_dir, backfill_merge_day, today, logger, + test_mode=False, check_nd=25): + """ + Merge ~4 weeks' backfill data into one file. + + Usually this function should merge 28 days' data into a new file so as to + save the reading time when running the backfill pipelines. We set a softer + threshold to allow flexibility in data delivery. + Parameters + ---------- + today : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_merge_day: int + The day of a week that we used to merge the backfill files. e.g. 0 + is Monday. + test_mode: bool + check_nd: int + The criteria of the number of unmerged files. Ideally, we want the + number to be 28, but we use a looser criteria from practical + considerations + """ + new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + if len(new_files) == 0: # if no any daily file is stored + return + + def get_date(file_link): + # Keep the function here consistent with the backfill path in + # function `store_backfill_file` + fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1] + return datetime.strptime(fn, "%Y%m%d") + + date_list = list(map(get_date, new_files)) + earliest_date = min(date_list) + latest_date = max(date_list) + + # Check whether to merge + # Check the number of files that are not merged + if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + logger.info("No new files to merge; skipping merging") + return + + # Start to merge files + logger.info("Merging files", start_date=earliest_date, end_date=latest_date) + pdList = [] + for fn in new_files: + df = pd.read_parquet(fn, engine='pyarrow') + pdList.append(df) + merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) + path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%( + datetime.strftime(earliest_date, "%Y%m%d"), + datetime.strftime(latest_date, "%Y%m%d")) + merged_file.to_parquet(path, index=False) + + # Delete daily files once we have the merged one. + if not test_mode: + for fn in new_files: + os.remove(fn) + return def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): """ @@ -101,20 +163,25 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge backfill_file : str specific file add to merged backfill file. """ - new_files = glob.glob(backfill_dir + "/claims_hosp_*") - - def get_file_with_date(files) -> Union[pathlib.Path, None]: - for file_path in files: - # need to only match files with 6 digits for merged files - pattern = re.findall(r"_(\d{6,6})\.parquet", file_path) - if pattern: - file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) - end_date = (file_month + timedelta(days=32)).replace(day=1) - if file_month <= issue_date < end_date: - return Path(file_path) + new_files = sorted(Path(backfill_dir).glob("claims_hosp_*")) + new_files.remove(Path(backfill_file)) + + def get_file_with_date(files, issue_date) -> Union[pathlib.Path, None]: + for filename in files: + pattern = re.findall(r"_(\d{8})", filename.name) + + if len(pattern) == 2: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + end_date = datetime.strptime(pattern[1], "%Y%m%d") + if start_date <= issue_date and end_date >= issue_date: + return filename + elif len(pattern) == 1: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + if issue_date >= start_date: + return filename return None - file_path = get_file_with_date(new_files) + file_path = get_file_with_date(new_files, issue_date) if not file_path: logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) @@ -125,7 +192,7 @@ def get_file_with_date(files) -> Union[pathlib.Path, None]: ) # Start to merge files - file_name = Path(file_path).name + file_name = file_path.name merge_file = f"{file_path.parent}/{file_name}_after_merge.parquet" try: @@ -146,56 +213,3 @@ def get_file_with_date(files) -> Union[pathlib.Path, None]: os.rename(merge_file, file_path) return - -def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False): - """ - Merge a month's source data into one file. - - Parameters - ---------- - most_recent : datetime - The most recent date when the raw data is received - backfill_dir : str - specified path to store backfill files. - test_mode: bool - """ - previous_month = (most_recent.replace(day=1) - timedelta(days=1)).strftime("%Y%m") - new_files = glob.glob(backfill_dir + f"/claims_hosp_as_of_{previous_month}*") - if len(new_files) == 0: # if no any daily file is stored - logger.info("No new files to merge; skipping merging") - return - - def get_date(file_link): - # Keep the function here consistent with the backfill path in - # function `store_backfill_file` - fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1] - return datetime.strptime(fn, "%Y%m%d") - - date_list = sorted(map(get_date, new_files)) - latest_date = max(date_list) - num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] - if len(date_list) < (num_of_days_in_month * 0.8) and most_recent != latest_date + timedelta(days=1): - logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) - return - - logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1]) - # Start to merge files - pdList = [] - try: - for fn in new_files: - df = pd.read_parquet(fn, engine="pyarrow") - pdList.append(df) - merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) - path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet" - merged_file.to_parquet(path, index=False) - - # pylint: disable=W0703 - except Exception as e: - logger.info("Failed to merge backfill files", msg=e) - return - - # Delete daily files once we have the merged one. - if not test_mode: - for fn in new_files: - os.remove(fn) - return diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index 7a98739bf..d0e042583 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -6,6 +6,7 @@ from pathlib import Path import pandas as pd +import pytest from delphi_utils.logger import get_structured_logger from delphi_claims_hosp.config import Config, GeoConstants @@ -56,14 +57,14 @@ def test_store_backfill_file(self, caplog): self.cleanup() def test_merge_backfill_file(self, caplog, monkeypatch): - fn = "claims_hosp_202006.parquet" + fn = "claims_hosp_from_20200611_to_20200614.parquet" caplog.set_level(logging.INFO) logger = get_structured_logger() # Check when there is no daily file to merge. today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today, logger, - test_mode=True) + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True, check_nd=8) assert fn not in os.listdir(backfill_dir) assert "No new files to merge; skipping merging" in caplog.text @@ -75,8 +76,8 @@ def test_merge_backfill_file(self, caplog, monkeypatch): today = datetime(2020, 7, 1) monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) - merge_backfill_file(backfill_dir, today, logger, - test_mode=True) + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True, check_nd=4) assert "Merging files" in caplog.text assert fn in os.listdir(backfill_dir) @@ -124,43 +125,46 @@ def test_merge_backfill_file_no_call(self, caplog): assert "Not enough days, skipping merging" in caplog.text self.cleanup() - def test_merge_existing_backfill_files(self, caplog, monkeypatch): - issue_date = datetime(year=2020, month=6, day=13) + @pytest.mark.parametrize("issue_date", + [datetime(year=2020, month=6, day=13), + datetime(year=2020, month=6, day=11), + datetime(year=2020, month=6, day=14)]) + def test_merge_existing_backfill_files(self, caplog, monkeypatch, issue_date): issue_date_str = issue_date.strftime("%Y%m%d") caplog.set_level(logging.INFO) logger = get_structured_logger() - def prep_backfill_data(): + def prep_backfill_data(start, end, issue_date_str): # Generate backfill daily files - for d in range(11, 15): + for d in range(start, end): dropdate = datetime(2020, 6, d) store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) today = datetime(2020, 7, 1) # creating expected file - merge_backfill_file(backfill_dir, today, logger, - test_mode=True) - original = f"{backfill_dir}/claims_hosp_202006.parquet" + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True, check_nd=4) + original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" os.rename(original, f"{backfill_dir}/expected.parquet") # creating backfill without issue date os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 3)) - merge_backfill_file(backfill_dir, today, logger, - test_mode=True) + merge_backfill_file(backfill_dir, today.weekday(), today, logger, + test_mode=True, check_nd=3) old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") for file in old_files: os.remove(file) - prep_backfill_data() + prep_backfill_data(11, 15, issue_date_str) file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) assert "Adding missing date to merged file" in caplog.text expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") - merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_202006.parquet") + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet") check = pd.concat([merged, expected]).drop_duplicates(keep=False) From 584a9b7df05349baf0277fb79749153e04c38294 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 2 Jan 2025 10:12:10 -0500 Subject: [PATCH 26/28] in progress --- claims_hosp/delphi_claims_hosp/backfill.py | 65 ++++++++++++++++------ claims_hosp/tests/test_backfill.py | 31 +++-------- 2 files changed, 58 insertions(+), 38 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 1fd0806d1..6c3231353 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -14,7 +14,7 @@ import shutil from datetime import datetime, timedelta from pathlib import Path -from typing import Union +from typing import Union, Tuple # third party import pandas as pd @@ -125,9 +125,12 @@ def get_date(file_link): # Check whether to merge # Check the number of files that are not merged - if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + if today.weekday() != backfill_merge_day: logger.info("No new files to merge; skipping merging") return + elif (today-earliest_date).days <= check_nd: + logger.info("Not enough days, skipping merging") + return # Start to merge files logger.info("Merging files", start_date=earliest_date, end_date=latest_date) @@ -166,33 +169,63 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge new_files = sorted(Path(backfill_dir).glob("claims_hosp_*")) new_files.remove(Path(backfill_file)) - def get_file_with_date(files, issue_date) -> Union[pathlib.Path, None]: - for filename in files: - pattern = re.findall(r"_(\d{8})", filename.name) + def get_file_with_date(files, issue_date, expand_flag=False) -> Union[Tuple[pathlib.Path, pathlib.Path], None]: + """ + Give file with the missing date. + + Parameters + ---------- + files: list of files in the backfill dir + issue_date: the issue date of the file to be inserted into + expand_flag: flag to indicate to check dates inclusive to from and to date in filenames + + Returns + ------- + Tuple[pathlib.Path, pathlib.Path] if file is found, along with new filename after the insertion of the missing file + None if no file is found + """ + for filepath in files: + pattern = re.findall(r"_(\d{8})", filepath.name) if len(pattern) == 2: start_date = datetime.strptime(pattern[0], "%Y%m%d") end_date = datetime.strptime(pattern[1], "%Y%m%d") + # if date is in between from and to if start_date <= issue_date and end_date >= issue_date: - return filename + return filepath, filepath + # if date is either replacing a from date or a to date + elif expand_flag: + if issue_date == end_date + timedelta(days=1): + new_filename = filepath.name.replace(pattern[1], issue_date.strftime('%Y%m%d')) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + + elif issue_date == start_date - timedelta(days=1): + new_filename = filepath.name.replace(pattern[0], issue_date.strftime('%Y%m%d')) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + elif len(pattern) == 1: start_date = datetime.strptime(pattern[0], "%Y%m%d") - if issue_date >= start_date: - return filename - return None + if issue_date > start_date: + new_filename = filepath.name.replace(pattern[0], issue_date.strftime('%Y%m%d')) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + return None, None - file_path = get_file_with_date(new_files, issue_date) + file_path, new_file_path = get_file_with_date(new_files, issue_date) - if not file_path: - logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) - return + if file_path is None: + file_path = get_file_with_date(new_files, issue_date, True) + if not file_path: + logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) + return logger.info( "Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_path ) - # Start to merge files - file_name = file_path.name + file_name = file_path.stem merge_file = f"{file_path.parent}/{file_name}_after_merge.parquet" try: @@ -210,6 +243,6 @@ def get_file_with_date(files, issue_date) -> Union[pathlib.Path, None]: return os.remove(file_path) - os.rename(merge_file, file_path) + os.rename(merge_file, new_file_path) return diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index d0e042583..7fd82b4f2 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -61,19 +61,6 @@ def test_merge_backfill_file(self, caplog, monkeypatch): caplog.set_level(logging.INFO) logger = get_structured_logger() - # Check when there is no daily file to merge. - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, logger, - test_mode=True, check_nd=8) - assert fn not in os.listdir(backfill_dir) - assert "No new files to merge; skipping merging" in caplog.text - - - # Generate backfill daily files - for d in range(11, 15): - dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) - today = datetime(2020, 7, 1) monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) merge_backfill_file(backfill_dir, today.weekday(), today, logger, @@ -103,24 +90,24 @@ def test_merge_backfill_file(self, caplog, monkeypatch): self.cleanup() def test_merge_backfill_file_no_call(self, caplog): - fn = "claims_hosp_202006.parquet" + fn = "claims_hosp_from_20200611_to_20200614.parquet" caplog.set_level(logging.INFO) logger = get_structured_logger() - - # Check when there is no daily file to merge. - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today, logger, - test_mode=True) - assert fn not in os.listdir(backfill_dir) - assert "No new files to merge; skipping merging" in caplog.text + today = datetime(2020, 6, 20) # Generate backfill daily files for d in range(11, 15): dropdate = datetime(2020, 6, d) store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + # Check when there is no daily file to merge. + merge_backfill_file(backfill_dir, today.weekday() + 1, today, logger, + test_mode=True, check_nd=8) + assert fn not in os.listdir(backfill_dir) + assert "No new files to merge; skipping merging" in caplog.text + today = datetime(2020, 7, 1) - merge_backfill_file(backfill_dir, today, logger, + merge_backfill_file(backfill_dir, today.weekday(), today, logger, test_mode=True) assert "Not enough days, skipping merging" in caplog.text self.cleanup() From 7ad08965fc620ff373411d8db40570214a60a078 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 5 Feb 2025 16:54:53 -0500 Subject: [PATCH 27/28] fixed test --- claims_hosp/tests/test_backfill.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index 7fd82b4f2..9dc1334ad 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -60,6 +60,10 @@ def test_merge_backfill_file(self, caplog, monkeypatch): fn = "claims_hosp_from_20200611_to_20200614.parquet" caplog.set_level(logging.INFO) logger = get_structured_logger() + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) today = datetime(2020, 7, 1) monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) @@ -172,7 +176,7 @@ def prep_backfill_data(): today = datetime(2020, 6, 14) # creating expected file - merge_backfill_file(backfill_dir, today, logger, + merge_backfill_file(backfill_dir, 28, today, logger, test_mode=True) prep_backfill_data() From 6a5d42365c0d47cc1bbe417d7a67ed50233d5d34 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 5 Feb 2025 16:55:17 -0500 Subject: [PATCH 28/28] lint --- claims_hosp/delphi_claims_hosp/backfill.py | 55 ++++++++++++---------- claims_hosp/delphi_claims_hosp/run.py | 3 +- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 6c3231353..d945f39f8 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -6,7 +6,6 @@ """ -import calendar import glob import os import pathlib @@ -14,7 +13,7 @@ import shutil from datetime import datetime, timedelta from pathlib import Path -from typing import Union, Tuple +from typing import Tuple, Union # third party import pandas as pd @@ -72,8 +71,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): "state_id": "string" }) - filename = backfill_dir + \ - "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = backfill_dir + "/claims_hosp_as_of_%s.parquet" % datetime.strftime(_end_date, "%Y%m%d") # Store intermediate file into the backfill folder backfilldata.to_parquet(filename, index=False) @@ -86,8 +84,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): return filename -def merge_backfill_file(backfill_dir, backfill_merge_day, today, logger, - test_mode=False, check_nd=25): +def merge_backfill_file(backfill_dir, backfill_merge_day, today, logger, test_mode=False, check_nd=25): """ Merge ~4 weeks' backfill data into one file. @@ -128,7 +125,7 @@ def get_date(file_link): if today.weekday() != backfill_merge_day: logger.info("No new files to merge; skipping merging") return - elif (today-earliest_date).days <= check_nd: + elif (today - earliest_date).days <= check_nd: logger.info("Not enough days, skipping merging") return @@ -150,6 +147,7 @@ def get_date(file_link): os.remove(fn) return + def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): """ Merge existing backfill with the patch data included. This function is specifically run for patching. @@ -169,7 +167,7 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge new_files = sorted(Path(backfill_dir).glob("claims_hosp_*")) new_files.remove(Path(backfill_file)) - def get_file_with_date(files, issue_date, expand_flag=False) -> Union[Tuple[pathlib.Path, pathlib.Path], None]: + def get_file_with_date(files, issue_date) -> Union[Tuple[pathlib.Path, pathlib.Path], None]: """ Give file with the missing date. @@ -181,7 +179,9 @@ def get_file_with_date(files, issue_date, expand_flag=False) -> Union[Tuple[path Returns ------- - Tuple[pathlib.Path, pathlib.Path] if file is found, along with new filename after the insertion of the missing file + Tuple[pathlib.Path, pathlib.Path] if file is found, along with new filename + after the insertion of the missing file + None if no file is found """ for filepath in files: @@ -193,33 +193,37 @@ def get_file_with_date(files, issue_date, expand_flag=False) -> Union[Tuple[path # if date is in between from and to if start_date <= issue_date and end_date >= issue_date: return filepath, filepath - # if date is either replacing a from date or a to date - elif expand_flag: - if issue_date == end_date + timedelta(days=1): - new_filename = filepath.name.replace(pattern[1], issue_date.strftime('%Y%m%d')) - new_filepath = Path(f"{filepath.parent}/{new_filename}") - return filepath, new_filepath - - elif issue_date == start_date - timedelta(days=1): - new_filename = filepath.name.replace(pattern[0], issue_date.strftime('%Y%m%d')) - new_filepath = Path(f"{filepath.parent}/{new_filename}") - return filepath, new_filepath elif len(pattern) == 1: start_date = datetime.strptime(pattern[0], "%Y%m%d") if issue_date > start_date: - new_filename = filepath.name.replace(pattern[0], issue_date.strftime('%Y%m%d')) + new_filename = filepath.name.replace(pattern[0], issue_date.strftime("%Y%m%d")) new_filepath = Path(f"{filepath.parent}/{new_filename}") return filepath, new_filepath + + for filepath in files: + if len(pattern) == 2: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + end_date = datetime.strptime(pattern[1], "%Y%m%d") + + # if date is either replacing a from date or a to date + if issue_date == end_date + timedelta(days=1): + new_filename = filepath.name.replace(pattern[1], issue_date.strftime("%Y%m%d")) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + + elif issue_date == start_date - timedelta(days=1): + new_filename = filepath.name.replace(pattern[0], issue_date.strftime("%Y%m%d")) + new_filepath = Path(f"{filepath.parent}/{new_filename}") + return filepath, new_filepath + return None, None file_path, new_file_path = get_file_with_date(new_files, issue_date) if file_path is None: - file_path = get_file_with_date(new_files, issue_date, True) - if not file_path: - logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) - return + logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) + return logger.info( "Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_path @@ -245,4 +249,3 @@ def get_file_with_date(files, issue_date, expand_flag=False) -> Union[Tuple[path os.remove(file_path) os.rename(merge_file, new_file_path) return - diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index b54f0d2cd..895706a56 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -101,12 +101,13 @@ def run_module(params, logger=None): # Store backfill data if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] + backfill_merge_day = params["indicator"]["backfill_merge_day"] if custom_run_flag: backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) else: - merge_backfill_file(backfill_dir, datetime.today(), logger) + merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today(), logger) store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) # print out information