Skip to content

Commit a0c9e3d

Browse files
authored
Merge pull request #1675 from cmu-delphi/backfill/claims_hosp
Backfill/claims_hosp
2 parents 3704566 + 6dcafa9 commit a0c9e3d

File tree

9 files changed

+236
-2
lines changed

9 files changed

+236
-2
lines changed

ansible/templates/claims_hosp-params-prod.json.j2

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
"start_date": "2020-02-01",
99
"end_date": null,
1010
"drop_date": null,
11+
"backfill_dir": "/common/backfill/claims_hosp",
12+
"backfill_merge_day": 0,
1113
"n_backfill_days": 70,
1214
"n_waiting_days": 3,
1315
"write_se": false,
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""
2+
Store backfill data.
3+
4+
Author: Jingjing Tang
5+
Created: 2022-08-03
6+
7+
"""
8+
import os
9+
import glob
10+
from datetime import datetime
11+
12+
# third party
13+
import pandas as pd
14+
from delphi_utils import GeoMapper
15+
16+
17+
from .config import Config
18+
19+
gmpr = GeoMapper()
20+
21+
def store_backfill_file(claims_filepath, _end_date, backfill_dir):
22+
"""
23+
Store county level backfill data into backfill_dir.
24+
25+
Parameter:
26+
claims_filepath: str
27+
path to the aggregated claims data
28+
_end_date: datetime
29+
The most recent date when the raw data is received
30+
backfill_dir: str
31+
specified path to store backfill files.
32+
"""
33+
backfilldata = pd.read_csv(
34+
claims_filepath,
35+
usecols=Config.CLAIMS_DTYPES.keys(),
36+
dtype=Config.CLAIMS_DTYPES,
37+
parse_dates=[Config.CLAIMS_DATE_COL],
38+
)
39+
backfilldata.rename({"ServiceDate": "time_value",
40+
"PatCountyFIPS": "fips",
41+
"Denominator": "den",
42+
"Covid_like": "num"},
43+
axis=1, inplace=True)
44+
backfilldata = gmpr.add_geocode(backfilldata, from_code="fips", new_code="state_id",
45+
from_col="fips", new_col="state_id")
46+
#Store one year's backfill data
47+
_start_date = _end_date.replace(year=_end_date.year-1)
48+
selected_columns = ['time_value', 'fips', 'state_id',
49+
'den', 'num']
50+
backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date)
51+
& (~backfilldata["fips"].isnull()),
52+
selected_columns]
53+
54+
backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]]
55+
backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d")
56+
backfilldata["issue_date"] = datetime.strftime(_end_date, "%Y-%m-%d")
57+
58+
backfilldata = backfilldata.astype({
59+
"time_value": "string",
60+
"issue_date": "string",
61+
"fips": "string",
62+
"state_id": "string"
63+
})
64+
65+
path = backfill_dir + \
66+
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
67+
# Store intermediate file into the backfill folder
68+
backfilldata.to_parquet(path, index=False)
69+
70+
def merge_backfill_file(backfill_dir, backfill_merge_day, today,
71+
test_mode=False, check_nd=25):
72+
"""
73+
Merge ~4 weeks' backfill data into one file.
74+
75+
Usually this function should merge 28 days' data into a new file so as to
76+
save the reading time when running the backfill pipelines. We set a softer
77+
threshold to allow flexibility in data delivery.
78+
Parameters
79+
----------
80+
today : datetime
81+
The most recent date when the raw data is received
82+
backfill_dir : str
83+
specified path to store backfill files.
84+
backfill_merge_day: int
85+
The day of a week that we used to merge the backfill files. e.g. 0
86+
is Monday.
87+
test_mode: bool
88+
check_nd: int
89+
The criteria of the number of unmerged files. Ideally, we want the
90+
number to be 28, but we use a looser criteria from practical
91+
considerations
92+
"""
93+
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
94+
if len(new_files) == 0: # if no any daily file is stored
95+
return
96+
97+
def get_date(file_link):
98+
# Keep the function here consistent with the backfill path in
99+
# function `store_backfill_file`
100+
fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1]
101+
return datetime.strptime(fn, "%Y%m%d")
102+
103+
date_list = list(map(get_date, new_files))
104+
earliest_date = min(date_list)
105+
latest_date = max(date_list)
106+
107+
# Check whether to merge
108+
# Check the number of files that are not merged
109+
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
110+
return
111+
112+
# Start to merge files
113+
pdList = []
114+
for fn in new_files:
115+
df = pd.read_parquet(fn, engine='pyarrow')
116+
pdList.append(df)
117+
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
118+
path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%(
119+
datetime.strftime(earliest_date, "%Y%m%d"),
120+
datetime.strftime(latest_date, "%Y%m%d"))
121+
merged_file.to_parquet(path, index=False)
122+
123+
# Delete daily files once we have the merged one.
124+
if not test_mode:
125+
for fn in new_files:
126+
os.remove(fn)
127+
return

claims_hosp/delphi_claims_hosp/load_data.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
Created: 2020-09-27
66
77
"""
8-
98
# third party
109
import pandas as pd
1110

@@ -53,7 +52,6 @@ def load_claims_data(claims_filepath, dropdate, base_geo):
5352

5453
return claims_data
5554

56-
5755
def load_data(input_filepath, dropdate, base_geo):
5856
"""
5957
Load in claims data, and combine them.

claims_hosp/delphi_claims_hosp/run.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from .modify_claims_drops import modify_and_write
2121
from .get_latest_claims_name import get_latest_filename
2222
from .update_indicator import ClaimsHospIndicatorUpdater
23+
from .backfill import (store_backfill_file, merge_backfill_file)
2324

2425

2526
def run_module(params):
@@ -89,6 +90,12 @@ def run_module(params):
8990
if params["indicator"]["start_date"] is not None:
9091
startdate = params["indicator"]['start_date']
9192

93+
# Store backfill data
94+
backfill_dir = params["indicator"]["backfill_dir"]
95+
backfill_merge_day = params["indicator"]["backfill_merge_day"]
96+
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
97+
store_backfill_file(claims_file, dropdate_dt, backfill_dir)
98+
9299
# print out information
93100
logger.info("Loaded params",
94101
startdate = startdate,

claims_hosp/params.json.template

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
"end_date": null,
1010
"drop_date": null,
1111
"n_backfill_days": 70,
12+
"backfill_dir": "./backfill",
13+
"backfill_merge_day": 0,
1214
"n_waiting_days": 3,
1315
"write_se": false,
1416
"obfuscated_prefix": "foo_obfuscated",

claims_hosp/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
required = [
55
"numpy",
66
"pandas",
7+
"pyarrow",
78
"paramiko",
89
"pydocstyle",
910
"pytest",

claims_hosp/tests/backfill/.gitignore

Whitespace-only changes.

claims_hosp/tests/test_backfill.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import os
2+
import glob
3+
from datetime import datetime
4+
5+
# third party
6+
import pandas as pd
7+
import pytest
8+
9+
# first party
10+
from delphi_claims_hosp.config import Config, GeoConstants
11+
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file
12+
13+
CONFIG = Config()
14+
CONSTANTS = GeoConstants()
15+
PARAMS = {
16+
"indicator": {
17+
"input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
18+
"backfill_dir": "./backfill",
19+
"drop_date": "2020-06-11",
20+
}
21+
}
22+
DATA_FILEPATH = PARAMS["indicator"]["input_file"]
23+
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
24+
backfill_dir = PARAMS["indicator"]["backfill_dir"]
25+
26+
class TestBackfill:
27+
28+
def test_store_backfill_file(self):
29+
dropdate = datetime(2020, 1, 1)
30+
fn = "claims_hosp_as_of_20200101.parquet"
31+
assert fn not in os.listdir(backfill_dir)
32+
33+
# Store backfill file
34+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
35+
assert fn in os.listdir(backfill_dir)
36+
fn = "claims_hosp_as_of_20200101.parquet"
37+
backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow')
38+
39+
selected_columns = ['time_value', 'fips', 'state_id',
40+
'num', 'den', 'lag', 'issue_date']
41+
assert set(selected_columns) == set(backfill_df.columns)
42+
43+
os.remove(backfill_dir + "/" + fn)
44+
assert fn not in os.listdir(backfill_dir)
45+
46+
def test_merge_backfill_file(self):
47+
48+
today = datetime.today()
49+
50+
fn = "claims_hosp_from_20200611_to_20200614.parquet"
51+
assert fn not in os.listdir(backfill_dir)
52+
53+
# Check when there is no daily file to merge.
54+
today = datetime(2020, 6, 14)
55+
merge_backfill_file(backfill_dir, today.weekday(), today,
56+
test_mode=True, check_nd=8)
57+
assert fn not in os.listdir(backfill_dir)
58+
59+
# Generate backfill daily files
60+
for d in range(11, 15):
61+
dropdate = datetime(2020, 6, d)
62+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
63+
64+
# Check the when the merged file is not generated
65+
today = datetime(2020, 6, 14)
66+
merge_backfill_file(backfill_dir, today.weekday(), today,
67+
test_mode=True, check_nd=8)
68+
assert fn not in os.listdir(backfill_dir)
69+
70+
# Generate the merged file, but not delete it
71+
merge_backfill_file(backfill_dir, today.weekday(), today,
72+
test_mode=True, check_nd=2)
73+
assert fn in os.listdir(backfill_dir)
74+
75+
# Read daily file
76+
new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet")
77+
pdList = []
78+
for file in new_files:
79+
if "from" in file:
80+
continue
81+
df = pd.read_parquet(file, engine='pyarrow')
82+
pdList.append(df)
83+
os.remove(file)
84+
new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet")
85+
assert len(new_files) == 1
86+
87+
expected = pd.concat(pdList).sort_values(["time_value", "fips"])
88+
89+
# Read the merged file
90+
merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow')
91+
92+
assert set(expected.columns) == set(merged.columns)
93+
assert expected.shape[0] == merged.shape[0]
94+
assert expected.shape[1] == merged.shape[1]
95+
96+
os.remove(backfill_dir + "/" + fn)
97+
assert fn not in os.listdir(backfill_dir)
Binary file not shown.

0 commit comments

Comments
 (0)