diff --git a/ansible/templates/claims_hosp-params-prod.json.j2 b/ansible/templates/claims_hosp-params-prod.json.j2 index dd4c884d9..851951133 100644 --- a/ansible/templates/claims_hosp-params-prod.json.j2 +++ b/ansible/templates/claims_hosp-params-prod.json.j2 @@ -8,6 +8,8 @@ "start_date": "2020-02-01", "end_date": null, "drop_date": null, + "backfill_dir": "/common/backfill/claims_hosp", + "backfill_merge_day": 0, "n_backfill_days": 70, "n_waiting_days": 3, "write_se": false, diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py new file mode 100644 index 000000000..a282be9f7 --- /dev/null +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -0,0 +1,127 @@ +""" +Store backfill data. + +Author: Jingjing Tang +Created: 2022-08-03 + +""" +import os +import glob +from datetime import datetime + +# third party +import pandas as pd +from delphi_utils import GeoMapper + + +from .config import Config + +gmpr = GeoMapper() + +def store_backfill_file(claims_filepath, _end_date, backfill_dir): + """ + Store county level backfill data into backfill_dir. + + Parameter: + claims_filepath: str + path to the aggregated claims data + _end_date: datetime + The most recent date when the raw data is received + backfill_dir: str + specified path to store backfill files. + """ + backfilldata = pd.read_csv( + claims_filepath, + usecols=Config.CLAIMS_DTYPES.keys(), + dtype=Config.CLAIMS_DTYPES, + parse_dates=[Config.CLAIMS_DATE_COL], + ) + backfilldata.rename({"ServiceDate": "time_value", + "PatCountyFIPS": "fips", + "Denominator": "den", + "Covid_like": "num"}, + axis=1, inplace=True) + backfilldata = gmpr.add_geocode(backfilldata, from_code="fips", new_code="state_id", + from_col="fips", new_col="state_id") + #Store one year's backfill data + _start_date = _end_date.replace(year=_end_date.year-1) + selected_columns = ['time_value', 'fips', 'state_id', + 'den', 'num'] + backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date) + & (~backfilldata["fips"].isnull()), + selected_columns] + + backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] + backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") + backfilldata["issue_date"] = datetime.strftime(_end_date, "%Y-%m-%d") + + backfilldata = backfilldata.astype({ + "time_value": "string", + "issue_date": "string", + "fips": "string", + "state_id": "string" + }) + + path = backfill_dir + \ + "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + # Store intermediate file into the backfill folder + backfilldata.to_parquet(path, index=False) + +def merge_backfill_file(backfill_dir, backfill_merge_day, today, + test_mode=False, check_nd=25): + """ + Merge ~4 weeks' backfill data into one file. + + Usually this function should merge 28 days' data into a new file so as to + save the reading time when running the backfill pipelines. We set a softer + threshold to allow flexibility in data delivery. + Parameters + ---------- + today : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_merge_day: int + The day of a week that we used to merge the backfill files. e.g. 0 + is Monday. + test_mode: bool + check_nd: int + The criteria of the number of unmerged files. Ideally, we want the + number to be 28, but we use a looser criteria from practical + considerations + """ + new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + if len(new_files) == 0: # if no any daily file is stored + return + + def get_date(file_link): + # Keep the function here consistent with the backfill path in + # function `store_backfill_file` + fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1] + return datetime.strptime(fn, "%Y%m%d") + + date_list = list(map(get_date, new_files)) + earliest_date = min(date_list) + latest_date = max(date_list) + + # Check whether to merge + # Check the number of files that are not merged + if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + return + + # Start to merge files + pdList = [] + for fn in new_files: + df = pd.read_parquet(fn, engine='pyarrow') + pdList.append(df) + merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) + path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%( + datetime.strftime(earliest_date, "%Y%m%d"), + datetime.strftime(latest_date, "%Y%m%d")) + merged_file.to_parquet(path, index=False) + + # Delete daily files once we have the merged one. + if not test_mode: + for fn in new_files: + os.remove(fn) + return diff --git a/claims_hosp/delphi_claims_hosp/load_data.py b/claims_hosp/delphi_claims_hosp/load_data.py index 505bfabc9..c2ee07e74 100644 --- a/claims_hosp/delphi_claims_hosp/load_data.py +++ b/claims_hosp/delphi_claims_hosp/load_data.py @@ -5,7 +5,6 @@ Created: 2020-09-27 """ - # third party import pandas as pd @@ -53,7 +52,6 @@ def load_claims_data(claims_filepath, dropdate, base_geo): return claims_data - def load_data(input_filepath, dropdate, base_geo): """ Load in claims data, and combine them. diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 6c7405a36..b1685cb00 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -20,6 +20,7 @@ from .modify_claims_drops import modify_and_write from .get_latest_claims_name import get_latest_filename from .update_indicator import ClaimsHospIndicatorUpdater +from .backfill import (store_backfill_file, merge_backfill_file) def run_module(params): @@ -89,6 +90,12 @@ def run_module(params): if params["indicator"]["start_date"] is not None: startdate = params["indicator"]['start_date'] + # Store backfill data + backfill_dir = params["indicator"]["backfill_dir"] + backfill_merge_day = params["indicator"]["backfill_merge_day"] + merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) + store_backfill_file(claims_file, dropdate_dt, backfill_dir) + # print out information logger.info("Loaded params", startdate = startdate, diff --git a/claims_hosp/params.json.template b/claims_hosp/params.json.template index e200fa8fc..67bfd4c43 100644 --- a/claims_hosp/params.json.template +++ b/claims_hosp/params.json.template @@ -9,6 +9,8 @@ "end_date": null, "drop_date": null, "n_backfill_days": 70, + "backfill_dir": "./backfill", + "backfill_merge_day": 0, "n_waiting_days": 3, "write_se": false, "obfuscated_prefix": "foo_obfuscated", diff --git a/claims_hosp/setup.py b/claims_hosp/setup.py index d7e46a13d..bc50a6414 100644 --- a/claims_hosp/setup.py +++ b/claims_hosp/setup.py @@ -4,6 +4,7 @@ required = [ "numpy", "pandas", + "pyarrow", "paramiko", "pydocstyle", "pytest", diff --git a/claims_hosp/tests/backfill/.gitignore b/claims_hosp/tests/backfill/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py new file mode 100644 index 000000000..fcd908461 --- /dev/null +++ b/claims_hosp/tests/test_backfill.py @@ -0,0 +1,97 @@ +import os +import glob +from datetime import datetime + +# third party +import pandas as pd +import pytest + +# first party +from delphi_claims_hosp.config import Config, GeoConstants +from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file + +CONFIG = Config() +CONSTANTS = GeoConstants() +PARAMS = { + "indicator": { + "input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", + "backfill_dir": "./backfill", + "drop_date": "2020-06-11", + } +} +DATA_FILEPATH = PARAMS["indicator"]["input_file"] +DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) +backfill_dir = PARAMS["indicator"]["backfill_dir"] + +class TestBackfill: + + def test_store_backfill_file(self): + dropdate = datetime(2020, 1, 1) + fn = "claims_hosp_as_of_20200101.parquet" + assert fn not in os.listdir(backfill_dir) + + # Store backfill file + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + assert fn in os.listdir(backfill_dir) + fn = "claims_hosp_as_of_20200101.parquet" + backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') + + selected_columns = ['time_value', 'fips', 'state_id', + 'num', 'den', 'lag', 'issue_date'] + assert set(selected_columns) == set(backfill_df.columns) + + os.remove(backfill_dir + "/" + fn) + assert fn not in os.listdir(backfill_dir) + + def test_merge_backfill_file(self): + + today = datetime.today() + + fn = "claims_hosp_from_20200611_to_20200614.parquet" + assert fn not in os.listdir(backfill_dir) + + # Check when there is no daily file to merge. + today = datetime(2020, 6, 14) + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=8) + assert fn not in os.listdir(backfill_dir) + + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + + # Check the when the merged file is not generated + today = datetime(2020, 6, 14) + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=8) + assert fn not in os.listdir(backfill_dir) + + # Generate the merged file, but not delete it + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=2) + assert fn in os.listdir(backfill_dir) + + # Read daily file + new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet") + pdList = [] + for file in new_files: + if "from" in file: + continue + df = pd.read_parquet(file, engine='pyarrow') + pdList.append(df) + os.remove(file) + new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet") + assert len(new_files) == 1 + + expected = pd.concat(pdList).sort_values(["time_value", "fips"]) + + # Read the merged file + merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow') + + assert set(expected.columns) == set(merged.columns) + assert expected.shape[0] == merged.shape[0] + assert expected.shape[1] == merged.shape[1] + + os.remove(backfill_dir + "/" + fn) + assert fn not in os.listdir(backfill_dir) diff --git a/claims_hosp/tests/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz b/claims_hosp/tests/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz index d932a5874..fab5cbfc3 100644 Binary files a/claims_hosp/tests/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz and b/claims_hosp/tests/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz differ