Skip to content

[JIT Variant] Split multi-signal queries into many single-signal queries #1026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
40b25c5
Server: add CovidcastRow helper class for testing
dshemetov Oct 7, 2022
4f8c346
Server: update csv_to_database to use CovidcastRow
dshemetov Oct 7, 2022
c119152
Server: update test_db to use CovidcastRow
dshemetov Oct 7, 2022
565aad4
Server: update test_delete_batch to use CovidcastRow
dshemetov Oct 7, 2022
c4e5675
Server: update test_delphi_epidata to use CovidcastRow
dshemetov Oct 7, 2022
166802a
Server: update test_covidcast_endpoints to use CovidcastRow
dshemetov Oct 7, 2022
62dd20f
Server: update test_covidcast to use CovidcastRow
dshemetov Oct 7, 2022
2d68be7
Server: update test_utils to use CovidcastRow
dshemetov Oct 7, 2022
9c53fa0
Server: update test_covidcast to use CovidcastRow
dshemetov Oct 7, 2022
9850fba
Server: update TimePair to auto-sort tuples
dshemetov Oct 7, 2022
0c7466c
Server: minor model.py data_source_by_id name update
dshemetov Oct 7, 2022
b5818ec
Server: minor dates.py spacing fix
dshemetov Oct 7, 2022
01baf4f
Server: remove unused imports test_covidcast
dshemetov Oct 7, 2022
24dbd2b
JIT: major feature commit
dshemetov Oct 7, 2022
9f9dfb3
Acquisition: update test_csv_uploading to remove Pandas warning
dshemetov Oct 11, 2022
0b1696a
Build a container image from this branch
korlaxxalrok Oct 11, 2022
8ec310d
Server: add assert_frame_equal_no_order test util
dshemetov Oct 29, 2022
8a37d82
Server: generalize iterate_over_range boundary inclusion
dshemetov Oct 29, 2022
e7740ee
Server: convert TODO comments to #1017
dshemetov Nov 1, 2022
c54cde9
Server: remove unused Flask import in _config.py
dshemetov Nov 1, 2022
5ce82b5
Server: remove model.py:data_signals threading and use mock
dshemetov Nov 2, 2022
d7a9219
Server: a small naming change in model.py
dshemetov Nov 2, 2022
9e06226
Server: add type hints to _query
dshemetov Nov 4, 2022
520dec8
Server: remove unused imports in _query
dshemetov Nov 4, 2022
7ce36d2
Server: tiny _params change
dshemetov Nov 5, 2022
0586952
JIT: Make JIT integration tests robust (use dataframes)
dshemetov Nov 5, 2022
487e2ce
CI: Update to build JIT Multi SQL image
dshemetov Oct 31, 2022
b2836a3
CI: Update image
dshemetov Oct 31, 2022
344d342
JIT: update model.py with multi-sql functions
dshemetov Nov 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ jobs:
image:
needs: build
# only on main and dev branch
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
#if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/jit_computations' || github.ref == 'refs/heads/ds/jit-multi-sql'

runs-on: ubuntu-latest
steps:
Expand All @@ -133,6 +134,9 @@ jobs:
if [ "$imageTag" = "main" ] ; then
imageTag="latest"
fi
if [ "$imageTag" = "ds/jit-multi-sql" ] ; then
imageTag="jit-multi-sql"
fi
echo "::set-output name=tag::$imageTag"
echo "::set-output name=repo::ghcr.io/${{ github.repository }}"
- name: Push Dev Tag
Expand Down
8 changes: 7 additions & 1 deletion deploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@
"match": "^.*\\.(py)$",
"add-header-comment": true
},
{
"type": "move",
"src": "src/server/utils",
"dst": "[[package]]/server/utils/",
"match": "^.*\\.(py)$",
"add-header-comment": true
},
{
"type": "move",
"src": "src/server/endpoints/covidcast_utils",
"dst": "[[package]]/server/endpoints/covidcast_utils/",
"match": "^.*\\.(py)$",
"add-header-comment": true
},

"// acquisition - fluview",
{
"type": "move",
Expand Down
4 changes: 2 additions & 2 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def test_uploading(self):
"time_value": [20200419],
"signal": [signal_name],
"direction": [None]})], axis=1).rename(columns=uploader_column_rename)
expected_values_df["missing_value"].iloc[0] = Nans.OTHER
expected_values_df["missing_sample_size"].iloc[0] = Nans.NOT_MISSING
expected_values_df.loc[0, "missing_value"] = Nans.OTHER
expected_values_df.loc[0, "missing_sample_size"] = Nans.NOT_MISSING
expected_values = expected_values_df.to_dict(orient="records")
expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'}

Expand Down
15 changes: 8 additions & 7 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import unittest

from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow, DBLoadStateException

from delphi.epidata.acquisition.covidcast.database import DBLoadStateException
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase
import delphi.operations.secrets as secrets


# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

Expand All @@ -31,8 +32,8 @@ def _find_matches_for_row(self, row):

def test_insert_or_update_with_nonempty_load_table(self):
# make rows
a_row = self._make_placeholder_row()[0]
another_row = self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE+1, issue=self.DEFAULT_ISSUE+1)[0]
a_row = CovidcastRow(time_value=20200202)
another_row = CovidcastRow(time_value=20200203, issue=20200203)
# insert one
self._db.insert_or_update_bulk([a_row])
# put something into the load table
Expand Down Expand Up @@ -61,7 +62,7 @@ def test_id_sync(self):
latest_view = 'epimetric_latest_v'

# add a data point
base_row, _ = self._make_placeholder_row()
base_row = CovidcastRow()
self._insert_rows([base_row])
# ensure the primary keys match in the latest and history tables
matches = self._find_matches_for_row(base_row)
Expand All @@ -71,7 +72,7 @@ def test_id_sync(self):
old_pk_id = matches[latest_view][pk_column]

# add a reissue for said data point
next_row, _ = self._make_placeholder_row()
next_row = CovidcastRow()
next_row.issue += 1
self._insert_rows([next_row])
# ensure the new keys also match
Expand Down
7 changes: 2 additions & 5 deletions integrations/acquisition/covidcast/test_delete_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
import unittest
from os import path

# third party
import mysql.connector

# first party
from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.database import Database
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow

# py3tester coverage target (equivalent to `import *`)
__test_target__ = 'delphi.epidata.acquisition.covidcast.database'
Expand Down
110 changes: 55 additions & 55 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
"""Integration tests for delphi_epidata.py."""

# standard library
import unittest
import time
from unittest.mock import patch, MagicMock
from json import JSONDecodeError
from unittest.mock import MagicMock, patch

# third party
from aiohttp.client_exceptions import ClientResponseError
import mysql.connector
# first party
import pytest
from aiohttp.client_exceptions import ClientResponseError

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
# third party
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase
import delphi.operations.secrets as secrets
from delphi.epidata.client.delphi_epidata import Epidata
from delphi_utils import Nans


# py3tester coverage target
__test_target__ = 'delphi.epidata.client.delphi_epidata'
# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value
IGNORE_FIELDS = ["id", "direction_updated_timestamp", "value_updated_timestamp", "source", "time_type", "geo_type"]

def fake_epidata_endpoint(func):
"""This can be used as a decorator to enable a bogus Epidata endpoint to return 404 responses."""
Expand All @@ -30,9 +32,6 @@ def wrapper(*args):
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'
return wrapper

# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

class DelphiEpidataPythonClientTests(CovidcastBase):
"""Tests the Python client."""

Expand All @@ -54,12 +53,12 @@ def test_covidcast(self):

# insert placeholder data: three issues of one signal, one issue of another
rows = [
self._make_placeholder_row(issue=self.DEFAULT_ISSUE + i, value=i, lag=i)[0]
CovidcastRow(issue=20200202 + i, value=i, lag=i)
for i in range(3)
]
row_latest_issue = rows[-1]
rows.append(
self._make_placeholder_row(signal="sig2")[0]
CovidcastRow(signal="sig2")
)
self._insert_rows(rows)

Expand All @@ -70,10 +69,11 @@ def test_covidcast(self):
)

expected = [
self.expected_from_row(row_latest_issue),
self.expected_from_row(rows[-1])
row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS),
rows[-1].as_dict(ignore_fields=IGNORE_FIELDS)
]

self.assertEqual(response['epidata'], expected)
# check result
self.assertEqual(response, {
'result': 1,
Expand All @@ -89,10 +89,10 @@ def test_covidcast(self):

expected = [{
rows[0].signal: [
self.expected_from_row(row_latest_issue, self.DEFAULT_MINUS + ['signal']),
row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS + ['signal']),
],
rows[-1].signal: [
self.expected_from_row(rows[-1], self.DEFAULT_MINUS + ['signal']),
rows[-1].as_dict(ignore_fields=IGNORE_FIELDS + ['signal']),
],
}]

Expand All @@ -109,12 +109,12 @@ def test_covidcast(self):
**self.params_from_row(rows[0])
)

expected = self.expected_from_row(row_latest_issue)
expected = [row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.assertEqual(response_1, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})

Expand All @@ -124,13 +124,13 @@ def test_covidcast(self):
**self.params_from_row(rows[0], as_of=rows[1].issue)
)

expected = self.expected_from_row(rows[1])
expected = [rows[1].as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.maxDiff=None
self.assertEqual(response_1a, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})

Expand All @@ -141,8 +141,8 @@ def test_covidcast(self):
)

expected = [
self.expected_from_row(rows[0]),
self.expected_from_row(rows[1])
rows[0].as_dict(ignore_fields=IGNORE_FIELDS),
rows[1].as_dict(ignore_fields=IGNORE_FIELDS)
]

# check result
Expand All @@ -158,12 +158,12 @@ def test_covidcast(self):
**self.params_from_row(rows[0], lag=2)
)

expected = self.expected_from_row(row_latest_issue)
expected = [row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.assertDictEqual(response_3, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})
with self.subTest(name='long request'):
Expand Down Expand Up @@ -223,16 +223,16 @@ def test_geo_value(self):
# insert placeholder data: three counties, three MSAs
N = 3
rows = [
self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0]
CovidcastRow(geo_type="county", geo_value=str(i)*5, value=i)
for i in range(N)
] + [
self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0]
CovidcastRow(geo_type="msa", geo_value=str(i)*5, value=i*10)
for i in range(N)
]
self._insert_rows(rows)

counties = [
self.expected_from_row(rows[i]) for i in range(N)
rows[i].as_dict(ignore_fields=IGNORE_FIELDS) for i in range(N)
]

def fetch(geo):
Expand All @@ -241,31 +241,31 @@ def fetch(geo):
)

# test fetch all
r = fetch('*')
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], counties)
request = fetch('*')
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], counties)
# test fetch a specific region
r = fetch('11111')
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1]])
request = fetch('11111')
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1]])
# test fetch a specific yet not existing region
r = fetch('55555')
self.assertEqual(r['message'], 'no results')
request = fetch('55555')
self.assertEqual(request['message'], 'no results')
# test fetch a multiple regions
r = fetch(['11111', '22222'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1], counties[2]])
request = fetch(['11111', '22222'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1], counties[2]])
# test fetch a multiple regions in another variant
r = fetch(['00000', '22222'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[0], counties[2]])
request = fetch(['00000', '22222'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[0], counties[2]])
# test fetch a multiple regions but one is not existing
r = fetch(['11111', '55555'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1]])
request = fetch(['11111', '55555'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1]])
# test fetch a multiple regions but specify no region
r = fetch([])
self.assertEqual(r['message'], 'no results')
request = fetch([])
self.assertEqual(request['message'], 'no results')

def test_covidcast_meta(self):
"""Test that the covidcast_meta endpoint returns expected data."""
Expand All @@ -275,7 +275,7 @@ def test_covidcast_meta(self):
# 2nd issue: 1 11 21
# 3rd issue: 2 12 22
rows = [
self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE + t, issue=self.DEFAULT_ISSUE + i, value=t*10 + i)[0]
CovidcastRow(time_value=2020_02_02 + t, issue=2020_02_02 + i, value=t*10 + i)
for i in range(3) for t in range(3)
]
self._insert_rows(rows)
Expand All @@ -299,14 +299,14 @@ def test_covidcast_meta(self):
signal=rows[0].signal,
time_type=rows[0].time_type,
geo_type=rows[0].geo_type,
min_time=self.DEFAULT_TIME_VALUE,
max_time=self.DEFAULT_TIME_VALUE + 2,
min_time=2020_02_02,
max_time=2020_02_02 + 2,
num_locations=1,
min_value=2.,
mean_value=12.,
max_value=22.,
stdev_value=8.1649658, # population stdev, not sample, which is 10.
max_issue=self.DEFAULT_ISSUE + 2,
max_issue=2020_02_02 + 2,
min_lag=0,
max_lag=0, # we didn't set lag when inputting data
)
Expand All @@ -322,10 +322,10 @@ def test_async_epidata(self):
# insert placeholder data: three counties, three MSAs
N = 3
rows = [
self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0]
CovidcastRow(geo_type="county", geo_value=str(i)*5, value=i)
for i in range(N)
] + [
self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0]
CovidcastRow(geo_type="msa", geo_value=str(i)*5, value=i*10)
for i in range(N)
]
self._insert_rows(rows)
Expand Down
Loading