1
1
# -*- coding: utf-8 -*-
2
- """Simply downloads email attachments.
3
-
4
- Uses this handy package: https://pypi.org/project/imap-tools/
5
- """
6
- import io
2
+ """Collect and process Quidel export files."""
7
3
from os .path import join
8
4
import os
9
5
from datetime import datetime , timedelta
6
+ import boto3
10
7
11
8
import pandas as pd
12
9
import numpy as np
13
10
14
- from imap_tools import MailBox , A , AND
15
-
16
- def get_from_email (start_date , end_date , mail_server ,
17
- account , sender , password ):
11
+ def get_from_s3 (start_date , end_date , bucket ):
18
12
"""
19
- Get raw data from email account .
13
+ Get raw data from aws s3 bucket .
20
14
21
15
Args:
22
16
start_date: datetime.datetime
23
- pull data from email received from the start date
17
+ pull data from file tagged with date on/after the start date
24
18
end_date: datetime.datetime
25
- pull data from email received on/before the end date
26
- mail_server: str
27
- account: str
28
- email account to receive new data
29
- sender: str
30
- email account of the sender
31
- password: str
32
- password of the datadrop email
19
+ pull data from file tagged with date on/before the end date
20
+ bucket: s3.Bucket
21
+ the aws s3 bucket that stores quidel data
33
22
output:
34
23
df: pd.DataFrame
24
+ time_flag: datetime.datetime
35
25
"""
36
26
time_flag = None
37
- df = pd .DataFrame (columns = ['SofiaSerNum' , 'TestDate' , 'Facility' , 'City' ,
38
- 'State' , 'Zip' , 'PatientAge' , 'Result1' , 'Result2' ,
39
- 'OverallResult' , 'County' , 'FacilityType' , 'Assay' ,
40
- 'SCO1' , 'SCO2' , 'CLN' , 'CSN' , 'InstrType' ,
41
- 'StorageDate' , 'ResultId' , 'SarsTestNumber' ])
42
- with MailBox (mail_server ).login (account , password , 'INBOX' ) as mailbox :
43
- for search_date in [start_date + timedelta (days = x )
44
- for x in range ((end_date - start_date ).days + 1 )]:
45
- for message in mailbox .fetch (A (AND (date = search_date .date (), from_ = sender ))):
46
- for att in message .attachments :
47
- name = att .filename
48
- # Only consider covid tests
49
- if "Sars" not in name :
50
- continue
51
- print ("Pulling data received on %s" % search_date .date ())
52
- toread = io .BytesIO ()
53
- toread .write (att .payload )
54
- toread .seek (0 ) # reset the pointer
55
- newdf = pd .read_excel (toread ) # now read to dataframe
56
- df = df .append (newdf )
57
- time_flag = search_date
27
+ selected_columns = ['SofiaSerNum' , 'TestDate' , 'Facility' , 'City' ,
28
+ 'State' , 'Zip' , 'PatientAge' , 'Result1' ,
29
+ 'Result2' , 'OverallResult' , 'StorageDate' ,
30
+ 'fname' ]
31
+ df = pd .DataFrame (columns = selected_columns )
32
+ s3_files = {}
33
+ for obj in bucket .objects .all ():
34
+ if "-sars" in obj .key :
35
+ date_string = obj .key .split ("/" )[1 ]
36
+ yy = int (date_string .split ("_" )[0 ])
37
+ mm = int (date_string .split ("_" )[1 ])
38
+ dd = int (date_string .split ("_" )[2 ])
39
+ received_date = datetime (yy , mm , dd )
40
+ s3_files [received_date ] = obj .key
41
+
42
+ n_days = (end_date - start_date ).days + 1
43
+ for search_date in [start_date + timedelta (days = x ) for x in range (n_days )]:
44
+ if search_date in s3_files .keys ():
45
+ # Avoid appending duplicate datasets
46
+ if s3_files [search_date ] in set (df ["fname" ].values ):
47
+ continue
48
+ print ("Pulling data received on %s" % search_date .date ())
49
+ obj = bucket .Object (key = s3_files [search_date ])
50
+ newdf = pd .read_csv (obj .get ()["Body" ],
51
+ parse_dates = ["StorageDate" , "TestDate" ],
52
+ low_memory = False )
53
+ newdf ["fname" ] = s3_files [search_date ]
54
+ df = df .append (newdf [selected_columns ])
55
+ assert set (df .columns ) == set (selected_columns )
56
+ time_flag = search_date
58
57
return df , time_flag
59
58
60
59
def fix_zipcode (df ):
@@ -99,41 +98,45 @@ def fix_date(df):
99
98
df ["timestamp" ].values [mask ] = df ["StorageDate" ].values [mask ]
100
99
return df
101
100
102
- def preprocess_new_data (start_date , end_date , mail_server , account ,
103
- sender , password , test_mode ):
101
+ def preprocess_new_data (start_date , end_date , params , test_mode ):
104
102
"""
105
- Pull and pre-process Quidel Covid Test data from datadrop email .
103
+ Pull and pre-process Quidel Covid Test data.
106
104
107
105
Drop unnecessary columns. Temporarily consider the positive rate
108
106
sensor only which is related to number of total tests and number
109
107
of positive tests.
110
108
111
109
Args:
112
110
start_date: datetime.datetime
113
- pull data from email received from the start date
111
+ pull data from file tagged with date on/after start date
114
112
end_date: datetime.datetime
115
- pull data from email received on/before the end date
116
- mail_server: str
117
- account: str
118
- email account to receive new data
119
- sender: str
120
- email account of the sender
121
- password: str
122
- password of the datadrop email
113
+ pull data from file tagged with date on/before the end date
114
+ params: dict
115
+ read from params.json
123
116
test_mode: bool
124
- pull raw data from email or not
117
+ pull raw data from s3 or not
125
118
output:
126
119
df: pd.DataFrame
127
120
time_flag: datetime.date:
128
121
the actual pull end date on which we successfully pull the data
129
122
"""
130
123
if test_mode :
131
- test_data_dir = "./test_data/test_data.xlsx"
132
- df , time_flag = pd .read_excel (test_data_dir ), datetime (2020 , 8 , 17 )
124
+ test_data_dir = "./test_data/test_data.csv"
125
+ df , time_flag = pd .read_csv (
126
+ test_data_dir ,
127
+ parse_dates = ["StorageDate" , "TestDate" ]
128
+ ), datetime (2020 , 8 , 17 )
133
129
else :
134
- # Get new data from email
135
- df , time_flag = get_from_email (start_date , end_date , mail_server ,
136
- account , sender , password )
130
+ # connect aws s3 bucket
131
+ aws_access_key_id = params ["aws_credentials" ]["aws_access_key_id" ]
132
+ aws_secret_access_key = params ["aws_credentials" ]["aws_secret_access_key" ]
133
+ bucket_name = params ["bucket_name" ]
134
+
135
+ s3 = boto3 .resource ('s3' , aws_access_key_id = aws_access_key_id ,
136
+ aws_secret_access_key = aws_secret_access_key )
137
+ bucket = s3 .Bucket (bucket_name )
138
+ # Get new data from s3
139
+ df , time_flag = get_from_s3 (start_date , end_date , bucket )
137
140
138
141
# No new data can be pulled
139
142
if time_flag is None :
@@ -187,8 +190,9 @@ def check_intermediate_file(cache_dir, pull_start_date):
187
190
return None , pull_start_date
188
191
189
192
def pull_quidel_covidtest (params ):
190
- """
191
- Pull the quidel covid test data and ecide whether to combine the new data with stored historical records in ./cache.
193
+ """Pull the quidel covid test data.
194
+
195
+ Conditionally merge new data with historical data from ./cache.
192
196
193
197
Parameters:
194
198
params: dict
@@ -206,14 +210,10 @@ def pull_quidel_covidtest(params):
206
210
the first date of the report
207
211
datetime.datetime
208
212
the last date of the report
213
+
209
214
"""
210
215
cache_dir = params ["cache_dir" ]
211
216
212
- mail_server = params ["mail_server" ]
213
- account = params ["account" ]
214
- password = params ["password" ]
215
- sender = params ["sender" ]
216
-
217
217
test_mode = (params ["mode" ] == "test" )
218
218
219
219
# pull new data only that has not been ingested
@@ -226,11 +226,10 @@ def pull_quidel_covidtest(params):
226
226
else :
227
227
pull_end_date = datetime .strptime (params ["pull_end_date" ], '%Y-%m-%d' )
228
228
229
- # Pull data from the email at 5 digit zipcode level
229
+ # Pull data from the file at 5 digit zipcode level
230
230
# Use _end_date to check the most recent date that we received data
231
231
df , _end_date = preprocess_new_data (
232
- pull_start_date , pull_end_date , mail_server ,
233
- account , sender , password , test_mode )
232
+ pull_start_date , pull_end_date , params , test_mode )
234
233
235
234
# Utilize previously stored data
236
235
if previous_df is not None :
@@ -268,7 +267,7 @@ def check_export_end_date(input_export_end_date, _end_date,
268
267
def check_export_start_date (export_start_date , export_end_date ,
269
268
export_day_range ):
270
269
"""
271
- Update export_start_date according to the export_end_date so that it could be export_end_date - export_day_range .
270
+ Ensure that the starte date, end date, and day range are mutually consistent .
272
271
273
272
Parameters:
274
273
export_start_date: str
@@ -281,6 +280,7 @@ def check_export_start_date(export_start_date, export_end_date,
281
280
Returns:
282
281
datetime.datetime
283
282
export data until which date
283
+
284
284
"""
285
285
if export_start_date == "" :
286
286
export_start_date = datetime (2020 , 5 , 26 )
0 commit comments