1
1
"""Retrieve data and wrangle into appropriate format."""
2
2
# -*- coding: utf-8 -*-
3
3
import re
4
-
5
4
from datetime import date , datetime , timedelta # pylint: disable=unused-import
6
- from os import listdir , makedirs
7
- from os .path import isfile , join , exists
8
-
9
5
import pandas_gbq
10
6
from google .oauth2 import service_account
11
7
import numpy as np
@@ -98,80 +94,40 @@ def preprocess(df, level):
98
94
return df
99
95
100
96
101
- def get_missing_dates ( receiving_dir , export_start_date ):
102
- """Produce list of dates to retrieve data for.
97
+ def get_date_range ( export_start_date , retrieve_days_before_now ):
98
+ """Produce date range to retrieve data for.
103
99
104
- Date list is created based on dates seen in already exported CSVs.
100
+ Calculate start of date range as a static offset from the end date
101
+ ("now"). Pad date range by an additional 7 days before the earliest
102
+ date to produce data for calculating smoothed estimates.
105
103
106
104
Parameters
107
105
----------
108
- receiving_dir: str
109
- path to output directory
110
- export_start_date: date
111
- first date to retrieve data for
112
-
113
- Returns
114
- -------
115
- list
116
- """
117
- if not exists (receiving_dir ):
118
- makedirs (receiving_dir )
119
-
120
- OUTPUT_NAME_PATTERN = re .compile ("^[0-9]{8}_.*[.]csv" )
121
- existing_output_files = [f for f in listdir (receiving_dir ) if isfile (
122
- join (receiving_dir , f )) and OUTPUT_NAME_PATTERN .match (f )]
123
-
124
- existing_output_dates = {datetime .strptime (f [0 :8 ], "%Y%m%d" ).date ()
125
- for f in existing_output_files }
126
- expected_dates = {date .date () for date in pd .date_range (
127
- start = export_start_date ,
128
- end = date .today (),
129
- freq = 'D' )}
130
-
131
- missing_dates = list (expected_dates .difference (existing_output_dates ))
132
-
133
- return missing_dates
134
-
135
-
136
- def get_all_dates (receiving_dir , export_start_date ):
137
- """Pad missing dates with enough extra days to do smoothing.
138
-
139
- Using the missing_dates list as reference, creates a new list of dates
140
- spanning 6 days before the earliest date in missing_dates to today. This
141
- pads missing_dates with enough prior days to produce smoothed estimates
142
- starting on min(missing_dates) and fills in any gaps in missing_dates.
143
-
144
- Parameters
145
- ----------
146
- receiving_dir: str
147
- path to output directory
148
106
export_start_date: date
149
107
first date to retrieve data for
108
+ retrieve_days_before_now: int
109
+ number of days before end date ("now") to export
150
110
151
111
Returns
152
112
-------
153
113
list
154
114
"""
155
115
PAD_DAYS = 7
156
116
157
- missing_dates = get_missing_dates (receiving_dir , export_start_date )
158
- if len (missing_dates ) == 0 :
159
- return missing_dates
160
-
161
- # Calculate list start date to avoid getting data before the
162
- # user-set start date. Convert both dates/datetimes to date to avoid error
163
- # from trying to compare different types.
117
+ end_date = date .today ()
118
+ # Don't fetch data before the user-set start date. Convert both
119
+ # dates/datetimes to date to avoid error from trying to compare
120
+ # different types.
164
121
start_date = max (
165
- min ( missing_dates ) - timedelta (days = PAD_DAYS - 1 ),
122
+ end_date - timedelta (days = retrieve_days_before_now ),
166
123
export_start_date .date ()
167
124
)
168
125
169
- retrieve_dates = {date .date () for date in pd .date_range (
170
- start = start_date ,
171
- end = date .today (),
172
- freq = 'D' )}
126
+ retrieve_dates = [
127
+ start_date - timedelta (days = PAD_DAYS - 1 ),
128
+ end_date ]
173
129
174
- return list ( retrieve_dates )
130
+ return retrieve_dates
175
131
176
132
177
133
def format_dates_for_query (date_list ):
@@ -187,30 +143,22 @@ def format_dates_for_query(date_list):
187
143
188
144
Returns
189
145
-------
190
- str: "timestamp(" YYYY-MM-DD"), ..."
146
+ list[ str]: [" YYYY-MM-DD"), "YYYY-MM-DD"]
191
147
"""
192
- earliest_available_symptom_search_year = 2017
193
-
194
- filtered_date_strings = [datetime .strftime (date , "%Y-%m-%d" )
195
- for date in date_list
196
- if date .year >= earliest_available_symptom_search_year ]
197
-
198
- # Convert list of dates into list of BigQuery-compatible timestamps.
199
- query_string = 'timestamp("' + \
200
- '"), timestamp("' .join (filtered_date_strings ) + '")'
201
-
202
- return query_string
148
+ formatted_date_strings = [datetime .strftime (date , "%Y-%m-%d" )
149
+ for date in date_list ]
150
+ return formatted_date_strings
203
151
204
152
205
- def produce_query (level , date_string ):
153
+ def produce_query (level , date_range ):
206
154
"""Create query string.
207
155
208
156
Parameters
209
157
----------
210
158
level: str
211
159
"county" or "state"
212
- date_string: str
213
- "timestamp(date ), ..." where timestamps are BigQuery-compatible
160
+ date_range: list[ str]
161
+ ["YYYY-MM-DD" ), "YYYY-MM-DD"] where dates are BigQuery-compatible.
214
162
215
163
Returns
216
164
-------
@@ -225,7 +173,7 @@ def produce_query(level, date_string):
225
173
date,
226
174
{symptom_cols}
227
175
from `bigquery-public-data.covid19_symptom_search.{symptom_table}`
228
- where timestamp(date) in ({date_list} ) and
176
+ where timestamp(date) between timestamp("{start_date}") and timestamp("{end_date}" ) and
229
177
country_region_code = "US"
230
178
"""
231
179
base_level_table = {"state" : "symptom_search_sub_region_1_daily" ,
@@ -235,12 +183,13 @@ def produce_query(level, date_string):
235
183
query = base_query .format (
236
184
symptom_cols = ", " .join (colname_map .keys ()),
237
185
symptom_table = base_level_table [level ],
238
- date_list = date_string )
186
+ start_date = date_range [0 ],
187
+ end_date = date_range [1 ])
239
188
240
189
return query
241
190
242
191
243
- def pull_gs_data_one_geolevel (level , date_string ):
192
+ def pull_gs_data_one_geolevel (level , date_range ):
244
193
"""Pull latest data for a single geo level.
245
194
246
195
Fetch data and transform it into the appropriate format, as described in
@@ -261,14 +210,14 @@ def pull_gs_data_one_geolevel(level, date_string):
261
210
----------
262
211
level: str
263
212
"county" or "state"
264
- date_string: str
265
- "timestamp(" YYYY-MM-DD"), ..." where timestamps are BigQuery-compatible
213
+ date_range: list[ str]
214
+ [" YYYY-MM-DD"), "YYYY-MM-DD"] where dates are BigQuery-compatible.
266
215
267
216
Returns
268
217
-------
269
218
pd.DataFrame
270
219
"""
271
- query = produce_query (level , date_string )
220
+ query = produce_query (level , date_range )
272
221
273
222
df = pandas_gbq .read_gbq (query , progress_bar_type = None )
274
223
@@ -301,7 +250,7 @@ def initialize_credentials(path_to_credentials):
301
250
pandas_gbq .context .project = credentials .project_id
302
251
303
252
304
- def pull_gs_data (path_to_credentials , receiving_dir , export_start_date ):
253
+ def pull_gs_data (path_to_credentials , export_start_date , num_export_days ):
305
254
"""Pull latest dataset for each geo level and combine.
306
255
307
256
PS: No information for PR
@@ -312,29 +261,29 @@ def pull_gs_data(path_to_credentials, receiving_dir, export_start_date):
312
261
Path to BigQuery API key and service account json file
313
262
level: str
314
263
"county" or "state"
315
- receiving_dir: str
316
- path to output directory
317
264
export_start_date: date
318
265
first date to retrieve data for
266
+ num_export_days: int
267
+ number of days before end date ("now") to export
319
268
320
269
Returns
321
270
-------
322
271
dict: {"county": pd.DataFrame, "state": pd.DataFrame}
323
272
"""
324
273
# Fetch and format dates we want to attempt to retrieve
325
- retrieve_dates = get_all_dates ( receiving_dir , export_start_date )
326
- retrieve_dates_dict = format_dates_for_query (retrieve_dates )
274
+ retrieve_dates = get_date_range ( export_start_date , num_export_days )
275
+ retrieve_dates = format_dates_for_query (retrieve_dates )
327
276
328
277
initialize_credentials (path_to_credentials )
329
278
330
279
# Create dictionary for state and county level data
331
280
dfs = {}
332
281
333
282
# For state level data
334
- dfs ["state" ] = pull_gs_data_one_geolevel ("state" , retrieve_dates_dict )
283
+ dfs ["state" ] = pull_gs_data_one_geolevel ("state" , retrieve_dates )
335
284
336
285
# For county level data
337
- dfs ["county" ] = pull_gs_data_one_geolevel ("county" , retrieve_dates_dict )
286
+ dfs ["county" ] = pull_gs_data_one_geolevel ("county" , retrieve_dates )
338
287
339
288
# Add District of Columbia as county
340
289
try :
0 commit comments