Skip to content

Commit 65a06d8

Browse files
committed
added logging and more clean up
1 parent eed2a63 commit 65a06d8

File tree

2 files changed

+92
-67
lines changed

2 files changed

+92
-67
lines changed

claims_hosp/delphi_claims_hosp/backfill.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
gmpr = GeoMapper()
2424

25-
def store_backfill_file(claims_filepath, _end_date, backfill_dir):
25+
def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger):
2626
"""
2727
Store county level backfill data into backfill_dir.
2828
@@ -57,6 +57,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir):
5757
backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date)
5858
& (~backfilldata["fips"].isnull()),
5959
selected_columns]
60+
logger.info("Filtering backfill data", startdate=_start_date, enddate=_end_date)
6061

6162
backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]]
6263
backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d")
@@ -69,10 +70,15 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir):
6970
"state_id": "string"
7071
})
7172

72-
path = backfill_dir + \
73-
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
73+
filename = "claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
74+
path = f"{backfill_dir}/{filename}"
75+
7476
# Store intermediate file into the backfill folder
75-
backfilldata.to_parquet(path, index=False)
77+
try:
78+
backfilldata.to_parquet(path, index=False)
79+
logger.info("Stored backfill data in parquet", filename=filename)
80+
except:
81+
logger.info("Failed to store backfill data in parquet", )
7682
return path
7783

7884

@@ -90,26 +96,29 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge
9096
backfill_dir : str
9197
specified path to store backfill files.
9298
backfill_file : str
99+
specific file add to merged backfill file.
93100
"""
94101
new_files = glob.glob(backfill_dir + "/claims_hosp_*")
95102

96103
def get_file_with_date(files) -> Union[str, None]:
97104
for filename in files:
98-
pattern = re.findall(r"\d{6}", filename)
99-
if len(pattern) == 1:
100-
file_month = datetime.strptime(pattern[0], "%Y%m")
101-
start_date = file_month.replace(day=1)
102-
end_date = (start_date + timedelta(days=32)).replace(day=1)
103-
if issue_date >= start_date and issue_date < end_date:
105+
# need to only match files with 6 digits for merged files
106+
pattern = re.findall(r"_(\d{6,6})\.parquet", filename)
107+
if pattern:
108+
file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1)
109+
end_date = (file_month + timedelta(days=32)).replace(day=1)
110+
if issue_date >= file_month and issue_date < end_date:
104111
return filename
105112
return ""
106113

107114
file_name = get_file_with_date(new_files)
108115

109116
if len(file_name) == 0:
110-
logger.info("patch file is too recent to merge", issue_date=issue_date.strftime("%Y-%m-%d"))
117+
logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d"))
111118
return
112119

120+
logger.info("Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name)
121+
113122
# Start to merge files
114123
merge_file = f"{file_name.split('.')[0]}_after_merge.parquet"
115124
try:
@@ -139,8 +148,10 @@ def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False):
139148
specified path to store backfill files.
140149
test_mode: bool
141150
"""
142-
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
151+
previous_month = (most_recent.replace(day=1) - timedelta(days=1)).strftime("%Y%m")
152+
new_files = glob.glob(backfill_dir + f"/claims_hosp_as_of_{previous_month}*")
143153
if len(new_files) == 0: # if no any daily file is stored
154+
logger.info("No new files to merge; skipping merging")
144155
return
145156

146157
def get_date(file_link):
@@ -155,7 +166,7 @@ def get_date(file_link):
155166
logger.info("Not a new month; skipping merging")
156167
return
157168

158-
169+
logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1])
159170
# Start to merge files
160171
pdList = []
161172
for fn in new_files:

claims_hosp/tests/test_backfill.py

Lines changed: 68 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import pytest
1010

1111
# first party
12+
from delphi_utils.logger import get_structured_logger
1213
from delphi_claims_hosp.config import Config, GeoConstants
1314
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files
1415

@@ -25,133 +26,146 @@
2526
DATA_FILEPATH = PARAMS["indicator"]["input_file"]
2627
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
2728
backfill_dir = PARAMS["indicator"]["backfill_dir"]
28-
TEST_LOGGER = logging.getLogger()
2929

3030
class TestBackfill:
3131

32-
def test_store_backfill_file(self):
33-
dropdate = datetime(2020, 1, 1)
32+
def cleanup(self):
33+
for file in glob.glob(f"{backfill_dir}/*.parquet"):
34+
os.remove(file)
35+
36+
def test_store_backfill_file(self, caplog):
37+
dropdate = datetime(2020, 1, 1)
3438
fn = "claims_hosp_as_of_20200101.parquet"
35-
assert fn not in os.listdir(backfill_dir)
36-
39+
caplog.set_level(logging.INFO)
40+
logger = get_structured_logger()
41+
num_rows = len(pd.read_csv(DATA_FILEPATH))
42+
3743
# Store backfill file
38-
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
44+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger)
3945
assert fn in os.listdir(backfill_dir)
46+
assert "Stored backfill data in parquet" in caplog.text
47+
48+
4049
fn = "claims_hosp_as_of_20200101.parquet"
4150
backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow')
42-
51+
4352
selected_columns = ['time_value', 'fips', 'state_id',
4453
'num', 'den', 'lag', 'issue_date']
45-
assert set(selected_columns) == set(backfill_df.columns)
46-
47-
os.remove(backfill_dir + "/" + fn)
48-
assert fn not in os.listdir(backfill_dir)
54+
55+
assert set(selected_columns) == set(backfill_df.columns)
56+
assert num_rows == len(backfill_df)
57+
58+
self.cleanup()
4959

50-
def test_merge_backfill_file(self):
60+
def test_merge_backfill_file(self, caplog):
5161
fn = "claims_hosp_202006.parquet"
52-
assert fn not in os.listdir(backfill_dir)
53-
62+
caplog.set_level(logging.INFO)
63+
logger = get_structured_logger()
64+
5465
# Check when there is no daily file to merge.
5566
today = datetime(2020, 6, 14)
56-
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
67+
merge_backfill_file(backfill_dir, today, logger,
5768
test_mode=True)
5869
assert fn not in os.listdir(backfill_dir)
59-
60-
# Generate backfill daily files
70+
assert "No new files to merge; skipping merging" in caplog.text
71+
72+
73+
# Generate backfill daily files
6174
for d in range(11, 15):
62-
dropdate = datetime(2020, 6, d)
63-
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
64-
65-
# Check when the merged file is not generated
75+
dropdate = datetime(2020, 6, d)
76+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger)
77+
6678
today = datetime(2020, 7, 1)
67-
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
79+
merge_backfill_file(backfill_dir, today, logger,
6880
test_mode=True)
81+
assert "Merging files" in caplog.text
6982
assert fn in os.listdir(backfill_dir)
7083

7184
# Read daily file
7285
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*.parquet")
73-
pdList = []
86+
pdList = []
7487
for file in new_files:
75-
if "from" in file:
76-
continue
7788
df = pd.read_parquet(file, engine='pyarrow')
7889
pdList.append(df)
7990
os.remove(file)
8091
new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet")
8192
assert len(new_files) == 1
8293

8394
expected = pd.concat(pdList).sort_values(["time_value", "fips"])
84-
95+
8596
# Read the merged file
8697
merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow')
87-
98+
8899
assert set(expected.columns) == set(merged.columns)
89100
assert expected.shape[0] == merged.shape[0]
90101
assert expected.shape[1] == merged.shape[1]
91-
92-
os.remove(backfill_dir + "/" + fn)
93-
assert fn not in os.listdir(backfill_dir)
94102

95-
def test_merge_existing_backfill_files(self):
103+
self.cleanup()
104+
105+
def test_merge_existing_backfill_files(self, caplog):
96106
issue_date = datetime(year=2020, month=6, day=13)
97107
issue_date_str = issue_date.strftime("%Y%m%d")
108+
caplog.set_level(logging.INFO)
109+
logger = get_structured_logger()
98110
def prep_backfill_data():
99111
# Generate backfill daily files
100112
for d in range(11, 15):
101113
dropdate = datetime(2020, 6, d)
102-
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
114+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger)
103115

104-
today = datetime(2020, 6, 14)
116+
today = datetime(2020, 7, 1)
105117
# creating expected file
106-
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
118+
merge_backfill_file(backfill_dir, today, logger,
107119
test_mode=True)
108-
original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet"
120+
original = f"{backfill_dir}/claims_hosp_202006.parquet"
109121
os.rename(original, f"{backfill_dir}/expected.parquet")
110122

111123
# creating backfill without issue date
112124
os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet")
113-
today = datetime(2020, 6, 14)
114-
merge_backfill_file(backfill_dir, today,
115-
test_mode=True, check_nd=2)
125+
merge_backfill_file(backfill_dir, today, logger,
126+
test_mode=True)
116127

117128
old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
118129
for file in old_files:
119130
os.remove(file)
120131

121132
prep_backfill_data()
122-
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir)
123-
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER)
133+
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger)
134+
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger)
135+
136+
assert "Adding missing date to merged file" in caplog.text
124137

125138
expected = pd.read_parquet(f"{backfill_dir}/expected.parquet")
126-
merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet")
139+
merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_202006.parquet")
127140

128-
check_diff = expected.merge(merged, how='left', indicator=True)
129-
assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0]
130-
for file in glob.glob(backfill_dir + "/*.parquet"):
131-
os.remove(file)
141+
check = pd.concat([merged, expected]).drop_duplicates(keep=False)
132142

143+
assert len(check) == 0
133144

134-
def test_merge_existing_backfill_files_no_call(self):
145+
self.cleanup()
146+
147+
148+
def test_merge_existing_backfill_files_no_call(self, caplog):
135149
issue_date = datetime(year=2020, month=5, day=20)
136-
issue_date_str = issue_date.strftime("%Y%m%d")
150+
caplog.set_level(logging.INFO)
151+
logger = get_structured_logger()
137152
def prep_backfill_data():
138153
# Generate backfill daily files
139154
for d in range(11, 15):
140155
dropdate = datetime(2020, 6, d)
141-
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
156+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger)
142157

143158
today = datetime(2020, 6, 14)
144159
# creating expected file
145-
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
160+
merge_backfill_file(backfill_dir, today, logger,
146161
test_mode=True)
147162

148163
prep_backfill_data()
149-
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir)
150-
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER)
164+
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger)
165+
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger)
166+
assert "Issue date has no matching merged files" in caplog.text
151167

152-
old_files = glob.glob(backfill_dir + "*.parquet")
153-
for file in old_files:
154-
os.remove(file)
168+
self.cleanup()
155169

156170

157171

0 commit comments

Comments
 (0)