1
1
"""
2
2
script to check converting covidcast api calls with Epidata.covidcast Epidata.covidcast_meta
3
3
"""
4
- import time
4
+
5
5
from collections import defaultdict
6
6
from pathlib import Path
7
7
from typing import Union , Iterable , Tuple , List , Dict
8
8
from datetime import datetime , timedelta , date
9
9
10
- import numpy as np
11
10
import pandas as pd
12
11
import covidcast
12
+ import tqdm
13
13
from delphi_epidata import Epidata
14
14
from pandas .testing import assert_frame_equal
15
15
import os
16
16
from epiweeks import Week
17
17
18
- API_KEY = os .environ .get (' DELPHI_API_KEY' )
18
+ API_KEY = os .environ .get (" DELPHI_API_KEY" , os . environ . get ( "DELPHI_EPIDATA_KEY" ) )
19
19
covidcast .use_api_key (API_KEY )
20
-
21
- Epidata .debug = True
22
20
Epidata .auth = ("epidata" , API_KEY )
23
-
24
21
CURRENT_DIR = Path (__file__ ).parent
22
+ if not Path (f"{ CURRENT_DIR } /covidcast_result" ).is_dir ():
23
+ os .mkdir (f"{ CURRENT_DIR } /covidcast_result" )
24
+ # We will test the first X signals for each data source that we find from the
25
+ # metadata endpoint with this variable.
26
+ NUMBER_SIGNALS_PER_SOURCE = 5
27
+
25
28
26
29
def _parse_datetimes (date_int : int , time_type : str , date_format : str = "%Y%m%d" ) -> Union [pd .Timestamp , None ]:
27
30
"""Convert a date or epiweeks string into timestamp objects.
@@ -151,22 +154,23 @@ def ported_signal(
151
154
152
155
return api_df
153
156
154
- def check_metadata ():
155
157
158
+ def check_metadata ():
156
159
expected_df = covidcast .metadata ()
157
160
df = ported_metadata ()
158
161
assert_frame_equal (expected_df , df )
159
162
163
+
160
164
def ported_signal (
161
- data_source : str ,
162
- signal : str , # pylint: disable=W0621
163
- start_day : date = None ,
164
- end_day : date = None ,
165
- geo_type : str = "county" ,
166
- geo_values : Union [str , Iterable [str ]] = "*" ,
167
- as_of : date = None ,
168
- lag : int = None ,
169
- time_type : str = "day" ,
165
+ data_source : str ,
166
+ signal : str , # pylint: disable=W0621
167
+ start_day : date = None ,
168
+ end_day : date = None ,
169
+ geo_type : str = "county" ,
170
+ geo_values : Union [str , Iterable [str ]] = "*" ,
171
+ as_of : date = None ,
172
+ lag : int = None ,
173
+ time_type : str = "day" ,
170
174
) -> Union [pd .DataFrame , None ]:
171
175
"""
172
176
Makes covidcast signal api call.
@@ -252,13 +256,16 @@ def generate_start_date_per_signal() -> Dict[Tuple[datetime, datetime, str], Lis
252
256
Dict[Tuple[datetime.datetime, datetime.datetime, str],[List[Tuple[str, str]]]
253
257
"""
254
258
meta_df = pd .DataFrame .from_dict (Epidata .covidcast_meta ()["epidata" ])
255
- meta_df ["min_time" ] = meta_df ["min_time" ].astype ('str' )
259
+ meta_df ["min_time" ] = meta_df ["min_time" ].astype ("str" )
260
+ meta_df = meta_df .groupby ("data_source" ).head (NUMBER_SIGNALS_PER_SOURCE )
256
261
signal_timeframe_dict = defaultdict (list )
257
262
258
263
for start_str , data in meta_df .groupby ("min_time" ):
259
-
260
264
data_source_groups = data .groupby ("data_source" )
261
265
for data_source , df in data_source_groups :
266
+ # TODO: Remove after metadata bug is fixed.
267
+ if data_source == "google-symptom" :
268
+ continue
262
269
signals = list (df ["signal" ].unique ())
263
270
time_type = df ["time_type" ].values [0 ]
264
271
for signal in signals :
@@ -274,8 +281,7 @@ def generate_start_date_per_signal() -> Dict[Tuple[datetime, datetime, str], Lis
274
281
elif time_type == "week" :
275
282
start_time = Week (year = int (start_str [:4 ]), week = int (start_str [- 2 :]))
276
283
end_time = (start_time + 2 ).startdate ()
277
- date_range = (start_time .startdate (),
278
- end_time , time_type )
284
+ date_range = (start_time .startdate (), end_time , time_type )
279
285
signal_timeframe_dict [date_range ].append ((data_source , signal ))
280
286
281
287
return signal_timeframe_dict
@@ -289,39 +295,51 @@ def check_signal():
289
295
"""
290
296
signal_timeframe_dict = generate_start_date_per_signal ()
291
297
signal_df_dict = dict ()
292
- for date_range , data_source_signal_list in signal_timeframe_dict .items ():
298
+ for date_range , data_source_signal_list in tqdm . tqdm ( signal_timeframe_dict .items () ):
293
299
for data_source , signal in data_source_signal_list :
294
300
time_type = date_range [2 ]
295
301
filename = f"{ CURRENT_DIR } /covidcast_result/{ data_source } _{ signal } .parquet"
296
302
if not Path (filename ).is_file ():
297
303
# every signal except google-symptom has geo type of state
298
304
geo_type = "state"
299
- if data_source == "google-symptom " :
305
+ if data_source == "google-symptoms " :
300
306
geo_type = "county"
301
-
302
- expected_df = covidcast .signal (data_source , signal , start_day = date_range [0 ], end_day = date_range [1 ],
303
- geo_type = geo_type , time_type = time_type )
304
- if expected_df is None :
305
- raise RuntimeError ("Data should exists" )
307
+ expected_df = covidcast .signal (
308
+ data_source ,
309
+ signal ,
310
+ start_day = date_range [0 ],
311
+ end_day = date_range [1 ],
312
+ geo_type = geo_type ,
313
+ time_type = time_type ,
314
+ )
315
+ assert not expected_df .empty , "Received no data from covidcast API."
306
316
307
317
expected_df .to_parquet (filename )
308
318
signal_df_dict [(data_source , signal , time_type )] = filename
309
319
310
- for date_range , data_source_signal_list in signal_timeframe_dict .items ():
320
+ for date_range , data_source_signal_list in tqdm . tqdm ( signal_timeframe_dict .items () ):
311
321
for data_source , signal in data_source_signal_list :
312
322
expected_filename = signal_df_dict .get ((data_source , signal , date_range [2 ]))
313
323
expected_df = pd .read_parquet (expected_filename )
314
324
315
325
# every signal except google-symptom has geo type of state
316
326
geo_type = "state"
317
- if data_source == "google-symptom " :
327
+ if data_source == "google-symptoms " :
318
328
geo_type = "county"
319
- df = ported_signal (data_source , signal , start_day = date_range [0 ], end_day = date_range [1 ],
320
- time_type = date_range [2 ],
321
- geo_type = geo_type )
329
+ df = ported_signal (
330
+ data_source ,
331
+ signal ,
332
+ start_day = date_range [0 ],
333
+ end_day = date_range [1 ],
334
+ time_type = date_range [2 ],
335
+ geo_type = geo_type ,
336
+ )
337
+ assert not df .empty , "Received no data from covidcast API."
338
+
322
339
check = df .merge (expected_df , indicator = True )
323
340
assert (check ["_merge" ] == "both" ).all ()
324
341
342
+
325
343
if __name__ == "__main__" :
326
344
check_metadata ()
327
- check_signal ()
345
+ check_signal ()
0 commit comments