-
Notifications
You must be signed in to change notification settings - Fork 16
Backfill/claims_hosp #1675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Backfill/claims_hosp #1675
Changes from 6 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
c2e8190
add code for backfill
d74b187
add issue_date and lag to backfill file
64bc15e
Update ansible file for claims_hosp
jingjtang de888aa
Fix the name of backfill path
jingjtang 983fde8
Use get_date() for issue date
jingjtang a8dc212
Finish the comment for merging
jingjtang c2395b7
remove index arg when saving parquet files; add state_id for backfill
9308071
fix the bug
abc20b2
use smaller test files
90fed9f
add the correct test file
c84ea3f
delete backfill test files
3d02733
add the case when there is no daily file stored
2f48fed
fix the test error
530b687
move lag and issue_date setting to daily files
nmdefries 6dcafa9
Merge branch 'main' into backfill/claims_hosp
nmdefries File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
""" | ||
Store backfill data. | ||
|
||
Author: Jingjing Tang | ||
Created: 2022-08-03 | ||
|
||
""" | ||
import os | ||
import glob | ||
from datetime import datetime | ||
|
||
# third party | ||
import pandas as pd | ||
|
||
# first party | ||
from .config import Config | ||
|
||
def store_backfill_file(claims_filepath, _end_date, backfill_dir): | ||
""" | ||
Store county level backfill data into backfill_dir. | ||
|
||
Parameter: | ||
claims_filepath: str | ||
path to the aggregated claims data | ||
_end_date: datetime | ||
The most recent date when the raw data is received | ||
backfill_dir: str | ||
specified path to store backfill files. | ||
""" | ||
backfilldata = pd.read_csv( | ||
claims_filepath, | ||
usecols=Config.CLAIMS_DTYPES.keys(), | ||
dtype=Config.CLAIMS_DTYPES, | ||
parse_dates=[Config.CLAIMS_DATE_COL], | ||
) | ||
|
||
backfilldata.rename({"ServiceDate": "time_value", | ||
"PatCountyFIPS": "fips", | ||
"Denominator": "den", | ||
"Covid_like": "num"}, | ||
axis=1, inplace=True) | ||
#Store one year's backfill data | ||
_start_date = _end_date.replace(year=_end_date.year-1) | ||
selected_columns = ['time_value', 'fips', | ||
'den', 'num'] | ||
backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date) | ||
& (~backfilldata["fips"].isnull()), | ||
selected_columns] | ||
path = backfill_dir + \ | ||
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") | ||
# Store intermediate file into the backfill folder | ||
backfilldata.to_parquet(path) | ||
nmdefries marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def merge_backfill_file(backfill_dir, backfill_merge_day, today, | ||
test_mode=False, check_nd=25): | ||
""" | ||
Merge ~4 weeks' backfill data into one file. | ||
|
||
Usually this function should merge 28 days' data into a new file so as to | ||
save the reading time when running the backfill pipelines. We set a softer | ||
threshold to allow flexibility in data delivery. | ||
Parameters | ||
---------- | ||
today : datetime | ||
The most recent date when the raw data is received | ||
backfill_dir : str | ||
specified path to store backfill files. | ||
backfill_merge_day: int | ||
The day of a week that we used to merge the backfill files. e.g. 0 | ||
is Monday. | ||
test_mode: bool | ||
check_nd: int | ||
The criteria of the number of unmerged files. Ideally, we want the | ||
number to be 28, but we use a looser criteria from practical | ||
considerations | ||
""" | ||
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") | ||
|
||
def get_date(file_link): | ||
# Keep the function here consistent with the backfill path in | ||
# function `store_backfill_file` | ||
fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1] | ||
return datetime.strptime(fn, "%Y%m%d") | ||
krivard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
date_list = list(map(get_date, new_files)) | ||
earliest_date = min(date_list) | ||
latest_date = max(date_list) | ||
|
||
# Check whether to merge | ||
# Check the number of files that are not merged | ||
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: | ||
return | ||
|
||
# Start to merge files | ||
pdList = [] | ||
for fn in new_files: | ||
df = pd.read_parquet(fn, engine='pyarrow') | ||
issue_date = get_date(fn) | ||
df["issue_date"] = issue_date | ||
df["lag"] = [(issue_date - x).days for x in df["time_value"]] | ||
pdList.append(df) | ||
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) | ||
path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%( | ||
datetime.strftime(earliest_date, "%Y%m%d"), | ||
datetime.strftime(latest_date, "%Y%m%d")) | ||
merged_file.to_parquet(path) | ||
nmdefries marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Delete daily files once we have the merged one. | ||
if not test_mode: | ||
for fn in new_files: | ||
os.remove(fn) | ||
return |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
required = [ | ||
"numpy", | ||
"pandas", | ||
"pyarrow", | ||
"paramiko", | ||
"pydocstyle", | ||
"pytest", | ||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
import os | ||
import glob | ||
from datetime import datetime | ||
|
||
# third party | ||
import pandas as pd | ||
import pytest | ||
|
||
# first party | ||
from delphi_claims_hosp.config import Config, GeoConstants | ||
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file | ||
|
||
CONFIG = Config() | ||
CONSTANTS = GeoConstants() | ||
PARAMS = { | ||
"indicator": { | ||
"input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", | ||
"backfill_dir": "./backfill", | ||
"drop_date": "2020-06-11", | ||
} | ||
} | ||
DATA_FILEPATH = PARAMS["indicator"]["input_file"] | ||
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) | ||
backfill_dir = PARAMS["indicator"]["backfill_dir"] | ||
|
||
class TestBackfill: | ||
|
||
def test_store_backfill_file(self): | ||
dropdate = datetime(2020, 1, 1) | ||
fn = "claims_hosp_as_of_20200101.parquet" | ||
assert fn not in os.listdir(backfill_dir) | ||
|
||
# Store backfill file | ||
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) | ||
assert fn in os.listdir(backfill_dir) | ||
fn = "claims_hosp_as_of_20200101.parquet" | ||
backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') | ||
|
||
selected_columns = ['time_value', 'fips', | ||
'num', 'den'] | ||
assert set(selected_columns) == set(backfill_df.columns) | ||
|
||
os.remove(backfill_dir + "/" + fn) | ||
assert fn not in os.listdir(backfill_dir) | ||
|
||
def test_merge_backfill_file(self): | ||
|
||
today = datetime.today() | ||
|
||
new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet") | ||
krivard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn = "claims_hosp_from_20200611_to_20200614.parquet" | ||
assert fn not in os.listdir(backfill_dir) | ||
|
||
# Check the when the merged file is not generated | ||
today = datetime(2020, 6, 14) | ||
merge_backfill_file(backfill_dir, today.weekday(), today, | ||
test_mode=True, check_nd=8) | ||
assert fn not in os.listdir(backfill_dir) | ||
|
||
# Generate the merged file, but not delete it | ||
merge_backfill_file(backfill_dir, today.weekday(), today, | ||
test_mode=True, check_nd=2) | ||
assert fn in os.listdir(backfill_dir) | ||
|
||
# Read daily file | ||
pdList = [] | ||
for file in new_files: | ||
df = pd.read_parquet(file, engine='pyarrow') | ||
issue_date = datetime.strptime(file[-16:-8], "%Y%m%d") | ||
df["issue_date"] = issue_date | ||
df["lag"] = [(issue_date - x).days for x in df["time_value"]] | ||
pdList.append(df) | ||
expected = pd.concat(pdList).sort_values(["time_value", "fips"]) | ||
|
||
# Read the merged file | ||
merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow') | ||
|
||
assert set(expected.columns) == set(merged.columns) | ||
assert expected.shape[0] == merged.shape[0] | ||
assert expected.shape[1] == merged.shape[1] | ||
|
||
os.remove(backfill_dir + "/" + fn) | ||
assert fn not in os.listdir(backfill_dir) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.