Skip to content

Commit 2b183d7

Browse files
committed
Added support in DB for is_wip flag to (hopefully) speed up cache updates.
Also fixed/improved setUp in integrations/server/test_covidcast_meta.py and added skeleton-code for new `missing_*` rows.
1 parent 47a8c52 commit 2b183d7

File tree

10 files changed

+96
-65
lines changed

10 files changed

+96
-65
lines changed

integrations/acquisition/covidcast/test_covidcast_meta_caching.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ def test_caching(self):
6666
self.cur.execute('''
6767
insert into covidcast values
6868
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
69-
123, 1, 2, 3, 456, 1, 20200422, 0),
69+
123, 1, 2, 3, 456, 1, 20200422, 0, False),
7070
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
71-
789, 1, 2, 3, 456, 1, 20200423, 1)
71+
789, 1, 2, 3, 456, 1, 20200423, 1, False)
7272
''')
7373
self.cur.execute('''
7474
insert into covidcast values
7575
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
76-
456, 4, 5, 6, 789, -1, 20200422, 0)
76+
456, 4, 5, 6, 789, -1, 20200422, 0, True)
7777
''')
7878

7979
self.cnx.commit()

integrations/acquisition/covidcast/test_direction_updating.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,23 @@ def test_uploading(self):
6161
self.cur.execute('''
6262
insert into covidcast values
6363
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
64-
123, 2, 0, 0, 0, NULL, 20200228, 0),
64+
123, 2, 0, 0, 0, NULL, 20200228, 0, False),
6565
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
66-
123, 6, 0, 0, 0, NULL, 20200229, 0),
66+
123, 6, 0, 0, 0, NULL, 20200229, 0, False),
6767
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
68-
123, 5, 0, 0, 0, NULL, 20200301, 0),
68+
123, 5, 0, 0, 0, NULL, 20200301, 0, False),
6969
(0, 'src', 'sig', 'day', 'state', 20200511, 'fl',
70-
123, 1, 0, 0, 0, NULL, 20200511, 0),
70+
123, 1, 0, 0, 0, NULL, 20200511, 0, False),
7171
(0, 'src', 'sig', 'day', 'state', 20200512, 'fl',
72-
123, 2, 0, 0, 0, NULL, 20200512, 0),
72+
123, 2, 0, 0, 0, NULL, 20200512, 0, False),
7373
(0, 'src', 'sig', 'day', 'state', 20200517, 'fl',
74-
123, 2, 0, 0, 0, NULL, 20200517, 0),
74+
123, 2, 0, 0, 0, NULL, 20200517, 0, False),
7575
(0, 'src', 'sig', 'day', 'state', 20200615, 'tx',
76-
123, 9, 0, 0, 456, NULL, 20200615, 0),
76+
123, 9, 0, 0, 456, NULL, 20200615, 0, False),
7777
(0, 'src', 'sig', 'day', 'state', 20200616, 'tx',
78-
123, 5, 0, 0, 456, NULL, 20200616, 0),
78+
123, 5, 0, 0, 456, NULL, 20200616, 0, False),
7979
(0, 'src', 'sig', 'day', 'state', 20200617, 'tx',
80-
123, 1, 0, 0, 456, 1, 20200617, 0)
80+
123, 1, 0, 0, 456, 1, 20200617, 0, False)
8181
''')
8282
self.cnx.commit()
8383

integrations/client/test_delphi_epidata.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ def test_covidcast(self):
5050
self.cur.execute('''
5151
insert into covidcast values
5252
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
53-
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
53+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, False),
5454
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
55-
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1),
55+
456, 5.5, 1.2, 10.5, 789, 0, 20200415, 1, False),
5656
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
57-
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2)
57+
345, 6.5, 2.2, 11.5, 678, 0, 20200416, 2, False)
5858
''')
5959
self.cnx.commit()
6060

@@ -157,11 +157,11 @@ def test_covidcast_meta(self):
157157
self.cur.execute('''
158158
insert into covidcast values
159159
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
160-
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0),
160+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, False),
161161
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
162-
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2),
162+
345, 6.0, 2.2, 11.5, 678, 0, 20200416, 2, False),
163163
(0, 'src', 'sig', 'day', 'county', 20200415, '01234',
164-
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1)
164+
345, 7.0, 2.0, 12.5, 678, 0, 20200416, 1, False)
165165
''')
166166
self.cnx.commit()
167167

integrations/server/test_covidcast.py

+22-22
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def test_round_trip(self):
4545
self.cur.execute('''
4646
insert into covidcast values
4747
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
48-
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
48+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0, False)
4949
''')
5050
self.cnx.commit()
5151

@@ -85,17 +85,17 @@ def test_location_wildcard(self):
8585
self.cur.execute('''
8686
insert into covidcast values
8787
(0, 'src', 'sig', 'day', 'county', 20200414, '11111',
88-
123, 10, 11, 12, 456, 13, 20200414, 0),
88+
123, 10, 11, 12, 456, 13, 20200414, 0, False),
8989
(0, 'src', 'sig', 'day', 'county', 20200414, '22222',
90-
123, 20, 21, 22, 456, 23, 20200414, 0),
90+
123, 20, 21, 22, 456, 23, 20200414, 0, False),
9191
(0, 'src', 'sig', 'day', 'county', 20200414, '33333',
92-
123, 30, 31, 32, 456, 33, 20200414, 0),
92+
123, 30, 31, 32, 456, 33, 20200414, 0, False),
9393
(0, 'src', 'sig', 'day', 'msa', 20200414, '11111',
94-
123, 40, 41, 42, 456, 43, 20200414, 0),
94+
123, 40, 41, 42, 456, 43, 20200414, 0, False),
9595
(0, 'src', 'sig', 'day', 'msa', 20200414, '22222',
96-
123, 50, 51, 52, 456, 53, 20200414, 0),
96+
123, 50, 51, 52, 456, 53, 20200414, 0, False),
9797
(0, 'src', 'sig', 'day', 'msa', 20200414, '33333',
98-
123, 60, 61, 62, 456, 634, 20200414, 0)
98+
123, 60, 61, 62, 456, 634, 20200414, 0, False)
9999
''')
100100
self.cnx.commit()
101101

@@ -155,17 +155,17 @@ def test_location_timeline(self):
155155
self.cur.execute('''
156156
insert into covidcast values
157157
(0, 'src', 'sig', 'day', 'county', 20200411, '01234',
158-
123, 10, 11, 12, 456, 13, 20200413, 2),
158+
123, 10, 11, 12, 456, 13, 20200413, 2, False),
159159
(0, 'src', 'sig', 'day', 'county', 20200412, '01234',
160-
123, 20, 21, 22, 456, 23, 20200413, 1),
160+
123, 20, 21, 22, 456, 23, 20200413, 1, False),
161161
(0, 'src', 'sig', 'day', 'county', 20200413, '01234',
162-
123, 30, 31, 32, 456, 33, 20200413, 0),
162+
123, 30, 31, 32, 456, 33, 20200413, 0, False),
163163
(0, 'src', 'sig', 'day', 'county', 20200411, '11111',
164-
123, 40, 41, 42, 456, 43, 20200413, 2),
164+
123, 40, 41, 42, 456, 43, 20200413, 2, False),
165165
(0, 'src', 'sig', 'day', 'county', 20200412, '22222',
166-
123, 50, 51, 52, 456, 53, 20200413, 1),
166+
123, 50, 51, 52, 456, 53, 20200413, 1, False),
167167
(0, 'src', 'sig', 'day', 'county', 20200413, '33333',
168-
123, 60, 61, 62, 456, 63, 20200413, 0)
168+
123, 60, 61, 62, 456, 63, 20200413, 0, False)
169169
''')
170170
self.cnx.commit()
171171

@@ -225,7 +225,7 @@ def test_unique_key_constraint(self):
225225
self.cur.execute('''
226226
insert into covidcast values
227227
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
228-
0, 0, 0, 0, 0, 0, 20200414, 0)
228+
0, 0, 0, 0, 0, 0, 20200414, 0, False)
229229
''')
230230
self.cnx.commit()
231231

@@ -234,14 +234,14 @@ def test_unique_key_constraint(self):
234234
self.cur.execute('''
235235
insert into covidcast values
236236
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
237-
1, 1, 1, 1, 1, 1, 20200414, 0)
237+
1, 1, 1, 1, 1, 1, 20200414, 0, False)
238238
''')
239239

240240
# succeed to insert different dummy data under a different issue
241241
self.cur.execute('''
242242
insert into covidcast values
243243
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
244-
1, 1, 1, 1, 1, 1, 20200415, 1)
244+
1, 1, 1, 1, 1, 1, 20200415, 1, False)
245245
''')
246246

247247
def test_nullable_columns(self):
@@ -251,7 +251,7 @@ def test_nullable_columns(self):
251251
self.cur.execute('''
252252
insert into covidcast values
253253
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
254-
123, 0.123, NULL, NULL, 456, NULL, 20200414, 0)
254+
123, 0.123, NULL, NULL, 456, NULL, 20200414, 0, False)
255255
''')
256256
self.cnx.commit()
257257

@@ -291,15 +291,15 @@ def test_temporal_partitioning(self):
291291
self.cur.execute('''
292292
insert into covidcast values
293293
(0, 'src', 'sig', 'hour', 'state', 2020041714, 'vi',
294-
123, 10, 11, 12, 456, 13, 2020041714, 0),
294+
123, 10, 11, 12, 456, 13, 2020041714, 0, False),
295295
(0, 'src', 'sig', 'day', 'state', 20200417, 'vi',
296-
123, 20, 21, 22, 456, 23, 20200417, 00),
296+
123, 20, 21, 22, 456, 23, 20200417, 00, False),
297297
(0, 'src', 'sig', 'week', 'state', 202016, 'vi',
298-
123, 30, 31, 32, 456, 33, 202016, 0),
298+
123, 30, 31, 32, 456, 33, 202016, 0, False),
299299
(0, 'src', 'sig', 'month', 'state', 202004, 'vi',
300-
123, 40, 41, 42, 456, 43, 202004, 0),
300+
123, 40, 41, 42, 456, 43, 202004, 0, False),
301301
(0, 'src', 'sig', 'year', 'state', 2020, 'vi',
302-
123, 50, 51, 52, 456, 53, 2020, 0)
302+
123, 50, 51, 52, 456, 53, 2020, 0, False)
303303
''')
304304
self.cnx.commit()
305305

integrations/server/test_covidcast_meta.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def setUp(self):
2828
database='epidata')
2929
cur = cnx.cursor()
3030
cur.execute('truncate table covidcast')
31+
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = ""')
3132
cnx.commit()
3233
cur.close()
3334

@@ -46,7 +47,7 @@ def test_round_trip(self):
4647
# insert dummy data and accumulate expected results (in sort order)
4748
template = '''
4849
insert into covidcast values
49-
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0)
50+
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0, %d)
5051
'''
5152
expected = []
5253
for src in ('src1', 'src2'):
@@ -72,7 +73,7 @@ def test_round_trip(self):
7273
})
7374
for tv in (1, 2):
7475
for gv, v in zip(('geo1', 'geo2'), (10, 20)):
75-
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv))
76+
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv, False))
7677
self.cnx.commit()
7778
update_cache(args=None)
7879

@@ -94,14 +95,18 @@ def test_suppress_work_in_progress(self):
9495
# insert dummy data and accumulate expected results (in sort order)
9596
template = '''
9697
insert into covidcast values
97-
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0)
98+
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0, %d)
9899
'''
99100
expected = []
100101
for src in ('src1', 'src2'):
101102
for sig in ('sig1', 'sig2', 'wip_sig3'):
102103
for tt in ('day', 'week'):
103104
for gt in ('hrr', 'msa'):
104-
if sig != 'wip_sig3':
105+
106+
if sig == 'wip_sig3':
107+
is_wip = True
108+
else:
109+
is_wip = False
105110
expected.append({
106111
'data_source': src,
107112
'signal': sig,
@@ -121,7 +126,7 @@ def test_suppress_work_in_progress(self):
121126
})
122127
for tv in (1, 2):
123128
for gv, v in zip(('geo1', 'geo2'), (10, 20)):
124-
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv))
129+
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv, is_wip))
125130
self.cnx.commit()
126131
update_cache(args=None)
127132

src/acquisition/covidcast/csv_to_database.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,16 @@ def archive_as_successful(path_src, filename, source):
7474

7575
(source, signal, time_type, geo_type, time_value, issue, lag) = details
7676

77+
is_wip = False
78+
if signal[:4].lower() == "wip_":
79+
is_wip = True
80+
print(signal, is_wip)
81+
7782
csv_rows = csv_importer_impl.load_csv(path, geo_type)
7883

7984
all_rows_valid = False
8085
try:
81-
cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag)
86+
cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag, is_wip)
8287
rows_list = list(cc_rows)
8388
if not rows_list:
8489
raise ValueError("No data")

src/acquisition/covidcast/database.py

+16-12
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@ class CovidcastRow():
1717
"""A container for all the values of a single covidcast row."""
1818

1919
@staticmethod
20-
def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag):
20+
def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag, is_wip):
2121
return CovidcastRow(source, signal, time_type, geo_type, time_value,
2222
row_value.geo_value,
2323
row_value.value,
2424
row_value.stderr,
2525
row_value.sample_size,
26-
issue, lag)
26+
issue, lag, is_wip)
2727

2828
@staticmethod
29-
def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag):
29+
def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag, is_wip):
3030
# NOTE: returns a generator, as row_values is expected to be a generator
31-
return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag)
31+
return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag, is_wip)
3232
for row_value in row_values)
3333

34-
def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr, sample_size, issue, lag):
34+
def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr, sample_size, issue, lag, is_wip):
3535
self.id = None
3636
self.source = source
3737
self.signal = signal
@@ -46,6 +46,7 @@ def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, v
4646
self.direction = None
4747
self.issue = issue
4848
self.lag = lag
49+
self.is_wip = is_wip
4950

5051

5152
class Database:
@@ -103,12 +104,12 @@ def insert_or_update_batch(self, cc_rows, batch_size=0, commit_partial=False):
103104
(`id`, `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`,
104105
`value_updated_timestamp`, `value`, `stderr`, `sample_size`,
105106
`direction_updated_timestamp`, `direction`,
106-
`issue`, `lag`)
107+
`issue`, `lag`, `is_wip`)
107108
VALUES
108109
(0, %s, %s, %s, %s, %s, %s,
109110
UNIX_TIMESTAMP(NOW()), %s, %s, %s,
110111
0, NULL,
111-
%s, %s)
112+
%s, %s, %s)
112113
ON DUPLICATE KEY UPDATE
113114
`value_updated_timestamp` = VALUES(`value_updated_timestamp`),
114115
`value` = VALUES(`value`),
@@ -139,7 +140,8 @@ def insert_or_update_batch(self, cc_rows, batch_size=0, commit_partial=False):
139140
row.stderr,
140141
row.sample_size,
141142
row.issue,
142-
row.lag
143+
row.lag,
144+
row.is_wip
143145
) for row in cc_rows[start:end]]
144146

145147
result = self._cursor.executemany(sql, args)
@@ -164,7 +166,8 @@ def insert_or_update(
164166
stderr,
165167
sample_size,
166168
issue,
167-
lag):
169+
lag,
170+
is_wip):
168171
"""
169172
Insert a new row, or update an existing row, in the `covidcast` table.
170173
@@ -173,7 +176,7 @@ def insert_or_update(
173176

174177
sql = '''
175178
INSERT INTO `covidcast` VALUES
176-
(0, %s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, 0, NULL, %s, %s)
179+
(0, %s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, 0, NULL, %s, %s, %s)
177180
ON DUPLICATE KEY UPDATE
178181
`value_updated_timestamp` = VALUES(`value_updated_timestamp`),
179182
`value` = VALUES(`value`),
@@ -192,7 +195,8 @@ def insert_or_update(
192195
stderr,
193196
sample_size,
194197
issue,
195-
lag
198+
lag,
199+
is_wip
196200
)
197201

198202
self._cursor.execute(sql, args)
@@ -587,7 +591,7 @@ def get_covidcast_meta(self):
587591
x.`geo_type` = t.`geo_type` AND
588592
x.`geo_value` = t.`geo_value`
589593
WHERE
590-
t.`signal` NOT LIKE 'wip_%'
594+
NOT t.`is_wip`
591595
GROUP BY
592596
t.`source`,
593597
t.`signal`,

0 commit comments

Comments
 (0)