Skip to content

Automate hospital admission patch #2043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
23e1055
implement
aysim319 Aug 22, 2024
39c433e
implement
aysim319 Aug 22, 2024
cbc7894
implimentation done
aysim319 Aug 22, 2024
5b9e69a
fixed typo and conditional
aysim319 Aug 27, 2024
1bb84d8
feat: adding patching with backfill
aysim319 Aug 28, 2024
3d5701c
lint
aysim319 Aug 30, 2024
230086a
suggested changes
aysim319 Sep 6, 2024
7bd15be
suggested changes
aysim319 Sep 6, 2024
b71dd82
making backfill to monthly in progess
aysim319 Oct 15, 2024
eed2a63
adjusting logic to match new naming format and chunking
aysim319 Oct 17, 2024
65a06d8
added logging and more clean up
aysim319 Oct 21, 2024
6995c8a
added conditional for merging
aysim319 Oct 28, 2024
1666e0c
lint
aysim319 Oct 30, 2024
73a9063
Merge branch 'main' into automate-hospital-admission-patch
aysim319 Oct 30, 2024
98d631a
remove unrelated file
aysim319 Oct 30, 2024
74bebe3
update libary for ci
aysim319 Nov 6, 2024
8be796e
fix test
aysim319 Nov 6, 2024
75a9e4a
lint
aysim319 Nov 6, 2024
f1c11e5
fixing logic
aysim319 Nov 8, 2024
cf4e4bc
lint
aysim319 Nov 8, 2024
0498ed4
making test more robust in progress
aysim319 Nov 8, 2024
f95a146
cleaning up and fixing tests
aysim319 Nov 12, 2024
1d98c03
fix test and suggestions
aysim319 Nov 13, 2024
a6cb942
add package
aysim319 Nov 19, 2024
b1ee511
added time to issue date to accomdate for filetime
aysim319 Nov 19, 2024
d5b414a
reverting in process
aysim319 Dec 24, 2024
584a9b7
in progress
aysim319 Jan 2, 2025
7ad0896
fixed test
aysim319 Feb 5, 2025
6a5d423
lint
aysim319 Feb 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 131 additions & 10 deletions claims_hosp/delphi_claims_hosp/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@
Created: 2022-08-03

"""
import os

import glob
from datetime import datetime
import os
import pathlib
import re
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Tuple, Union

# third party
import pandas as pd
from delphi_utils import GeoMapper


from .config import Config

gmpr = GeoMapper()

def store_backfill_file(claims_filepath, _end_date, backfill_dir):

def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger):
"""
Store county level backfill data into backfill_dir.

Expand Down Expand Up @@ -65,13 +71,20 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir):
"state_id": "string"
})

path = backfill_dir + \
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
filename = backfill_dir + "/claims_hosp_as_of_%s.parquet" % datetime.strftime(_end_date, "%Y%m%d")
# Store intermediate file into the backfill folder
backfilldata.to_parquet(filename, index=False)

# Store intermediate file into the backfill folder
backfilldata.to_parquet(path, index=False)
try:
backfilldata.to_parquet(filename, index=False)
logger.info("Stored source data in parquet", filename=filename)
except: # pylint: disable=W0702
logger.info("Failed to store source data in parquet")
return filename

def merge_backfill_file(backfill_dir, backfill_merge_day, today,
test_mode=False, check_nd=25):

def merge_backfill_file(backfill_dir, backfill_merge_day, today, logger, test_mode=False, check_nd=25):
"""
Merge ~4 weeks' backfill data into one file.

Expand Down Expand Up @@ -109,10 +122,15 @@ def get_date(file_link):

# Check whether to merge
# Check the number of files that are not merged
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
if today.weekday() != backfill_merge_day:
logger.info("No new files to merge; skipping merging")
return
elif (today - earliest_date).days <= check_nd:
logger.info("Not enough days, skipping merging")
return

# Start to merge files
logger.info("Merging files", start_date=earliest_date, end_date=latest_date)
pdList = []
for fn in new_files:
df = pd.read_parquet(fn, engine='pyarrow')
Expand All @@ -128,3 +146,106 @@ def get_date(file_link):
for fn in new_files:
os.remove(fn)
return


def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger):
"""
Merge existing backfill with the patch data included. This function is specifically run for patching.

When the indicator fails for some reason or another, there's a gap in the backfill files.
The patch to fill in the missing dates happens later down the line when the backfill files are already merged.
This function takes the merged files with the missing date, insert the particular date, and merge back the file.
Parameters
----------
issue_date : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_file : str
specific file add to merged backfill file.
"""
new_files = sorted(Path(backfill_dir).glob("claims_hosp_*"))
new_files.remove(Path(backfill_file))

def get_file_with_date(files, issue_date) -> Union[Tuple[pathlib.Path, pathlib.Path], None]:
"""
Give file with the missing date.

Parameters
----------
files: list of files in the backfill dir
issue_date: the issue date of the file to be inserted into
expand_flag: flag to indicate to check dates inclusive to from and to date in filenames

Returns
-------
Tuple[pathlib.Path, pathlib.Path] if file is found, along with new filename
after the insertion of the missing file

None if no file is found
"""
for filepath in files:
pattern = re.findall(r"_(\d{8})", filepath.name)

if len(pattern) == 2:
start_date = datetime.strptime(pattern[0], "%Y%m%d")
end_date = datetime.strptime(pattern[1], "%Y%m%d")
# if date is in between from and to
if start_date <= issue_date and end_date >= issue_date:
return filepath, filepath

elif len(pattern) == 1:
start_date = datetime.strptime(pattern[0], "%Y%m%d")
if issue_date > start_date:
new_filename = filepath.name.replace(pattern[0], issue_date.strftime("%Y%m%d"))
new_filepath = Path(f"{filepath.parent}/{new_filename}")
return filepath, new_filepath

for filepath in files:
if len(pattern) == 2:
start_date = datetime.strptime(pattern[0], "%Y%m%d")
end_date = datetime.strptime(pattern[1], "%Y%m%d")

# if date is either replacing a from date or a to date
if issue_date == end_date + timedelta(days=1):
new_filename = filepath.name.replace(pattern[1], issue_date.strftime("%Y%m%d"))
new_filepath = Path(f"{filepath.parent}/{new_filename}")
return filepath, new_filepath

elif issue_date == start_date - timedelta(days=1):
new_filename = filepath.name.replace(pattern[0], issue_date.strftime("%Y%m%d"))
new_filepath = Path(f"{filepath.parent}/{new_filename}")
return filepath, new_filepath

return None, None

file_path, new_file_path = get_file_with_date(new_files, issue_date)

if file_path is None:
logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d"))
return

logger.info(
"Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_path
)
# Start to merge files
file_name = file_path.stem
merge_file = f"{file_path.parent}/{file_name}_after_merge.parquet"

try:
shutil.copyfile(file_path, merge_file)
existing_df = pd.read_parquet(merge_file, engine="pyarrow")
df = pd.read_parquet(backfill_file, engine="pyarrow")
merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"])
merged_df.to_parquet(merge_file, index=False)

# pylint: disable=W0703
except Exception as e:
logger.info("Failed to merge existing backfill files", issue_date=issue_date.strftime("%Y-%m-%d"), msg=e)
os.remove(merge_file)
os.remove(backfill_file)
return

os.remove(file_path)
os.rename(merge_file, new_file_path)
return
5 changes: 3 additions & 2 deletions claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ def change_date_format(name):
return name


def download(ftp_credentials, out_path, logger):
def download(ftp_credentials, out_path, logger, issue_date=None):
"""Pull the latest raw files."""
current_time = datetime.datetime.now()
current_time = issue_date if issue_date else datetime.datetime.now()

seconds_in_day = 24 * 60 * 60
logger.info("Starting download")

Expand Down
5 changes: 2 additions & 3 deletions claims_hosp/delphi_claims_hosp/get_latest_claims_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import datetime
from pathlib import Path

def get_latest_filename(dir_path, logger):
def get_latest_filename(dir_path, logger, issue_date=None):
"""Get the latest filename from the list of downloaded raw files."""
current_date = datetime.datetime.now()
current_date = issue_date if issue_date else datetime.datetime.now()
files = list(Path(dir_path).glob("*"))

latest_timestamp = datetime.datetime(1900, 1, 1)
Expand All @@ -23,7 +23,6 @@ def get_latest_filename(dir_path, logger):
if timestamp <= current_date:
latest_timestamp = timestamp
latest_filename = file

assert current_date.date() == latest_timestamp.date(), "no drop for today"

logger.info("Latest claims file", filename=latest_filename)
Expand Down
2 changes: 1 addition & 1 deletion claims_hosp/delphi_claims_hosp/modify_claims_drops.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ def modify_and_write(data_path, logger, test_mode=False):
dfs_list.append(dfs)
else:
dfs.to_csv(out_path, index=False)
logger.info("Wrote modified csv", filename=out_path)
logger.info("Wrote modified csv", filename=str(out_path))
return files, dfs_list
78 changes: 78 additions & 0 deletions claims_hosp/delphi_claims_hosp/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
This module is used for patching data in the delphi_claims_hosp package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
"custom_flag" : true,
...
},
"validation": {
...
},
"patch": {
"patch_dir": "/covidcast-indicators/hopspital-admissions/patch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21"
}
}

It will generate data for that range of issue dates, and store them in batch issue format:
[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv
"""

from datetime import datetime, timedelta
from os import makedirs

from delphi_utils import get_structured_logger, read_params

from .run import run_module


def patch():
"""
Run the hospital-admissions indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
"""
params = read_params()
logger = get_structured_logger("delphi_claims_hosp.patch", filename=params["common"]["log_filename"])

start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(
"Starting patching",
patch_directory=params["patch"]["patch_dir"],
start_issue=start_issue.strftime("%Y-%m-%d"),
end_issue=end_issue.strftime("%Y-%m-%d"),
)

makedirs(params["patch"]["patch_dir"], exist_ok=True)

current_issue = start_issue
if not params["common"]["custom_run"]:
logger.warning("Custom flag not set; setting it to true for patching")
params["common"]["custom_flag"] = True

while current_issue <= end_issue:
logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d"))

params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d")

current_issue_yyyymmdd = current_issue.strftime("%Y%m%d")
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/hospital-admissions"""
makedirs(f"{current_issue_dir}", exist_ok=True)
params["common"]["export_dir"] = f"""{current_issue_dir}"""

run_module(params, logger)
current_issue += timedelta(days=1)


if __name__ == "__main__":
patch()
37 changes: 25 additions & 12 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@
when the module is run with `python -m delphi_claims_hosp`.
"""

import os

# standard packages
import time
import os
from datetime import datetime, timedelta
from pathlib import Path

# third party
from delphi_utils import get_structured_logger

from .backfill import merge_backfill_file, merge_existing_backfill_files, store_backfill_file

# first party
from .config import Config
from .download_claims_ftp_files import download
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .modify_claims_drops import modify_and_write
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
def run_module(params, logger=None):
"""
Generate updated claims-based hospitalization indicator values.

Expand Down Expand Up @@ -54,19 +56,25 @@ def run_module(params):
adjustments (False).
"""
start_time = time.time()
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
# safety check for patch parameters exists in file, but not running custom runs/patches
custom_run_flag = False if not params["common"].get("custom_run", False) else params["common"]["custom_run"]
issue_date_str = params.get("patch", {}).get("current_issue", None)
issue_date = datetime.strptime(issue_date_str + " 23:59:00", "%Y-%m-%d %H:%M:%S") if issue_date_str else None
if not logger:
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)

# pull latest data
download(params["indicator"]["ftp_credentials"],
params["indicator"]["input_dir"], logger)
download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date)

# aggregate data
modify_and_write(params["indicator"]["input_dir"], logger)

# find the latest files (these have timestamps)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date)

# handle range of estimates to produce
# filename expected to have format: EDI_AGG_INPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz
Expand Down Expand Up @@ -94,8 +102,13 @@ def run_module(params):
if params["indicator"].get("generate_backfill_files", True):
backfill_dir = params["indicator"]["backfill_dir"]
backfill_merge_day = params["indicator"]["backfill_merge_day"]
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
store_backfill_file(claims_file, dropdate_dt, backfill_dir)
if custom_run_flag:
backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger)
merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger)

else:
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today(), logger)
store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger)

# print out information
logger.info("Loaded params",
Expand Down
1 change: 1 addition & 0 deletions claims_hosp/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"covidcast",
"darker[isort]~=2.1.1",
"delphi-utils",
"freezegun",
"numpy",
"pandas",
"paramiko",
Expand Down
Loading
Loading