Skip to content

Commit 6995c8a

Browse files
committed
added conditional for merging
1 parent 65a06d8 commit 6995c8a

File tree

2 files changed

+34
-7
lines changed

2 files changed

+34
-7
lines changed

claims_hosp/delphi_claims_hosp/backfill.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import shutil
1313
from datetime import datetime, timedelta
1414
from typing import Union
15+
import calendar
1516

1617
# third party
1718
import pandas as pd
@@ -57,7 +58,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger):
5758
backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date)
5859
& (~backfilldata["fips"].isnull()),
5960
selected_columns]
60-
logger.info("Filtering backfill data", startdate=_start_date, enddate=_end_date)
61+
logger.info("Filtering source data", startdate=_start_date, enddate=_end_date)
6162

6263
backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]]
6364
backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d")
@@ -76,9 +77,9 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger):
7677
# Store intermediate file into the backfill folder
7778
try:
7879
backfilldata.to_parquet(path, index=False)
79-
logger.info("Stored backfill data in parquet", filename=filename)
80+
logger.info("Stored source data in parquet", filename=filename)
8081
except:
81-
logger.info("Failed to store backfill data in parquet", )
82+
logger.info("Failed to store source data in parquet")
8283
return path
8384

8485

@@ -162,8 +163,9 @@ def get_date(file_link):
162163

163164
date_list = list(map(get_date, new_files))
164165
latest_date = max(date_list)
165-
if latest_date.month == most_recent.month:
166-
logger.info("Not a new month; skipping merging")
166+
num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1]
167+
if len(date_list) < num_of_days_in_month:
168+
logger.info("Not enough days, skipping merging", n_file_days=len(date_list))
167169
return
168170

169171
logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1])

claims_hosp/tests/test_backfill.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import calendar
12
import logging
23
import os
34
import glob
@@ -57,7 +58,7 @@ def test_store_backfill_file(self, caplog):
5758

5859
self.cleanup()
5960

60-
def test_merge_backfill_file(self, caplog):
61+
def test_merge_backfill_file(self, caplog, monkeypatch):
6162
fn = "claims_hosp_202006.parquet"
6263
caplog.set_level(logging.INFO)
6364
logger = get_structured_logger()
@@ -76,6 +77,7 @@ def test_merge_backfill_file(self, caplog):
7677
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger)
7778

7879
today = datetime(2020, 7, 1)
80+
monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4))
7981
merge_backfill_file(backfill_dir, today, logger,
8082
test_mode=True)
8183
assert "Merging files" in caplog.text
@@ -102,6 +104,29 @@ def test_merge_backfill_file(self, caplog):
102104

103105
self.cleanup()
104106

107+
def test_merge_backfill_file_no_call(self, caplog):
108+
fn = "claims_hosp_202006.parquet"
109+
caplog.set_level(logging.INFO)
110+
logger = get_structured_logger()
111+
112+
# Check when there is no daily file to merge.
113+
today = datetime(2020, 6, 14)
114+
merge_backfill_file(backfill_dir, today, logger,
115+
test_mode=True)
116+
assert fn not in os.listdir(backfill_dir)
117+
assert "No new files to merge; skipping merging" in caplog.text
118+
119+
# Generate backfill daily files
120+
for d in range(11, 15):
121+
dropdate = datetime(2020, 6, d)
122+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger)
123+
124+
today = datetime(2020, 7, 1)
125+
merge_backfill_file(backfill_dir, today, logger,
126+
test_mode=True)
127+
assert "Not enough days, skipping merging" in caplog.text
128+
self.cleanup()
129+
105130
def test_merge_existing_backfill_files(self, caplog):
106131
issue_date = datetime(year=2020, month=6, day=13)
107132
issue_date_str = issue_date.strftime("%Y%m%d")
@@ -164,8 +189,8 @@ def prep_backfill_data():
164189
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger)
165190
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger)
166191
assert "Issue date has no matching merged files" in caplog.text
167-
168192
self.cleanup()
169193

170194

171195

196+

0 commit comments

Comments
 (0)