diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e082443a5..9ed1cfde9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -62,7 +62,7 @@ jobs: - name: Start services run: | docker network create --driver bridge delphi-net - docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata delphi_database_epidata + docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata docker run --rm -d -p 10080:80 --env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --network delphi-net --name delphi_web_epidata delphi_web_epidata docker ps diff --git a/dev/docker/database/epidata/Dockerfile b/dev/docker/database/epidata/Dockerfile index dca3d3730..bd0ac37b5 100644 --- a/dev/docker/database/epidata/Dockerfile +++ b/dev/docker/database/epidata/Dockerfile @@ -1,5 +1,14 @@ -# start with the `delphi_database` image -FROM delphi_database +# start with a standard percona mysql image +FROM percona:ps-8 + +# percona exits with the mysql user but we need root for additional setup +USER root + +# use delphi's timezome +RUN ln -s -f /usr/share/zoneinfo/America/New_York /etc/localtime + +# specify a development-only password for the database user "root" +ENV MYSQL_ROOT_PASSWORD pass # create the `epidata` database ENV MYSQL_DATABASE epidata @@ -14,5 +23,11 @@ COPY repos/delphi/delphi-epidata/dev/docker/database/epidata/_init.sql /docker-e # provide DDL which will create empty tables at container startup COPY repos/delphi/delphi-epidata/src/ddl/*.sql /docker-entrypoint-initdb.d/ +# provide additional configuration needed for percona +COPY repos/delphi/delphi-epidata/dev/docker/database/mysql.d/*.cnf /etc/my.cnf.d/ + # grant access to SQL scripts RUN chmod o+r /docker-entrypoint-initdb.d/*.sql + +# restore mysql user for percona +USER mysql diff --git a/dev/docker/database/mysql.d/my.cnf b/dev/docker/database/mysql.d/my.cnf new file mode 100644 index 000000000..0c952a7a7 --- /dev/null +++ b/dev/docker/database/mysql.d/my.cnf @@ -0,0 +1,2 @@ +[mysqld] +default_authentication_plugin=mysql_native_password \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index c18363c03..d9be18645 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -71,10 +71,15 @@ def test_caching(self): # insert dummy data self.cur.execute(f''' - INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig'); + INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) + VALUES + (42, 'src', 'sig'); ''') self.cur.execute(f''' - INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa'); + INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) + VALUES + (96, 'state', 'pa'), + (97, 'state', 'wa'); ''') self.cur.execute(f''' INSERT INTO diff --git a/integrations/acquisition/covidcast/test_db.py b/integrations/acquisition/covidcast/test_db.py index c064e3867..3371a3073 100644 --- a/integrations/acquisition/covidcast/test_db.py +++ b/integrations/acquisition/covidcast/test_db.py @@ -2,37 +2,13 @@ from delphi_utils import Nans from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase import delphi.operations.secrets as secrets # all the Nans we use here are just one value, so this is a shortcut to it: nmv = Nans.NOT_MISSING.value -class TestTest(unittest.TestCase): - - def setUp(self): - # use the local test instance of the database - secrets.db.host = 'delphi_database_epidata' - secrets.db.epi = ('user', 'pass') - - self._db = Database() - self._db.connect() - - # empty all of the data tables - for table in "signal_load signal_latest signal_history geo_dim signal_dim".split(): - self._db._cursor.execute(f"TRUNCATE TABLE {table}") - - def tearDown(self): - # close and destroy conenction to the database - self._db.disconnect(False) - del self._db - - def _make_dummy_row(self): - return CovidcastRow('src', 'sig', 'day', 'state', 2022_02_22, 'pa', 2, 22, 222, nmv,nmv,nmv, 2022_02_22, 0) - # cols: ^ timeval v se ssz ^issue ^lag - - def _insert_rows(self, rows): - self._db.insert_or_update_bulk(rows) - ###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib) +class TestTest(CovidcastBase): def _find_matches_for_row(self, row): # finds (if existing) row from both history and latest views that matches long-key of provided CovidcastRow @@ -63,7 +39,7 @@ def test_id_sync(self): latest_view = 'signal_latest_v' # add a data point - base_row = self._make_dummy_row() + base_row, _ = self._make_placeholder_row() self._insert_rows([base_row]) # ensure the primary keys match in the latest and history tables matches = self._find_matches_for_row(base_row) @@ -73,7 +49,7 @@ def test_id_sync(self): old_pk_id = matches[latest_view][pk_column] # add a reissue for said data point - next_row = self._make_dummy_row() + next_row, _ = self._make_placeholder_row() next_row.issue += 1 self._insert_rows([next_row]) # ensure the new keys also match diff --git a/integrations/acquisition/covidcast/test_delete_batch.py b/integrations/acquisition/covidcast/test_delete_batch.py index 41921a3d3..f59ec7b04 100644 --- a/integrations/acquisition/covidcast/test_delete_batch.py +++ b/integrations/acquisition/covidcast/test_delete_batch.py @@ -95,9 +95,9 @@ def _test_delete_batch(self, cc_deletions): ), # verify latest issue was corrected Example( - f'select geo_value, issue from {self._db.latest_view} where time_value=0', - [('d_nonlatest', 2), - ('d_latest', 2)] + f'select geo_value, issue from {self._db.latest_view} where time_value=0 order by geo_value', + [('d_latest', 2), + ('d_nonlatest', 2)] ) ] diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 5f012f6c9..625d2859d 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -16,6 +16,7 @@ from delphi.epidata.client.delphi_epidata import Epidata from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase import delphi.operations.secrets as secrets # py3tester coverage target @@ -32,36 +33,14 @@ def wrapper(*args): # all the Nans we use here are just one value, so this is a shortcut to it: nmv = Nans.NOT_MISSING.value -class DelphiEpidataPythonClientTests(unittest.TestCase): +class DelphiEpidataPythonClientTests(CovidcastBase): """Tests the Python client.""" - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" - # connect to the `epidata` database and clear relevant tables - cnx = mysql.connector.connect( - user='user', - password='pass', - host='delphi_database_epidata', - database='covid') - cur = cnx.cursor() - - # clear all tables - cur.execute("truncate table signal_load") - cur.execute("truncate table signal_history") - cur.execute("truncate table signal_latest") - cur.execute("truncate table geo_dim") - cur.execute("truncate table signal_dim") # reset the `covidcast_meta_cache` table (it should always have one row) - cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - - cnx.commit() - cur.close() - - # make connection and cursor available to the Database object - self._db = Database() - self._db._connection = cnx - self._db._cursor = cnx.cursor() + self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') # use the local instance of the Epidata API Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' @@ -70,226 +49,129 @@ def setUp(self): secrets.db.host = 'delphi_database_epidata' secrets.db.epi = ('user', 'pass') - def tearDown(self): - """Perform per-test teardown.""" - self._db._cursor.close() - self._db._connection.close() - def test_covidcast(self): """Test that the covidcast endpoint returns expected data.""" - # insert dummy data + # insert placeholder data: three issues of one signal, one issue of another rows = [ - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '01234', - 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig2', 'day', 'county', 20200414, '01234', - 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '01234', - 5.5, 1.2, 10.5, nmv, nmv, nmv, 20200415, 1), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '01234', - 6.5, 2.2, 11.5, nmv, nmv, nmv, 20200416, 2), + self._make_placeholder_row(issue=self.DEFAULT_ISSUE + i, value=i, lag=i)[0] + for i in range(3) ] - self._db.insert_or_update_bulk(rows) - self._db._connection.commit() + row_latest_issue = rows[-1] + rows.append( + self._make_placeholder_row(signal="sig2")[0] + ) + self._insert_rows(rows) with self.subTest(name='request two signals'): # fetch data response = Epidata.covidcast( - 'src', ['sig', 'sig2'], 'day', 'county', 20200414, '01234') + **self.params_from_row(rows[0], signals=[rows[0].signal, rows[-1].signal]) + ) + + expected = [ + self.expected_from_row(row_latest_issue), + self.expected_from_row(rows[-1]) + ] # check result self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': None, - 'issue': 20200416, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig2', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': expected, 'message': 'success', }) with self.subTest(name='request two signals with tree format'): # fetch data response = Epidata.covidcast( - 'src', ['sig', 'sig2'], 'day', 'county', 20200414, '01234', - format='tree') + **self.params_from_row(rows[0], signals=[rows[0].signal, rows[-1].signal], format='tree') + ) + + expected = [{ + rows[0].signal: [ + self.expected_from_row(row_latest_issue, self.DEFAULT_MINUS + ['signal']), + ], + rows[-1].signal: [ + self.expected_from_row(rows[-1], self.DEFAULT_MINUS + ['signal']), + ], + }] # check result self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'sig': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': None, - 'issue': 20200416, - 'lag': 2, - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], - 'sig2': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], - }], + 'epidata': expected, 'message': 'success', }) with self.subTest(name='request most recent'): # fetch data, without specifying issue or lag response_1 = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234') + **self.params_from_row(rows[0]) + ) + + expected = self.expected_from_row(row_latest_issue) # check result self.assertEqual(response_1, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': None, - 'issue': 20200416, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) with self.subTest(name='request as-of a date'): # fetch data, specifying as_of response_1a = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234', - as_of=20200415) + **self.params_from_row(rows[0], as_of=rows[1].issue) + ) + + expected = self.expected_from_row(rows[1]) # check result + self.maxDiff=None self.assertEqual(response_1a, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 5.5, - 'stderr': 1.2, - 'sample_size': 10.5, - 'direction': None, - 'issue': 20200415, - 'lag': 1, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) with self.subTest(name='request a range of issues'): # fetch data, specifying issue range, not lag response_2 = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234', - issues=Epidata.range(20200414, 20200415)) + **self.params_from_row(rows[0], issues=Epidata.range(rows[0].issue, rows[1].issue)) + ) + + expected = [ + self.expected_from_row(rows[0]), + self.expected_from_row(rows[1]) + ] # check result self.assertDictEqual(response_2, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 5.5, - 'stderr': 1.2, - 'sample_size': 10.5, - 'direction': None, - 'issue': 20200415, - 'lag': 1, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': expected, 'message': 'success', }) with self.subTest(name='request at a given lag'): # fetch data, specifying lag, not issue range response_3 = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, '01234', - lag=2) + **self.params_from_row(rows[0], lag=2) + ) + + expected = self.expected_from_row(row_latest_issue) # check result self.assertDictEqual(response_3, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 6.5, - 'stderr': 2.2, - 'sample_size': 11.5, - 'direction': None, - 'issue': 20200416, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) with self.subTest(name='long request'): # fetch data, without specifying issue or lag # TODO should also trigger a post but doesn't due to the 414 issue response_1 = Epidata.covidcast( - 'src', 'sig'*1000, 'day', 'county', 20200414, '01234') + **self.params_from_row(rows[0], signals='sig'*1000) + ) # check result self.assertEqual(response_1, {'message': 'no results', 'result': -2}) @@ -338,70 +220,25 @@ def test_retry_request(self, get): def test_geo_value(self): """test different variants of geo types: single, *, multi.""" - # insert dummy data + # insert placeholder data: three counties, three MSAs + N = 3 rows = [ - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', - 10, 11, 12, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '22222', - 20, 21, 22, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '33333', - 30, 31, 32, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'msa', 20200414, '11111', - 40, 41, 42, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'msa', 20200414, '22222', - 50, 51, 52, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'msa', 20200414, '33333', - 60, 61, 62, nmv, nmv, nmv, 20200414, 0), + self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0] + for i in range(N) + ] + [ + self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0] + for i in range(N) ] - self._db.insert_or_update_bulk(rows) - self._db._connection.commit() + self._insert_rows(rows) - def fetch(geo_value): - # make the request - response = Epidata.covidcast( - 'src', 'sig', 'day', 'county', 20200414, geo_value) - return response - - counties = [{ - 'time_value': 20200414, - 'geo_value': '11111', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '22222', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '33333', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }] + counties = [ + self.expected_from_row(rows[i]) for i in range(N) + ] + + def fetch(geo): + return Epidata.covidcast( + **self.params_from_row(rows[0], geo_value=geo) + ) # test fetch all r = fetch('*') @@ -410,22 +247,22 @@ def fetch(geo_value): # test fetch a specific region r = fetch('11111') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) + self.assertEqual(r['epidata'], [counties[1]]) # test fetch a specific yet not existing region r = fetch('55555') self.assertEqual(r['message'], 'no results') # test fetch a multiple regions r = fetch(['11111', '22222']) self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0], counties[1]]) + self.assertEqual(r['epidata'], [counties[1], counties[2]]) # test fetch a multiple regions in another variant - r = fetch(['11111', '33333']) + r = fetch(['00000', '22222']) self.assertEqual(r['message'], 'success') self.assertEqual(r['epidata'], [counties[0], counties[2]]) # test fetch a multiple regions but one is not existing r = fetch(['11111', '55555']) self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) + self.assertEqual(r['epidata'], [counties[1]]) # test fetch a multiple regions but specify no region r = fetch([]) self.assertEqual(r['message'], 'no results') @@ -433,17 +270,15 @@ def fetch(geo_value): def test_covidcast_meta(self): """Test that the covidcast_meta endpoint returns expected data.""" - # insert dummy data + # insert placeholder data: three dates, three issues. values are: + # 1st issue: 0 10 20 + # 2nd issue: 1 11 21 + # 3rd issue: 2 12 22 rows = [ - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '01234', - 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '01234', - 6.0, 2.2, 11.5, nmv, nmv, nmv, 20200416, 2), - CovidcastRow('src', 'sig', 'day', 'county', 20200415, '01234', - 7.0, 2.0, 12.5, nmv, nmv, nmv, 20200416, 1), + self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE + t, issue=self.DEFAULT_ISSUE + i, value=t*10 + i)[0] + for i in range(3) for t in range(3) ] - self._db.insert_or_update_bulk(rows) - self._db._connection.commit() + self._insert_rows(rows) # cache it update_covidcast_meta_cache(args=None) @@ -459,73 +294,55 @@ def test_covidcast_meta(self): # remove "last updated" time so our comparison below works: del response['epidata'][0]['last_update'] + expected = dict( + data_source=rows[0].source, + signal=rows[0].signal, + time_type=rows[0].time_type, + geo_type=rows[0].geo_type, + min_time=self.DEFAULT_TIME_VALUE, + max_time=self.DEFAULT_TIME_VALUE + 2, + num_locations=1, + min_value=2., + mean_value=12., + max_value=22., + stdev_value=8.1649658, # population stdev, not sample, which is 10. + max_issue=self.DEFAULT_ISSUE + 2, + min_lag=0, + max_lag=0, # we didn't set lag when inputting data + ) # check result + self.maxDiff=None self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'min_time': 20200414, - 'max_time': 20200415, - 'num_locations': 1, - 'min_value': 6.0, - 'max_value': 7.0, - 'mean_value': 6.5, - 'stdev_value': 0.5, - 'max_issue': 20200416, - 'min_lag': 1, - 'max_lag': 2, - }], + 'epidata': [expected], 'message': 'success', }) def test_async_epidata(self): - # insert dummy data + # insert placeholder data: three counties, three MSAs + N = 3 rows = [ - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', - 10, 11, 12, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '22222', - 20, 21, 22, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '33333', - 30, 31, 32, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'msa', 20200414, '11111', - 40, 41, 42, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'msa', 20200414, '22222', - 50, 51, 52, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'msa', 20200414, '33333', - 60, 61, 62, nmv, nmv, nmv, 20200414, 0) + self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0] + for i in range(N) + ] + [ + self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0] + for i in range(N) ] - self._db.insert_or_update_bulk(rows) - self._db._connection.commit() + self._insert_rows(rows) test_output = Epidata.async_epidata([ - { - 'source': 'covidcast', - 'data_source': 'src', - 'signals': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'geo_value': '11111', - 'time_values': '20200414' - }, - { - 'source': 'covidcast', - 'data_source': 'src', - 'signals': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'geo_value': '00000', - 'time_values': '20200414' - } + self.params_from_row(rows[0], source='covidcast'), + self.params_from_row(rows[1], source='covidcast') ]*12, batch_size=10) responses = [i[0] for i in test_output] # check response is same as standard covidcast call, using 24 calls to test batch sizing - self.assertEqual(responses, - [Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '11111'), - Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '00000')]*12 - ) + self.assertEqual( + responses, + [ + Epidata.covidcast(**self.params_from_row(rows[0])), + Epidata.covidcast(**self.params_from_row(rows[1])), + ]*12 + ) @fake_epidata_endpoint def test_async_epidata_fail(self): diff --git a/integrations/server/test_covidcast.py b/integrations/server/test_covidcast.py index 048816612..2d80c12de 100644 --- a/integrations/server/test_covidcast.py +++ b/integrations/server/test_covidcast.py @@ -1,6 +1,7 @@ """Integration tests for the `covidcast` endpoint.""" # standard library +import json import unittest # third party @@ -9,137 +10,70 @@ # first party from delphi_utils import Nans +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase # use the local instance of the Epidata API BASE_URL = 'http://delphi_web_epidata/epidata/api.php' -class CovidcastTests(unittest.TestCase): + +class CovidcastTests(CovidcastBase): """Tests the `covidcast` endpoint.""" - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" + self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - # connect to the `epidata` database and clear the `covidcast` table - cnx = mysql.connector.connect( - user='user', - password='pass', - host='delphi_database_epidata', - database='covid') - cur = cnx.cursor() - - # clear all tables - cur.execute("truncate table signal_load") - cur.execute("truncate table signal_history") - cur.execute("truncate table signal_latest") - cur.execute("truncate table geo_dim") - cur.execute("truncate table signal_dim") - # reset the `covidcast_meta_cache` table (it should always have one row) - cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - - cnx.commit() - cur.close() - - # make connection and cursor available to test cases - self.cnx = cnx - self.cur = cnx.cursor() - - def tearDown(self): - """Perform per-test teardown.""" - self.cur.close() - self.cnx.close() - - def _insert_dummy_data_set_one(self): - self.cur.execute(f''' INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig'); ''') - self.cur.execute(f''' INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'county', '01234'); ''') - self.cur.execute(f''' - INSERT INTO `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) - VALUES (0, 42, 96, - 'day', 20200414, 123, 1.5, 2.5, 3.5, 20200414, 0, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}); - ''') - - def _insert_dummy_data_set_two(self): - self.cur.execute(f''' INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig'); ''') - self.cur.execute(f''' - INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES - (101, 'county', '11111'), - (202, 'county', '22222'), - (303, 'county', '33333'), - (1001, 'msa', '11111'), - (2002, 'msa', '22222'), - (3003, 'msa', '33333'); ''') - self.cur.execute(f''' - INSERT INTO `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) - VALUES - (1, 42, 101, 'day', 20200414, 123, 10, 11, 12, 20200414, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (2, 42, 202, 'day', 20200414, 123, 20, 21, 22, 20200414, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (3, 42, 303, 'day', 20200414, 123, 30, 31, 32, 20200414, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (4, 42, 1001, 'day', 20200414, 123, 40, 41, 42, 20200414, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (5, 42, 2002, 'day', 20200414, 123, 50, 51, 52, 20200414, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (6, 42, 3003, 'day', 20200414, 123, 60, 61, 62, 20200414, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}); ''') - - def _insert_dummy_data_set_three(self): - self.cur.execute(f''' INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig'); ''') - self.cur.execute(f''' - INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES - (101, 'county', '11111'), - (202, 'county', '22222'), - (303, 'county', '33333'), - (444, 'county', '01234'); ''') - self.cur.execute(f''' - INSERT INTO `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) - VALUES - (1, 42, 444, 'day', 20200411, 123, 10, 11, 12, 20200413, 2, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (2, 42, 444, 'day', 20200412, 123, 20, 21, 22, 20200413, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (3, 42, 444, 'day', 20200413, 123, 30, 31, 32, 20200413, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (4, 42, 101, 'day', 20200411, 123, 40, 41, 42, 20200413, 2, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (5, 42, 202, 'day', 20200412, 123, 50, 51, 52, 20200413, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (6, 42, 303, 'day', 20200413, 123, 60, 61, 62, 20200413, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}); ''') + def request_based_on_row(self, row, extract_response=lambda x: x.json(), **kwargs): + params = self.params_from_row(row, endpoint='covidcast', **kwargs) + response = requests.get(BASE_URL, params=params) + response.raise_for_status() + response = extract_response(response) + + expected = self.expected_from_row(row) + + return response, expected + + def _insert_placeholder_set_one(self): + row, settings = self._make_placeholder_row() + self._insert_rows([row]) + return row + + def _insert_placeholder_set_two(self): + rows = [ + self._make_placeholder_row(geo_type='county', geo_value=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + for i in [1, 2, 3] + ] + [ + # geo value intended to overlap with counties above + self._make_placeholder_row(geo_type='msa', geo_value=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.)[0] + for i in [4, 5, 6] + ] + self._insert_rows(rows) + return rows + + def _insert_placeholder_set_three(self): + rows = [ + self._make_placeholder_row(geo_type='county', geo_value='11111', time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i)[0] + for i in [1, 2, 3] + ] + [ + # time value intended to overlap with 11111 above, with disjoint geo values + self._make_placeholder_row(geo_type='county', geo_value=str(i)*5, time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i)[0] + for i in [4, 5, 6] + ] + self._insert_rows(rows) + return rows def test_round_trip(self): """Make a simple round-trip with some sample data.""" - # insert dummy data - self._insert_dummy_data_set_one() - self.cnx.commit() + # insert placeholder data + row = self._insert_placeholder_set_one() # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() - - # assert that the right data came back + response, expected = self.request_based_on_row(row) self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) @@ -148,7 +82,7 @@ def test_round_trip(self): # def test_uri_too_long(self): # """Test that a long request yields a 414 with GET but works with POST.""" - # # insert dummy data + # # insert placeholder data # self.cur.execute(f''' # INSERT INTO # `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`, @@ -191,27 +125,33 @@ def test_round_trip(self): def test_csv_format(self): """Test generate csv data.""" - # insert dummy data - self._insert_dummy_data_set_one() - self.cnx.commit() + # insert placeholder data + row = self._insert_placeholder_set_one() # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'format': 'csv' - }) - response.raise_for_status() - response = response.text + # NB 'format' is a Python reserved word + response, _ = self.request_based_on_row( + row, + extract_response=lambda resp: resp.text, + **{'format':'csv'} + ) expected_response = ( "geo_value,signal,time_value,direction,issue,lag,missing_value," + "missing_stderr,missing_sample_size,value,stderr,sample_size\n" + - "01234,sig,20200414,,20200414,0,0,0,0,1.5,2.5,3.5\n" + ",".join("" if x is None else str(x) for x in [ + row.geo_value, + row.signal, + row.time_value, + row.direction, + row.issue, + row.lag, + row.missing_value, + row.missing_stderr, + row.missing_sample_size, + row.value, + row.stderr, + row.sample_size + ]) + "\n" ) # assert that the right data came back @@ -220,623 +160,220 @@ def test_csv_format(self): def test_raw_json_format(self): """Test generate raw json data.""" - # insert dummy data - self._insert_dummy_data_set_one() - self.cnx.commit() + # insert placeholder data + row = self._insert_placeholder_set_one() # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'format': 'json' - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(row, **{'format':'json'}) # assert that the right data came back - self.assertEqual(response, [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }]) + self.assertEqual(response, [expected]) def test_fields(self): - """Test to limit fields field""" - - # insert dummy data - self._insert_dummy_data_set_one() - self.cnx.commit() + """Test fields parameter""" - # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() + # insert placeholder data + row = self._insert_placeholder_set_one() - # assert that the right data came back - self.assertEqual(response, { + # limit fields + response, expected = self.request_based_on_row(row, fields='time_value,geo_value') + expected_all = { 'result': 1, 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 1.5, - 'stderr': 2.5, - 'sample_size': 3.5, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING + k: expected[k] for k in ['time_value', 'geo_value'] }], 'message': 'success', - }) - - # limit fields - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'fields': 'time_value,geo_value' - }) - response.raise_for_status() - response = response.json() + } # assert that the right data came back - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234' - }], - 'message': 'success', - }) + self.assertEqual(response, expected_all) - # limit invalid values - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'fields': 'time_value,geo_value,dummy' - }) - response.raise_for_status() - response = response.json() + # limit using invalid fields + response, _ = self.request_based_on_row(row, fields='time_value,geo_value,doesnt_exist') - # assert that the right data came back - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234' - }], - 'message': 'success', - }) + # assert that the right data came back (only valid fields) + self.assertEqual(response, expected_all) - # limit exclude fields - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - 'fields': ( + # limit exclude fields: exclude all except time_value and geo_value + + response, _ = self.request_based_on_row(row, fields=( '-value,-stderr,-sample_size,-direction,-issue,-lag,-signal,' + '-missing_value,-missing_stderr,-missing_sample_size' - ) - }) - response.raise_for_status() - response = response.json() + )) # assert that the right data came back - self.assertEqual(response, { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234' - }], - 'message': 'success', - }) + self.assertEqual(response, expected_all) def test_location_wildcard(self): """Select all locations with a wildcard query.""" - # insert dummy data - self._insert_dummy_data_set_two() - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_two() + expected_counties = [ + self.expected_from_row(r) for r in rows[:3] + ] # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, _ = self.request_based_on_row(rows[0], geo_value="*") + self.maxDiff = None # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': [ - { - 'time_value': 20200414, - 'geo_value': '11111', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '22222', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '33333', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, - ], + 'epidata': expected_counties, 'message': 'success', }) def test_geo_value(self): """test different variants of geo types: single, *, multi.""" - # insert dummy data - self._insert_dummy_data_set_two() - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_two() + expected_counties = [ + self.expected_from_row(r) for r in rows[:3] + ] def fetch(geo_value): # make the request - params = { - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - } - if isinstance(geo_value, list): - params['geo_values'] = ','.join(geo_value) - else: - params['geo_value'] = geo_value - response = requests.get(BASE_URL, params=params) - response.raise_for_status() - response = response.json() + response, _ = self.request_based_on_row(rows[0], geo_value=geo_value) return response - counties = [{ - 'time_value': 20200414, - 'geo_value': '11111', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '22222', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200414, - 'geo_value': '33333', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }] - - # test fetch all - r = fetch('*') - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], counties) # test fetch a specific region r = fetch('11111') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) + self.assertEqual(r['epidata'], [expected_counties[0]]) # test fetch a specific yet not existing region r = fetch('55555') self.assertEqual(r['message'], 'no results') - # test fetch a multiple regions - r = fetch(['11111', '22222']) + # test fetch multiple regions + r = fetch('11111,22222') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0], counties[1]]) - # test fetch a multiple regions in another variant - r = fetch(['11111', '33333']) + self.assertEqual(r['epidata'], [expected_counties[0], expected_counties[1]]) + # test fetch multiple noncontiguous regions + r = fetch('11111,33333') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0], counties[2]]) - # test fetch a multiple regions but one is not existing - r = fetch(['11111', '55555']) + self.assertEqual(r['epidata'], [expected_counties[0], expected_counties[2]]) + # test fetch multiple regions but one is not existing + r = fetch('11111,55555') self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], [counties[0]]) - # test fetch a multiple regions but specify no region - r = fetch([]) + self.assertEqual(r['epidata'], [expected_counties[0]]) + # test fetch empty region + r = fetch('') self.assertEqual(r['message'], 'no results') - def test_location_timeline(self): """Select a timeline for a particular location.""" - # insert dummy data - self._insert_dummy_data_set_three() - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_three() + expected_timeseries = [ + self.expected_from_row(r) for r in rows[:3] + ] # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411-20200413', - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() + response, _ = self.request_based_on_row(rows[0], time_values='20000101-20000105') # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': [ - { - 'time_value': 20200411, - 'geo_value': '01234', - 'value': 10, - 'stderr': 11, - 'sample_size': 12, - 'direction': None, - 'issue': 20200413, - 'lag': 2, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200412, - 'geo_value': '01234', - 'value': 20, - 'stderr': 21, - 'sample_size': 22, - 'direction': None, - 'issue': 20200413, - 'lag': 1, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, { - 'time_value': 20200413, - 'geo_value': '01234', - 'value': 30, - 'stderr': 31, - 'sample_size': 32, - 'direction': None, - 'issue': 20200413, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }, - ], + 'epidata': expected_timeseries, 'message': 'success', }) + @unittest.skip("v4 now uses ON DUPLICATE KEY UPDATE which prevents this key collision. Consider moving this test to a database integration test which runs SQL without the ON DUPLICATE KEY UPDATE clause to verify constraints are set correctly.") def test_unique_key_constraint(self): """Don't allow a row with a key collision to be inserted.""" - # insert dummy data - self._insert_dummy_data_set_one() - self.cnx.commit() + # insert placeholder data + row = self._insert_placeholder_set_one() - # fail to insert different dummy data under the same key + # fail to insert different placeholder data under the same key with self.assertRaises(mysql.connector.errors.IntegrityError): - self.cur.execute(f''' - INSERT INTO `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) - VALUES (1, 42, 96, - 'day', 20200414, 123, 991.5, 992.5, 993.5, 20200414, 0, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}); - ''') - - # succeed to insert different dummy data under a different time_type - self.cur.execute(f''' - INSERT INTO `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) - VALUES (2, 42, 96, - 'score', 20200414, 123, 991.5, 992.5, 993.5, 20200415, 1, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}); - ''') + self._insert_placeholder_set_one() + + # succeed to insert different placeholder data under a different time_type + self._insert_placeholder_set_one(time_type='week') def test_nullable_columns(self): """Missing values should be surfaced as null.""" - # insert dummy data - self.cur.execute(f''' INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig'); ''') - self.cur.execute(f''' INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'county', '01234'); ''') - self.cur.execute(f''' - INSERT INTO `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) - VALUES (0, 42, 96, - 'day', 20200414, 123, 0.123, NULL, NULL, 20200414, 0, - {Nans.NOT_MISSING}, {Nans.OTHER}, {Nans.OTHER}); - ''') - self.cnx.commit() + row, _ = self._make_placeholder_row( + stderr=None, sample_size=None, + missing_stderr=Nans.OTHER.value, missing_sample_size=Nans.OTHER.value + ) + self._insert_rows([row]) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': 20200414, - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() - expected_response = { - 'result': 1, - 'epidata': [{ - 'time_value': 20200414, - 'geo_value': '01234', - 'value': 0.123, - 'stderr': None, - 'sample_size': None, - 'direction': None, - 'issue': 20200414, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.OTHER, - 'missing_sample_size': Nans.OTHER - }], - 'message': 'success', - } + response, expected = self.request_based_on_row(row) + expected.update(stderr=None, sample_size=None) # assert that the right data came back - self.assertEqual(response, expected_response) + self.assertEqual(response, { + 'result': 1, + 'epidata': [expected], + 'message': 'success', + }) def test_temporal_partitioning(self): """Request a signal that's available at multiple temporal resolutions.""" - # insert dummy data - self._insert_dummy_data_set_one() # this creates a record w/ temporal type of 'day' - self.cur.execute(f''' - INSERT INTO `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) - VALUES - (1, 42, 96, - 'hour', 2020041423, 123, 15, 25, 35, 20200414, 0, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (2, 42, 96, - 'week', 202016, 123, 115, 125, 135, 202016, 0, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (3, 42, 96, - 'month', 202004, 123, 215, 225, 235, 20200414, 0, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (4, 42, 96, - 'year', 2020, 123, 315, 325, 335, 20200414, 0, - {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}); - ''') - self.cnx.commit() + # insert placeholder data + rows = [ + self._make_placeholder_row(time_type=tt)[0] + for tt in "hour day week month year".split() + ] + self._insert_rows(rows) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'week', - 'geo_type': 'county', - 'time_values': '0-9999999999', - 'geo_value': '01234', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[1], time_values="0-99999999") # assert that the right data came back self.assertEqual(response, { 'result': 1, - 'epidata': [{ - 'time_value': 202016, - 'geo_value': '01234', - 'value': 115, - 'stderr': 125, - 'sample_size': 135, - 'direction': None, - 'issue': 202016, - 'lag': 0, - 'signal': 'sig', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING - }], + 'epidata': [expected], 'message': 'success', }) def test_date_formats(self): """Request a signal using different time formats.""" - # insert dummy data - self._insert_dummy_data_set_three() - self.cnx.commit() + # insert placeholder data + rows = self._insert_placeholder_set_three() # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="20000102", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 2) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '2020-04-11', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 2) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411,20200412', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="20000102,20000103", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 4) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '2020-04-11,2020-04-12', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02,2000-01-03", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 4) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '20200411-20200413', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="20000102-20000104", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 6) # make the request - response = requests.get(BASE_URL, params={ - 'endpoint': 'covidcast', - 'data_source': 'src', - 'signal': 'sig', - 'time_type': 'day', - 'geo_type': 'county', - 'time_values': '2020-04-11:2020-04-13', - 'geo_value': '*', - }) - response.raise_for_status() - response = response.json() + response, expected = self.request_based_on_row(rows[0], time_values="2000-01-02:2000-01-04", geo_value="*") # assert that the right data came back self.assertEqual(len(response['epidata']), 6) diff --git a/integrations/server/test_covidcast_endpoints.py b/integrations/server/test_covidcast_endpoints.py index b650a4ea3..54974a874 100644 --- a/integrations/server/test_covidcast_endpoints.py +++ b/integrations/server/test_covidcast_endpoints.py @@ -17,117 +17,20 @@ from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_cache from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase # use the local instance of the Epidata API BASE_URL = "http://delphi_web_epidata/epidata/covidcast" -@dataclass -class CovidcastRow: - # TODO in https://github.com/cmu-delphi/delphi-epidata/issues/897 - # this heavily resembles delphi.epidata.acquisition.covidcast.database.CovidcastRow and should be refactored out - source: str = "src" - signal: str = "sig" - time_type: str = "day" - geo_type: str = "county" - time_value: int = 20200411 - geo_value: str = "01234" - value_updated_timestamp: int = 20200202 - value: float = 10.0 - stderr: float = 0 - sample_size: float = 10 - issue: int = 20200202 - lag: int = 0 - missing_value: int = Nans.NOT_MISSING.value - missing_stderr: int = Nans.NOT_MISSING.value - missing_sample_size: int = Nans.NOT_MISSING.value - - def __str__(self): - return f"""( - '{self.source}', - '{self.signal}', - '{self.time_type}', - '{self.geo_type}', - {self.time_value}, - '{self.geo_value}', - {self.value_updated_timestamp}, - {self.value}, - {self.stderr}, - {self.sample_size}, - {self.issue}, - {self.lag}, - {self.missing_value}, - {self.missing_stderr}, - {self.missing_sample_size} - )""" - - @staticmethod - def from_json(json: Dict[str, Any]) -> "CovidcastRow": - return CovidcastRow( - source=json["source"], - signal=json["signal"], - time_type=json["time_type"], - geo_type=json["geo_type"], - geo_value=json["geo_value"], - issue=json["issue"], - lag=json["lag"], - value=json["value"], - stderr=json["stderr"], - sample_size=json["sample_size"], - missing_value=json["missing_value"], - missing_stderr=json["missing_stderr"], - missing_sample_size=json["missing_sample_size"], - ) - - @property - def signal_pair(self): - return f"{self.source}:{self.signal}" - - @property - def geo_pair(self): - return f"{self.geo_type}:{self.geo_value}" +class CovidcastEndpointTests(CovidcastBase): - @property - def time_pair(self): - return f"{self.time_type}:{self.time_value}" - - -class CovidcastEndpointTests(unittest.TestCase): """Tests the `covidcast/*` endpoint.""" - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" - - # connect to the database and clear the tables - cnx = mysql.connector.connect(user="user", password="pass", host="delphi_database_epidata", database="covid") - cur = cnx.cursor() - - # clear all tables - cur.execute("truncate table signal_load") - cur.execute("truncate table signal_history") - cur.execute("truncate table signal_latest") - cur.execute("truncate table geo_dim") - cur.execute("truncate table signal_dim") # reset the `covidcast_meta_cache` table (it should always have one row) - cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - - cnx.commit() - cur.close() - - # make connection and cursor available to the Database object - self._db = Database() - self._db._connection = cnx - self._db._cursor = cnx.cursor() - - def tearDown(self): - """Perform per-test teardown.""" - self._db._cursor.close() - self._db._connection.close() - - def _insert_rows(self, rows: Iterable[CovidcastRow]): - self._db.insert_or_update_bulk(rows) - self._db._connection.commit() - return rows + self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') def _fetch(self, endpoint="/", **params): # make the request @@ -139,9 +42,9 @@ def _fetch(self, endpoint="/", **params): return response.json() def test_basic(self): - """Request a signal the / endpoint.""" + """Request a signal from the / endpoint.""" - rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(10)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(10)] first = rows[0] self._insert_rows(rows) @@ -150,20 +53,20 @@ def test_basic(self): self.assertEqual(out["result"], -1) with self.subTest("simple"): - out = self._fetch("/", signal=first.signal_pair, geo=first.geo_pair, time="day:*") + out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") self.assertEqual(len(out["epidata"]), len(rows)) def test_trend(self): - """Request a signal the /trend endpoint.""" + """Request a signal from the /trend endpoint.""" num_rows = 30 - rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(num_rows)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(num_rows)] first = rows[0] last = rows[-1] ref = rows[num_rows // 2] self._insert_rows(rows) - out = self._fetch("/trend", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20201212", basis=ref.time_value) + out = self._fetch("/trend", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20201212", basis=ref.time_value) self.assertEqual(out["result"], 1) self.assertEqual(len(out["epidata"]), 1) @@ -188,15 +91,15 @@ def test_trend(self): self.assertEqual(trend["max_trend"], "steady") def test_trendseries(self): - """Request a signal the /trendseries endpoint.""" + """Request a signal from the /trendseries endpoint.""" num_rows = 3 - rows = [CovidcastRow(time_value=20200401 + i, value=num_rows - i) for i in range(num_rows)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=num_rows - i)[0] for i in range(num_rows)] first = rows[0] last = rows[-1] self._insert_rows(rows) - out = self._fetch("/trendseries", signal=first.signal_pair, geo=first.geo_pair, date=last.time_value, window="20200401-20200410", basis=1) + out = self._fetch("/trendseries", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20200410", basis=1) self.assertEqual(out["result"], 1) self.assertEqual(len(out["epidata"]), 3) @@ -253,18 +156,18 @@ def match_row(trend, row): self.assertEqual(trend["max_trend"], "decreasing") def test_correlation(self): - """Request a signal the /correlation endpoint.""" + """Request a signal from the /correlation endpoint.""" num_rows = 30 - reference_rows = [CovidcastRow(signal="ref", time_value=20200401 + i, value=i) for i in range(num_rows)] + reference_rows = [self._make_placeholder_row(signal="ref", time_value=20200401 + i, value=i)[0] for i in range(num_rows)] first = reference_rows[0] self._insert_rows(reference_rows) - other_rows = [CovidcastRow(signal="other", time_value=20200401 + i, value=i) for i in range(num_rows)] + other_rows = [self._make_placeholder_row(signal="other", time_value=20200401 + i, value=i)[0] for i in range(num_rows)] other = other_rows[0] self._insert_rows(other_rows) max_lag = 3 - out = self._fetch("/correlation", reference=first.signal_pair, others=other.signal_pair, geo=first.geo_pair, window="20200401-20201212", lag=max_lag) + out = self._fetch("/correlation", reference=first.signal_pair(), others=other.signal_pair(), geo=first.geo_pair(), window="20200401-20201212", lag=max_lag) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), max_lag * 2 + 1) # -...0...+ @@ -280,15 +183,15 @@ def test_correlation(self): self.assertEqual(df["samples"].tolist(), [num_rows - abs(l) for l in range(-max_lag, max_lag + 1)]) def test_csv(self): - """Request a signal the /csv endpoint.""" + """Request a signal from the /csv endpoint.""" - rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(10)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i)[0] for i in range(10)] first = rows[0] self._insert_rows(rows) response = requests.get( f"{BASE_URL}/csv", - params=dict(signal=first.signal_pair, start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), + params=dict(signal=first.signal_pair(), start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type), ) response.raise_for_status() out = response.text @@ -297,16 +200,16 @@ def test_csv(self): self.assertEqual(list(df.columns), ["geo_value", "signal", "time_value", "issue", "lag", "value", "stderr", "sample_size", "geo_type", "data_source"]) def test_backfill(self): - """Request a signal the /backfill endpoint.""" + """Request a signal from the /backfill endpoint.""" num_rows = 10 - issue_0 = [CovidcastRow(time_value=20200401 + i, value=i, sample_size=1, lag=0, issue=20200401 + i) for i in range(num_rows)] - issue_1 = [CovidcastRow(time_value=20200401 + i, value=i + 1, sample_size=2, lag=1, issue=20200401 + i + 1) for i in range(num_rows)] - last_issue = [CovidcastRow(time_value=20200401 + i, value=i + 2, sample_size=3, lag=2, issue=20200401 + i + 2) for i in range(num_rows)] # <-- the latest issues + issue_0 = [self._make_placeholder_row(time_value=20200401 + i, value=i, sample_size=1, lag=0, issue=20200401 + i)[0] for i in range(num_rows)] + issue_1 = [self._make_placeholder_row(time_value=20200401 + i, value=i + 1, sample_size=2, lag=1, issue=20200401 + i + 1)[0] for i in range(num_rows)] + last_issue = [self._make_placeholder_row(time_value=20200401 + i, value=i + 2, sample_size=3, lag=2, issue=20200401 + i + 2)[0] for i in range(num_rows)] # <-- the latest issues self._insert_rows([*issue_0, *issue_1, *last_issue]) first = issue_0[0] - out = self._fetch("/backfill", signal=first.signal_pair, geo=first.geo_pair, time="day:20200401-20201212", anchor_lag=3) + out = self._fetch("/backfill", signal=first.signal_pair(), geo=first.geo_pair(), time="day:20200401-20201212", anchor_lag=3) self.assertEqual(out["result"], 1) df = pd.DataFrame(out["epidata"]) self.assertEqual(len(df), 3 * num_rows) # num issues @@ -325,10 +228,10 @@ def test_backfill(self): self.assertEqual(df_t0["sample_size_completeness"].tolist(), [1 / 3, 2 / 3, 3 / 3]) # total 2, given 0,1,2 def test_meta(self): - """Request a signal the /meta endpoint.""" + """Request a signal from the /meta endpoint.""" num_rows = 10 - rows = [CovidcastRow(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli") for i in range(num_rows)] + rows = [self._make_placeholder_row(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli")[0] for i in range(num_rows)] self._insert_rows(rows) first = rows[0] last = rows[-1] @@ -365,26 +268,26 @@ def test_meta(self): self.assertEqual(len(out), 0) def test_coverage(self): - """Request a signal the /coverage endpoint.""" + """Request a signal from the /coverage endpoint.""" num_geos_per_date = [10, 20, 30, 40, 44] dates = [20200401 + i for i in range(len(num_geos_per_date))] - rows = [CovidcastRow(time_value=dates[i], value=i, geo_value=str(geo_value)) for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] + rows = [self._make_placeholder_row(time_value=dates[i], value=i, geo_value=str(geo_value))[0] for i, num_geo in enumerate(num_geos_per_date) for geo_value in range(num_geo)] self._insert_rows(rows) first = rows[0] with self.subTest("default"): - out = self._fetch("/coverage", signal=first.signal_pair, latest=dates[-1], format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, latest=dates[-1], format="json") self.assertEqual(len(out), len(num_geos_per_date)) self.assertEqual([o["time_value"] for o in out], dates) self.assertEqual([o["count"] for o in out], num_geos_per_date) with self.subTest("specify window"): - out = self._fetch("/coverage", signal=first.signal_pair, window=f"{dates[0]}-{dates[1]}", format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json") self.assertEqual(len(out), 2) self.assertEqual([o["time_value"] for o in out], dates[:2]) self.assertEqual([o["count"] for o in out], num_geos_per_date[:2]) with self.subTest("invalid geo_type"): - out = self._fetch("/coverage", signal=first.signal_pair, geo_type="state", format="json") + out = self._fetch("/coverage", signal=first.signal_pair(), geo_type="doesnt_exist", format="json") self.assertEqual(len(out), 0) diff --git a/integrations/server/test_covidcast_meta.py b/integrations/server/test_covidcast_meta.py index 6c52f56f1..ec5f0e23e 100644 --- a/integrations/server/test_covidcast_meta.py +++ b/integrations/server/test_covidcast_meta.py @@ -33,15 +33,18 @@ class CovidcastMetaTests(unittest.TestCase): } template = ''' - INSERT INTO - `signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, - `time_type`, `time_value`, `value_updated_timestamp`, + INSERT INTO `signal_latest` ( + `signal_data_id`, `signal_key_id`, `geo_key_id`, + `time_type`, `time_value`, `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, `missing_value`, `missing_stderr`,`missing_sample_size`) VALUES - (%d, %d, %d, "%s", %d, 123, - %d, 0, 0, %d, 0, %d, %d, %d) + (%d, %d, %d, + "%s", %d, 123, + %d, 0, 0, + %d, 0, %d, + %d, %d) ''' def setUp(self): @@ -64,7 +67,7 @@ def setUp(self): # reset the `covidcast_meta_cache` table (it should always have one row) cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - # populate dimension tables for convenience + # populate dimension tables for (src,sig) in self.src_sig_lookups: cur.execute(''' INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) @@ -94,14 +97,7 @@ def tearDown(self): self.cur.close() self.cnx.close() - def _get_id(self): - self.id_counter += 1 - return self.id_counter - - def test_round_trip(self): - """Make a simple round-trip with some sample data.""" - - # insert dummy data and accumulate expected results (in sort order) + def insert_placeholder_data(self): expected = [] for src in ('src1', 'src2'): for sig in ('sig1', 'sig2'): @@ -128,11 +124,23 @@ def test_round_trip(self): for gv, v in zip(('geo1', 'geo2'), (10, 20)): self.cur.execute(self.template % ( self._get_id(), - self.src_sig_lookups[(src,sig)], self.geo_lookups[(gt,gv)], tt, tv, v, tv, + self.src_sig_lookups[(src,sig)], self.geo_lookups[(gt,gv)], + tt, tv, v, tv, # re-use time value for issue Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING )) self.cnx.commit() update_cache(args=None) + return expected + + def _get_id(self): + self.id_counter += 1 + return self.id_counter + + def test_round_trip(self): + """Make a simple round-trip with some sample data.""" + + # insert placeholder data and accumulate expected results (in sort order) + expected = self.insert_placeholder_data() # make the request response = requests.get(BASE_URL, params={'endpoint': 'covidcast_meta'}) @@ -146,42 +154,11 @@ def test_round_trip(self): 'message': 'success', }) - def test_filter(self): """Test filtering options some sample data.""" - # insert dummy data and accumulate expected results (in sort order) - expected = [] - for src in ('src1', 'src2'): - for sig in ('sig1', 'sig2'): - for tt in ('day', 'week'): - for gt in ('hrr', 'msa'): - expected.append({ - 'data_source': src, - 'signal': sig, - 'time_type': tt, - 'geo_type': gt, - 'min_time': 1, - 'max_time': 2, - 'num_locations': 2, - 'min_value': 10, - 'max_value': 20, - 'mean_value': 15, - 'stdev_value': 5, - 'last_update': 123, - 'max_issue': 2, - 'min_lag': 0, - 'max_lag': 0, - }) - for tv in (1, 2): - for gv, v in zip(('geo1', 'geo2'), (10, 20)): - self.cur.execute(self.template % ( - self._get_id(), - self.src_sig_lookups[(src,sig)], self.geo_lookups[(gt,gv)], tt, tv, v, tv, - Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING - )) - self.cnx.commit() - update_cache(args=None) + # insert placeholder data and accumulate expected results (in sort order) + expected = self.insert_placeholder_data() def fetch(**kwargs): # make the request diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 9828564d0..2b82fc29c 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -122,7 +122,7 @@ def upload_archive( database.commit() except Exception as e: all_rows_valid = False - logger.exception('exception while inserting rows:', e) + logger.exception('exception while inserting rows', exc_info=e) database.rollback() # archive the current file based on validation results diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 112f50d10..9c370c0c6 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -60,6 +60,11 @@ def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, v self.issue = issue self.lag = lag + def signal_pair(self): + return f"{self.source}:{self.signal}" + + def geo_pair(self): + return f"{self.geo_type}:{self.geo_value}" class Database: @@ -136,7 +141,7 @@ def _reset_load_table_ai_counter(self): def insert_or_update_bulk(self, cc_rows): return self.insert_or_update_batch(cc_rows) - def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False): + def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False): """ Insert new rows into the load table and dispatch into dimension and fact tables. """ @@ -204,7 +209,8 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False self._cursor.executemany(insert_into_loader_sql, args) modified_row_count = self._cursor.rowcount self._cursor.execute(fix_is_latest_issue_sql) - self.run_dbjobs() # TODO: consider incorporating the logic of dbjobs() into this method [once calls to dbjobs() are no longer needed for migrations] + if not suppress_jobs: + self.run_dbjobs() # TODO: incorporate the logic of dbjobs() into this method [once calls to dbjobs() are no longer needed for migrations] if modified_row_count is None or modified_row_count == -1: # the SQL connector does not support returning number of rows affected (see PEP 249) @@ -226,7 +232,7 @@ def run_dbjobs(self): INSERT INTO signal_dim (`source`, `signal`) SELECT DISTINCT sl.source, sl.signal FROM {self.load_table} AS sl LEFT JOIN signal_dim AS sd - ON sl.source=sd.source AND sl.signal=sd.signal + USING (`source`, `signal`) WHERE sd.source IS NULL ''' @@ -235,7 +241,7 @@ def run_dbjobs(self): INSERT INTO geo_dim (`geo_type`, `geo_value`) SELECT DISTINCT sl.geo_type, sl.geo_value FROM {self.load_table} AS sl LEFT JOIN geo_dim AS gd - ON sl.geo_type=gd.geo_type AND sl.geo_value=gd.geo_value + USING (`geo_type`, `geo_value`) WHERE gd.geo_type IS NULL ''' @@ -356,6 +362,8 @@ def delete_batch(self, cc_deletions): # composite keys: short_comp_key = "`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`" long_comp_key = short_comp_key + ", `issue`" + short_comp_ref_key = "`signal_key_id`, `geo_key_id`, `time_type`, `time_value`" + long_comp_ref_key = short_comp_ref_key + ", `issue`" create_tmp_table_sql = f''' CREATE TABLE {tmp_table_name} LIKE {self.load_table}; @@ -410,23 +418,27 @@ def delete_batch(self, cc_deletions): # AND also after `delete_latest_sql` so that we dont get a key collision on insert. update_latest_sql = f''' INSERT INTO {self.latest_table} - (issue, - signal_data_id, signal_key_id, geo_key_id, time_type, time_value, + (signal_data_id, + signal_key_id, geo_key_id, time_type, time_value, issue, value, stderr, sample_size, `lag`, value_updated_timestamp, missing_value, missing_stderr, missing_sample_size) SELECT - MAX(h.issue), - h.signal_data_id, h.signal_key_id, h.geo_key_id, h.time_type, h.time_value, + h.signal_data_id, + h.signal_key_id, h.geo_key_id, h.time_type, h.time_value, h.issue, h.value, h.stderr, h.sample_size, h.`lag`, h.value_updated_timestamp, h.missing_value, h.missing_stderr, h.missing_sample_size -FROM {self.history_view} h JOIN {tmp_table_name} d USING ({short_comp_key}) -WHERE d.update_latest=1 GROUP BY {short_comp_key}; +FROM {self.history_view} h JOIN ( + SELECT {short_comp_key}, MAX(hh.issue) AS issue + FROM {self.history_view} hh JOIN {tmp_table_name} dd USING ({short_comp_key}) + WHERE dd.update_latest=1 GROUP BY {short_comp_key} + ) d USING ({long_comp_key}); ''' - drop_tmp_table_sql = f'DROP TABLE {tmp_table_name}' + drop_tmp_table_sql = f'DROP TABLE IF EXISTS {tmp_table_name}' total = None try: + self._cursor.execute(drop_tmp_table_sql) self._cursor.execute(create_tmp_table_sql) self._cursor.execute(amend_tmp_table_sql) if isinstance(cc_deletions, str): @@ -437,15 +449,21 @@ def split_list(lst, n): yield lst[i:(i+n)] for deletions_batch in split_list(cc_deletions, 100000): self._cursor.executemany(load_tmp_table_insert_sql, deletions_batch) + print(f"load_tmp_table_insert_sql:{self._cursor.rowcount}") else: raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}") self._cursor.execute(add_history_id_sql) + print(f"add_history_id_sql:{self._cursor.rowcount}") self._cursor.execute(mark_for_update_latest_sql) + print(f"mark_for_update_latest_sql:{self._cursor.rowcount}") self._cursor.execute(delete_history_sql) + print(f"delete_history_sql:{self._cursor.rowcount}") total = self._cursor.rowcount # TODO: consider reporting rows removed and/or replaced in latest table as well self._cursor.execute(delete_latest_sql) + print(f"delete_latest_sql:{self._cursor.rowcount}") self._cursor.execute(update_latest_sql) + print(f"update_latest_sql:{self._cursor.rowcount}") self._connection.commit() if total == -1: diff --git a/src/acquisition/covidcast/test_utils.py b/src/acquisition/covidcast/test_utils.py new file mode 100644 index 000000000..86c8af8ca --- /dev/null +++ b/src/acquisition/covidcast/test_utils.py @@ -0,0 +1,83 @@ +import unittest + +from delphi_utils import Nans +from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +import delphi.operations.secrets as secrets + +# all the Nans we use here are just one value, so this is a shortcut to it: +nmv = Nans.NOT_MISSING.value + +class CovidcastBase(unittest.TestCase): + def setUp(self): + # use the local test instance of the database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + self._db = Database() + self._db.connect() + + # empty all of the data tables + for table in "signal_load signal_latest signal_history geo_dim signal_dim".split(): + self._db._cursor.execute(f"TRUNCATE TABLE {table};") + self.localSetUp() + self._db._connection.commit() + + def localSetUp(self): + # stub; override in subclasses to perform custom setup. + # runs after tables have been truncated but before database changes have been committed + pass + + def tearDown(self): + # close and destroy conenction to the database + self._db.disconnect(False) + del self._db + + DEFAULT_TIME_VALUE=2000_01_01 + DEFAULT_ISSUE=2000_01_01 + def _make_placeholder_row(self, **kwargs): + settings = { + 'source': 'src', + 'signal': 'sig', + 'geo_type': 'state', + 'geo_value': 'pa', + 'time_type': 'day', + 'time_value': self.DEFAULT_TIME_VALUE, + 'value': 0.0, + 'stderr': 1.0, + 'sample_size': 2.0, + 'missing_value': nmv, + 'missing_stderr': nmv, + 'missing_sample_size': nmv, + 'issue': self.DEFAULT_ISSUE, + 'lag': 0 + } + settings.update(kwargs) + return (CovidcastRow(**settings), settings) + + def _insert_rows(self, rows): + # inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables + n = self._db.insert_or_update_bulk(rows) + print(f"{n} rows added to load table & dispatched to v4 schema") + self._db._connection.commit() # NOTE: this isnt expressly needed for our test cases, but would be if using external access (like through client lib) to ensure changes are visible outside of this db session + + def params_from_row(self, row, **kwargs): + ret = { + 'data_source': row.source, + 'signals': row.signal, + 'time_type': row.time_type, + 'geo_type': row.geo_type, + 'time_values': row.time_value, + 'geo_value': row.geo_value, + } + ret.update(kwargs) + return ret + + DEFAULT_MINUS=['time_type', 'geo_type', 'source'] + def expected_from_row(self, row, minus=DEFAULT_MINUS): + expected = dict(vars(row)) + # remove columns commonly excluded from output + # nb may need to add source or *_type back in for multiplexed queries + for key in ['id', 'direction_updated_timestamp'] + minus: + del expected[key] + return expected + diff --git a/src/acquisition/covidcast_nowcast/load_sensors.py b/src/acquisition/covidcast_nowcast/load_sensors.py index ab9a6b33e..079b2f27c 100644 --- a/src/acquisition/covidcast_nowcast/load_sensors.py +++ b/src/acquisition/covidcast_nowcast/load_sensors.py @@ -92,7 +92,12 @@ def _move_after_processing(filepath, success): def _create_upsert_method(meta): def method(table, conn, keys, data_iter): - sql_table = sqlalchemy.Table(table.name, meta, autoload=True) + sql_table = sqlalchemy.Table( + table.name, + meta, + # specify lag column explicitly; lag is a reserved word sqlalchemy doesn't know about + sqlalchemy.Column("lag", sqlalchemy.Integer, quote=True), + autoload=True) insert_stmt = sqlalchemy.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter]) upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted}) conn.execute(upsert_stmt) diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 88920f63c..cf65e0834 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -98,31 +98,31 @@ CREATE TABLE signal_load ( CREATE OR REPLACE VIEW signal_history_v AS SELECT - 0 AS is_latest_issue, -- provides column-compatibility to match `covidcast` table + 0 AS `is_latest_issue`, -- provides column-compatibility to match `covidcast` table -- ^ this value is essentially undefined in this view, the notion of a 'latest' issue is not encoded here and must be drawn from the 'latest' table or view or otherwise computed... - NULL AS direction, -- provides column-compatibility to match `covidcast` table + NULL AS `direction`, -- provides column-compatibility to match `covidcast` table `t2`.`source` AS `source`, `t2`.`signal` AS `signal`, `t3`.`geo_type` AS `geo_type`, `t3`.`geo_value` AS `geo_value`, - `t1`.`signal_data_id` AS `signal_data_id`, -- TODO: unnecessary ...remove? - `t1`.`strat_key_id` AS `strat_key_id`, -- TODO: for future + `t1`.`signal_data_id` AS `signal_data_id`, + `t1`.`strat_key_id` AS `strat_key_id`, -- TODO: for future use `t1`.`issue` AS `issue`, - `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed ...remove? + `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed `t1`.`time_type` AS `time_type`, `t1`.`time_value` AS `time_value`, - `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use ...remove? + `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use `t1`.`value` AS `value`, `t1`.`stderr` AS `stderr`, `t1`.`sample_size` AS `sample_size`, `t1`.`lag` AS `lag`, `t1`.`value_updated_timestamp` AS `value_updated_timestamp`, - `t1`.`computation_as_of_dt` AS `computation_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed ...remove? + `t1`.`computation_as_of_dt` AS `computation_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed `t1`.`missing_value` AS `missing_value`, `t1`.`missing_stderr` AS `missing_stderr`, `t1`.`missing_sample_size` AS `missing_sample_size`, - `t1`.`signal_key_id` AS `signal_key_id`, -- TODO: unnecessary ...remove? - `t1`.`geo_key_id` AS `geo_key_id` -- TODO: unnecessary ...remove? + `t1`.`signal_key_id` AS `signal_key_id`, + `t1`.`geo_key_id` AS `geo_key_id` FROM `signal_history` `t1` JOIN `signal_dim` `t2` ON `t1`.`signal_key_id` = `t2`.`signal_key_id` @@ -131,30 +131,30 @@ CREATE OR REPLACE VIEW signal_history_v AS CREATE OR REPLACE VIEW signal_latest_v AS SELECT - 1 AS is_latest_issue, -- provides column-compatibility to match `covidcast` table - NULL AS direction, -- provides column-compatibility to match `covidcast` table + 1 AS `is_latest_issue`, -- provides column-compatibility to match `covidcast` table + NULL AS `direction`, -- provides column-compatibility to match `covidcast` table `t2`.`source` AS `source`, `t2`.`signal` AS `signal`, `t3`.`geo_type` AS `geo_type`, `t3`.`geo_value` AS `geo_value`, - `t1`.`signal_data_id` AS `signal_data_id`, -- TODO: unnecessary ...remove? + `t1`.`signal_data_id` AS `signal_data_id`, `t1`.`strat_key_id` AS `strat_key_id`, -- TODO: for future use `t1`.`issue` AS `issue`, - `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed ...remove? + `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed `t1`.`time_type` AS `time_type`, `t1`.`time_value` AS `time_value`, - `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use ...remove? + `t1`.`reference_dt` AS `reference_dt`, -- TODO: for future use `t1`.`value` AS `value`, `t1`.`stderr` AS `stderr`, `t1`.`sample_size` AS `sample_size`, `t1`.`lag` AS `lag`, `t1`.`value_updated_timestamp` AS `value_updated_timestamp`, - `t1`.`computation_as_of_dt` AS `computation_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed ...remove? + `t1`.`computation_as_of_dt` AS `computation_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed `t1`.`missing_value` AS `missing_value`, `t1`.`missing_stderr` AS `missing_stderr`, `t1`.`missing_sample_size` AS `missing_sample_size`, - `t1`.`signal_key_id` AS `signal_key_id`, -- TODO: unnecessary ...remove? - `t1`.`geo_key_id` AS `geo_key_id` -- TODO: unnecessary ...remove? + `t1`.`signal_key_id` AS `signal_key_id`, + `t1`.`geo_key_id` AS `geo_key_id` FROM `signal_latest` `t1` JOIN `signal_dim` `t2` ON `t1`.`signal_key_id` = `t2`.`signal_key_id` diff --git a/src/server/endpoints/covid_hosp_state_timeseries.py b/src/server/endpoints/covid_hosp_state_timeseries.py index c8684ddba..5da4d4e16 100644 --- a/src/server/endpoints/covid_hosp_state_timeseries.py +++ b/src/server/endpoints/covid_hosp_state_timeseries.py @@ -154,16 +154,16 @@ def handle(): if issues is not None: q.where_integers("issue", issues) # final query using specific issues - query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) row FROM {q.table} WHERE {q.conditions_clause}) SELECT {q.fields_clause} FROM {q.alias} WHERE row = 1 ORDER BY {q.order_clause}" + query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} WHERE {q.conditions_clause}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" elif as_of is not None: sub_condition_asof = "(issue <= :as_of)" q.params["as_of"] = as_of - query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state ORDER BY issue DESC, record_type) row FROM {q.table} WHERE {q.conditions_clause} AND {sub_condition_asof}) SELECT {q.fields_clause} FROM {q.alias} WHERE row = 1 ORDER BY {q.order_clause}" + query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state ORDER BY issue DESC, record_type) `row` FROM {q.table} WHERE {q.conditions_clause} AND {sub_condition_asof}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" else: # final query using most recent issues subquery = f"(SELECT max(`issue`) `max_issue`, `date`, `state` FROM {q.table} WHERE {q.conditions_clause} GROUP BY `date`, `state`) x" condition = f"x.`max_issue` = {q.alias}.`issue` AND x.`date` = {q.alias}.`date` AND x.`state` = {q.alias}.`state`" - query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) row FROM {q.table} JOIN {subquery} ON {condition}) select {q.fields_clause} FROM {q.alias} WHERE row = 1 ORDER BY {q.order_clause}" + query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} JOIN {subquery} ON {condition}) select {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" # send query return execute_query(query, q.params, fields_string, fields_int, fields_float) diff --git a/src/server/endpoints/covidcast_nowcast.py b/src/server/endpoints/covidcast_nowcast.py index 9b2e79848..9a773f572 100644 --- a/src/server/endpoints/covidcast_nowcast.py +++ b/src/server/endpoints/covidcast_nowcast.py @@ -94,7 +94,7 @@ def handle(): query = f"SELECT {fields} FROM {table} {subquery} WHERE {conditions} AND ({condition_version}) ORDER BY {order}" else: # fetch most recent issue fast - query = f"WITH t as (SELECT {fields}, ROW_NUMBER() OVER (PARTITION BY t.`time_type`, t.`time_value`, t.`source`, t.`signal`, t.`geo_type`, t.`geo_value` ORDER BY t.`issue` DESC) row FROM {table} {subquery} WHERE {conditions}) SELECT {fields} FROM t where row = 1 ORDER BY {order}" + query = f"WITH t as (SELECT {fields}, ROW_NUMBER() OVER (PARTITION BY t.`time_type`, t.`time_value`, t.`source`, t.`signal`, t.`geo_type`, t.`geo_value` ORDER BY t.`issue` DESC) `row` FROM {table} {subquery} WHERE {conditions}) SELECT {fields} FROM t where `row` = 1 ORDER BY {order}" fields_string = ["geo_value", "signal"] fields_int = ["time_value", "issue", "lag"]