2
2
"""Simply downloads email attachments.
3
3
Uses this handy package: https://pypi.org/project/imap-tools/
4
4
"""
5
- from datetime import datetime , timedelta , date
6
5
import io
6
+ from os .path import join
7
7
import os
8
+ from datetime import datetime , timedelta
8
9
9
10
import pandas as pd
10
11
import numpy as np
11
12
12
13
from imap_tools import MailBox , A , AND
13
14
14
- def get_from_email (start_date : datetime . date , end_date : datetime . date ,
15
- mail_server : str , account : str , sender : str , password : str ):
15
+ def get_from_email (start_date , end_date , mail_server ,
16
+ account , sender , password ):
16
17
"""
17
18
Get raw data from email account
18
19
Args:
19
- start_date: datetime.date
20
+ start_date: datetime.datetime
20
21
pull data from email received from the start date
21
- end_date: datetime.date
22
+ end_date: datetime.datetime
22
23
pull data from email received on/before the end date
23
24
mail_server: str
24
25
account: str
@@ -39,13 +40,13 @@ def get_from_email(start_date: datetime.date, end_date: datetime.date,
39
40
with MailBox (mail_server ).login (account , password , 'INBOX' ) as mailbox :
40
41
for search_date in [start_date + timedelta (days = x )
41
42
for x in range ((end_date - start_date ).days + 1 )]:
42
- for message in mailbox .fetch (A (AND (date = search_date , from_ = sender ))):
43
+ for message in mailbox .fetch (A (AND (date = search_date . date () , from_ = sender ))):
43
44
for att in message .attachments :
44
45
name = att .filename
45
46
# Only consider covid tests
46
47
if "Sars" not in name :
47
48
continue
48
- print ("Pulling data received on %s" % search_date )
49
+ print ("Pulling data received on %s" % search_date . date () )
49
50
toread = io .BytesIO ()
50
51
toread .write (att .payload )
51
52
toread .seek (0 ) # reset the pointer
@@ -90,18 +91,18 @@ def fix_date(df):
90
91
df ["timestamp" ].values [mask ] = df ["StorageDate" ].values [mask ]
91
92
return df
92
93
93
- def pull_quidel_covidtest (start_date , end_date , mail_server , account ,
94
- sender , password , test_mode ):
94
+ def preprocess_new_data (start_date , end_date , mail_server , account ,
95
+ sender , password , test_mode ):
95
96
"""
96
97
Pull and pre-process Quidel Covid Test data from datadrop email.
97
98
Drop unnecessary columns. Temporarily consider the positive rate
98
99
sensor only which is related to number of total tests and number
99
100
of positive tests.
100
101
101
102
Args:
102
- start_date: datetime.date
103
+ start_date: datetime.datetime
103
104
pull data from email received from the start date
104
- end_date: datetime.date
105
+ end_date: datetime.datetime
105
106
pull data from email received on/before the end date
106
107
mail_server: str
107
108
account: str
@@ -119,7 +120,7 @@ def pull_quidel_covidtest(start_date, end_date, mail_server, account,
119
120
"""
120
121
if test_mode :
121
122
test_data_dir = "./test_data/test_data.xlsx"
122
- df , time_flag = pd .read_excel (test_data_dir ), date (2020 , 8 , 17 )
123
+ df , time_flag = pd .read_excel (test_data_dir ), datetime (2020 , 8 , 17 )
123
124
else :
124
125
# Get new data from email
125
126
df , time_flag = get_from_email (start_date , end_date , mail_server ,
@@ -166,12 +167,140 @@ def pull_quidel_covidtest(start_date, end_date, mail_server, account,
166
167
return df_merged , time_flag
167
168
168
169
def check_intermediate_file (cache_dir , pull_start_date ):
170
+ """
171
+ Check whether there is a cache file containing historical data already
172
+ """
169
173
for filename in os .listdir (cache_dir ):
170
174
if ".csv" in filename :
171
175
pull_start_date = datetime .strptime (filename .split ("_" )[2 ].split ("." )[0 ],
172
- '%Y%m%d' ). date () + timedelta (days = 1 )
176
+ '%Y%m%d' ) + timedelta (days = 1 )
173
177
previous_df = pd .read_csv (os .path .join (cache_dir , filename ),
174
178
sep = "," , parse_dates = ["timestamp" ])
175
- os .remove (os .path .join (cache_dir , filename ))
176
179
return previous_df , pull_start_date
177
180
return None , pull_start_date
181
+
182
+ def pull_quidel_covidtest (params ):
183
+ """
184
+ Pull the quidel covid test data. Decide whether to combine the newly
185
+ received data with stored historical records in ./cache
186
+
187
+ Parameters:
188
+ params: dict
189
+ including all the information read from params.json
190
+ END_FROM_TODAY_MINUS: int
191
+ report data until - X days
192
+ EXPORT_DAY_RANGE: int
193
+ number of dates to report
194
+
195
+ Returns:
196
+ DataFrame:
197
+ A data frame containinig the pre-process data with columns:
198
+ timestamp, numUniqueDevices, positiveTest, totalTest
199
+ datetime.datetime
200
+ the first date of the report
201
+ datetime.datetime
202
+ the last date of the report
203
+ """
204
+ cache_dir = params ["cache_dir" ]
205
+
206
+ mail_server = params ["mail_server" ]
207
+ account = params ["account" ]
208
+ password = params ["password" ]
209
+ sender = params ["sender" ]
210
+
211
+ test_mode = (params ["mode" ] == "test" )
212
+
213
+ # pull new data only that has not been ingested
214
+ previous_df , pull_start_date = check_intermediate_file (
215
+ cache_dir ,
216
+ datetime .strptime (params ["pull_start_date" ], '%Y-%m-%d' ))
217
+
218
+ if params ["pull_end_date" ] == "" :
219
+ pull_end_date = datetime .today ()
220
+ else :
221
+ pull_end_date = datetime .strptime (params ["pull_end_date" ], '%Y-%m-%d' )
222
+
223
+ # Pull data from the email at 5 digit zipcode level
224
+ # Use _end_date to check the most recent date that we received data
225
+ df , _end_date = preprocess_new_data (
226
+ pull_start_date , pull_end_date , mail_server ,
227
+ account , sender , password , test_mode )
228
+
229
+ # Utilize previously stored data
230
+ if previous_df is not None :
231
+ df = previous_df .append (df ).groupby (["timestamp" , "zip" ]).sum ().reset_index ()
232
+ return df , _end_date
233
+
234
+ def check_export_end_date (input_export_end_date , _end_date ,
235
+ END_FROM_TODAY_MINUS ):
236
+ """
237
+ Update the export_end_date according to the data received
238
+ By default, set the export end date to be the last pulling date - 5 days
239
+ (END_FROM_TODAY_MINUS = 5).
240
+ Otherwise, use the required date if it is earlier than the default one.
241
+
242
+ Parameter:
243
+ input_export_end_date: str
244
+ read from params
245
+ _end_date: datetime.datetime
246
+ updated according the data received
247
+ END_FROM_TODAY_MINUS: int
248
+ report data until - X days
249
+
250
+ Returns:
251
+ datetime.datetime
252
+ export data from which date
253
+ """
254
+ export_end_date = _end_date - timedelta (days = END_FROM_TODAY_MINUS )
255
+ if input_export_end_date != "" :
256
+ input_export_end_date = datetime .strptime (input_export_end_date , '%Y-%m-%d' )
257
+ if input_export_end_date < export_end_date :
258
+ return input_export_end_date
259
+ return export_end_date
260
+
261
+ def check_export_start_date (export_start_date , export_end_date ,
262
+ EXPORT_DAY_RANGE ):
263
+ """
264
+ Update the export_start_date according to the export_end_date so that it
265
+ could be export_end_date - EXPORT_DAY_RANGE
266
+
267
+ Parameters:
268
+ export_start_date: str
269
+ Read from params
270
+ export_end_date: datetime.datetime
271
+ Calculated according to the data received
272
+ EXPORT_DAY_RANGE: int
273
+ Number of days to report
274
+
275
+ Returns:
276
+ datetime.datetime
277
+ export data until which date
278
+ """
279
+ if export_start_date == "" :
280
+ export_start_date = datetime (2020 , 5 , 26 )
281
+ else :
282
+ export_start_date = datetime .strptime (export_start_date , '%Y-%m-%d' )
283
+ # Only export data from -45 days to -5 days
284
+ if (export_end_date - export_start_date ).days > EXPORT_DAY_RANGE :
285
+ export_start_date = export_end_date - timedelta (days = EXPORT_DAY_RANGE )
286
+
287
+ if export_start_date < datetime (2020 , 5 , 26 ):
288
+ return datetime (2020 , 5 , 26 )
289
+ return export_start_date
290
+
291
+ def update_cache_file (df , _end_date , cache_dir ):
292
+ """
293
+ Update cache file. Remove the old one, export the new one
294
+
295
+ Parameter:
296
+ df: pd.DataFrame
297
+ Pre-process file at ZipCode level
298
+ _end_date:
299
+ The most recent date when the raw data is received
300
+ cache_dir:
301
+ ./cache where the cache file is stored
302
+ """
303
+ for fn in os .listdir (cache_dir ):
304
+ if ".csv" in fn :
305
+ os .remove (join (cache_dir , fn ))
306
+ df .to_csv (join (cache_dir , "pulled_until_%s.csv" ) % _end_date .strftime ("%Y%m%d" ), index = False )
0 commit comments