diff --git a/integrations/acquisition/covidcast/test_db.py b/integrations/acquisition/covidcast/test_db.py new file mode 100644 index 000000000..faa6525b4 --- /dev/null +++ b/integrations/acquisition/covidcast/test_db.py @@ -0,0 +1,66 @@ +import unittest + +from delphi_utils import Nans +from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase +import delphi.operations.secrets as secrets + +# all the Nans we use here are just one value, so this is a shortcut to it: +nmv = Nans.NOT_MISSING.value + +class TestTest(CovidcastBase): + + def _find_matches_for_row(self, row): + # finds (if existing) row from both history and latest views that matches long-key of provided CovidcastRow + # TODO: consider making `issue` an optional match... this will break the at-most-1-row-returned assumption for signal_history + cols = "source signal time_type time_value geo_type geo_value issue".split() + results = {} + cur = self._db._cursor + for table in ['signal_latest_v', 'signal_history_v']: + q = f"SELECT * FROM {table} WHERE " + # NOTE: repr() puts str values in single quotes but simply 'string-ifies' numerics; + # getattr() accesses members by string of their name + q += " AND ".join([f" `{c}` = {repr(getattr(row,c))} " for c in cols]) + q += " LIMIT 1;" + cur.execute(q) + res = cur.fetchone() + if res: + results[table] = dict(zip(cur.column_names, res)) + else: + results[table] = None + return results + + @unittest.skip("v4 will need this later.") + def test_id_sync(self): + # the history and latest tables have a non-AUTOINCREMENT primary key id that is fed by the + # AUTOINCREMENT pk id from the load table. this test is intended to make sure that they + # appropriately stay in sync with each other + + pk_column = 'signal_data_id' + histor_view = 'signal_history_v' + latest_view = 'signal_latest_v' + + # add a data point + base_row, _ = self._make_placeholder_row() + self._insert_rows([base_row]) + # ensure the primary keys match in the latest and history tables + matches = self._find_matches_for_row(base_row) + self.assertEqual(matches[latest_view][pk_column], + matches[histor_view][pk_column]) + # save old pk value + old_pk_id = matches[latest_view][pk_column] + + # add a reissue for said data point + next_row, _ = self._make_placeholder_row() + next_row.issue += 1 + self._insert_rows([next_row]) + # ensure the new keys also match + matches = self._find_matches_for_row(next_row) + self.assertEqual(matches[latest_view][pk_column], + matches[histor_view][pk_column]) + # check new ids were used + new_pk_id = matches[latest_view][pk_column] + self.assertNotEqual(old_pk_id, new_pk_id) + + # verify old issue is no longer in latest table + self.assertIsNone(self._find_matches_for_row(base_row)[latest_view]) diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 8d3560a68..b54836503 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -2,6 +2,7 @@ # standard library import unittest +import time from unittest.mock import patch, MagicMock from json import JSONDecodeError @@ -14,6 +15,7 @@ from delphi_utils import Nans from delphi.epidata.client.delphi_epidata import Epidata from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase import delphi.operations.secrets as secrets # py3tester coverage target @@ -28,27 +30,14 @@ def wrapper(*args): return wrapper -class DelphiEpidataPythonClientTests(unittest.TestCase): +class DelphiEpidataPythonClientTests(CovidcastBase): """Tests the Python client.""" - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" - # connect to the `epidata` database and clear relevant tables - cnx = mysql.connector.connect( - user='user', - password='pass', - host='delphi_database_epidata', - database='epidata') - cur = cnx.cursor() - cur.execute('truncate table covidcast') - cur.execute('truncate table covidcast_nowcast') - cnx.commit() - cur.close() - - # make connection and cursor available to test cases - self.cnx = cnx - self.cur = cnx.cursor() + # reset the `covidcast_meta_cache` table (it should always have one row) + self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') # use the local instance of the Epidata API Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' @@ -57,236 +46,129 @@ def setUp(self): secrets.db.host = 'delphi_database_epidata' secrets.db.epi = ('user', 'pass') - def tearDown(self): - """Perform per-test teardown.""" - self.cur.close() - self.cnx.close() - def test_covidcast(self): """Test that the covidcast endpoint returns expected data.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig2', 'day', 'county', 20200414, '01234', - 123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1, 0, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data: three issues of one signal, one issue of another + rows = [ + self._make_placeholder_row(issue=self.DEFAULT_ISSUE + i, value=i, lag=i)[0] + for i in range(3) + ] + row_latest_issue = rows[-1] + rows.append( + self._make_placeholder_row(signal="sig2")[0] + ) + self._insert_rows(rows) with self.subTest(name='request two signals'): # fetch data response = Epidata.covidcast( - 'src', ['sig', 'sig2'], 'day', 'county', 20200414, '01234') + **self.params_from_row(rows[0], signals=[rows[0].signal, rows[-1].signal]) + ) + + expected = [ + self.expected_from_row(row_latest_issue), + self.expected_from_row(rows[-1]) + ] # check result self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': 0, - 'issue': 20200416, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': 4, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig2', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': expected, 'message': 'success', }) with self.subTest(name='request two signals with tree format'): # fetch data response = Epidata.covidcast( - 'src', ['sig', 'sig2'], 'day', 'county', 20200414, '01234', - format='tree') + **self.params_from_row(rows[0], signals=[rows[0].signal, rows[-1].signal], format='tree') + ) + + expected = [{ + rows[0].signal: [ + self.expected_from_row(row_latest_issue, self.DEFAULT_MINUS + ['signal']), + ], + rows[-1].signal: [ + self.expected_from_row(rows[-1], self.DEFAULT_MINUS + ['signal']), + ], + }] # check result self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'sig': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': 0, - 'issue': 20200416, - 'lag': 2, - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], - 'sig2': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': 4, - 'issue': 20200414, - 'lag': 0, - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], - }], + 'epidata': expected, 'message': 'success', }) with self.subTest(name='request most recent'): # fetch data, without specifying issue or lag response_1 = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234') + **self.params_from_row(rows[0]) + ) + + expected = self.expected_from_row(row_latest_issue) # check result self.assertEqual(response_1, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': 0, - 'issue': 20200416, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) with self.subTest(name='request as-of a date'): # fetch data, specifying as_of response_1a = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234', - as_of=20200415) + **self.params_from_row(rows[0], as_of=rows[1].issue) + ) + + expected = self.expected_from_row(rows[1]) # check result + self.maxDiff=None self.assertEqual(response_1a, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 5.5, - 'stderr': 1.2, - 'sample_size': 10.5, - 'direction': 0, - 'issue': 20200415, - 'lag': 1, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) with self.subTest(name='request a range of issues'): # fetch data, specifying issue range, not lag response_2 = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234', - issues=Epidata.range(20200414, 20200415)) + **self.params_from_row(rows[0], issues=Epidata.range(rows[0].issue, rows[1].issue)) + ) + + expected = [ + self.expected_from_row(rows[0]), + self.expected_from_row(rows[1]) + ] # check result self.assertDictEqual(response_2, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': 4, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 5.5, - 'stderr': 1.2, - 'sample_size': 10.5, - 'direction': 0, - 'issue': 20200415, - 'lag': 1, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': expected, 'message': 'success', }) with self.subTest(name='request at a given lag'): # fetch data, specifying lag, not issue range response_3 = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234', - lag=2) + **self.params_from_row(rows[0], lag=2) + ) + + expected = self.expected_from_row(row_latest_issue) # check result self.assertDictEqual(response_3, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': 0, - 'issue': 20200416, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) with self.subTest(name='long request'): # fetch data, without specifying issue or lag # TODO should also trigger a post but doesn't due to the 414 issue response_1 = Epidata.covidcast( - 'src', 'sig'*1000, 'day', 'county', 20200414, '01234') + **self.params_from_row(rows[0], signals='sig'*1000) + ) # check result self.assertEqual(response_1, {'message': 'no results', 'result': -2}) @@ -335,82 +217,25 @@ def test_retry_request(self, get): def test_geo_value(self): """test different variants of geo types: single, *, multi.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '11111', - 123, 10, 11, 12, 456, 13, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '22222', - 123, 20, 21, 22, 456, 23, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '33333', - 123, 30, 31, 32, 456, 33, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '11111', - 123, 40, 41, 42, 456, 43, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '22222', - 123, 50, 51, 52, 456, 53, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '33333', - 123, 60, 61, 62, 456, 634, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() - - def fetch(geo_value): - # make the request - response = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, geo_value) - return response - - counties = [{ - 'time_value': 20200414, - 'geo_value': '11111', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': 13, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '22222', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': 23, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '33333', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': 33, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }] + # insert placeholder data: three counties, three MSAs + N = 3 + rows = [ + self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0] + for i in range(N) + ] + [ + self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0] + for i in range(N) + ] + self._insert_rows(rows) + + counties = [ + self.expected_from_row(rows[i]) for i in range(N) + ] + + def fetch(geo): + return Epidata.covidcast( + **self.params_from_row(rows[0], geo_value=geo) + ) # test fetch all r = fetch('*') @@ -419,22 +244,22 @@ def fetch(geo_value): # test fetch a specific region r = fetch('11111') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) + self.assertEqual(r['epidata'], [counties[1]]) # test fetch a specific yet not existing region r = fetch('55555') self.assertEqual(r['message'], 'no results') # test fetch a multiple regions r = fetch(['11111', '22222']) self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0], counties[1]]) + self.assertEqual(r['epidata'], [counties[1], counties[2]]) # test fetch a multiple regions in another variant - r = fetch(['11111', '33333']) + r = fetch(['00000', '22222']) self.assertEqual(r['message'], 'success') self.assertEqual(r['epidata'], [counties[0], counties[2]]) # test fetch a multiple regions but one is not existing r = fetch(['11111', '55555']) self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) + self.assertEqual(r['epidata'], [counties[1]]) # test fetch a multiple regions but specify no region r = fetch([]) self.assertEqual(r['message'], 'no results') @@ -442,26 +267,15 @@ def fetch(geo_value): def test_covidcast_meta(self): """Test that the covidcast_meta endpoint returns expected data.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200415, '01234', - 345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data: three dates, three issues. values are: + # 1st issue: 0 10 20 + # 2nd issue: 1 11 21 + # 3rd issue: 2 12 22 + rows = [ + self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE + t, issue=self.DEFAULT_ISSUE + i, value=t*10 + i)[0] + for i in range(3) for t in range(3) + ] + self._insert_rows(rows) # cache it update_covidcast_meta_cache(args=None) @@ -469,170 +283,60 @@ def test_covidcast_meta(self): # fetch data response = Epidata.covidcast_meta() + # make sure "last updated" time is recent: + updated_time = response['epidata'][0]['last_update'] + t_diff = time.time() - updated_time + self.assertGreater(t_diff, 0) # else it was in the future + self.assertLess(t_diff, 5) # 5s should be long enough to pull the metadata, right?? + # remove "last updated" time so our comparison below works: + del response['epidata'][0]['last_update'] + + expected = dict( + data_source=rows[0].source, + signal=rows[0].signal, + time_type=rows[0].time_type, + geo_type=rows[0].geo_type, + min_time=self.DEFAULT_TIME_VALUE, + max_time=self.DEFAULT_TIME_VALUE + 2, + num_locations=1, + min_value=2., + mean_value=12., + max_value=22., + stdev_value=8.1649658, # population stdev, not sample, which is 10. + max_issue=self.DEFAULT_ISSUE + 2, + min_lag=0, + max_lag=0, # we didn't set lag when inputting data + ) # check result + self.maxDiff=None self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'min_time': 20200414, - 'max_time': 20200415, - 'num_locations': 1, - 'min_value': 6.0, - 'max_value': 7.0, - 'mean_value': 6.5, - 'stdev_value': 0.5, - 'last_update': 345, - 'max_issue': 20200416, - 'min_lag': 1, - 'max_lag': 2, - }], - 'message': 'success', - }) - - - def test_covidcast_nowcast(self): - """Test that the covidcast_nowcast endpoint returns expected data.""" - - # insert dummy data - self.cur.execute(f'''insert into covidcast_nowcast values - (0, 'src', 'sig1', 'sensor', 'day', 'county', 20200101, '01001', 12345678, 3.5, 20200101, 2), - (0, 'src', 'sig2', 'sensor', 'day', 'county', 20200101, '01001', 12345678, 2.5, 20200101, 2), - (0, 'src', 'sig1', 'sensor', 'day', 'county', 20200101, '01001', 12345678, 1.5, 20200102, 2)''') - self.cnx.commit() - - # fetch data - response = Epidata.covidcast_nowcast( - 'src', ['sig1', 'sig2'], 'sensor', 'day', 'county', 20200101, '01001') - - # request two signals - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200101, - 'geo_value': '01001', - 'value': 1.5, - 'issue': 20200102, - 'lag': 2, - 'signal': 'sig1', - }, { - 'time_value': 20200101, - 'geo_value': '01001', - 'value': 2.5, - 'issue': 20200101, - 'lag': 2, - 'signal': 'sig2', - }], - 'message': 'success', - }) - - # request range of issues - response = Epidata.covidcast_nowcast( - 'src', 'sig1', 'sensor', 'day', 'county', 20200101, '01001', - issues=Epidata.range(20200101, 20200102)) - - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200101, - 'geo_value': '01001', - 'value': 3.5, - 'issue': 20200101, - 'lag': 2, - 'signal': 'sig1', - }, { - 'time_value': 20200101, - 'geo_value': '01001', - 'value': 1.5, - 'issue': 20200102, - 'lag': 2, - 'signal': 'sig1', - }], - 'message': 'success', - }) - - # request as_of - response = Epidata.covidcast_nowcast( - 'src', 'sig1', 'sensor', 'day', 'county', 20200101, '01001', - as_of=20200101) - - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200101, - 'geo_value': '01001', - 'value': 3.5, - 'issue': 20200101, - 'lag': 2, - 'signal': 'sig1', - }], + 'epidata': [expected], 'message': 'success', }) - # request unavailable data - response = Epidata.covidcast_nowcast( - 'src', 'sig1', 'sensor', 'day', 'county', 22222222, '01001') - - self.assertEqual(response, {'result': -2, 'message': 'no results'}) - def test_async_epidata(self): - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '11111', - 123, 10, 11, 12, 456, 13, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '22222', - 123, 20, 21, 22, 456, 23, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '33333', - 123, 30, 31, 32, 456, 33, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '11111', - 123, 40, 41, 42, 456, 43, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '22222', - 123, 50, 51, 52, 456, 53, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '33333', - 123, 60, 61, 62, 456, 634, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data: three counties, three MSAs + N = 3 + rows = [ + self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0] + for i in range(N) + ] + [ + self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0] + for i in range(N) + ] + self._insert_rows(rows) + test_output = Epidata.async_epidata([ - { - 'source': 'covidcast', - 'data_source': 'src', - 'signals': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'geo_value': '11111', - 'time_values': '20200414' - }, - { - 'source': 'covidcast', - 'data_source': 'src', - 'signals': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'geo_value': '00000', - 'time_values': '20200414' - } + self.params_from_row(rows[0], source='covidcast'), + self.params_from_row(rows[1], source='covidcast') ]*12, batch_size=10) responses = [i[0] for i in test_output] # check response is same as standard covidcast call, using 24 calls to test batch sizing - self.assertEqual(responses, - [Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '11111'), - Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '00000')]*12 - ) + self.assertEqual( + responses, + [Epidata.covidcast(**self.params_from_row(rows[0])),Epidata.covidcast(**self.params_from_row(rows[1]))]*12 + ) @fake_epidata_endpoint def test_async_epidata_fail(self): diff --git a/integrations/server/test_covidcast.py b/integrations/server/test_covidcast.py index 36e4cd880..f22f59610 100644 --- a/integrations/server/test_covidcast.py +++ b/integrations/server/test_covidcast.py @@ -1,6 +1,7 @@ """Integration tests for the `covidcast` endpoint.""" # standard library +import json import unittest # third party @@ -9,85 +10,70 @@ # first party from delphi_utils import Nans +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase # use the local instance of the Epidata API BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + -class CovidcastTests(unittest.TestCase): +class CovidcastTests(CovidcastBase): """Tests the `covidcast` endpoint.""" - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" - - # connect to the `epidata` database and clear the `covidcast` table - cnx = mysql.connector.connect( - user='user', - password='pass', - host='delphi_database_epidata', - database='epidata') - cur = cnx.cursor() - cur.execute('truncate table covidcast') - cnx.commit() - cur.close() - - # make connection and cursor available to test cases - self.cnx = cnx - self.cur = cnx.cursor() - - def tearDown(self): - """Perform per-test teardown.""" - self.cur.close() - self.cnx.close() + self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') + + def request_based_on_row(self, row, extract_response=lambda x: x.json(), **kwargs): + params = self.params_from_row(row, endpoint='covidcast', **kwargs) + response = requests.get(BASE_URL, params=params) + response.raise_for_status() + response = extract_response(response) + + expected = self.expected_from_row(row) + + return response, expected + + def _insert_placeholder_set_one(self): + row, settings = self._make_placeholder_row() + self._insert_rows([row]) + return row + + def _insert_placeholder_set_two(self): + rows = [ + self._make_placeholder_row(geo_type='county', geo_value=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + for i in [1, 2, 3] + ] + [ + # geo value intended to overlap with counties above + self._make_placeholder_row(geo_type='msa', geo_value=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + for i in [4, 5, 6] + ] + self._insert_rows(rows) + return rows + + def _insert_placeholder_set_three(self): + rows = [ + self._make_placeholder_row(geo_type='county', geo_value='11111', time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i)[0] + for i in [1, 2, 3] + ] + [ + # time value intended to overlap with 11111 above, with disjoint geo values + self._make_placeholder_row(geo_type='county', geo_value=str(i)*5, time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i)[0] + for i in [4, 5, 6] + ] + self._insert_rows(rows) + return rows def test_round_trip(self): """Make a simple round-trip with some sample data.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() - + # insert placeholder data + row = self._insert_placeholder_set_one() + # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() - - # assert that the right data came back + response, expected = self.request_based_on_row(row) self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': 4, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) @@ -96,7 +82,7 @@ def test_round_trip(self): # def test_uri_too_long(self): # """Test that a long request yields a 414 with GET but works with POST.""" - # # insert dummy data + # # insert placeholder data # self.cur.execute(f''' # INSERT INTO # `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, @@ -139,38 +125,33 @@ def test_round_trip(self): def test_csv_format(self): """Test generate csv data.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data + row = self._insert_placeholder_set_one() # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'format': 'csv' - }) - response.raise_for_status() - response = response.text + # NB 'format' is a Python reserved word + response, _ = self.request_based_on_row( + row, + extract_response=lambda resp: resp.text, + **{'format':'csv'} + ) expected_response = ( "geo_value,signal,time_value,direction,issue,lag,missing_value," + "missing_stderr,missing_sample_size,value,stderr,sample_size\n" + - "01234,sig,20200414,4,20200414,0,0,0,0,1.5,2.5,3.5\n" + ",".join("" if x is None else str(x) for x in [ + row.geo_value, + row.signal, + row.time_value, + row.direction, + row.issue, + row.lag, + row.missing_value, + row.missing_stderr, + row.missing_sample_size, + row.value, + row.stderr, + row.sample_size + ]) + "\n" ) # assert that the right data came back @@ -179,775 +160,220 @@ def test_csv_format(self): def test_raw_json_format(self): """Test generate raw json data.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data + row = self._insert_placeholder_set_one() # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'format': 'json' - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(row, **{'format':'json'}) # assert that the right data came back - self.assertEqual(response, [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': 4, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }]) + self.assertEqual(response, [expected]) def test_fields(self): - """Test to limit fields field""" - - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() - - # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() + """Test fields parameter""" - # assert that the right data came back - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': 4, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], - 'message': 'success', - }) + # insert placeholder data + row = self._insert_placeholder_set_one() # limit fields - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'fields': 'time_value,geo_value' - }) - response.raise_for_status() - response = response.json() - - # assert that the right data came back - self.assertEqual(response, { + response, expected = self.request_based_on_row(row, fields='time_value,geo_value') + expected_all = { 'result': 1, 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234' + k: expected[k] for k in ['time_value', 'geo_value'] }], 'message': 'success', - }) + } + + # assert that the right data came back + self.assertEqual(response, expected_all) - # limit invalid values - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'fields': 'time_value,geo_value,dummy' - }) - response.raise_for_status() - response = response.json() + # limit using invalid fields + response, _ = self.request_based_on_row(row, fields='time_value,geo_value,doesnt_exist') - # assert that the right data came back - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234' - }], - 'message': 'success', - }) + # assert that the right data came back (only valid fields) + self.assertEqual(response, expected_all) - # limit exclude fields - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'fields': ( + # limit exclude fields: exclude all except time_value and geo_value + + response, _ = self.request_based_on_row(row, fields=( '-value,-stderr,-sample_size,-direction,-issue,-lag,-signal,' + '-missing_value,-missing_stderr,-missing_sample_size' - ) - }) - response.raise_for_status() - response = response.json() - + )) + # assert that the right data came back - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234' - }], - 'message': 'success', - }) + self.assertEqual(response, expected_all) def test_location_wildcard(self): """Select all locations with a wildcard query.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '11111', - 123, 10, 11, 12, 456, 13, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '22222', - 123, 20, 21, 22, 456, 23, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '33333', - 123, 30, 31, 32, 456, 33, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '11111', - 123, 40, 41, 42, 456, 43, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '22222', - 123, 50, 51, 52, 456, 53, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '33333', - 123, 60, 61, 62, 456, 634, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_two() + expected_counties = [ + self.expected_from_row(r) for r in rows[:3] + ] # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, _ = self.request_based_on_row(rows[0], geo_value="*") + self.maxDiff = None # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': [ - { - 'time_value': 20200414, - 'geo_value': '11111', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': 13, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '22222', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': 23, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '33333', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': 33, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, - ], + 'epidata': expected_counties, 'message': 'success', }) - + def test_geo_value(self): """test different variants of geo types: single, *, multi.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '11111', - 123, 10, 11, 12, 456, 13, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '22222', - 123, 20, 21, 22, 456, 23, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200414, '33333', - 123, 30, 31, 32, 456, 33, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '11111', - 123, 40, 41, 42, 456, 43, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '22222', - 123, 50, 51, 52, 456, 53, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'msa', 20200414, '33333', - 123, 60, 61, 62, 456, 634, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_two() + expected_counties = [ + self.expected_from_row(r) for r in rows[:3] + ] def fetch(geo_value): # make the request - params = { - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - } - if isinstance(geo_value, list): - params['geo_values'] = ','.join(geo_value) - else: - params['geo_value'] = geo_value - response = requests.get(BASE_URL, params=params) - response.raise_for_status() - response = response.json() + response, _ = self.request_based_on_row(rows[0], geo_value=geo_value) return response - counties = [{ - 'time_value': 20200414, - 'geo_value': '11111', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': 13, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '22222', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': 23, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '33333', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': 33, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }] - - # test fetch all - r = fetch('*') - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], counties) # test fetch a specific region r = fetch('11111') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) + self.assertEqual(r['epidata'], [expected_counties[0]]) # test fetch a specific yet not existing region r = fetch('55555') self.assertEqual(r['message'], 'no results') - # test fetch a multiple regions - r = fetch(['11111', '22222']) + # test fetch multiple regions + r = fetch('11111,22222') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0], counties[1]]) - # test fetch a multiple regions in another variant - r = fetch(['11111', '33333']) + self.assertEqual(r['epidata'], [expected_counties[0], expected_counties[1]]) + # test fetch multiple noncontiguous regions + r = fetch('11111,33333') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0], counties[2]]) - # test fetch a multiple regions but one is not existing - r = fetch(['11111', '55555']) + self.assertEqual(r['epidata'], [expected_counties[0], expected_counties[2]]) + # test fetch multiple regions but one is not existing + r = fetch('11111,55555') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) - # test fetch a multiple regions but specify no region - r = fetch([]) + self.assertEqual(r['epidata'], [expected_counties[0]]) + # test fetch empty region + r = fetch('') self.assertEqual(r['message'], 'no results') - - + def test_location_timeline(self): """Select a timeline for a particular location.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200411, '01234', - 123, 10, 11, 12, 456, 13, 20200413, 2, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200412, '01234', - 123, 20, 21, 22, 456, 23, 20200413, 1, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200413, '01234', - 123, 30, 31, 32, 456, 33, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200411, '11111', - 123, 40, 41, 42, 456, 43, 20200413, 2, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200412, '22222', - 123, 50, 51, 52, 456, 53, 20200413, 1, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200413, '33333', - 123, 60, 61, 62, 456, 63, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_three() + expected_timeseries = [ + self.expected_from_row(r) for r in rows[:3] + ] # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411-20200413', - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() + response, _ = self.request_based_on_row(rows[0], time_values='20000101-20000105') # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': [ - { - 'time_value': 20200411, - 'geo_value': '01234', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': 13, - 'issue': 20200413, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200412, - 'geo_value': '01234', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': 23, - 'issue': 20200413, - 'lag': 1, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200413, - 'geo_value': '01234', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': 33, - 'issue': 20200413, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, - ], + 'epidata': expected_timeseries, 'message': 'success', }) + @unittest.skip("v4 now uses ON DUPLICATE KEY UPDATE which prevents this key collision. Consider moving this test to a database integration test which runs SQL without the ON DUPLICATE KEY UPDATE clause to verify constraints are set correctly.") def test_unique_key_constraint(self): """Don't allow a row with a key collision to be inserted.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 0, 0, 0, 0, 0, 0, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() - - # fail to insert different dummy data under the same key + # insert placeholder data + row = self._insert_placeholder_set_one() + + # fail to insert different placeholder data under the same key with self.assertRaises(mysql.connector.errors.IntegrityError): - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 1, 1, 1, 1, 1, 1, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - - # succeed to insert different dummy data under a different issue - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 1, 1, 1, 1, 1, 1, 20200415, 1, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') + self._insert_placeholder_set_one() + + # succeed to insert different placeholder data under a different time_type + self._insert_placeholder_set_one(time_type='week') def test_nullable_columns(self): """Missing values should be surfaced as null.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200414, '01234', - 123, 0.123, NULL, NULL, 456, NULL, 20200414, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.OTHER}, {Nans.OTHER}) - ''') - self.cnx.commit() + row, _ = self._make_placeholder_row( + stderr=None, sample_size=None, + missing_stderr=Nans.OTHER.value, missing_sample_size=Nans.OTHER.value + ) + self._insert_rows([row]) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() - expected_response = { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 0.123, - 'stderr': None, - 'sample_size': None, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.OTHER, - 'missing_sample_size': Nans.OTHER - }], - 'message': 'success', - } + response, expected = self.request_based_on_row(row) + expected.update(stderr=None, sample_size=None) # assert that the right data came back - self.assertEqual(response, expected_response) - + self.assertEqual(response, { + 'result': 1, + 'epidata': [expected], + 'message': 'success', + }) + def test_temporal_partitioning(self): """Request a signal that's available at multiple temporal resolutions.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'hour', 'state', 2020041714, 'vi', - 123, 10, 11, 12, 456, 13, 2020041714, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'state', 20200417, 'vi', - 123, 20, 21, 22, 456, 23, 20200417, 00, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'week', 'state', 202016, 'vi', - 123, 30, 31, 32, 456, 33, 202016, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'month', 'state', 202004, 'vi', - 123, 40, 41, 42, 456, 43, 202004, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'year', 'state', 2020, 'vi', - 123, 50, 51, 52, 456, 53, 2020, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data + rows = [ + self._make_placeholder_row(time_type=tt)[0] + for tt in "hour day week month year".split() + ] + self._insert_rows(rows) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'week', - 'geo_type': 'state', - 'time_values': '0-9999999999', - 'geo_value': 'vi', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[1], time_values="00010101-99990101") # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'time_value': 202016, - 'geo_value': 'vi', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': 33, - 'issue': 202016, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) - + def test_date_formats(self): """Request a signal using different time formats.""" - # insert dummy data - self.cur.execute(f''' - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (0, 'src', 'sig', 'day', 'county', 20200411, '01234', - 123, 10, 11, 12, 456, 13, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200412, '01234', - 123, 20, 21, 22, 456, 23, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200413, '01234', - 123, 30, 31, 32, 456, 33, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200411, '11111', - 123, 40, 41, 42, 456, 43, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200412, '22222', - 123, 50, 51, 52, 456, 53, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (0, 'src', 'sig', 'day', 'county', 20200413, '33333', - 123, 60, 61, 62, 456, 63, 20200413, 0, 1, False, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_three() # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="20000102", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 2) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '2020-04-11', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 2) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411,20200412', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="20000102,20000103", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 4) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '2020-04-11,2020-04-12', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02,2000-01-03", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 4) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411-20200413', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="20000102-20000104", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 6) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '2020-04-11:2020-04-13', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02:2000-01-04", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 6) \ No newline at end of file diff --git a/integrations/server/test_covidcast_endpoints.py b/integrations/server/test_covidcast_endpoints.py index 68c542846..dc95ca195 100644 --- a/integrations/server/test_covidcast_endpoints.py +++ b/integrations/server/test_covidcast_endpoints.py @@ -16,129 +16,20 @@ from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_cache +from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase # use the local instance of the Epidata API BASE_URL = "http://delphi_web_epidata/epidata/covidcast" -@dataclass -class CovidcastRow: - id: int = 0 - source: str = "src" - signal: str = "sig" - time_type: str = "day" - geo_type: str = "county" - time_value: int = 20200411 - geo_value: str = "01234" - value_updated_timestamp: int = 20200202 - value: float = 10.0 - stderr: float = 0 - sample_size: float = 10 - direction_updated_timestamp: int = 20200202 - direction: int = 0 - issue: int = 20200202 - lag: int = 0 - is_latest_issue: bool = True - is_wip: bool = False - missing_value: int = Nans.NOT_MISSING - missing_stderr: int = Nans.NOT_MISSING - missing_sample_size: int = Nans.NOT_MISSING - - def __str__(self): - return f"""( - {self.id}, - '{self.source}', - '{self.signal}', - '{self.time_type}', - '{self.geo_type}', - {self.time_value}, - '{self.geo_value}', - {self.value_updated_timestamp}, - {self.value}, - {self.stderr}, - {self.sample_size}, - {self.direction_updated_timestamp}, - {self.direction}, - {self.issue}, - {self.lag}, - {self.is_latest_issue}, - {self.is_wip}, - {self.missing_value}, - {self.missing_stderr}, - {self.missing_sample_size} - )""" - - @staticmethod - def from_json(json: Dict[str, Any]) -> "CovidcastRow": - return CovidcastRow( - source=json["source"], - signal=json["signal"], - time_type=json["time_type"], - geo_type=json["geo_type"], - geo_value=json["geo_value"], - direction=json["direction"], - issue=json["issue"], - lag=json["lag"], - value=json["value"], - stderr=json["stderr"], - sample_size=json["sample_size"], - missing_value=json["missing_value"], - missing_stderr=json["missing_stderr"], - missing_sample_size=json["missing_sample_size"], - ) - - @property - def signal_pair(self): - return f"{self.source}:{self.signal}" - - @property - def geo_pair(self): - return f"{self.geo_type}:{self.geo_value}" - - @property - def time_pair(self): - return f"{self.time_type}:{self.time_value}" - - -class CovidcastEndpointTests(unittest.TestCase): +class CovidcastEndpointTests(CovidcastBase): """Tests the `covidcast/*` endpoint.""" - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" - - # connect to the `epidata` database and clear the `covidcast` table - cnx = mysql.connector.connect(user="user", password="pass", host="delphi_database_epidata", database="epidata") - cur = cnx.cursor() - cur.execute("truncate table covidcast") - cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = ""') - cnx.commit() - cur.close() - - # make connection and cursor available to test cases - self.cnx = cnx - self.cur = cnx.cursor() - - def tearDown(self): - """Perform per-test teardown.""" - self.cur.close() - self.cnx.close() - - def _insert_rows(self, rows: Iterable[CovidcastRow]): - sql = ",\n".join((str(r) for r in rows)) - self.cur.execute( - f""" - INSERT INTO - `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, - `time_value`, `geo_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, - `direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - {sql} - """ - ) - self.cnx.commit() - return rows + # reset the `covidcast_meta_cache` table (it should always have one row) + self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') def _fetch(self, endpoint="/", **params): # make the request @@ -150,9 +41,9 @@ def _fetch(self, endpoint="/", **params): return response.json() def test_basic(self): - """Request a signal the / endpoint.""" + """Request a signal from the / endpoint.""" - rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(10)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(10)] first = rows[0] self._insert_rows(rows) @@ -161,20 +52,20 @@ def test_basic(self): self.assertEqual(out["result"], -1) with self.subTest("simple"): - out = self._fetch("/", signal=first.signal_pair, geo=first.geo_pair, time="day:*") + out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") self.assertEqual(len(out["epidata"]), len(rows)) def test_trend(self): - """Request a signal the /trend endpoint.""" + """Request a signal from the /trend endpoint.""" num_rows = 30 - rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(num_rows)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(num_rows)] first = rows[0] last = rows[-1] ref = rows[num_rows // 2] self._insert_rows(rows) - out = self._fetch("/trend", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20201212", basis=ref.time_value) + out = self._fetch("/trend", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20201212", basis=ref.time_value) self.assertEqual(out["result"], 1) self.assertEqual(len(out["epidata"]), 1) @@ -199,15 +90,15 @@ def test_trend(self): self.assertEqual(trend["max_trend"], "steady") def test_trendseries(self): - """Request a signal the /trendseries endpoint.""" + """Request a signal from the /trendseries endpoint.""" num_rows = 3 - rows = [CovidcastRow(time_value=20200401 + i, value=num_rows - i) for i in range(num_rows)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=num_rows - i)[0] for i in range(num_rows)] first = rows[0] last = rows[-1] self._insert_rows(rows) - out = self._fetch("/trendseries", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20200410", basis=1) + out = self._fetch("/trendseries", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20200410", basis=1) self.assertEqual(out["result"], 1) self.assertEqual(len(out["epidata"]), 3) @@ -264,18 +155,18 @@ def match_row(trend, row): self.assertEqual(trend["max_trend"], "decreasing") def test_correlation(self): - """Request a signal the /correlation endpoint.""" + """Request a signal from the /correlation endpoint.""" num_rows = 30 - reference_rows = [CovidcastRow(signal="ref", time_value=20200401 + i, value=i) for i in range(num_rows)] + reference_rows = [self._make_placeholder_row(signal="ref", time_value=20200401 + i, value=i)[0] for i in range(num_rows)] first = reference_rows[0] self._insert_rows(reference_rows) - other_rows = [CovidcastRow(signal="other", time_value=20200401 + i, value=i) for i in range(num_rows)] + other_rows = [self._make_placeholder_row(signal="other", time_value=20200401 + i, value=i)[0] for i in range(num_rows)] other = other_rows[0] self._insert_rows(other_rows) max_lag = 3 - out = self._fetch("/correlation", reference=first.signal_pair, others=other.signal_pair, geo=first.geo_pair, window="20200401-20201212", lag=max_lag) + out = self._fetch("/correlation", reference=first.signal_pair(), others=other.signal_pair(), geo=first.geo_pair(), window="20200401-20201212", lag=max_lag) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), max_lag * 2 + 1) # -...0...+ @@ -291,15 +182,15 @@ def test_correlation(self): self.assertEqual(df["samples"].tolist(), [num_rows - abs(l) for l in range(-max_lag, max_lag + 1)]) def test_csv(self): - """Request a signal the /csv endpoint.""" + """Request a signal from the /csv endpoint.""" - rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(10)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(10)] first = rows[0] self._insert_rows(rows) response = requests.get( f"{BASE_URL}/csv", - params=dict(signal=first.signal_pair, start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), + params=dict(signal=first.signal_pair(), start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), ) response.raise_for_status() out = response.text @@ -308,16 +199,16 @@ def test_csv(self): self.assertEqual(list(df.columns), ["geo_value", "signal", "time_value", "issue", "lag", "value", "stderr", "sample_size", "geo_type", "data_source"]) def test_backfill(self): - """Request a signal the /backfill endpoint.""" + """Request a signal from the /backfill endpoint.""" num_rows = 10 - issue_0 = [CovidcastRow(time_value=20200401 + i, value=i, sample_size=1, lag=0, issue=20200401 + i, is_latest_issue=False) for i in range(num_rows)] - issue_1 = [CovidcastRow(time_value=20200401 + i, value=i + 1, sample_size=2, lag=1, issue=20200401 + i + 1, is_latest_issue=False) for i in range(num_rows)] - last_issue = [CovidcastRow(time_value=20200401 + i, value=i + 2, sample_size=3, lag=2, issue=20200401 + i + 2, is_latest_issue=True) for i in range(num_rows)] + issue_0 = [self._make_placeholder_row(time_value=20200401 + i, value=i, sample_size=1, lag=0, issue=20200401 + i)[0] for i in range(num_rows)] + issue_1 = [self._make_placeholder_row(time_value=20200401 + i, value=i + 1, sample_size=2, lag=1, issue=20200401 + i + 1)[0] for i in range(num_rows)] + last_issue = [self._make_placeholder_row(time_value=20200401 + i, value=i + 2, sample_size=3, lag=2, issue=20200401 + i + 2)[0] for i in range(num_rows)] # <-- the latest issues self._insert_rows([*issue_0, *issue_1, *last_issue]) first = issue_0[0] - out = self._fetch("/backfill", signal=first.signal_pair, geo=first.geo_pair, time="day:20200401-20201212", anchor_lag=3) + out = self._fetch("/backfill", signal=first.signal_pair(), geo=first.geo_pair(), time="day:20200401-20201212", anchor_lag=3) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), 3 * num_rows) # num issues @@ -336,10 +227,10 @@ def test_backfill(self): self.assertEqual(df_t0["sample_size_completeness"].tolist(), [1 / 3, 2 / 3, 3 / 3]) # total 2, given 0,1,2 def test_meta(self): - """Request a signal the /meta endpoint.""" + """Request a signal from the /meta endpoint.""" num_rows = 10 - rows = [CovidcastRow(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli") for i in range(num_rows)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli")[0] for i in range(num_rows)] self._insert_rows(rows) first = rows[0] last = rows[-1] @@ -376,26 +267,26 @@ def test_meta(self): self.assertEqual(len(out), 0) def test_coverage(self): - """Request a signal the /coverage endpoint.""" + """Request a signal from the /coverage endpoint.""" num_geos_per_date = [10, 20, 30, 40, 44] dates = [20200401 + i for i in range(len(num_geos_per_date))] - rows = [CovidcastRow(time_value=dates[i], value=i, geo_value=str(geo_value)) for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] + rows = [self._make_placeholder_row(time_value=dates[i], value=i, geo_value=str(geo_value))[0] for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] self._insert_rows(rows) first = rows[0] with self.subTest("default"): - out = self._fetch("/coverage", signal=first.signal_pair, latest=dates[-1], format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, latest=dates[-1], format="json") self.assertEqual(len(out), len(num_geos_per_date)) self.assertEqual([o["time_value"] for o in out], dates) self.assertEqual([o["count"] for o in out], num_geos_per_date) with self.subTest("specify window"): - out = self._fetch("/coverage", signal=first.signal_pair, window=f"{dates[0]}-{dates[1]}", format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json") self.assertEqual(len(out), 2) self.assertEqual([o["time_value"] for o in out], dates[:2]) self.assertEqual([o["count"] for o in out], num_geos_per_date[:2]) with self.subTest("invalid geo_type"): - out = self._fetch("/coverage", signal=first.signal_pair, geo_type="state", format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type="doesnt_exist", format="json") self.assertEqual(len(out), 0) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 60a8268e5..8d6fc06db 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -61,6 +61,20 @@ def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, v self.lag = lag self.is_wip = is_wip + def signal_pair(self): + return f"{self.source}:{self.signal}" + + def geo_pair(self): + return f"{self.geo_type}:{self.geo_value}" + + +# constants for the codes used in the `process_status` column of `signal_load` +class _PROCESS_STATUS(object): + INSERTING = 'i' + LOADED = 'l' + BATCHING = 'b' +PROCESS_STATUS = _PROCESS_STATUS() + class Database: """A collection of covidcast database operations.""" diff --git a/src/acquisition/covidcast/test_utils.py b/src/acquisition/covidcast/test_utils.py new file mode 100644 index 000000000..7df92e91c --- /dev/null +++ b/src/acquisition/covidcast/test_utils.py @@ -0,0 +1,86 @@ +import unittest + +from delphi_utils import Nans +from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +import delphi.operations.secrets as secrets + +# all the Nans we use here are just one value, so this is a shortcut to it: +nmv = Nans.NOT_MISSING.value + +class CovidcastBase(unittest.TestCase): + def setUp(self): + # use the local test instance of the database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + self._db = Database() + self._db.connect() + + # empty all of the data tables + for table in "covidcast".split(): + self._db._cursor.execute(f"TRUNCATE TABLE {table};") + self.localSetUp() + self._db._connection.commit() + + def localSetUp(self): + pass + + def tearDown(self): + # close and destroy conenction to the database + self._db.disconnect(False) + del self._db + + DEFAULT_TIME_VALUE=2000_01_01 + DEFAULT_ISSUE=2000_01_01 + def _make_placeholder_row(self, **kwargs): + settings = { + 'source': 'src', + 'signal': 'sig', + 'geo_type': 'state', + 'geo_value': 'pa', + 'time_type': 'day', + 'time_value': self.DEFAULT_TIME_VALUE, + 'value': 0.0, + 'stderr': 1.0, + 'sample_size': 2.0, + 'missing_value': nmv, + 'missing_stderr': nmv, + 'missing_sample_size': nmv, + 'issue': self.DEFAULT_ISSUE, + 'lag': 0, + 'is_wip': False # TODO: Remove in v4 + } + settings.update(kwargs) + return (CovidcastRow(**settings), settings) + + def _insert_rows(self, rows): + # inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables + n = self._db.insert_or_update_bulk(rows) + print(f"{n} rows added to load table. dispatching to v4 schema") + # TODO: Add in v4 + # self._db.run_dbjobs() + self._db._connection.commit() + ###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib) + + def params_from_row(self, row, **kwargs): + ret = { + 'data_source': row.source, + 'signals': row.signal, + 'time_type': row.time_type, + 'geo_type': row.geo_type, + 'time_values': row.time_value, + 'geo_value': row.geo_value, + } + ret.update(kwargs) + return ret + + # TODO: remove is_wip in v4 + DEFAULT_MINUS=['time_type', 'geo_type', 'source', 'is_wip'] + def expected_from_row(self, row, minus=DEFAULT_MINUS): + expected = dict(vars(row)) + # remove columns commonly excluded from output + # nb may need to add source or *_type back in for multiplexed queries + for key in ['id', 'direction_updated_timestamp'] + minus: + del expected[key] + return expected +