Skip to content

Apply "quick fix" data ingestion performance improvements #187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ def test_caching(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
123, 1, 2, 3, 456, 1, 20200422, 0),
123, 1, 2, 3, 456, 1, 20200422, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
789, 1, 2, 3, 456, 1, 20200423, 1)
789, 1, 2, 3, 456, 1, 20200423, 1, 1, False)
''')
self.cur.execute('''
insert into covidcast values
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
456, 4, 5, 6, 789, -1, 20200422, 0)
456, 4, 5, 6, 789, -1, 20200422, 0, 1, True)
''')

self.cnx.commit()
Expand Down
38 changes: 32 additions & 6 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ def test_uploading(self):
f.write('wa,30,0.03,300\n')

# invalid
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated.csv', 'w') as f:
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_be_accepted.csv', 'w') as f:
f.write('geo_id,val,se,sample_size\n')
f.write('pa,100,5.4,624\n')

# invalid
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet.csv', 'w') as f:
f.write('geo_id,val,se,sample_size\n')
f.write('pa,100,5.4,624\n')

Expand Down Expand Up @@ -180,9 +185,30 @@ def apply_lag(expected_epidata):
'message': 'success',
})


# request CSV data from the API on the signal with name length 32<x<64
response = Epidata.covidcast(
'src-name', 'wip_really_long_name_that_will_be_accepted', 'day', 'state', 20200419, '*')

# verify data matches the CSV
self.assertEqual(response, {
'result': 1,
'message': 'success',
'epidata': apply_lag([
{
'time_value': 20200419,
'geo_value': 'pa',
'value': 100,
'stderr': 5.4,
'sample_size': 624,
'direction': None,
},
])
})

# request CSV data from the API on the long-named signal
response = Epidata.covidcast(
'src-name', 'wip_really_long_name_that_will_g', 'day', 'state', 20200419, '*')
'src-name', 'wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_s', 'day', 'state', 20200419, '*')

# verify data matches the CSV
# if the CSV failed correctly there should be no results
Expand All @@ -192,10 +218,10 @@ def apply_lag(expected_epidata):
})

# verify timestamps and default values are reasonable
self.cur.execute('select timestamp1, timestamp2, direction from covidcast')
for timestamp1, timestamp2, direction in self.cur:
self.assertGreater(timestamp1, 0)
self.assertEqual(timestamp2, 0)
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
self.assertGreater(value_updated_timestamp, 0)
self.assertEqual(direction_updated_timestamp, 0)
self.assertIsNone(direction)

# verify that the CSVs were archived
Expand Down
76 changes: 62 additions & 14 deletions integrations/acquisition/covidcast/test_direction_updating.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def test_uploading(self):
"""Update rows having a stale `direction` field and serve the results."""

# insert some sample data
# src, sig1, 1111:
# direction should be updated to None as there are no historical data for (src, sig1, state).
# CA 20200301:
# timeline should be x=[-2, -1, 0], y=[2, 6, 5] with direction=1
# FL 20200517:
Expand All @@ -60,31 +62,77 @@ def test_uploading(self):
# wrong) is fresh
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig1', 'day', 'state', 20201028, '1111',
123, 2, 0, 0, 0, -1, 20201028, 0, 1, False),
(0, 'src', 'sig1', 'day', 'state', 20201029, '1111',
123, 6, 0, 0, 0, 0, 20201029, 0, 1, False),
(0, 'src', 'sig1', 'day', 'state', 20201030, '1111',
123, 5, 0, 0, 0, 1, 20201030, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 0, 0, 0, NULL, 20200228, 0),
123, 2, 0, 0, 0, NULL, 20200228, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 0, 0, 0, NULL, 20200229, 0),
123, 6, 0, 0, 0, NULL, 20200229, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 0, 0, 0, NULL, 20200301, 0),
123, 5, 0, 0, 0, NULL, 20200301, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200511, 'fl',
123, 1, 0, 0, 0, NULL, 20200511, 0),
123, 1, 0, 0, 0, NULL, 20200511, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200512, 'fl',
123, 2, 0, 0, 0, NULL, 20200512, 0),
123, 2, 0, 0, 0, NULL, 20200512, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200517, 'fl',
123, 2, 0, 0, 0, NULL, 20200517, 0),
123, 2, 0, 0, 0, NULL, 20200517, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200615, 'tx',
123, 9, 0, 0, 456, NULL, 20200615, 0),
123, 9, 0, 0, 456, NULL, 20200615, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200616, 'tx',
123, 5, 0, 0, 456, NULL, 20200616, 0),
123, 5, 0, 0, 456, NULL, 20200616, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200617, 'tx',
123, 1, 0, 0, 456, 1, 20200617, 0)
123, 1, 0, 0, 456, 1, 20200617, 0, 1, False)
''')
self.cnx.commit()

# update direction (only 20200417 has enough history)
args = None
args = get_argument_parser().parse_args('')
main(args)

# The Quick-Fix is working
response = Epidata.covidcast(
'src', 'sig1', 'day', 'state', '20200101-20201231', '*')

self.assertEqual(response, {
'result': 1,
'epidata': [{
'time_value': 20201028,
'geo_value': '1111',
'value': 2,
'stderr': 0,
'sample_size': 0,
'direction': None,
'issue': 20201028,
'lag': 0
},
{
'time_value': 20201029,
'geo_value': '1111',
'value': 6,
'stderr': 0,
'sample_size': 0,
'direction': None,
'issue': 20201029,
'lag': 0
},
{
'time_value': 20201030,
'geo_value': '1111',
'value': 5,
'stderr': 0,
'sample_size': 0,
'direction': None,
'issue': 20201030,
'lag': 0
},
],
'message': 'success',
})

# request data from the API
response = Epidata.covidcast(
'src', 'sig', 'day', 'state', '20200101-20201231', '*')
Expand Down Expand Up @@ -188,11 +236,11 @@ def test_uploading(self):
})

# verify secondary timestamps were updated
self.cur.execute('select timestamp2 from covidcast order by id asc')
self.cur.execute('select direction_updated_timestamp from covidcast order by id asc')
timestamps = [t for (t,) in self.cur]
for t in timestamps[:6]:
# first 6 rows had `direction` updated
for t in timestamps[:9]:
# first 9 rows had `direction` updated
self.assertGreater(t, 0)
for t in timestamps[6:]:
for t in timestamps[9:]:
# last 3 rows were not updated
self.assertEqual(t, 456)
94 changes: 94 additions & 0 deletions integrations/acquisition/covidcast/test_fill_is_latest_issue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Integration tests for covidcast's direction updating."""

# standard library
import unittest

# third party
import mysql.connector

# first party
from delphi.epidata.client.delphi_epidata import Epidata
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
__test_target__ = 'delphi.epidata.acquisition.covidcast.fill_is_latest_issue'


class FillIsLatestIssueTests(unittest.TestCase):
"""Tests filling is_latest_issue column"""

def setUp(self):
"""Perform per-test setup."""

# connect to the `epidata` database and clear the `covidcast` table
cnx = mysql.connector.connect(
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
cur = cnx.cursor()
cur.execute('truncate table covidcast')
cnx.commit()
cur.close()

# make connection and cursor available to test cases
self.cnx = cnx
self.cur = cnx.cursor()

# use the local instance of the epidata database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

# use the local instance of the Epidata API
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'

def tearDown(self):
"""Perform per-test teardown."""
self.cur.close()
self.cnx.close()

def test_fill_is_latest_issue(self):
"""Update rows having a stale `direction` field and serve the results."""

self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 5, 5, 5, NULL, 20200228, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 0, 0, 0, NULL, 20200229, 1, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 0, 0, 0, NULL, 20200301, 1, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 9, 9, 9, NULL, 20200229, 0, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 0, 0, 0, NULL, 20200303, 2, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 5, 5, 5, NULL, 20200302, 1, 1, False),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 9, 8, 7, NULL, 20200301, 0, 1, False)
''')
self.cnx.commit()

# fill is_latest_issue
main()

self.cur.execute('''select * from covidcast''')
result = list(self.cur)
expected = [
(1, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 5, 5, 5, None, 20200228, 0, bytearray(b'0'), bytearray(b'0')),
(2, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 0, 0, 0, None, 20200229, 1, bytearray(b'1'), bytearray(b'0')),
(3, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 0, 0, 0, None, 20200301, 1, bytearray(b'1'), bytearray(b'0')),
(4, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 9, 9, 9, None, 20200229, 0, bytearray(b'0'), bytearray(b'0')),
(5, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 0, 0, 0, None, 20200303, 2, bytearray(b'1'), bytearray(b'0')),
(6, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 5, 5, 5, None, 20200302, 1, bytearray(b'0'), bytearray(b'0')),
(7, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 9, 8, 7, None, 20200301, 0, bytearray(b'0'), bytearray(b'0'))
]

self.assertEqual(result, expected)
12 changes: 6 additions & 6 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ def test_covidcast(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0, False),
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1),
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1, 0, False),
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2)
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2, 1, False)
''')
self.cnx.commit()

Expand Down Expand Up @@ -157,11 +157,11 @@ def test_covidcast_meta(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0, False),
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2),
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2, 1, False),
(0, 'src', 'sig', 'day', 'county', 20200415, '01234',
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1)
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1, 1, False)
''')
self.cnx.commit()

Expand Down
Loading