Skip to content

Commit ddbcec5

Browse files
authored
Merge pull request #187 from cmu-delphi/staging
Apply "quick fix" data ingestion performance improvements
2 parents dfb24e8 + 87b1984 commit ddbcec5

21 files changed

+669
-368
lines changed

integrations/acquisition/covidcast/test_covidcast_meta_caching.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ def test_caching(self):
6666
self.cur.execute('''
6767
insert into covidcast values
6868
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
69-
123, 1, 2, 3, 456, 1, 20200422, 0),
69+
123, 1, 2, 3, 456, 1, 20200422, 0, 1, False),
7070
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
71-
789, 1, 2, 3, 456, 1, 20200423, 1)
71+
789, 1, 2, 3, 456, 1, 20200423, 1, 1, False)
7272
''')
7373
self.cur.execute('''
7474
insert into covidcast values
7575
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
76-
456, 4, 5, 6, 789, -1, 20200422, 0)
76+
456, 4, 5, 6, 789, -1, 20200422, 0, 1, True)
7777
''')
7878

7979
self.cnx.commit()

integrations/acquisition/covidcast/test_csv_uploading.py

+32-6
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,12 @@ def test_uploading(self):
7676
f.write('wa,30,0.03,300\n')
7777

7878
# invalid
79-
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated.csv', 'w') as f:
79+
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_be_accepted.csv', 'w') as f:
80+
f.write('geo_id,val,se,sample_size\n')
81+
f.write('pa,100,5.4,624\n')
82+
83+
# invalid
84+
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet.csv', 'w') as f:
8085
f.write('geo_id,val,se,sample_size\n')
8186
f.write('pa,100,5.4,624\n')
8287

@@ -180,9 +185,30 @@ def apply_lag(expected_epidata):
180185
'message': 'success',
181186
})
182187

188+
189+
# request CSV data from the API on the signal with name length 32<x<64
190+
response = Epidata.covidcast(
191+
'src-name', 'wip_really_long_name_that_will_be_accepted', 'day', 'state', 20200419, '*')
192+
193+
# verify data matches the CSV
194+
self.assertEqual(response, {
195+
'result': 1,
196+
'message': 'success',
197+
'epidata': apply_lag([
198+
{
199+
'time_value': 20200419,
200+
'geo_value': 'pa',
201+
'value': 100,
202+
'stderr': 5.4,
203+
'sample_size': 624,
204+
'direction': None,
205+
},
206+
])
207+
})
208+
183209
# request CSV data from the API on the long-named signal
184210
response = Epidata.covidcast(
185-
'src-name', 'wip_really_long_name_that_will_g', 'day', 'state', 20200419, '*')
211+
'src-name', 'wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_s', 'day', 'state', 20200419, '*')
186212

187213
# verify data matches the CSV
188214
# if the CSV failed correctly there should be no results
@@ -192,10 +218,10 @@ def apply_lag(expected_epidata):
192218
})
193219

194220
# verify timestamps and default values are reasonable
195-
self.cur.execute('select timestamp1, timestamp2, direction from covidcast')
196-
for timestamp1, timestamp2, direction in self.cur:
197-
self.assertGreater(timestamp1, 0)
198-
self.assertEqual(timestamp2, 0)
221+
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
222+
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
223+
self.assertGreater(value_updated_timestamp, 0)
224+
self.assertEqual(direction_updated_timestamp, 0)
199225
self.assertIsNone(direction)
200226

201227
# verify that the CSVs were archived

integrations/acquisition/covidcast/test_direction_updating.py

+62-14
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def test_uploading(self):
5151
"""Update rows having a stale `direction` field and serve the results."""
5252

5353
# insert some sample data
54+
# src, sig1, 1111:
55+
# direction should be updated to None as there are no historical data for (src, sig1, state).
5456
# CA 20200301:
5557
# timeline should be x=[-2, -1, 0], y=[2, 6, 5] with direction=1
5658
# FL 20200517:
@@ -60,31 +62,77 @@ def test_uploading(self):
6062
# wrong) is fresh
6163
self.cur.execute('''
6264
insert into covidcast values
65+
(0, 'src', 'sig1', 'day', 'state', 20201028, '1111',
66+
123, 2, 0, 0, 0, -1, 20201028, 0, 1, False),
67+
(0, 'src', 'sig1', 'day', 'state', 20201029, '1111',
68+
123, 6, 0, 0, 0, 0, 20201029, 0, 1, False),
69+
(0, 'src', 'sig1', 'day', 'state', 20201030, '1111',
70+
123, 5, 0, 0, 0, 1, 20201030, 0, 1, False),
6371
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
64-
123, 2, 0, 0, 0, NULL, 20200228, 0),
72+
123, 2, 0, 0, 0, NULL, 20200228, 0, 1, False),
6573
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
66-
123, 6, 0, 0, 0, NULL, 20200229, 0),
74+
123, 6, 0, 0, 0, NULL, 20200229, 0, 1, False),
6775
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
68-
123, 5, 0, 0, 0, NULL, 20200301, 0),
76+
123, 5, 0, 0, 0, NULL, 20200301, 0, 1, False),
6977
(0, 'src', 'sig', 'day', 'state', 20200511, 'fl',
70-
123, 1, 0, 0, 0, NULL, 20200511, 0),
78+
123, 1, 0, 0, 0, NULL, 20200511, 0, 1, False),
7179
(0, 'src', 'sig', 'day', 'state', 20200512, 'fl',
72-
123, 2, 0, 0, 0, NULL, 20200512, 0),
80+
123, 2, 0, 0, 0, NULL, 20200512, 0, 1, False),
7381
(0, 'src', 'sig', 'day', 'state', 20200517, 'fl',
74-
123, 2, 0, 0, 0, NULL, 20200517, 0),
82+
123, 2, 0, 0, 0, NULL, 20200517, 0, 1, False),
7583
(0, 'src', 'sig', 'day', 'state', 20200615, 'tx',
76-
123, 9, 0, 0, 456, NULL, 20200615, 0),
84+
123, 9, 0, 0, 456, NULL, 20200615, 0, 1, False),
7785
(0, 'src', 'sig', 'day', 'state', 20200616, 'tx',
78-
123, 5, 0, 0, 456, NULL, 20200616, 0),
86+
123, 5, 0, 0, 456, NULL, 20200616, 0, 1, False),
7987
(0, 'src', 'sig', 'day', 'state', 20200617, 'tx',
80-
123, 1, 0, 0, 456, 1, 20200617, 0)
88+
123, 1, 0, 0, 456, 1, 20200617, 0, 1, False)
8189
''')
8290
self.cnx.commit()
8391

8492
# update direction (only 20200417 has enough history)
85-
args = None
93+
args = get_argument_parser().parse_args('')
8694
main(args)
8795

96+
# The Quick-Fix is working
97+
response = Epidata.covidcast(
98+
'src', 'sig1', 'day', 'state', '20200101-20201231', '*')
99+
100+
self.assertEqual(response, {
101+
'result': 1,
102+
'epidata': [{
103+
'time_value': 20201028,
104+
'geo_value': '1111',
105+
'value': 2,
106+
'stderr': 0,
107+
'sample_size': 0,
108+
'direction': None,
109+
'issue': 20201028,
110+
'lag': 0
111+
},
112+
{
113+
'time_value': 20201029,
114+
'geo_value': '1111',
115+
'value': 6,
116+
'stderr': 0,
117+
'sample_size': 0,
118+
'direction': None,
119+
'issue': 20201029,
120+
'lag': 0
121+
},
122+
{
123+
'time_value': 20201030,
124+
'geo_value': '1111',
125+
'value': 5,
126+
'stderr': 0,
127+
'sample_size': 0,
128+
'direction': None,
129+
'issue': 20201030,
130+
'lag': 0
131+
},
132+
],
133+
'message': 'success',
134+
})
135+
88136
# request data from the API
89137
response = Epidata.covidcast(
90138
'src', 'sig', 'day', 'state', '20200101-20201231', '*')
@@ -188,11 +236,11 @@ def test_uploading(self):
188236
})
189237

190238
# verify secondary timestamps were updated
191-
self.cur.execute('select timestamp2 from covidcast order by id asc')
239+
self.cur.execute('select direction_updated_timestamp from covidcast order by id asc')
192240
timestamps = [t for (t,) in self.cur]
193-
for t in timestamps[:6]:
194-
# first 6 rows had `direction` updated
241+
for t in timestamps[:9]:
242+
# first 9 rows had `direction` updated
195243
self.assertGreater(t, 0)
196-
for t in timestamps[6:]:
244+
for t in timestamps[9:]:
197245
# last 3 rows were not updated
198246
self.assertEqual(t, 456)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""Integration tests for covidcast's direction updating."""
2+
3+
# standard library
4+
import unittest
5+
6+
# third party
7+
import mysql.connector
8+
9+
# first party
10+
from delphi.epidata.client.delphi_epidata import Epidata
11+
import delphi.operations.secrets as secrets
12+
13+
# py3tester coverage target (equivalent to `import *`)
14+
__test_target__ = 'delphi.epidata.acquisition.covidcast.fill_is_latest_issue'
15+
16+
17+
class FillIsLatestIssueTests(unittest.TestCase):
18+
"""Tests filling is_latest_issue column"""
19+
20+
def setUp(self):
21+
"""Perform per-test setup."""
22+
23+
# connect to the `epidata` database and clear the `covidcast` table
24+
cnx = mysql.connector.connect(
25+
user='user',
26+
password='pass',
27+
host='delphi_database_epidata',
28+
database='epidata')
29+
cur = cnx.cursor()
30+
cur.execute('truncate table covidcast')
31+
cnx.commit()
32+
cur.close()
33+
34+
# make connection and cursor available to test cases
35+
self.cnx = cnx
36+
self.cur = cnx.cursor()
37+
38+
# use the local instance of the epidata database
39+
secrets.db.host = 'delphi_database_epidata'
40+
secrets.db.epi = ('user', 'pass')
41+
42+
# use the local instance of the Epidata API
43+
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'
44+
45+
def tearDown(self):
46+
"""Perform per-test teardown."""
47+
self.cur.close()
48+
self.cnx.close()
49+
50+
def test_fill_is_latest_issue(self):
51+
"""Update rows having a stale `direction` field and serve the results."""
52+
53+
self.cur.execute('''
54+
insert into covidcast values
55+
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
56+
123, 2, 5, 5, 5, NULL, 20200228, 0, 1, False),
57+
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
58+
123, 2, 0, 0, 0, NULL, 20200229, 1, 1, False),
59+
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
60+
123, 6, 0, 0, 0, NULL, 20200301, 1, 1, False),
61+
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
62+
123, 6, 9, 9, 9, NULL, 20200229, 0, 1, False),
63+
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
64+
123, 5, 0, 0, 0, NULL, 20200303, 2, 1, False),
65+
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
66+
123, 5, 5, 5, 5, NULL, 20200302, 1, 1, False),
67+
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
68+
123, 5, 9, 8, 7, NULL, 20200301, 0, 1, False)
69+
''')
70+
self.cnx.commit()
71+
72+
# fill is_latest_issue
73+
main()
74+
75+
self.cur.execute('''select * from covidcast''')
76+
result = list(self.cur)
77+
expected = [
78+
(1, 'src', 'sig', 'day', 'state', 20200228, 'ca',
79+
123, 2, 5, 5, 5, None, 20200228, 0, bytearray(b'0'), bytearray(b'0')),
80+
(2, 'src', 'sig', 'day', 'state', 20200228, 'ca',
81+
123, 2, 0, 0, 0, None, 20200229, 1, bytearray(b'1'), bytearray(b'0')),
82+
(3, 'src', 'sig', 'day', 'state', 20200229, 'ca',
83+
123, 6, 0, 0, 0, None, 20200301, 1, bytearray(b'1'), bytearray(b'0')),
84+
(4, 'src', 'sig', 'day', 'state', 20200229, 'ca',
85+
123, 6, 9, 9, 9, None, 20200229, 0, bytearray(b'0'), bytearray(b'0')),
86+
(5, 'src', 'sig', 'day', 'state', 20200301, 'ca',
87+
123, 5, 0, 0, 0, None, 20200303, 2, bytearray(b'1'), bytearray(b'0')),
88+
(6, 'src', 'sig', 'day', 'state', 20200301, 'ca',
89+
123, 5, 5, 5, 5, None, 20200302, 1, bytearray(b'0'), bytearray(b'0')),
90+
(7, 'src', 'sig', 'day', 'state', 20200301, 'ca',
91+
123, 5, 9, 8, 7, None, 20200301, 0, bytearray(b'0'), bytearray(b'0'))
92+
]
93+
94+
self.assertEqual(result, expected)

integrations/client/test_delphi_epidata.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ def test_covidcast(self):
5050
self.cur.execute('''
5151
insert into covidcast values
5252
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
53-
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
53+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0, False),
5454
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
55-
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1),
55+
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1, 0, False),
5656
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
57-
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2)
57+
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2, 1, False)
5858
''')
5959
self.cnx.commit()
6060

@@ -157,11 +157,11 @@ def test_covidcast_meta(self):
157157
self.cur.execute('''
158158
insert into covidcast values
159159
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
160-
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
160+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, 0, False),
161161
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
162-
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2),
162+
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2, 1, False),
163163
(0, 'src', 'sig', 'day', 'county', 20200415, '01234',
164-
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1)
164+
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1, 1, False)
165165
''')
166166
self.cnx.commit()
167167

0 commit comments

Comments
 (0)