Skip to content

Partitioning Direction Update #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ def test_caching(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
123, 1, 2, 3, 456, 1, 20200422, 0),
123, 1, 2, 3, 456, 1, 20200422, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
789, 1, 2, 3, 456, 1, 20200423, 1)
789, 1, 2, 3, 456, 1, 20200423, 1, 1)
''')
self.cur.execute('''
insert into covidcast values
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
456, 4, 5, 6, 789, -1, 20200422, 0)
456, 4, 5, 6, 789, -1, 20200422, 0, 1)
''')

self.cnx.commit()
Expand Down
8 changes: 4 additions & 4 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ def apply_lag(expected_epidata):
})

# verify timestamps and default values are reasonable
self.cur.execute('select timestamp1, timestamp2, direction from covidcast')
for timestamp1, timestamp2, direction in self.cur:
self.assertGreater(timestamp1, 0)
self.assertEqual(timestamp2, 0)
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
self.assertGreater(value_updated_timestamp, 0)
self.assertEqual(direction_updated_timestamp, 0)
self.assertIsNone(direction)

# verify that the CSVs were archived
Expand Down
76 changes: 62 additions & 14 deletions integrations/acquisition/covidcast/test_direction_updating.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def test_uploading(self):
"""Update rows having a stale `direction` field and serve the results."""

# insert some sample data
# src, sig1, 1111:
# direction should be updated to None as there are no historical data for (src, sig1, state).
# CA 20200301:
# timeline should be x=[-2, -1, 0], y=[2, 6, 5] with direction=1
# FL 20200517:
Expand All @@ -60,31 +62,77 @@ def test_uploading(self):
# wrong) is fresh
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig1', 'day', 'state', 20201028, '1111',
123, 2, 0, 0, 0, -1, 20201028, 0, 1),
(0, 'src', 'sig1', 'day', 'state', 20201029, '1111',
123, 6, 0, 0, 0, 0, 20201029, 0, 1),
(0, 'src', 'sig1', 'day', 'state', 20201030, '1111',
123, 5, 0, 0, 0, 1, 20201030, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 0, 0, 0, NULL, 20200228, 0),
123, 2, 0, 0, 0, NULL, 20200228, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 0, 0, 0, NULL, 20200229, 0),
123, 6, 0, 0, 0, NULL, 20200229, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 0, 0, 0, NULL, 20200301, 0),
123, 5, 0, 0, 0, NULL, 20200301, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200511, 'fl',
123, 1, 0, 0, 0, NULL, 20200511, 0),
123, 1, 0, 0, 0, NULL, 20200511, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200512, 'fl',
123, 2, 0, 0, 0, NULL, 20200512, 0),
123, 2, 0, 0, 0, NULL, 20200512, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200517, 'fl',
123, 2, 0, 0, 0, NULL, 20200517, 0),
123, 2, 0, 0, 0, NULL, 20200517, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200615, 'tx',
123, 9, 0, 0, 456, NULL, 20200615, 0),
123, 9, 0, 0, 456, NULL, 20200615, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200616, 'tx',
123, 5, 0, 0, 456, NULL, 20200616, 0),
123, 5, 0, 0, 456, NULL, 20200616, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200617, 'tx',
123, 1, 0, 0, 456, 1, 20200617, 0)
123, 1, 0, 0, 456, 1, 20200617, 0, 1)
''')
self.cnx.commit()

# update direction (only 20200417 has enough history)
args = None
args = get_argument_parser().parse_args('')
main(args)

# The Quick-Fix is working
response = Epidata.covidcast(
'src', 'sig1', 'day', 'state', '20200101-20201231', '*')

self.assertEqual(response, {
'result': 1,
'epidata': [{
'time_value': 20201028,
'geo_value': '1111',
'value': 2,
'stderr': 0,
'sample_size': 0,
'direction': None,
'issue': 20201028,
'lag': 0
},
{
'time_value': 20201029,
'geo_value': '1111',
'value': 6,
'stderr': 0,
'sample_size': 0,
'direction': None,
'issue': 20201029,
'lag': 0
},
{
'time_value': 20201030,
'geo_value': '1111',
'value': 5,
'stderr': 0,
'sample_size': 0,
'direction': None,
'issue': 20201030,
'lag': 0
},
],
'message': 'success',
})

# request data from the API
response = Epidata.covidcast(
'src', 'sig', 'day', 'state', '20200101-20201231', '*')
Expand Down Expand Up @@ -188,11 +236,11 @@ def test_uploading(self):
})

# verify secondary timestamps were updated
self.cur.execute('select timestamp2 from covidcast order by id asc')
self.cur.execute('select direction_updated_timestamp from covidcast order by id asc')
timestamps = [t for (t,) in self.cur]
for t in timestamps[:6]:
# first 6 rows had `direction` updated
for t in timestamps[:9]:
# first 9 rows had `direction` updated
self.assertGreater(t, 0)
for t in timestamps[6:]:
for t in timestamps[9:]:
# last 3 rows were not updated
self.assertEqual(t, 456)
94 changes: 94 additions & 0 deletions integrations/acquisition/covidcast/test_fill_is_latest_issue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Integration tests for covidcast's direction updating."""

# standard library
import unittest

# third party
import mysql.connector

# first party
from delphi.epidata.client.delphi_epidata import Epidata
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
__test_target__ = 'delphi.epidata.acquisition.covidcast.fill_is_latest_issue'


class FillIsLatestIssueTests(unittest.TestCase):
"""Tests filling is_latest_issue column"""

def setUp(self):
"""Perform per-test setup."""

# connect to the `epidata` database and clear the `covidcast` table
cnx = mysql.connector.connect(
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
cur = cnx.cursor()
cur.execute('truncate table covidcast')
cnx.commit()
cur.close()

# make connection and cursor available to test cases
self.cnx = cnx
self.cur = cnx.cursor()

# use the local instance of the epidata database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

# use the local instance of the Epidata API
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'

def tearDown(self):
"""Perform per-test teardown."""
self.cur.close()
self.cnx.close()

def test_fill_is_latest_issue(self):
"""Update rows having a stale `direction` field and serve the results."""

self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 5, 5, 5, NULL, 20200228, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 0, 0, 0, NULL, 20200229, 1, 1),
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 0, 0, 0, NULL, 20200301, 1, 1),
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 9, 9, 9, NULL, 20200229, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 0, 0, 0, NULL, 20200303, 2, 1),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 5, 5, 5, NULL, 20200302, 1, 1),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 9, 8, 7, NULL, 20200301, 0, 1)
''')
self.cnx.commit()

# fill is_latest_issue
main()

self.cur.execute('''select * from covidcast''')
result = list(self.cur)
expected = [
(1, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 5, 5, 5, None, 20200228, 0, bytearray(b'0')),
(2, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 0, 0, 0, None, 20200229, 1, bytearray(b'1')),
(3, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 0, 0, 0, None, 20200301, 1, bytearray(b'1')),
(4, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 9, 9, 9, None, 20200229, 0, bytearray(b'0')),
(5, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 0, 0, 0, None, 20200303, 2, bytearray(b'1')),
(6, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 5, 5, 5, None, 20200302, 1, bytearray(b'0')),
(7, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 9, 8, 7, None, 20200301, 0, bytearray(b'0'))
]

self.assertEqual(result, expected)
12 changes: 6 additions & 6 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ def test_covidcast(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0),
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1),
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1, 0),
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2)
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2, 1)
''')
self.cnx.commit()

Expand Down Expand Up @@ -157,11 +157,11 @@ def test_covidcast_meta(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0),
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2),
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2, 1),
(0, 'src', 'sig', 'day', 'county', 20200415, '01234',
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1)
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1, 1)
''')
self.cnx.commit()

Expand Down
44 changes: 22 additions & 22 deletions integrations/server/test_covidcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_round_trip(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 1)
''')
self.cnx.commit()

Expand Down Expand Up @@ -85,17 +85,17 @@ def test_location_wildcard(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '11111',
123, 10, 11, 12, 456, 13, 20200414, 0),
123, 10, 11, 12, 456, 13, 20200414, 0, 1),
(0, 'src', 'sig', 'day', 'county', 20200414, '22222',
123, 20, 21, 22, 456, 23, 20200414, 0),
123, 20, 21, 22, 456, 23, 20200414, 0, 1),
(0, 'src', 'sig', 'day', 'county', 20200414, '33333',
123, 30, 31, 32, 456, 33, 20200414, 0),
123, 30, 31, 32, 456, 33, 20200414, 0, 1),
(0, 'src', 'sig', 'day', 'msa', 20200414, '11111',
123, 40, 41, 42, 456, 43, 20200414, 0),
123, 40, 41, 42, 456, 43, 20200414, 0, 1),
(0, 'src', 'sig', 'day', 'msa', 20200414, '22222',
123, 50, 51, 52, 456, 53, 20200414, 0),
123, 50, 51, 52, 456, 53, 20200414, 0, 1),
(0, 'src', 'sig', 'day', 'msa', 20200414, '33333',
123, 60, 61, 62, 456, 634, 20200414, 0)
123, 60, 61, 62, 456, 634, 20200414, 0, 1)
''')
self.cnx.commit()

Expand Down Expand Up @@ -155,17 +155,17 @@ def test_location_timeline(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200411, '01234',
123, 10, 11, 12, 456, 13, 20200413, 2),
123, 10, 11, 12, 456, 13, 20200413, 2, 1),
(0, 'src', 'sig', 'day', 'county', 20200412, '01234',
123, 20, 21, 22, 456, 23, 20200413, 1),
123, 20, 21, 22, 456, 23, 20200413, 1, 1),
(0, 'src', 'sig', 'day', 'county', 20200413, '01234',
123, 30, 31, 32, 456, 33, 20200413, 0),
123, 30, 31, 32, 456, 33, 20200413, 0, 1),
(0, 'src', 'sig', 'day', 'county', 20200411, '11111',
123, 40, 41, 42, 456, 43, 20200413, 2),
123, 40, 41, 42, 456, 43, 20200413, 2, 1),
(0, 'src', 'sig', 'day', 'county', 20200412, '22222',
123, 50, 51, 52, 456, 53, 20200413, 1),
123, 50, 51, 52, 456, 53, 20200413, 1, 1),
(0, 'src', 'sig', 'day', 'county', 20200413, '33333',
123, 60, 61, 62, 456, 63, 20200413, 0)
123, 60, 61, 62, 456, 63, 20200413, 0, 1)
''')
self.cnx.commit()

Expand Down Expand Up @@ -225,7 +225,7 @@ def test_unique_key_constraint(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
0, 0, 0, 0, 0, 0, 20200414, 0)
0, 0, 0, 0, 0, 0, 20200414, 0, 1)
''')
self.cnx.commit()

Expand All @@ -234,14 +234,14 @@ def test_unique_key_constraint(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
1, 1, 1, 1, 1, 1, 20200414, 0)
1, 1, 1, 1, 1, 1, 20200414, 0, 1)
''')

# succeed to insert different dummy data under a different issue
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
1, 1, 1, 1, 1, 1, 20200415, 1)
1, 1, 1, 1, 1, 1, 20200415, 1, 1)
''')

def test_nullable_columns(self):
Expand All @@ -251,7 +251,7 @@ def test_nullable_columns(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 0.123, NULL, NULL, 456, NULL, 20200414, 0)
123, 0.123, NULL, NULL, 456, NULL, 20200414, 0, 1)
''')
self.cnx.commit()

Expand Down Expand Up @@ -291,15 +291,15 @@ def test_temporal_partitioning(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'hour', 'state', 2020041714, 'vi',
123, 10, 11, 12, 456, 13, 2020041714, 0),
123, 10, 11, 12, 456, 13, 2020041714, 0, 1),
(0, 'src', 'sig', 'day', 'state', 20200417, 'vi',
123, 20, 21, 22, 456, 23, 20200417, 00),
123, 20, 21, 22, 456, 23, 20200417, 00, 1),
(0, 'src', 'sig', 'week', 'state', 202016, 'vi',
123, 30, 31, 32, 456, 33, 202016, 0),
123, 30, 31, 32, 456, 33, 202016, 0, 1),
(0, 'src', 'sig', 'month', 'state', 202004, 'vi',
123, 40, 41, 42, 456, 43, 202004, 0),
123, 40, 41, 42, 456, 43, 202004, 0, 1),
(0, 'src', 'sig', 'year', 'state', 2020, 'vi',
123, 50, 51, 52, 456, 53, 2020, 0)
123, 50, 51, 52, 456, 53, 2020, 0, 1)
''')
self.cnx.commit()

Expand Down
Loading