Skip to content

Automate hospital admission patch #2043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
23e1055
implement
aysim319 Aug 22, 2024
39c433e
implement
aysim319 Aug 22, 2024
cbc7894
implimentation done
aysim319 Aug 22, 2024
5b9e69a
fixed typo and conditional
aysim319 Aug 27, 2024
1bb84d8
feat: adding patching with backfill
aysim319 Aug 28, 2024
3d5701c
lint
aysim319 Aug 30, 2024
230086a
suggested changes
aysim319 Sep 6, 2024
7bd15be
suggested changes
aysim319 Sep 6, 2024
b71dd82
making backfill to monthly in progess
aysim319 Oct 15, 2024
eed2a63
adjusting logic to match new naming format and chunking
aysim319 Oct 17, 2024
65a06d8
added logging and more clean up
aysim319 Oct 21, 2024
6995c8a
added conditional for merging
aysim319 Oct 28, 2024
1666e0c
lint
aysim319 Oct 30, 2024
73a9063
Merge branch 'main' into automate-hospital-admission-patch
aysim319 Oct 30, 2024
98d631a
remove unrelated file
aysim319 Oct 30, 2024
74bebe3
update libary for ci
aysim319 Nov 6, 2024
8be796e
fix test
aysim319 Nov 6, 2024
75a9e4a
lint
aysim319 Nov 6, 2024
f1c11e5
fixing logic
aysim319 Nov 8, 2024
cf4e4bc
lint
aysim319 Nov 8, 2024
0498ed4
making test more robust in progress
aysim319 Nov 8, 2024
f95a146
cleaning up and fixing tests
aysim319 Nov 12, 2024
1d98c03
fix test and suggestions
aysim319 Nov 13, 2024
a6cb942
add package
aysim319 Nov 19, 2024
b1ee511
added time to issue date to accomdate for filetime
aysim319 Nov 19, 2024
d5b414a
reverting in process
aysim319 Dec 24, 2024
584a9b7
in progress
aysim319 Jan 2, 2025
7ad0896
fixed test
aysim319 Feb 5, 2025
6a5d423
lint
aysim319 Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions claims_hosp/delphi_claims_hosp/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
Created: 2022-08-03

"""
import os

import glob
import os
import re
import shutil
from datetime import datetime
from typing import Union

# third party
import pandas as pd
from delphi_utils import GeoMapper


from .config import Config

gmpr = GeoMapper()
Expand Down Expand Up @@ -69,9 +72,60 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir):
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
# Store intermediate file into the backfill folder
backfilldata.to_parquet(path, index=False)
return path


def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger):
"""
Merge existing backfill with the patch data included. This function is specifically run for patching.

When the indicator fails for some reason or another, there's a gap in the backfill files.
The patch to fill in the missing dates happens later down the line when the backfill files are already merged.
This function takes the merged files with the missing date, insert the particular date, and merge back the file.
Parameters
----------
issue_date : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_file : str
"""
new_files = glob.glob(backfill_dir + "/claims_hosp_*")

def get_file_with_date(files) -> Union[str, None]:
for filename in files:
pattern = re.findall(r"\d{8}", filename)
if len(pattern) == 2:
start_date = datetime.strptime(pattern[0], "%Y%m%d")
end_date = datetime.strptime(pattern[1], "%Y%m%d")
if start_date <= issue_date or end_date <= issue_date:
return filename
return ""

file_name = get_file_with_date(new_files)

if len(file_name) == 0:
logger.info("patch file is too recent to merge", issue_date=issue_date.strftime("%Y-%m-%d"))
return

# Start to merge files
merge_file = f"{file_name.split('.')[0]}_after_merge.parquet"
try:
shutil.copyfile(file_name, merge_file)
existing_df = pd.read_parquet(merge_file, engine="pyarrow")
df = pd.read_parquet(backfill_file, engine="pyarrow")
merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"])
merged_df.to_parquet(merge_file, index=False)
os.remove(file_name)
os.rename(merge_file, file_name)
# pylint: disable=W0703:
except Exception as e:
os.remove(merge_file)
logger.error(e)
return


def merge_backfill_file(backfill_dir, backfill_merge_day, today,
test_mode=False, check_nd=25):
def merge_backfill_file(backfill_dir, backfill_merge_day, most_recent, test_mode=False, check_nd=25):
"""
Merge ~4 weeks' backfill data into one file.

Expand All @@ -80,7 +134,7 @@ def merge_backfill_file(backfill_dir, backfill_merge_day, today,
threshold to allow flexibility in data delivery.
Parameters
----------
today : datetime
most_recent : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
Expand Down Expand Up @@ -109,7 +163,7 @@ def get_date(file_link):

# Check whether to merge
# Check the number of files that are not merged
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
if most_recent.weekday() != backfill_merge_day or (most_recent - earliest_date).days <= check_nd:
return

# Start to merge files
Expand Down
5 changes: 3 additions & 2 deletions claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ def change_date_format(name):
return name


def download(ftp_credentials, out_path, logger):
def download(ftp_credentials, out_path, logger, issue_date=None):
"""Pull the latest raw files."""
current_time = datetime.datetime.now()
current_time = issue_date if issue_date else datetime.datetime.now()

seconds_in_day = 24 * 60 * 60
logger.info("starting download", time=current_time)

Expand Down
8 changes: 4 additions & 4 deletions claims_hosp/delphi_claims_hosp/get_latest_claims_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import datetime
from pathlib import Path

def get_latest_filename(dir_path, logger):
def get_latest_filename(dir_path, logger, issue_date=None):
"""Get the latest filename from the list of downloaded raw files."""
current_date = datetime.datetime.now()
current_date = issue_date if issue_date else datetime.datetime.now()
files = list(Path(dir_path).glob("*"))

latest_timestamp = datetime.datetime(1900, 1, 1)
Expand All @@ -23,8 +23,8 @@ def get_latest_filename(dir_path, logger):
if timestamp <= current_date:
latest_timestamp = timestamp
latest_filename = file

assert current_date.date() == latest_timestamp.date(), "no drop for today"
if issue_date is None:
assert current_date.date() == latest_timestamp.date(), "no drop for today"

logger.info("Latest claims file", filename=latest_filename)

Expand Down
75 changes: 75 additions & 0 deletions claims_hosp/delphi_claims_hosp/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
This module is used for patching data in the delphi_claims_hosp package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
"custom_flag" : true,
...
},
"validation": {
...
},
"patch": {
"patch_dir": "/covidcast-indicators/hopspital-admissions/patch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21"
}
}

It will generate data for that range of issue dates, and store them in batch issue format:
[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv
"""

from datetime import datetime, timedelta
from os import makedirs

from delphi_utils import get_structured_logger, read_params

from .run import run_module


def patch():
"""
Run the hospital-admissions indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
"""
params = read_params()
logger = get_structured_logger("delphi_claims_hosp.patch", filename=params["common"]["log_filename"])

start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(
"Starting patching",
patch_directory=params["patch"]["patch_dir"],
start_issue=start_issue.strftime("%Y-%m-%d"),
end_issue=end_issue.strftime("%Y-%m-%d"),
)

makedirs(params["patch"]["patch_dir"], exist_ok=True)

current_issue = start_issue

while current_issue <= end_issue:
logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d"))

params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d")

current_issue_yyyymmdd = current_issue.strftime("%Y%m%d")
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/hospital-admissions"""
makedirs(f"{current_issue_dir}", exist_ok=True)
params["common"]["export_dir"] = f"""{current_issue_dir}"""

run_module(params, logger)
current_issue += timedelta(days=1)


if __name__ == "__main__":
patch()
39 changes: 27 additions & 12 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@
when the module is run with `python -m delphi_claims_hosp`.
"""

import os

# standard packages
import time
import os
from datetime import datetime, timedelta
from pathlib import Path

# third party
from delphi_utils import get_structured_logger

from .backfill import merge_backfill_file, merge_existing_backfill_files, store_backfill_file

# first party
from .config import Config
from .download_claims_ftp_files import download
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .modify_claims_drops import modify_and_write
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
def run_module(params, logger=None):
"""
Generate updated claims-based hospitalization indicator values.

Expand Down Expand Up @@ -54,19 +56,27 @@ def run_module(params):
adjustments (False).
"""
start_time = time.time()
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
issue_date_str = params.get("patch", {}).get("current_issue", None)
issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d")
# safety check for patch parameters exists in file, but not running custom runs/patches
custom_run_flag = (
False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False)
)
if not logger:
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)

# pull latest data
download(params["indicator"]["ftp_credentials"],
params["indicator"]["input_dir"], logger)
download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date)

# aggregate data
modify_and_write(params["indicator"]["input_dir"], logger)

# find the latest files (these have timestamps)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date)

# handle range of estimates to produce
# filename expected to have format: EDI_AGG_INPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz
Expand Down Expand Up @@ -94,8 +104,13 @@ def run_module(params):
if params["indicator"].get("generate_backfill_files", True):
backfill_dir = params["indicator"]["backfill_dir"]
backfill_merge_day = params["indicator"]["backfill_merge_day"]
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
store_backfill_file(claims_file, dropdate_dt, backfill_dir)
if custom_run_flag:
backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir)
merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger)

else:
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
store_backfill_file(claims_file, dropdate_dt, backfill_dir)

# print out information
logger.info("Loaded params",
Expand Down
75 changes: 72 additions & 3 deletions claims_hosp/tests/test_backfill.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
import logging
import os
import glob
from datetime import datetime
from pathlib import Path
import shutil

# third party
import pandas as pd
import pytest

# first party
from delphi_claims_hosp.config import Config, GeoConstants
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files

CONFIG = Config()
CONSTANTS = GeoConstants()
TEST_PATH = Path(__file__).parent
PARAMS = {
"indicator": {
"input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
"backfill_dir": "./backfill",
"input_file": f"{TEST_PATH}/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
"backfill_dir": f"{TEST_PATH}/backfill",
"drop_date": "2020-06-11",
}
}
DATA_FILEPATH = PARAMS["indicator"]["input_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
backfill_dir = PARAMS["indicator"]["backfill_dir"]
TEST_LOGGER = logging.getLogger()

class TestBackfill:

Expand Down Expand Up @@ -95,3 +100,67 @@ def test_merge_backfill_file(self):

os.remove(backfill_dir + "/" + fn)
assert fn not in os.listdir(backfill_dir)

def test_merge_existing_backfill_files(self):
issue_date = datetime(year=2020, month=6, day=13)
issue_date_str = issue_date.strftime("%Y%m%d")
def prep_backfill_data():
# Generate backfill daily files
for d in range(11, 15):
dropdate = datetime(2020, 6, d)
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)

today = datetime(2020, 6, 14)
# creating expected file
merge_backfill_file(backfill_dir, today.weekday(), today,
test_mode=True, check_nd=2)
original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet"
os.rename(original, f"{backfill_dir}/expected.parquet")

# creating backfill without issue date
os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet")
today = datetime(2020, 6, 14)
merge_backfill_file(backfill_dir, today.weekday(), today,
test_mode=True, check_nd=2)

old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
for file in old_files:
os.remove(file)

prep_backfill_data()
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir)
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER)

expected = pd.read_parquet(f"{backfill_dir}/expected.parquet")
merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet")

check_diff = expected.merge(merged, how='left', indicator=True)
assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0]
for file in glob.glob(backfill_dir + "/*.parquet"):
os.remove(file)


def test_merge_existing_backfill_files_no_call(self):
issue_date = datetime(year=2020, month=6, day=20)
issue_date_str = issue_date.strftime("%Y%m%d")
def prep_backfill_data():
# Generate backfill daily files
for d in range(11, 15):
dropdate = datetime(2020, 6, d)
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)

today = datetime(2020, 6, 14)
# creating expected file
merge_backfill_file(backfill_dir, today.weekday(), today,
test_mode=True, check_nd=8)

prep_backfill_data()
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir)
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER)

old_files = glob.glob(backfill_dir + "*.parquet")
for file in old_files:
os.remove(file)



Loading
Loading