Skip to content

Backfill/claims_hosp #1675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ansible/templates/claims_hosp-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
{
"common": {
"export_dir": "./receiving",
"export_dir": "./common/covidcast/receiving/claims_hosp",
"log_exceptions": false
},
"indicator": {
"input_dir": "./retrieve_files",
"start_date": "2020-02-01",
"end_date": null,
"drop_date": null,
"backfill_dir": "/common/backfill/claims_hosp",
"backfill_merge_day": 0,
"n_backfill_days": 70,
"n_waiting_days": 3,
"write_se": false,
Expand Down
112 changes: 112 additions & 0 deletions claims_hosp/delphi_claims_hosp/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Store backfill data.

Author: Jingjing Tang
Created: 2022-08-03

"""
import os
import glob
from datetime import datetime

# third party
import pandas as pd

# first party
from .config import Config

def store_backfill_file(claims_filepath, _end_date, backfill_dir):
"""
Store county level backfill data into backfill_dir.

Parameter:
claims_filepath: str
path to the aggregated claims data
_end_date: datetime
The most recent date when the raw data is received
backfill_dir: str
specified path to store backfill files.
"""
backfilldata = pd.read_csv(
claims_filepath,
usecols=Config.CLAIMS_DTYPES.keys(),
dtype=Config.CLAIMS_DTYPES,
parse_dates=[Config.CLAIMS_DATE_COL],
)

backfilldata.rename({"ServiceDate": "time_value",
"PatCountyFIPS": "fips",
"Denominator": "den",
"Covid_like": "num"},
axis=1, inplace=True)
#Store one year's backfill data
_start_date = _end_date.replace(year=_end_date.year-1)
selected_columns = ['time_value', 'fips',
'den', 'num']
backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date)
& (~backfilldata["fips"].isnull()),
selected_columns]
path = backfill_dir + \
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
# Store intermediate file into the backfill folder
backfilldata.to_parquet(path)

def merge_backfill_file(backfill_dir, backfill_merge_day, today,
test_mode=False, check_nd=25):
"""
Merge ~4 weeks' backfill data into one file.

Usually this function should merge 28 days' data into a new file so as to
save the reading time when running the backfill pipelines. We set a softer
threshold to allow flexibility in data delivery.
Parameters
----------
today : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_merge_day: int
The day of a week that we used to merge the backfill files. e.g. 0
is Monday.
test_mode: bool
check_nd: int
The criteria of the number of unmerged files. Ideally, we want the
number to be 28, but we use a looser criteria from practical
considerations
"""
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")

def get_date(file_link):
# Keep the function here consistent with the backfill path in
# function `store_backfill_file`
fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1]
return datetime.strptime(fn, "%Y%m%d")

date_list = list(map(get_date, new_files))
earliest_date = min(date_list)
latest_date = max(date_list)

# Check whether to merge
# Check the number of files that are not merged
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
return

# Start to merge files
pdList = []
for fn in new_files:
df = pd.read_parquet(fn, engine='pyarrow')
issue_date = get_date(fn)
df["issue_date"] = issue_date
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
pdList.append(df)
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%(
datetime.strftime(earliest_date, "%Y%m%d"),
datetime.strftime(latest_date, "%Y%m%d"))
merged_file.to_parquet(path)

# Delete daily files once we have the merged one.
if not test_mode:
for fn in new_files:
os.remove(fn)
return
2 changes: 0 additions & 2 deletions claims_hosp/delphi_claims_hosp/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Created: 2020-09-27

"""

# third party
import pandas as pd

Expand Down Expand Up @@ -53,7 +52,6 @@ def load_claims_data(claims_filepath, dropdate, base_geo):

return claims_data


def load_data(input_filepath, dropdate, base_geo):
"""
Load in claims data, and combine them.
Expand Down
7 changes: 7 additions & 0 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
Expand Down Expand Up @@ -89,6 +90,12 @@ def run_module(params):
if params["indicator"]["start_date"] is not None:
startdate = params["indicator"]['start_date']

# Store backfill data
backfill_dir = params["indicator"]["backfill_dir"]
backfill_merge_day = params["indicator"]["backfill_merge_day"]
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
store_backfill_file(claims_file, dropdate_dt, backfill_dir)

# print out information
logger.info("Loaded params",
startdate = startdate,
Expand Down
2 changes: 2 additions & 0 deletions claims_hosp/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"end_date": null,
"drop_date": null,
"n_backfill_days": 70,
"backfill_dir": "./backfill",
"backfill_merge_day": 0,
"n_waiting_days": 3,
"write_se": false,
"obfuscated_prefix": "foo_obfuscated",
Expand Down
1 change: 1 addition & 0 deletions claims_hosp/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
required = [
"numpy",
"pandas",
"pyarrow",
"paramiko",
"pydocstyle",
"pytest",
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
83 changes: 83 additions & 0 deletions claims_hosp/tests/test_backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import glob
from datetime import datetime

# third party
import pandas as pd
import pytest

# first party
from delphi_claims_hosp.config import Config, GeoConstants
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file

CONFIG = Config()
CONSTANTS = GeoConstants()
PARAMS = {
"indicator": {
"input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
"backfill_dir": "./backfill",
"drop_date": "2020-06-11",
}
}
DATA_FILEPATH = PARAMS["indicator"]["input_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
backfill_dir = PARAMS["indicator"]["backfill_dir"]

class TestBackfill:

def test_store_backfill_file(self):
dropdate = datetime(2020, 1, 1)
fn = "claims_hosp_as_of_20200101.parquet"
assert fn not in os.listdir(backfill_dir)

# Store backfill file
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
assert fn in os.listdir(backfill_dir)
fn = "claims_hosp_as_of_20200101.parquet"
backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow')

selected_columns = ['time_value', 'fips',
'num', 'den']
assert set(selected_columns) == set(backfill_df.columns)

os.remove(backfill_dir + "/" + fn)
assert fn not in os.listdir(backfill_dir)

def test_merge_backfill_file(self):

today = datetime.today()

new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet")
fn = "claims_hosp_from_20200611_to_20200614.parquet"
assert fn not in os.listdir(backfill_dir)

# Check the when the merged file is not generated
today = datetime(2020, 6, 14)
merge_backfill_file(backfill_dir, today.weekday(), today,
test_mode=True, check_nd=8)
assert fn not in os.listdir(backfill_dir)

# Generate the merged file, but not delete it
merge_backfill_file(backfill_dir, today.weekday(), today,
test_mode=True, check_nd=2)
assert fn in os.listdir(backfill_dir)

# Read daily file
pdList = []
for file in new_files:
df = pd.read_parquet(file, engine='pyarrow')
issue_date = datetime.strptime(file[-16:-8], "%Y%m%d")
df["issue_date"] = issue_date
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
pdList.append(df)
expected = pd.concat(pdList).sort_values(["time_value", "fips"])

# Read the merged file
merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow')

assert set(expected.columns) == set(merged.columns)
assert expected.shape[0] == merged.shape[0]
assert expected.shape[1] == merged.shape[1]

os.remove(backfill_dir + "/" + fn)
assert fn not in os.listdir(backfill_dir)