Skip to content

Commit b71dd82

Browse files
committed
making backfill to monthly in progess
1 parent 7bd15be commit b71dd82

File tree

4 files changed

+28
-49
lines changed

4 files changed

+28
-49
lines changed

changehc/delphi_changehc/patch.py

Whitespace-only changes.

claims_hosp/delphi_claims_hosp/backfill.py

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010
import os
1111
import re
1212
import shutil
13-
from datetime import datetime
13+
from datetime import datetime, timedelta
1414
from typing import Union
1515

1616
# third party
1717
import pandas as pd
18+
import pytz
1819
from delphi_utils import GeoMapper
1920

2021
from .config import Config
@@ -94,11 +95,12 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge
9495

9596
def get_file_with_date(files) -> Union[str, None]:
9697
for filename in files:
97-
pattern = re.findall(r"\d{8}", filename)
98-
if len(pattern) == 2:
99-
start_date = datetime.strptime(pattern[0], "%Y%m%d")
100-
end_date = datetime.strptime(pattern[1], "%Y%m%d")
101-
if start_date <= issue_date or end_date <= issue_date:
98+
pattern = re.findall(r"\d{6}", filename)
99+
if len(pattern) == 1:
100+
file_month = datetime.strptime(pattern[0], "%Y%m")
101+
start_date = file_month.replace(day=1)
102+
end_date = (start_date + timedelta(days=32)).replace(day=1)
103+
if issue_date >= start_date and issue_date < end_date:
102104
return filename
103105
return ""
104106

@@ -125,27 +127,17 @@ def get_file_with_date(files) -> Union[str, None]:
125127
return
126128

127129

128-
def merge_backfill_file(backfill_dir, backfill_merge_day, most_recent, test_mode=False, check_nd=25):
130+
def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False):
129131
"""
130-
Merge ~4 weeks' backfill data into one file.
132+
Merge a month's source data into one file.
131133
132-
Usually this function should merge 28 days' data into a new file so as to
133-
save the reading time when running the backfill pipelines. We set a softer
134-
threshold to allow flexibility in data delivery.
135134
Parameters
136135
----------
137136
most_recent : datetime
138137
The most recent date when the raw data is received
139138
backfill_dir : str
140139
specified path to store backfill files.
141-
backfill_merge_day: int
142-
The day of a week that we used to merge the backfill files. e.g. 0
143-
is Monday.
144140
test_mode: bool
145-
check_nd: int
146-
The criteria of the number of unmerged files. Ideally, we want the
147-
number to be 28, but we use a looser criteria from practical
148-
considerations
149141
"""
150142
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
151143
if len(new_files) == 0: # if no any daily file is stored
@@ -158,23 +150,19 @@ def get_date(file_link):
158150
return datetime.strptime(fn, "%Y%m%d")
159151

160152
date_list = list(map(get_date, new_files))
161-
earliest_date = min(date_list)
162153
latest_date = max(date_list)
163-
164-
# Check whether to merge
165-
# Check the number of files that are not merged
166-
if most_recent.weekday() != backfill_merge_day or (most_recent - earliest_date).days <= check_nd:
154+
if latest_date.month == most_recent.month:
155+
logger.info("Not a new month; skipping merging")
167156
return
168157

158+
169159
# Start to merge files
170160
pdList = []
171161
for fn in new_files:
172162
df = pd.read_parquet(fn, engine='pyarrow')
173163
pdList.append(df)
174164
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
175-
path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%(
176-
datetime.strftime(earliest_date, "%Y%m%d"),
177-
datetime.strftime(latest_date, "%Y%m%d"))
165+
path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet"
178166
merged_file.to_parquet(path, index=False)
179167

180168
# Delete daily files once we have the merged one.

claims_hosp/delphi_claims_hosp/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def run_module(params, logger=None):
109109
merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger)
110110

111111
else:
112-
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
112+
merge_backfill_file(backfill_dir, datetime.today())
113113
store_backfill_file(claims_file, dropdate_dt, backfill_dir)
114114

115115
# print out information

claims_hosp/tests/test_backfill.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import glob
44
from datetime import datetime
55
from pathlib import Path
6-
import shutil
76

87
# third party
98
import pandas as pd
@@ -49,32 +48,24 @@ def test_store_backfill_file(self):
4948
assert fn not in os.listdir(backfill_dir)
5049

5150
def test_merge_backfill_file(self):
52-
53-
today = datetime.today()
54-
55-
fn = "claims_hosp_from_20200611_to_20200614.parquet"
51+
fn = "claims_hosp_202006.parquet"
5652
assert fn not in os.listdir(backfill_dir)
5753

5854
# Check when there is no daily file to merge.
5955
today = datetime(2020, 6, 14)
60-
merge_backfill_file(backfill_dir, today.weekday(), today,
61-
test_mode=True, check_nd=8)
56+
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
57+
test_mode=True)
6258
assert fn not in os.listdir(backfill_dir)
6359

6460
# Generate backfill daily files
6561
for d in range(11, 15):
6662
dropdate = datetime(2020, 6, d)
6763
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
6864

69-
# Check the when the merged file is not generated
70-
today = datetime(2020, 6, 14)
71-
merge_backfill_file(backfill_dir, today.weekday(), today,
72-
test_mode=True, check_nd=8)
73-
assert fn not in os.listdir(backfill_dir)
74-
75-
# Generate the merged file, but not delete it
76-
merge_backfill_file(backfill_dir, today.weekday(), today,
77-
test_mode=True, check_nd=2)
65+
# Check when the merged file is not generated
66+
today = datetime(2020, 7, 1)
67+
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
68+
test_mode=True)
7869
assert fn in os.listdir(backfill_dir)
7970

8071
# Read daily file
@@ -112,15 +103,15 @@ def prep_backfill_data():
112103

113104
today = datetime(2020, 6, 14)
114105
# creating expected file
115-
merge_backfill_file(backfill_dir, today.weekday(), today,
116-
test_mode=True, check_nd=2)
106+
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
107+
test_mode=True)
117108
original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet"
118109
os.rename(original, f"{backfill_dir}/expected.parquet")
119110

120111
# creating backfill without issue date
121112
os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet")
122113
today = datetime(2020, 6, 14)
123-
merge_backfill_file(backfill_dir, today.weekday(), today,
114+
merge_backfill_file(backfill_dir, today,
124115
test_mode=True, check_nd=2)
125116

126117
old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
@@ -141,7 +132,7 @@ def prep_backfill_data():
141132

142133

143134
def test_merge_existing_backfill_files_no_call(self):
144-
issue_date = datetime(year=2020, month=6, day=20)
135+
issue_date = datetime(year=2020, month=5, day=20)
145136
issue_date_str = issue_date.strftime("%Y%m%d")
146137
def prep_backfill_data():
147138
# Generate backfill daily files
@@ -151,8 +142,8 @@ def prep_backfill_data():
151142

152143
today = datetime(2020, 6, 14)
153144
# creating expected file
154-
merge_backfill_file(backfill_dir, today.weekday(), today,
155-
test_mode=True, check_nd=8)
145+
merge_backfill_file(backfill_dir, today, TEST_LOGGER,
146+
test_mode=True)
156147

157148
prep_backfill_data()
158149
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir)

0 commit comments

Comments
 (0)