1
+ from os import environ
1
2
from typing import (
2
3
Any ,
3
4
Dict ,
13
14
from appdirs import user_cache_dir
14
15
from diskcache import Cache
15
16
from pandas import CategoricalDtype , DataFrame , Series , to_datetime
16
- from os import environ
17
17
from requests import Response , Session
18
18
from requests .auth import HTTPBasicAuth
19
19
from tenacity import retry , stop_after_attempt
27
27
EpidataFieldInfo ,
28
28
EpidataFieldType ,
29
29
EpiDataResponse ,
30
- EpiRange ,
31
30
EpiRangeParam ,
32
31
OnlySupportsClassicFormatException ,
33
32
add_endpoint_to_url ,
34
33
)
35
34
from ._parse import fields_to_predicate
36
35
37
36
# Make the linter happy about the unused variables
38
- __all__ = ["Epidata" , "EpiDataCall" , "EpiDataContext" , "EpiRange" , "CovidcastEpidata" ]
39
37
CACHE_DIRECTORY = user_cache_dir (appname = "epidatpy" , appauthor = "delphi" )
40
38
41
39
if environ .get ("USE_EPIDATPY_CACHE" , None ):
42
- print (f"diskcache is being used (unset USE_EPIDATPY_CACHE if not intended). "
43
- f"The cache directory is { CACHE_DIRECTORY } . "
44
- f"The TTL is set to { environ .get ("EPIDATPY_CACHE_MAX_AGE_DAYS" , "7" )} days." )
40
+ print (
41
+ f"diskcache is being used (unset USE_EPIDATPY_CACHE if not intended). "
42
+ f"The cache directory is { CACHE_DIRECTORY } . "
43
+ f"The TTL is set to { environ .get ('EPIDATPY_CACHE_MAX_AGE_DAYS' , '7' )} days."
44
+ )
45
+
45
46
46
47
@retry (reraise = True , stop = stop_after_attempt (2 ))
47
48
def _request_with_retry (
@@ -67,9 +68,7 @@ def call_impl(s: Session) -> Response:
67
68
68
69
69
70
class EpiDataCall (AEpiDataCall ):
70
- """
71
- epidata call representation
72
- """
71
+ """epidata call representation"""
73
72
74
73
_session : Final [Optional [Session ]]
75
74
@@ -101,7 +100,7 @@ def _call(
101
100
url , params = self .request_arguments (fields )
102
101
return _request_with_retry (url , params , self ._session , stream )
103
102
104
- def _get_cache_key (self , method ) -> str :
103
+ def _get_cache_key (self , method : str ) -> str :
105
104
cache_key = f"{ self ._endpoint } | { method } "
106
105
if self ._params :
107
106
cache_key += f" | { str (dict (sorted (self ._params .items ())))} "
@@ -120,7 +119,7 @@ def classic(
120
119
with Cache (CACHE_DIRECTORY ) as cache :
121
120
cache_key = self ._get_cache_key ("classic" )
122
121
if cache_key in cache :
123
- return cache [cache_key ]
122
+ return cast ( EpiDataResponse , cache [cache_key ])
124
123
response = self ._call (fields )
125
124
r = cast (EpiDataResponse , response .json ())
126
125
if disable_type_parsing :
@@ -131,7 +130,7 @@ def classic(
131
130
if self .use_cache :
132
131
with Cache (CACHE_DIRECTORY ) as cache :
133
132
cache_key = self ._get_cache_key ("classic" )
134
- cache .set (cache_key , r , expire = self .cache_max_age_days * 24 * 60 * 60 )
133
+ cache .set (cache_key , r , expire = self .cache_max_age_days * 24 * 60 * 60 )
135
134
return r
136
135
except Exception as e : # pylint: disable=broad-except
137
136
return {"result" : 0 , "message" : f"error: { e } " , "epidata" : []}
@@ -143,7 +142,11 @@ def __call__(
143
142
) -> Union [EpiDataResponse , DataFrame ]:
144
143
"""Request and parse epidata in df message format."""
145
144
if self .only_supports_classic :
146
- return self .classic (fields , disable_date_parsing = disable_date_parsing , disable_type_parsing = False )
145
+ return self .classic (
146
+ fields ,
147
+ disable_date_parsing = disable_date_parsing ,
148
+ disable_type_parsing = False ,
149
+ )
147
150
return self .df (fields , disable_date_parsing = disable_date_parsing )
148
151
149
152
def df (
@@ -160,7 +163,7 @@ def df(
160
163
with Cache (CACHE_DIRECTORY ) as cache :
161
164
cache_key = self ._get_cache_key ("df" )
162
165
if cache_key in cache :
163
- return cache [cache_key ]
166
+ return cast ( DataFrame , cache [cache_key ])
164
167
165
168
json = self .classic (fields , disable_type_parsing = True )
166
169
rows = json .get ("epidata" , [])
@@ -177,7 +180,8 @@ def df(
177
180
data_types [info .name ] = bool
178
181
elif info .type == EpidataFieldType .categorical :
179
182
data_types [info .name ] = CategoricalDtype (
180
- categories = Series (info .categories ) if info .categories else None , ordered = True
183
+ categories = Series (info .categories ) if info .categories else None ,
184
+ ordered = True ,
181
185
)
182
186
elif info .type == EpidataFieldType .int :
183
187
data_types [info .name ] = "Int64"
@@ -196,8 +200,10 @@ def df(
196
200
df = df .astype (data_types )
197
201
if not disable_date_parsing :
198
202
for info in time_fields :
199
- if info .type == EpidataFieldType .epiweek or info . type == EpidataFieldType . date_or_epiweek :
203
+ if info .type == EpidataFieldType .epiweek :
200
204
continue
205
+ # Try two date foramts, otherwise keep as string. The try except
206
+ # is needed because the time field might be date_or_epiweek.
201
207
try :
202
208
df [info .name ] = to_datetime (df [info .name ], format = "%Y-%m-%d" )
203
209
continue
@@ -211,15 +217,13 @@ def df(
211
217
if self .use_cache :
212
218
with Cache (CACHE_DIRECTORY ) as cache :
213
219
cache_key = self ._get_cache_key ("df" )
214
- cache .set (cache_key , df , expire = self .cache_max_age_days * 24 * 60 * 60 )
220
+ cache .set (cache_key , df , expire = self .cache_max_age_days * 24 * 60 * 60 )
215
221
216
222
return df
217
223
218
224
219
225
class EpiDataContext (AEpiDataEndpoints [EpiDataCall ]):
220
- """
221
- sync epidata call class
222
- """
226
+ """sync epidata call class"""
223
227
224
228
_base_url : Final [str ]
225
229
_session : Final [Optional [Session ]]
@@ -249,16 +253,25 @@ def _create_call(
249
253
params : Mapping [str , Optional [EpiRangeParam ]],
250
254
meta : Optional [Sequence [EpidataFieldInfo ]] = None ,
251
255
only_supports_classic : bool = False ,
252
-
253
256
) -> EpiDataCall :
254
- return EpiDataCall (self ._base_url , self ._session , endpoint , params , meta , only_supports_classic , self .use_cache , self .cache_max_age_days )
257
+ return EpiDataCall (
258
+ self ._base_url ,
259
+ self ._session ,
260
+ endpoint ,
261
+ params ,
262
+ meta ,
263
+ only_supports_classic ,
264
+ self .use_cache ,
265
+ self .cache_max_age_days ,
266
+ )
267
+
255
268
256
269
def CovidcastEpidata (
257
- base_url : str = BASE_URL ,
258
- session : Optional [Session ] = None ,
259
- use_cache : Optional [bool ] = None ,
260
- cache_max_age_days : Optional [int ] = None ,
261
- ) -> CovidcastDataSources [EpiDataCall ]:
270
+ base_url : str = BASE_URL ,
271
+ session : Optional [Session ] = None ,
272
+ use_cache : Optional [bool ] = None ,
273
+ cache_max_age_days : Optional [int ] = None ,
274
+ ) -> CovidcastDataSources [EpiDataCall ]:
262
275
url = add_endpoint_to_url (base_url , "covidcast/meta" )
263
276
meta_data_res = _request_with_retry (url , {}, session , False )
264
277
meta_data_res .raise_for_status ()
@@ -267,6 +280,14 @@ def CovidcastEpidata(
267
280
def create_call (
268
281
params : Mapping [str , Optional [EpiRangeParam ]],
269
282
) -> EpiDataCall :
270
- return EpiDataCall (base_url , session , "covidcast" , params , define_covidcast_fields (), use_cache = use_cache , cache_max_age_days = cache_max_age_days )
283
+ return EpiDataCall (
284
+ base_url ,
285
+ session ,
286
+ "covidcast" ,
287
+ params ,
288
+ define_covidcast_fields (),
289
+ use_cache = use_cache ,
290
+ cache_max_age_days = cache_max_age_days ,
291
+ )
271
292
272
293
return CovidcastDataSources .create (meta_data , create_call )
0 commit comments