2
2
3
3
# standard library
4
4
from copy import copy
5
+ from io import StringIO
5
6
from itertools import accumulate , chain
6
7
from typing import List , Sequence
7
- from io import StringIO
8
+ from delphi . epidata . server . utils . dates import iterate_over_range
8
9
9
10
# third party
10
- from more_itertools import interleave_longest , windowed
11
- import requests
12
11
import pandas as pd
12
+ import pytest
13
+ import requests
14
+ from more_itertools import windowed
13
15
14
16
from delphi .epidata .acquisition .covidcast .covidcast_meta_cache_updater import main as update_cache
17
+ from delphi .epidata .acquisition .covidcast .covidcast_row import CovidcastRow , CovidcastRows , set_df_dtypes , assert_frame_equal_no_order
15
18
from delphi .epidata .acquisition .covidcast .test_utils import CovidcastBase
16
- from delphi .epidata .acquisition .covidcast .covidcast_row import CovidcastRow , CovidcastRows , set_df_dtypes
17
19
18
20
# use the local instance of the Epidata API
19
21
BASE_URL = "http://delphi_web_epidata/epidata/covidcast"
20
22
BASE_URL_OLD = "http://delphi_web_epidata/epidata/api.php"
21
23
22
24
23
- def _read_csv (txt : str ) -> pd .DataFrame :
25
+ def _read_csv_str (txt : str ) -> pd .DataFrame :
24
26
df = pd .read_csv (StringIO (txt ), index_col = 0 ).rename (columns = {"data_source" : "source" })
25
27
df .time_value = pd .to_datetime (df .time_value ).dt .strftime ("%Y%m%d" ).astype (int )
26
28
df .issue = pd .to_datetime (df .issue ).dt .strftime ("%Y%m%d" ).astype (int )
@@ -88,6 +90,7 @@ def test_compatibility(self):
88
90
self ._insert_rows (rows )
89
91
90
92
with self .subTest ("simple" ):
93
+ # TODO: These tests aren't actually testing the compatibility endpoint.
91
94
out = self ._fetch ("/" , signal = first .signal_pair , geo = first .geo_pair , time = "day:*" )
92
95
self .assertEqual (len (out ["epidata" ]), len (rows ))
93
96
@@ -103,72 +106,81 @@ def test_compatibility(self):
103
106
104
107
# JIT tests
105
108
def test_derived_signals (self ):
106
- time_value_pairs = [(20200401 + i , i ** 2 ) for i in range (10 )]
107
- rows01 = [CovidcastRow (source = "jhu-csse" , signal = "confirmed_cumulative_num" , time_value = time_value , value = value , geo_value = "01" ) for time_value , value in time_value_pairs ]
108
- rows02 = [CovidcastRow (source = "jhu-csse" , signal = "confirmed_cumulative_num" , time_value = time_value , value = 2 * value , geo_value = "02" ) for time_value , value in time_value_pairs ]
109
- first = rows01 [0 ]
110
- self ._insert_rows (rows01 + rows02 )
109
+ # The base signal data.
110
+ data1 = CovidcastRows .from_args (
111
+ source = ["jhu-csse" ] * 10 ,
112
+ signal = ["confirmed_cumulative_num" ] * 10 ,
113
+ time_value = iterate_over_range (20200401 , 20200410 , inclusive = True ),
114
+ geo_value = ["01" ] * 10 ,
115
+ value = [i ** 2 for i in range (10 )],
116
+ )
117
+ data2 = CovidcastRows .from_args (
118
+ source = ["jhu-csse" ] * 10 ,
119
+ signal = ["confirmed_cumulative_num" ] * 10 ,
120
+ time_value = iterate_over_range (20200401 , 20200410 , inclusive = True ),
121
+ geo_value = ["02" ] * 10 ,
122
+ value = [2 * i ** 2 for i in range (10 )],
123
+ )
124
+ # A base signal with a time gap.
125
+ data3 = CovidcastRows .from_args (
126
+ source = ["jhu-csse" ] * 15 ,
127
+ signal = ["confirmed_cumulative_num" ] * 15 ,
128
+ time_value = chain (iterate_over_range (20200401 , 20200410 , inclusive = True ), iterate_over_range (20200416 , 20200420 , inclusive = True )),
129
+ geo_value = ["03" ] * 15 ,
130
+ value = [i ** 2 for i in chain (range (10 ), range (15 , 20 ))],
131
+ )
132
+ self ._insert_rows (data1 .rows + data2 .rows + data3 .rows )
133
+ data3_reindexed = data3 .api_row_df .set_index ("time_value" ).reindex (iterate_over_range (20200401 , 20200420 , inclusive = True )).assign (
134
+ source = lambda df : df .source .fillna (method = "ffill" ),
135
+ signal = lambda df : df .signal .fillna (method = "ffill" ),
136
+ geo_value = lambda df : df .geo_value .fillna (method = "ffill" )
137
+ ).reset_index ()
138
+ # Get the expected derived signal values.
139
+ data_df = pd .concat ([data1 .api_row_df , data2 .api_row_df , data3_reindexed ]).reset_index ().set_index (["signal" , "geo_value" , "time_value" ])
140
+ expected_diffed_df = data_df .groupby (["geo_value" ]).value .diff ()
141
+ expected_diffed_df .index .set_levels (["confirmed_incidence_num" ], level = 0 , inplace = True )
142
+ expected_smoothed_df = data_df .groupby (["geo_value" ]).value .diff ().rolling (7 ).mean ()
143
+ expected_smoothed_df .index .set_levels (["confirmed_7dav_incidence_num" ], level = 0 , inplace = True )
144
+ expected_df = pd .concat ([data_df .value , expected_diffed_df , expected_smoothed_df ])
111
145
112
146
with self .subTest ("diffed signal" ):
113
- out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num" , geo = first .geo_pair , time = "day:*" )
147
+ out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num" , geo = "county:01" , time = "day:*" )
148
+ # TODO: This test will be updated when JIT can handle *.
114
149
assert out ['result' ] == - 2
115
- out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num" , geo = first . geo_pair , time = "day:20200401-20200410" )
116
- out_values = [ row [ "value" ] for row in out [ "epidata" ]]
117
- values = [ value for _ , value in time_value_pairs ]
118
- expected_values = _diff_rows ( values )
119
- self . assertAlmostEqual ( out_values , expected_values )
150
+ out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num" , geo = "county:01" , time = "day:20200401-20200410" )
151
+ out_df = pd . DataFrame . from_records ( out [ "epidata" ]). set_index ([ "signal" , "time_value" , "geo_value" ])
152
+ merged_df = pd . merge ( out_df , expected_df , left_index = True , right_index = True , suffixes = [ "_out" , "_expected" ])[[ "value_out" , "value_expected" ] ]
153
+ assert merged_df . empty is False
154
+ assert merged_df . value_out . to_numpy () == pytest . approx ( merged_df . value_expected , nan_ok = True )
120
155
121
156
with self .subTest ("diffed signal, multiple geos" ):
122
157
out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num" , geo = "county:01,02" , time = "day:20200401-20200410" )
123
- out_values = [row ["value" ] for row in out ["epidata" ]]
124
- values1 = [value for _ , value in time_value_pairs ]
125
- values2 = [2 * value for _ , value in time_value_pairs ]
126
- expected_values = _diff_rows (values1 ) + _diff_rows (values2 )
127
- self .assertAlmostEqual (out_values , expected_values )
128
-
129
- with self .subTest ("diffed signal, multiple geos using geo:*" ):
130
- out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num" , geo = "county:*" , time = "day:20200401-20200410" )
131
- values1 = [value for _ , value in time_value_pairs ]
132
- values2 = [2 * value for _ , value in time_value_pairs ]
133
- expected_values = _diff_rows (values1 ) + _diff_rows (values2 )
134
- self .assertAlmostEqual (out_values , expected_values )
158
+ out_df = pd .DataFrame .from_records (out ["epidata" ]).set_index (["signal" , "time_value" , "geo_value" ])
159
+ merged_df = pd .merge (out_df , expected_df , left_index = True , right_index = True , suffixes = ["_out" , "_expected" ])[["value_out" , "value_expected" ]]
160
+ assert merged_df .empty is False
161
+ assert merged_df .value_out .to_numpy () == pytest .approx (merged_df .value_expected , nan_ok = True )
135
162
136
163
with self .subTest ("smooth diffed signal" ):
137
- out = self ._fetch ("/" , signal = "jhu-csse:confirmed_7dav_incidence_num" , geo = first . geo_pair , time = "day:20200401-20200410" )
138
- out_values = [ row [ "value" ] for row in out [ "epidata" ]]
139
- values = [ value for _ , value in time_value_pairs ]
140
- expected_values = _smooth_rows ( _diff_rows ( values ))
141
- self . assertAlmostEqual ( out_values , expected_values )
164
+ out = self ._fetch ("/" , signal = "jhu-csse:confirmed_7dav_incidence_num" , geo = "county:01" , time = "day:20200401-20200410" )
165
+ out_df = pd . DataFrame . from_records ( out [ "epidata" ]). set_index ([ "signal" , "time_value" , "geo_value" ])
166
+ merged_df = pd . merge ( out_df , expected_df , left_index = True , right_index = True , suffixes = [ "_out" , "_expected" ])[[ "value_out" , "value_expected" ] ]
167
+ assert merged_df . empty is False
168
+ assert merged_df . value_out . to_numpy () == pytest . approx ( merged_df . value_expected , nan_ok = True )
142
169
143
170
with self .subTest ("diffed signal and smoothed signal in one request" ):
144
- out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num;jhu-csse:confirmed_7dav_incidence_num" , geo = first .geo_pair , time = "day:20200401-20200410" )
145
- out_values = [row ["value" ] for row in out ["epidata" ]]
146
- values = [value for _ , value in time_value_pairs ]
147
- expected_diff = _diff_rows (values )
148
- expected_smoothed = _smooth_rows (expected_diff )
149
- expected_values = list (interleave_longest (expected_smoothed , expected_diff ))
150
- self .assertAlmostEqual (out_values , expected_values )
151
-
152
- time_value_pairs = [(20200401 + i , i ** 2 ) for i in chain (range (10 ), range (15 , 20 ))]
153
- rows = [CovidcastRow (source = "jhu-csse" , signal = "confirmed_cumulative_num" , geo_value = "03" , time_value = time_value , value = value ) for time_value , value in time_value_pairs ]
154
- first = rows [0 ]
155
- self ._insert_rows (rows )
156
-
157
- with self .subTest ("diffing with a time gap" ):
158
- # should fetch 1 extra day
159
- out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num" , geo = first .geo_pair , time = "day:20200401-20200420" )
160
- out_values = [row ["value" ] for row in out ["epidata" ]]
161
- values = [value for _ , value in time_value_pairs ][:10 ] + [None ] * 5 + [value for _ , value in time_value_pairs ][10 :]
162
- expected_values = _diff_rows (values )
163
- self .assertAlmostEqual (out_values , expected_values )
164
-
165
- with self .subTest ("smoothing and diffing with a time gap" ):
166
- # should fetch 1 extra day
167
- out = self ._fetch ("/" , signal = "jhu-csse:confirmed_7dav_incidence_num" , geo = first .geo_pair , time = "day:20200401-20200420" )
168
- out_values = [row ["value" ] for row in out ["epidata" ]]
169
- values = [value for _ , value in time_value_pairs ][:10 ] + [None ] * 5 + [value for _ , value in time_value_pairs ][10 :]
170
- expected_values = _smooth_rows (_diff_rows (values ))
171
- self .assertAlmostEqual (out_values , expected_values )
171
+ out = self ._fetch ("/" , signal = "jhu-csse:confirmed_incidence_num;jhu-csse:confirmed_7dav_incidence_num" , geo = "county:01" , time = "day:20200401-20200410" )
172
+ out_df = pd .DataFrame .from_records (out ["epidata" ]).set_index (["signal" , "time_value" , "geo_value" ])
173
+ merged_df = pd .merge (out_df , expected_df , left_index = True , right_index = True , suffixes = ["_out" , "_expected" ])[["value_out" , "value_expected" ]]
174
+ assert merged_df .empty is False
175
+ assert merged_df .value_out .to_numpy () == pytest .approx (merged_df .value_expected , nan_ok = True )
176
+
177
+ with self .subTest ("smoothing and diffing with a time gap and geo=*" ):
178
+ # should fetch 7 extra day
179
+ out = self ._fetch ("/" , signal = "jhu-csse:confirmed_7dav_incidence_num" , geo = "county:*" , time = "day:20200407-20200420" )
180
+ out_df = pd .DataFrame .from_records (out ["epidata" ]).set_index (["signal" , "time_value" , "geo_value" ])
181
+ merged_df = pd .merge (out_df , expected_df , left_index = True , right_index = True , suffixes = ["_out" , "_expected" ])[["value_out" , "value_expected" ]]
182
+ assert merged_df .empty is False
183
+ assert merged_df .value_out .to_numpy () == pytest .approx (merged_df .value_expected , nan_ok = True )
172
184
173
185
def test_compatibility (self ):
174
186
"""Request at the /api.php endpoint."""
@@ -447,7 +459,7 @@ def test_csv(self):
447
459
params = dict (signal = "jhu-csse:confirmed_cumulative_num" , start_day = "2020-04-01" , end_day = "2020-04-10" , geo_type = first .geo_type ),
448
460
)
449
461
response .raise_for_status ()
450
- df = _read_csv (response .text )
462
+ df = _read_csv_str (response .text )
451
463
expected_df = CovidcastRows .from_args (
452
464
source = ["jhu-csse" ] * 10 ,
453
465
signal = ["confirmed_cumulative_num" ] * 10 ,
@@ -461,7 +473,7 @@ def test_csv(self):
461
473
params = dict (signal = "jhu-csse:confirmed_incidence_num" , start_day = "2020-04-01" , end_day = "2020-04-10" , geo_type = first .geo_type ),
462
474
)
463
475
response .raise_for_status ()
464
- df_diffed = _read_csv (response .text )
476
+ df_diffed = _read_csv_str (response .text )
465
477
expected_df = CovidcastRows .from_args (
466
478
source = ["jhu-csse" ] * 10 ,
467
479
signal = ["confirmed_incidence_num" ] * 10 ,
0 commit comments