Skip to content

Commit eedca28

Browse files
committed
adding SQL from Joe's dbjobs and an extrememely basic runner module
also did a bunch of formatting of said SQL code
1 parent c010b08 commit eedca28

File tree

2 files changed

+120
-1
lines changed

2 files changed

+120
-1
lines changed

src/acquisition/covidcast/database.py

+107-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
163163
'''
164164

165165
update_status_sql = f'''
166-
UPDATE `{self.load_table}` SET `process_status` = '{self.process_status['loaded']}' WHERE `process_status` = '{self.process_status['inserting']}'
166+
UPDATE `{self.load_table}`
167+
SET `process_status` = '{self.process_status['loaded']}'
168+
WHERE `process_status` = '{self.process_status['inserting']}'
167169
'''
168170

169171
if 0 != self.count_insertstatus_rows():
@@ -219,6 +221,110 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
219221
raise e
220222
return total
221223

224+
def run_dbjobs(self):
225+
226+
signal_load_set_comp_keys = f'''
227+
UPDATE `{self.load_table}`
228+
SET compressed_signal_key = md5(CONCAT(`source`,`signal`)),
229+
compressed_geo_key = md5(CONCAT(`geo_type`,`geo_value`))
230+
'''
231+
232+
signal_load_mark_batch = f'''
233+
UPDATE `{self.load_table}`
234+
SET process_status = 'b'
235+
'''
236+
237+
signal_dim_add_new_load = f'''
238+
INSERT INTO signal_dim (`source`, `signal`, `compressed_signal_key`)
239+
SELECT DISTINCT `source`, `signal`, compressed_signal_key
240+
FROM `{self.load_table}`
241+
WHERE compressed_signal_key NOT IN
242+
(SELECT DISTINCT compressed_signal_key
243+
FROM signal_dim)
244+
'''
245+
246+
geo_dim_add_new_load = f'''
247+
INSERT INTO geo_dim (`geo_type`, `geo_value`, `compressed_geo_key`)
248+
SELECT DISTINCT `geo_type`, `geo_value`, compressed_geo_key
249+
FROM `{self.load_table}`
250+
WHERE compressed_geo_key NOT IN
251+
(SELECT DISTINCT compressed_geo_key
252+
FROM geo_dim)
253+
'''
254+
255+
signal_history_load = f'''
256+
INSERT INTO signal_history
257+
(signal_data_id, signal_key_id, geo_key_id, demog_key_id, issue, data_as_of_dt,
258+
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
259+
computation_as_of_dt, is_latest_issue, missing_value, missing_stderr, missing_sample_size, `id`)
260+
SELECT
261+
signal_data_id, sd.signal_key_id, gd.geo_key_id, 0, issue, data_as_of_dt,
262+
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
263+
computation_as_of_dt, is_latest_issue, missing_value, missing_stderr, missing_sample_size, `id`
264+
FROM `{self.load_table}` sl
265+
INNER JOIN signal_dim sd
266+
USE INDEX(`compressed_signal_key_ind`)
267+
ON sd.compressed_signal_key = sl.compressed_signal_key
268+
INNER JOIN geo_dim gd
269+
USE INDEX(`compressed_geo_key_ind`)
270+
ON gd.compressed_geo_key = sl.compressed_geo_key
271+
WHERE process_status = 'b'
272+
ON DUPLICATE KEY UPDATE
273+
`value_updated_timestamp` = sl.`value_updated_timestamp`,
274+
`value` = sl.`value`,
275+
`stderr` = sl.`stderr`,
276+
`sample_size` = sl.`sample_size`,
277+
`lag` = sl.`lag`,
278+
`missing_value` = sl.`missing_value`,
279+
`missing_stderr` = sl.`missing_stderr`,
280+
`missing_sample_size` = sl.`missing_sample_size`
281+
'''
282+
283+
signal_latest_load = f'''
284+
INSERT INTO signal_latest
285+
(signal_data_id, signal_key_id, geo_key_id, demog_key_id, issue, data_as_of_dt,
286+
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
287+
computation_as_of_dt, missing_value, missing_stderr, missing_sample_size)
288+
SELECT
289+
signal_data_id, sd.signal_key_id, gd.geo_key_id, 0, issue, data_as_of_dt,
290+
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
291+
computation_as_of_dt, missing_value, missing_stderr, missing_sample_size
292+
FROM `{self.load_table}` sl
293+
INNER JOIN signal_dim sd
294+
USE INDEX(`compressed_signal_key_ind`)
295+
ON sd.compressed_signal_key = sl.compressed_signal_key
296+
INNER JOIN geo_dim gd
297+
USE INDEX(`compressed_geo_key_ind`)
298+
ON gd.compressed_geo_key = sl.compressed_geo_key
299+
WHERE process_status = 'b'
300+
AND is_latest_issue = 1
301+
ON DUPLICATE KEY UPDATE
302+
`value_updated_timestamp` = sl.`value_updated_timestamp`,
303+
`value` = sl.`value`,
304+
`stderr` = sl.`stderr`,
305+
`sample_size` = sl.`sample_size`,
306+
`lag` = sl.`lag`,
307+
`missing_value` = sl.`missing_value`,
308+
`missing_stderr` = sl.`missing_stderr`,
309+
`missing_sample_size` = sl.`missing_sample_size`
310+
'''
311+
312+
signal_load_delete_processed = f'''
313+
DELETE FROM `{self.load_table}`
314+
WHERE process_status <> 'l'
315+
'''
316+
317+
self._cursor.execute(signal_load_set_comp_keys)
318+
self._cursor.execute(signal_load_mark_batch)
319+
self._cursor.execute(signal_dim_add_new_load)
320+
self._cursor.execute(geo_dim_add_new_load)
321+
self._cursor.execute(signal_history_load)
322+
self._cursor.execute(signal_latest_load)
323+
self._cursor.execute(signal_load_delete_processed)
324+
325+
return self
326+
327+
222328
def compute_covidcast_meta(self, table_name=None):
223329
"""Compute and return metadata on all COVIDcast signals."""
224330
logger = get_structured_logger("compute_covidcast_meta")
+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
2+
from delphi.epidata.acquisition.covidcast.database import Database
3+
4+
def main():
5+
database = Database()
6+
database.connect()
7+
try:
8+
database.run_dbjobs()
9+
finally:
10+
database.disconnect(True)
11+
12+
if __name__ == '__main__':
13+
main()

0 commit comments

Comments
 (0)