From 090c0dfb3f9355a10bf99e80bfb88e47d58187e8 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 31 May 2022 09:38:17 -0400 Subject: [PATCH 01/11] schema refinement, dbjobs speed increase, and added migration routine --- src/acquisition/covidcast/database.py | 303 +++++++++--------- .../covidcast/migrate_epidata_to_v4.py | 183 +++++++++++ src/ddl/v4_schema.sql | 155 +++++---- 3 files changed, 411 insertions(+), 230 deletions(-) create mode 100644 src/acquisition/covidcast/migrate_epidata_to_v4.py diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 6617d6dfa..d724a7b5f 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -74,11 +74,9 @@ class Database: DATABASE_NAME = 'covid' + latest_table = "signal_latest_v" # technically VIEW and not a TABLE, but... + history_table = "signal_history_v" # ...also a VIEW load_table = "signal_load" - latest_table = "signal_latest" # NOTE: careful! probably want to use variable `latest_view` instead for semantics purposes - latest_view = latest_table + "_v" - history_table = "signal_history" # NOTE: careful! probably want to use variable `history_view` instead for semantics purposes - history_view = history_table + "_v" def connect(self, connector_impl=mysql.connector): @@ -115,7 +113,7 @@ def count_all_rows(self, tablename=None): """Return the total number of rows in table `covidcast`.""" if tablename is None: - tablename = self.history_view + tablename = self.history_table self._cursor.execute(f'SELECT count(1) FROM `{tablename}`') @@ -123,10 +121,10 @@ def count_all_rows(self, tablename=None): return num def count_all_history_rows(self): - return self.count_all_rows(self.history_view) + return self.count_all_rows(self.history_table) def count_all_latest_rows(self): - return self.count_all_rows(self.latest_view) + return self.count_all_rows(self.latest_table) def count_insertstatus_rows(self): self._cursor.execute(f"SELECT count(1) from `{self.load_table}` where `process_status`='{PROCESS_STATUS.INSERTING}'") @@ -164,10 +162,10 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load. fix_is_latest_issue_sql = f''' UPDATE - `{self.load_table}` JOIN `{self.latest_view}` + `{self.load_table}` JOIN `{self.latest_table}` USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) SET `{self.load_table}`.`is_latest_issue`=0 - WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` + WHERE `{self.load_table}`.`issue` < `{self.latest_table}`.`issue` AND `process_status` = '{PROCESS_STATUS.INSERTING}' ''' @@ -232,6 +230,115 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False def run_dbjobs(self): + # we do this LEFT JOIN trick because mysql cant do set difference (aka EXCEPT or MINUS) + # (as in " select distinct source, signal from signal_dim minus select distinct source, signal from signal_load ") + signal_dim_add_new_load = f''' + INSERT INTO signal_dim (`source`, `signal`) + SELECT DISTINCT sl.source, sl.signal + FROM signal_load AS sl LEFT JOIN signal_dim AS sd + ON sl.source=sd.source AND sl.signal=sd.signal + WHERE sd.source IS NULL + ''' + + # again, same trick to get around lack of EXCEPT/MINUS + geo_dim_add_new_load = f''' + INSERT INTO geo_dim (`geo_type`, `geo_value`) + SELECT DISTINCT sl.geo_type, sl.geo_value + FROM signal_load AS sl LEFT JOIN geo_dim AS gd + ON sl.geo_type=gd.geo_type AND sl.geo_value=gd.geo_value + WHERE gd.geo_type IS NULL + ''' + + signal_history_load = f''' + INSERT INTO signal_history + (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size, `legacy_id`) + SELECT + signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size, `legacy_id` + FROM `{self.load_table}` sl + INNER JOIN signal_dim sd USING (source, `signal`) + INNER JOIN geo_dim gd USING (geo_type, geo_value) + ON DUPLICATE KEY UPDATE + `signal_data_id` = sl.`signal_data_id`, + `value_updated_timestamp` = sl.`value_updated_timestamp`, + `value` = sl.`value`, + `stderr` = sl.`stderr`, + `sample_size` = sl.`sample_size`, + `lag` = sl.`lag`, + `missing_value` = sl.`missing_value`, + `missing_stderr` = sl.`missing_stderr`, + `missing_sample_size` = sl.`missing_sample_size` + ''' + + signal_latest_load = f''' + INSERT INTO signal_latest + (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) + SELECT + signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, + time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size + FROM `{self.load_table}` sl + INNER JOIN signal_dim sd USING (source, `signal`) + INNER JOIN geo_dim gd USING (geo_type, geo_value) + WHERE is_latest_issue = 1 + ON DUPLICATE KEY UPDATE + `signal_data_id` = sl.`signal_data_id`, + `value_updated_timestamp` = sl.`value_updated_timestamp`, + `value` = sl.`value`, + `stderr` = sl.`stderr`, + `sample_size` = sl.`sample_size`, + `issue` = sl.`issue`, + `lag` = sl.`lag`, + `missing_value` = sl.`missing_value`, + `missing_stderr` = sl.`missing_stderr`, + `missing_sample_size` = sl.`missing_sample_size` + ''' + + signal_load_delete_processed = f''' + DELETE FROM `{self.load_table}` + ''' + + import time + time_q = [] + time_q.append(time.time()) + + print('signal_dim_add_new_load:', end='') + self._cursor.execute(signal_dim_add_new_load) + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('geo_dim_add_new_load:', end='') + self._cursor.execute(geo_dim_add_new_load) + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_history_load:', end='') + self._cursor.execute(signal_history_load) + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_latest_load:', end='') + self._cursor.execute(signal_latest_load) + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_load_delete_processed:', end='') + self._cursor.execute(signal_load_delete_processed) + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print("done.") + + return self + + + def run_dbjobs_old(self): + signal_load_set_comp_keys = f''' UPDATE `{self.load_table}` SET compressed_signal_key = md5(CONCAT(`source`,`signal`)), @@ -263,11 +370,11 @@ def run_dbjobs(self): signal_history_load = f''' INSERT INTO signal_history - (signal_data_id, signal_key_id, geo_key_id, demog_key_id, issue, data_as_of_dt, + (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, computation_as_of_dt, is_latest_issue, missing_value, missing_stderr, missing_sample_size, `legacy_id`) SELECT - signal_data_id, sd.signal_key_id, gd.geo_key_id, 0, issue, data_as_of_dt, + signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, computation_as_of_dt, is_latest_issue, missing_value, missing_stderr, missing_sample_size, `legacy_id` FROM `{self.load_table}` sl @@ -292,11 +399,11 @@ def run_dbjobs(self): signal_latest_load = f''' INSERT INTO signal_latest - (signal_data_id, signal_key_id, geo_key_id, demog_key_id, issue, data_as_of_dt, + (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) SELECT - signal_data_id, sd.signal_key_id, gd.geo_key_id, 0, issue, data_as_of_dt, + signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, computation_as_of_dt, missing_value, missing_stderr, missing_sample_size FROM `{self.load_table}` sl @@ -326,160 +433,56 @@ def run_dbjobs(self): WHERE process_status <> '{PROCESS_STATUS.LOADED}' ''' - print('signal_load_set_comp_keys:') + import time + time_q = [] + time_q.append(time.time()) + + print('signal_load_set_comp_keys:', end='') self._cursor.execute(signal_load_set_comp_keys) - print('signal_load_mark_batch:') + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_load_mark_batch:', end='') self._cursor.execute(signal_load_mark_batch) - print('signal_dim_add_new_load:') + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_dim_add_new_load:', end='') self._cursor.execute(signal_dim_add_new_load) - print('geo_dim_add_new_load:') + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('geo_dim_add_new_load:', end='') self._cursor.execute(geo_dim_add_new_load) - print('signal_history_load:') + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_history_load:', end='') self._cursor.execute(signal_history_load) - print('signal_latest_load:') + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_latest_load:', end='') self._cursor.execute(signal_latest_load) - print('signal_load_delete_processed:') + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + + print('signal_load_delete_processed:', end='') self._cursor.execute(signal_load_delete_processed) + time_q.append(time.time()) + print(f" elapsed: {time_q[-1]-time_q[-2]}s") + print("done.") return self - def delete_batch(self, cc_deletions): - """ - Remove rows specified by a csv file or list of tuples. - - If cc_deletions is a filename, the file should include a header row and use the following field order: - - geo_id - - value (ignored) - - stderr (ignored) - - sample_size (ignored) - - issue (YYYYMMDD format) - - time_value (YYYYMMDD format) - - geo_type - - signal - - source - - If cc_deletions is a list of tuples, the tuples should use the following field order (=same as above, plus time_type): - - geo_id - - value (ignored) - - stderr (ignored) - - sample_size (ignored) - - issue (YYYYMMDD format) - - time_value (YYYYMMDD format) - - geo_type - - signal - - source - - time_type - """ - - tmp_table_name = "tmp_delete_table" - # composite keys: - short_comp_key = "`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`" - long_comp_key = short_comp_key + ", `issue`" - - create_tmp_table_sql = f''' -CREATE OR REPLACE TABLE {tmp_table_name} LIKE {self.load_table}; -''' - - amend_tmp_table_sql = f''' -ALTER TABLE {tmp_table_name} ADD COLUMN delete_history_id BIGINT UNSIGNED, - ADD COLUMN delete_latest_id BIGINT UNSIGNED, - ADD COLUMN update_latest BINARY(1) DEFAULT 0; -''' - - load_tmp_table_infile_sql = f''' -LOAD DATA INFILE "{cc_deletions}" -INTO TABLE {tmp_table_name} -FIELDS TERMINATED BY "," -IGNORE 1 LINES -(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`) -SET time_type="day"; -''' - - load_tmp_table_insert_sql = f''' -INSERT INTO {tmp_table_name} -(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`, `time_type`, -`value_updated_timestamp`, `lag`, `is_latest_issue`) -VALUES -(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, -0, 0, 0) -''' - - add_history_id_sql = f''' -UPDATE {tmp_table_name} d INNER JOIN {self.history_view} h USING ({long_comp_key}) -SET d.delete_history_id=h.signal_data_id; -''' - - # if a row we are deleting also appears in the 'latest' table (with a matching 'issue')... - mark_for_update_latest_sql = f''' -UPDATE {tmp_table_name} d INNER JOIN {self.latest_view} ell USING ({long_comp_key}) -SET d.update_latest=1, d.delete_latest_id=ell.signal_data_id; -''' - - delete_history_sql = f''' -DELETE h FROM {tmp_table_name} d INNER JOIN {self.history_table} h ON d.delete_history_id=h.signal_data_id; -''' - - # ...remove it from 'latest'... - delete_latest_sql = f''' -DELETE ell FROM {tmp_table_name} d INNER JOIN {self.latest_table} ell ON d.delete_latest_id=ell.signal_data_id; -''' - - # ...and re-write that record with its next-latest issue (from 'history') instead. - # NOTE: this must be executed *AFTER* `delete_history_sql` to ensure we get the correct `issue` - # AND also after `delete_latest_sql` so that we dont get a key collision on insert. - update_latest_sql = f''' -INSERT INTO signal_latest - (issue, - signal_data_id, signal_key_id, geo_key_id, time_type, time_value, - value, stderr, sample_size, `lag`, value_updated_timestamp, - missing_value, missing_stderr, missing_sample_size) -SELECT - MAX(h.issue), - h.signal_data_id, h.signal_key_id, h.geo_key_id, h.time_type, h.time_value, - h.value, h.stderr, h.sample_size, h.`lag`, h.value_updated_timestamp, - h.missing_value, h.missing_stderr, h.missing_sample_size -FROM {self.history_view} h JOIN {tmp_table_name} d USING ({short_comp_key}) -WHERE d.update_latest=1 GROUP BY {short_comp_key}; -''' - - drop_tmp_table_sql = f'DROP TABLE {tmp_table_name}' - - total = None - try: - self._cursor.execute(create_tmp_table_sql) - self._cursor.execute(amend_tmp_table_sql) - if isinstance(cc_deletions, str): - self._cursor.execute(load_tmp_table_infile_sql) - elif isinstance(cc_deletions, list): - self._cursor.executemany(load_tmp_table_insert_sql, cc_deletions) - else: - raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}") - self._cursor.execute(add_history_id_sql) - self._cursor.execute(mark_for_update_latest_sql) - self._cursor.execute(delete_history_sql) - total = self._cursor.rowcount - # TODO: consider reporting rows removed and/or replaced in latest table as well - self._cursor.execute(delete_latest_sql) - self._cursor.execute(update_latest_sql) - self._connection.commit() - - if total == -1: - # the SQL connector does not support returning number of rows affected (see PEP 249) - total = None - except Exception as e: - raise e - finally: - self._cursor.execute(drop_tmp_table_sql) - return total - def compute_covidcast_meta(self, table_name=None): """Compute and return metadata on all COVIDcast signals.""" logger = get_structured_logger("compute_covidcast_meta") if table_name is None: - table_name = self.latest_view + table_name = self.latest_table n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server # NOTE: this may present a small problem if this job runs on different hardware than the db, diff --git a/src/acquisition/covidcast/migrate_epidata_to_v4.py b/src/acquisition/covidcast/migrate_epidata_to_v4.py new file mode 100644 index 000000000..0962abbac --- /dev/null +++ b/src/acquisition/covidcast/migrate_epidata_to_v4.py @@ -0,0 +1,183 @@ +# run as: +# python3 -u -m delphi.epidata.acquisition.covidcast.migrate_epidata_to_v4 +# ("-u" allows unbuffered print statements so we can watch timing in closer-to-real-time) + + +#####import delphi.operations.secrets as secrets +#####secrets.db.host = '172.30.n.n' # aka 'epidata-db-qa-01' +#####secrets.db.epi = ('delphi', 'xxxxxxxx') +# ^ these are already set appropriately on qa-automation in/by the operations module ^ + + +batch_size = 20_000_000 +upper_lim_override = False ##### 60_000_000 +use_transaction_wrappers = False +use_autocommit = False + +# used to continue to experiment with this module on the same DBMS but *not* muck up an already complete and valid 'covid' schema migration +destination_schema = 'covid' ##### 'covid' + +# TODO: maybe output: was autocommit enabled? was table locking used? what isolation type was used? were indexes enabled? were uniqueness checks enabled? +# TODO: make cli flags for various things listed in "outputs" suggestions above, plus: starting id, id upper limit (which id to stop at), batch size, and dbjobs "version" + + + +''' + +mysql> select count(id) from epidata.covidcast; ++------------+ +| count(id) | ++------------+ +| 2647381579 | ++------------+ +1 row in set (13 min 49.32 sec) + +mysql> select max(id) from epidata.covidcast; ++------------+ +| max(id) | ++------------+ +| 3740757041 | ++------------+ +1 row in set (0.00 sec) + +-- so ~71% coverage of actual rows per allocated ids ( 2647381579 / 3740757041 = .70771278379851347314 ) + +mysql> select time_value, issue from epidata.covidcast where id=3740757041; ++------------+----------+ +| time_value | issue | ++------------+----------+ +| 20210927 | 20210930 | ++------------+----------+ +1 row in set (0.01 sec) + +mysql> select now(); ++---------------------+ +| now() | ++---------------------+ +| 2022-05-16 16:45:34 | ++---------------------+ +1 row in set (0.00 sec) + +''' + + +from delphi.epidata.acquisition.covidcast.database import Database +import time + +def start_tx(cursor): + cursor.execute('SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;') + cursor.execute('SET autocommit=0;') # starts a transaction as suggested in https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + # NOTE: locks must be specified for any aliases of table names that are used + cursor.execute('''LOCK TABLES epidata.covidcast AS cc READ, + signal_load WRITE, signal_load AS sl WRITE, + signal_history WRITE, + signal_latest WRITE, + signal_dim WRITE, signal_dim AS sd READ, + geo_dim WRITE, geo_dim AS gd READ;''') + cursor.execute('SET unique_checks=0;') + +def finish_tx(cursor): + cursor.execute('SET unique_checks=1;') + cursor.execute('COMMIT;') + cursor.execute('UNLOCK TABLES;') + + +def do_batches(db, start, upper_lim): + # NOTE: upper_lim is not actually selected for ; make sure it exceeds any ids you want to include + batch_lower = start + + while batch_lower < upper_lim: + batch_upper = min(batch_lower + batch_size, upper_lim) + + # NOTE: first rows of column names are identical, second rows are for specifying a rename and a literal + batch_sql = f""" + INSERT INTO {destination_schema}.signal_load ( + `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size, + `legacy_id`, process_status + ) SELECT + `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size, + `id`, 'l' + FROM epidata.covidcast AS cc + USE INDEX(`PRIMARY`) + WHERE {batch_lower} <= cc.id AND cc.id < {batch_upper}; """ + # TODO: use LIMIT instead of id range?? + # TODO: might it be worth adding "ORDER BY id ASC" ? + + if use_transaction_wrappers: + start_tx(db._cursor) + + print(f"-=-=-=-=-=-=-=- RUNNING BATCH STARTING AT {batch_lower} -=-=-=-=-=-=-=-") + print(f"-=-=-=-=-=-=-=- RUNNING ''INSERT INTO SELECT FROM''... ", end="") + t = time.time() + db._cursor.execute(batch_sql) + print(f"elapsed: {time.time()-t} sec, rows: {db._cursor.rowcount} -=-=-=-=-=-=-=-") + + t = time.time() + db.run_dbjobs() + print(f"-=-=-=-=-=-=-=- RAN db_jobs()... elapsed: {time.time()-t} sec -=-=-=-=-=-=-=-") + #####db.run_dbjobs_old() + #####print(f"-=-=-=-=-=-=-=- RAN db_jobs_old()... elapsed: {time.time()-t} sec -=-=-=-=-=-=-=-") + + print("-=-=-=-=-=-=-=- RUNNING commit()... ", end="") + t = time.time() + db.commit() + if use_transaction_wrappers: + finish_tx(db._cursor) + print(f"elapsed: {time.time()-t} sec -=-=-=-=-=-=-=-") + + print("\n\n") + # move pointer for next batch + batch_lower = batch_upper + + +def main(): + Database.DATABASE_NAME = destination_schema + db = Database() + db.connect() + if use_autocommit: + db._connection.autocommit = True + + print(f"starting run at: {time.strftime('%c')}") + + # clear tables in the v4 schema + # TODO: potentially drop and recreate all tables + print("truncating tables...") + for table in "signal_load signal_latest signal_history geo_dim signal_dim".split(): + db._cursor.execute(f"TRUNCATE TABLE {table}") + db.commit() + + # TODO: if using "compressed" keys, this operation saves a significant amount of time... dont forget to restore them afterward! + #####db._cursor.execute(f"DROP INDEX comp_signal_key ON {destination_schema}.signal_load") + #####db._cursor.execute(f"DROP INDEX comp_geo_key ON {destination_schema}.signal_load") + #####db.commit() + # TODO: should we drop indexes on other tables? this may not save time in the long run, as the indexes must be rebuilt before going live + + if upper_lim_override: + upper_lim = upper_lim_override + else: + # find upper limit for data to be imported + db._cursor.execute("SELECT MAX(id) FROM epidata.covidcast;") + for (max_id,) in db._cursor: + upper_lim = 1 + max_id + + # run batch loop + do_batches(db, 1, upper_lim) + + # get table counts [the quick and dirty way] + print("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-") + db._cursor.execute(f"SELECT MAX(signal_data_id) FROM {destination_schema}.signal_history;") + for (max_id,) in db._cursor: + print(f"signal_history: {max_id}") + db._cursor.execute(f"SELECT MAX(signal_data_id) FROM {destination_schema}.signal_latest;") + for (max_id,) in db._cursor: + print(f"signal_latest: {max_id}") + db._cursor.execute(f"SELECT COUNT(signal_key_id), MAX(signal_key_id) FROM {destination_schema}.signal_dim;") + for (count_id, max_id) in db._cursor: + print(f"signal_dim: {count_id}/{max_id}") + db._cursor.execute(f"SELECT COUNT(geo_key_id), MAX(geo_key_id) FROM {destination_schema}.geo_dim;") + for (count_id, max_id) in db._cursor: + print(f"geo_dim: {count_id}/{max_id}") + + +if __name__ == '__main__': + main() diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 0d266a4ca..03dc2677d 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -1,6 +1,6 @@ -- -------------------------------- -- TODO: REMOVE THESE HACKS!!! (find a better way to do this --- +-- -- the database schema `epidata` is created by ENV variables specified in the docker image definition found at: -- ../../dev/docker/database/epidata/Dockerfile -- and the user 'user' is created with permissions on that database. @@ -15,76 +15,76 @@ GRANT ALL ON covid.* TO 'user'; -- -------------------------------- CREATE TABLE geo_dim ( - `geo_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, - `geo_type` VARCHAR(12), - `geo_value` VARCHAR(12), - `compressed_geo_key` VARCHAR(100), - - PRIMARY KEY (`geo_key_id`) USING BTREE, - UNIQUE INDEX `compressed_geo_key_ind` (`compressed_geo_key`) USING BTREE -); + `geo_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `geo_type` VARCHAR(12) NOT NULL, + `geo_value` VARCHAR(12) NOT NULL, + + UNIQUE INDEX `geo_dim_index` (`geo_type`, `geo_value`) +) ENGINE=InnoDB; CREATE TABLE signal_dim ( - `signal_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, - `source` VARCHAR(32), - `signal` VARCHAR(64), - `compressed_signal_key` VARCHAR(100), - - PRIMARY KEY (`signal_key_id`) USING BTREE, - UNIQUE INDEX `compressed_signal_key_ind` (`compressed_signal_key`) USING BTREE + `signal_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `source` VARCHAR(32) NOT NULL, + `signal` VARCHAR(64) NOT NULL, + + UNIQUE INDEX `signal_dim_index` (`source`, `signal`) ) ENGINE=InnoDB; +CREATE TABLE strat_dim ( + `strat_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + `stratification_name` VARCHAR(64) NOT NULL UNIQUE, + `stratification_descr` VARCHAR(64) NOT NULL +) ENGINE=InnoDB; +INSERT INTO strat_dim VALUES (1, 'NO_STRATIFICATION', ''); CREATE TABLE signal_history ( - `signal_data_id` BIGINT(20) UNSIGNED NOT NULL, - `signal_key_id` BIGINT(20) UNSIGNED, - `geo_key_id` BIGINT(20) UNSIGNED, - `demog_key_id` BIGINT(20) UNSIGNED, -- TODO: for future use ; also rename s/demog/stratification/ - `issue` INT(11), + `signal_data_id` BIGINT(20) UNSIGNED NOT NULL PRIMARY KEY, + `signal_key_id` BIGINT(20) UNSIGNED NOT NULL, + `geo_key_id` BIGINT(20) UNSIGNED NOT NULL, + `strat_key_id` BIGINT(20) UNSIGNED NOT NULL DEFAULT 1, -- TODO: for future use + `issue` INT(11) NOT NULL, `data_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed `time_type` VARCHAR(12) NOT NULL, `time_value` INT(11) NOT NULL, `reference_dt` DATETIME(0), -- TODO: for future use - `value` DOUBLE NULL DEFAULT NULL, - `stderr` DOUBLE NULL DEFAULT NULL, - `sample_size` DOUBLE NULL DEFAULT NULL, + `value` DOUBLE, + `stderr` DOUBLE, + `sample_size` DOUBLE, `lag` INT(11) NOT NULL, `value_updated_timestamp` INT(11) NOT NULL, `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed - `is_latest_issue` BINARY(1) NOT NULL DEFAULT '0', -- TODO: delete this, its hard to keep updated and its not currently used - `missing_value` INT(1) NULL DEFAULT '0', - `missing_stderr` INT(1) NULL DEFAULT '0', - `missing_sample_size` INT(1) NULL DEFAULT '0', - `legacy_id` BIGINT(20) UNSIGNED NULL DEFAULT NULL, -- not used beyond import of previous data into the v4 schema - - PRIMARY KEY (`signal_data_id`) USING BTREE, - UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`issue`,`time_type`,`time_value`) USING BTREE + `missing_value` INT(1) DEFAULT '0', + `missing_stderr` INT(1) DEFAULT '0', + `missing_sample_size` INT(1) DEFAULT '0', + `legacy_id` BIGINT(20) UNSIGNED, -- not used beyond import of previous data into the v4 schema + + UNIQUE INDEX `value_key` (`signal_key_id`, `time_type`, `time_value`, `issue`, `geo_key_id`), + UNIQUE INDEX `val_2_key` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`, `issue`) ) ENGINE=InnoDB; CREATE TABLE signal_latest ( - `signal_data_id` BIGINT(20) UNSIGNED NOT NULL, - `signal_key_id` BIGINT(20) UNSIGNED, - `geo_key_id` BIGINT(20) UNSIGNED, - `demog_key_id` BIGINT(20) UNSIGNED, -- TODO: for future use ; also rename s/demog/stratification/ - `issue` INT(11), + `signal_data_id` BIGINT(20) UNSIGNED NOT NULL PRIMARY KEY, + `signal_key_id` BIGINT(20) UNSIGNED NOT NULL, + `geo_key_id` BIGINT(20) UNSIGNED NOT NULL, + `strat_key_id` BIGINT(20) UNSIGNED NOT NULL DEFAULT 1, -- TODO: for future use + `issue` INT(11) NOT NULL, `data_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed `time_type` VARCHAR(12) NOT NULL, `time_value` INT(11) NOT NULL, `reference_dt` DATETIME(0), -- TODO: for future use - `value` DOUBLE NULL DEFAULT NULL, - `stderr` DOUBLE NULL DEFAULT NULL, - `sample_size` DOUBLE NULL DEFAULT NULL, + `value` DOUBLE, + `stderr` DOUBLE, + `sample_size` DOUBLE, `lag` INT(11) NOT NULL, `value_updated_timestamp` INT(11) NOT NULL, `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed - `missing_value` INT(1) NULL DEFAULT '0', - `missing_stderr` INT(1) NULL DEFAULT '0', - `missing_sample_size` INT(1) NULL DEFAULT '0', - - PRIMARY KEY (`signal_data_id`) USING BTREE, - UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`time_type`,`time_value`) USING BTREE + `missing_value` INT(1) DEFAULT '0', + `missing_stderr` INT(1) DEFAULT '0', + `missing_sample_size` INT(1) DEFAULT '0', + + UNIQUE INDEX `value_key` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`) ) ENGINE=InnoDB; @@ -96,11 +96,11 @@ CREATE TABLE signal_latest ( -- To restore the counter, a row must be written with a `signal_data_id` value greater than the maximum -- of its values in the other tables. CREATE TABLE signal_load ( - `signal_data_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, + `signal_data_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `signal_key_id` BIGINT(20) UNSIGNED, `geo_key_id` BIGINT(20) UNSIGNED, - `demog_key_id` BIGINT(20) UNSIGNED, -- TODO: for future use ; also rename s/demog/stratification/ - `issue` INT(11), + `strat_key_id` BIGINT(20) UNSIGNED NOT NULL DEFAULT 1, -- TODO: for future use + `issue` INT(11) NOT NULL, `data_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed `source` VARCHAR(32) NOT NULL, `signal` VARCHAR(64) NOT NULL, @@ -109,27 +109,22 @@ CREATE TABLE signal_load ( `time_type` VARCHAR(12) NOT NULL, `time_value` INT(11) NOT NULL, `reference_dt` DATETIME(0), -- TODO: for future use - `value` DOUBLE NULL DEFAULT NULL, - `stderr` DOUBLE NULL DEFAULT NULL, - `sample_size` DOUBLE NULL DEFAULT NULL, + `value` DOUBLE, + `stderr` DOUBLE, + `sample_size` DOUBLE, `lag` INT(11) NOT NULL, `value_updated_timestamp` INT(11) NOT NULL, `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed `is_latest_issue` BINARY(1) NOT NULL DEFAULT '0', - `missing_value` INT(1) NULL DEFAULT '0', - `missing_stderr` INT(1) NULL DEFAULT '0', - `missing_sample_size` INT(1) NULL DEFAULT '0', + `missing_value` INT(1) DEFAULT '0', + `missing_stderr` INT(1) DEFAULT '0', + `missing_sample_size` INT(1) DEFAULT '0', `legacy_id` BIGINT(20) UNSIGNED, -- not used beyond import of previous data into the v4 schema - `compressed_signal_key` VARCHAR(100), - `compressed_geo_key` VARCHAR(100), - `compressed_demog_key` VARCHAR(100), -- TODO: for future use ; also rename s/demog/stratification/ `process_status` VARCHAR(2) DEFAULT 'l', -- using codes: 'i' (I) for "inserting", 'l' (L) for "loaded", and 'b' for "batching" -- TODO: change `process_status` default to 'i' (I) "inserting" or even 'x'/'u' "undefined" ? - PRIMARY KEY (`signal_data_id`) USING BTREE, - INDEX `comp_signal_key` (`compressed_signal_key`) USING BTREE, - INDEX `comp_geo_key` (`compressed_geo_key`) USING BTREE -) ENGINE=InnoDB AUTO_INCREMENT=4000000001; + UNIQUE INDEX (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) +) ENGINE=InnoDB; CREATE OR REPLACE VIEW signal_history_v AS @@ -142,7 +137,7 @@ CREATE OR REPLACE VIEW signal_history_v AS `t3`.`geo_type` AS `geo_type`, `t3`.`geo_value` AS `geo_value`, `t1`.`signal_data_id` AS `signal_data_id`, -- TODO: unnecessary ...remove? - `t1`.`demog_key_id` AS `demog_key_id`, -- TODO: for future use ; also rename s/demog/stratification/ ...remove? + `t1`.`strat_key_id` AS `strat_key_id`, -- TODO: for future `t1`.`issue` AS `issue`, `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed ...remove? `t1`.`time_type` AS `time_type`, @@ -159,16 +154,16 @@ CREATE OR REPLACE VIEW signal_history_v AS `t1`.`missing_sample_size` AS `missing_sample_size`, `t1`.`signal_key_id` AS `signal_key_id`, -- TODO: unnecessary ...remove? `t1`.`geo_key_id` AS `geo_key_id` -- TODO: unnecessary ...remove? - FROM ((`signal_history` `t1` - JOIN `signal_dim` `t2` - USE INDEX (PRIMARY) - ON `t1`.`signal_key_id` = `t2`.`signal_key_id`) - JOIN `geo_dim` `t3` - USE INDEX (PRIMARY) - ON `t1`.`geo_key_id` = `t3`.`geo_key_id`); + FROM `signal_history` `t1` + JOIN `signal_dim` `t2` + USE INDEX (PRIMARY) + ON `t1`.`signal_key_id` = `t2`.`signal_key_id` + JOIN `geo_dim` `t3` + USE INDEX (PRIMARY) + ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; -CREATE OR REPLACE VIEW signal_latest_v AS +CREATE OR REPLACE VIEW signal_latest_v AS SELECT 1 AS is_latest_issue, -- provides column-compatibility to match `covidcast` table NULL AS direction, -- provides column-compatibility to match `covidcast` table @@ -177,7 +172,7 @@ CREATE OR REPLACE VIEW signal_latest_v AS `t3`.`geo_type` AS `geo_type`, `t3`.`geo_value` AS `geo_value`, `t1`.`signal_data_id` AS `signal_data_id`, -- TODO: unnecessary ...remove? - `t1`.`demog_key_id` AS `demog_key_id`, -- TODO: for future use ; also rename s/demog/stratification/ ...remove? + `t1`.`strat_key_id` AS `strat_key_id`, -- TODO: for future use `t1`.`issue` AS `issue`, `t1`.`data_as_of_dt` AS `data_as_of_dt`, -- TODO: for future use ; also "as_of" is problematic and should be renamed ...remove? `t1`.`time_type` AS `time_type`, @@ -194,13 +189,13 @@ CREATE OR REPLACE VIEW signal_latest_v AS `t1`.`missing_sample_size` AS `missing_sample_size`, `t1`.`signal_key_id` AS `signal_key_id`, -- TODO: unnecessary ...remove? `t1`.`geo_key_id` AS `geo_key_id` -- TODO: unnecessary ...remove? - FROM ((`signal_latest` `t1` - JOIN `signal_dim` `t2` - USE INDEX (PRIMARY) - ON `t1`.`signal_key_id` = `t2`.`signal_key_id`) - JOIN `geo_dim` `t3` - USE INDEX (PRIMARY) - ON `t1`.`geo_key_id` = `t3`.`geo_key_id`); + FROM `signal_latest` `t1` + JOIN `signal_dim` `t2` + USE INDEX (PRIMARY) + ON `t1`.`signal_key_id` = `t2`.`signal_key_id` + JOIN `geo_dim` `t3` + USE INDEX (PRIMARY) + ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; CREATE TABLE `covidcast_meta_cache` ( @@ -208,5 +203,5 @@ CREATE TABLE `covidcast_meta_cache` ( `epidata` LONGTEXT NOT NULL, PRIMARY KEY (`timestamp`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; +) ENGINE=InnoDB; INSERT INTO covidcast_meta_cache VALUES (0, '[]'); From b54069927f4a4f43213f3333fcba674dd0fc71a0 Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 2 Jun 2022 15:36:53 -0400 Subject: [PATCH 02/11] fix to re-do changes the last commit undid from the commit before it... and also restore a method that got deleted?? --- src/acquisition/covidcast/database.py | 148 ++++++++++++++++++++++++-- 1 file changed, 140 insertions(+), 8 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index d724a7b5f..1e7972477 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -74,9 +74,11 @@ class Database: DATABASE_NAME = 'covid' - latest_table = "signal_latest_v" # technically VIEW and not a TABLE, but... - history_table = "signal_history_v" # ...also a VIEW load_table = "signal_load" + latest_table = "signal_latest" # NOTE: careful! probably want to use variable `latest_view` instead for semantics purposes + latest_view = latest_table + "_v" + history_table = "signal_history" # NOTE: careful! probably want to use variable `history_view` instead for semantics purposes + history_view = history_table + "_v" def connect(self, connector_impl=mysql.connector): @@ -113,7 +115,7 @@ def count_all_rows(self, tablename=None): """Return the total number of rows in table `covidcast`.""" if tablename is None: - tablename = self.history_table + tablename = self.history_view self._cursor.execute(f'SELECT count(1) FROM `{tablename}`') @@ -121,10 +123,10 @@ def count_all_rows(self, tablename=None): return num def count_all_history_rows(self): - return self.count_all_rows(self.history_table) + return self.count_all_rows(self.history_view) def count_all_latest_rows(self): - return self.count_all_rows(self.latest_table) + return self.count_all_rows(self.latest_view) def count_insertstatus_rows(self): self._cursor.execute(f"SELECT count(1) from `{self.load_table}` where `process_status`='{PROCESS_STATUS.INSERTING}'") @@ -162,10 +164,10 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load. fix_is_latest_issue_sql = f''' UPDATE - `{self.load_table}` JOIN `{self.latest_table}` + `{self.load_table}` JOIN `{self.latest_view}` USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) SET `{self.load_table}`.`is_latest_issue`=0 - WHERE `{self.load_table}`.`issue` < `{self.latest_table}`.`issue` + WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` AND `process_status` = '{PROCESS_STATUS.INSERTING}' ''' @@ -476,13 +478,143 @@ def run_dbjobs_old(self): return self + def delete_batch(self, cc_deletions): + """ + Remove rows specified by a csv file or list of tuples. + + If cc_deletions is a filename, the file should include a header row and use the following field order: + - geo_id + - value (ignored) + - stderr (ignored) + - sample_size (ignored) + - issue (YYYYMMDD format) + - time_value (YYYYMMDD format) + - geo_type + - signal + - source + + If cc_deletions is a list of tuples, the tuples should use the following field order (=same as above, plus time_type): + - geo_id + - value (ignored) + - stderr (ignored) + - sample_size (ignored) + - issue (YYYYMMDD format) + - time_value (YYYYMMDD format) + - geo_type + - signal + - source + - time_type + """ + + tmp_table_name = "tmp_delete_table" + # composite keys: + short_comp_key = "`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`" + long_comp_key = short_comp_key + ", `issue`" + + create_tmp_table_sql = f''' +CREATE OR REPLACE TABLE {tmp_table_name} LIKE {self.load_table}; +''' + + amend_tmp_table_sql = f''' +ALTER TABLE {tmp_table_name} ADD COLUMN delete_history_id BIGINT UNSIGNED, + ADD COLUMN delete_latest_id BIGINT UNSIGNED, + ADD COLUMN update_latest BINARY(1) DEFAULT 0; +''' + + load_tmp_table_infile_sql = f''' +LOAD DATA INFILE "{cc_deletions}" +INTO TABLE {tmp_table_name} +FIELDS TERMINATED BY "," +IGNORE 1 LINES +(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`) +SET time_type="day"; +''' + + load_tmp_table_insert_sql = f''' +INSERT INTO {tmp_table_name} +(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`, `time_type`, +`value_updated_timestamp`, `lag`, `is_latest_issue`) +VALUES +(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, +0, 0, 0) +''' + + add_history_id_sql = f''' +UPDATE {tmp_table_name} d INNER JOIN {self.history_view} h USING ({long_comp_key}) +SET d.delete_history_id=h.signal_data_id; +''' + + # if a row we are deleting also appears in the 'latest' table (with a matching 'issue')... + mark_for_update_latest_sql = f''' +UPDATE {tmp_table_name} d INNER JOIN {self.latest_view} ell USING ({long_comp_key}) +SET d.update_latest=1, d.delete_latest_id=ell.signal_data_id; +''' + + delete_history_sql = f''' +DELETE h FROM {tmp_table_name} d INNER JOIN {self.history_table} h ON d.delete_history_id=h.signal_data_id; +''' + + # ...remove it from 'latest'... + delete_latest_sql = f''' +DELETE ell FROM {tmp_table_name} d INNER JOIN {self.latest_table} ell ON d.delete_latest_id=ell.signal_data_id; +''' + + # ...and re-write that record with its next-latest issue (from 'history') instead. + # NOTE: this must be executed *AFTER* `delete_history_sql` to ensure we get the correct `issue` + # AND also after `delete_latest_sql` so that we dont get a key collision on insert. + update_latest_sql = f''' +INSERT INTO signal_latest + (issue, + signal_data_id, signal_key_id, geo_key_id, time_type, time_value, + value, stderr, sample_size, `lag`, value_updated_timestamp, + missing_value, missing_stderr, missing_sample_size) +SELECT + MAX(h.issue), + h.signal_data_id, h.signal_key_id, h.geo_key_id, h.time_type, h.time_value, + h.value, h.stderr, h.sample_size, h.`lag`, h.value_updated_timestamp, + h.missing_value, h.missing_stderr, h.missing_sample_size +FROM {self.history_view} h JOIN {tmp_table_name} d USING ({short_comp_key}) +WHERE d.update_latest=1 GROUP BY {short_comp_key}; +''' + + drop_tmp_table_sql = f'DROP TABLE {tmp_table_name}' + + total = None + try: + self._cursor.execute(create_tmp_table_sql) + self._cursor.execute(amend_tmp_table_sql) + if isinstance(cc_deletions, str): + self._cursor.execute(load_tmp_table_infile_sql) + elif isinstance(cc_deletions, list): + self._cursor.executemany(load_tmp_table_insert_sql, cc_deletions) + else: + raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}") + self._cursor.execute(add_history_id_sql) + self._cursor.execute(mark_for_update_latest_sql) + self._cursor.execute(delete_history_sql) + total = self._cursor.rowcount + # TODO: consider reporting rows removed and/or replaced in latest table as well + self._cursor.execute(delete_latest_sql) + self._cursor.execute(update_latest_sql) + self._connection.commit() + + if total == -1: + # the SQL connector does not support returning number of rows affected (see PEP 249) + total = None + except Exception as e: + raise e + finally: + self._cursor.execute(drop_tmp_table_sql) + return total + + def compute_covidcast_meta(self, table_name=None): """Compute and return metadata on all COVIDcast signals.""" logger = get_structured_logger("compute_covidcast_meta") if table_name is None: - table_name = self.latest_table + table_name = self.latest_view n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server # NOTE: this may present a small problem if this job runs on different hardware than the db, From dca9e184d0a7f198d6259a3bebe92a297c24e459 Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 2 Jun 2022 16:16:53 -0400 Subject: [PATCH 03/11] removed run_dbjobs_old() and cleaned up table name references --- src/acquisition/covidcast/database.py | 157 ++------------------------ 1 file changed, 10 insertions(+), 147 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 1e7972477..1152b02ff 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -79,6 +79,8 @@ class Database: latest_view = latest_table + "_v" history_table = "signal_history" # NOTE: careful! probably want to use variable `history_view` instead for semantics purposes history_view = history_table + "_v" + # TODO: consider using class variables like this for dimension table names too + # TODO: also consider that for composite key tuples, like short_comp_key and long_comp_key as used in delete_batch() def connect(self, connector_impl=mysql.connector): @@ -115,7 +117,7 @@ def count_all_rows(self, tablename=None): """Return the total number of rows in table `covidcast`.""" if tablename is None: - tablename = self.history_view + tablename = self.history_table self._cursor.execute(f'SELECT count(1) FROM `{tablename}`') @@ -123,10 +125,10 @@ def count_all_rows(self, tablename=None): return num def count_all_history_rows(self): - return self.count_all_rows(self.history_view) + return self.count_all_rows(self.history_table) def count_all_latest_rows(self): - return self.count_all_rows(self.latest_view) + return self.count_all_rows(self.latest_table) def count_insertstatus_rows(self): self._cursor.execute(f"SELECT count(1) from `{self.load_table}` where `process_status`='{PROCESS_STATUS.INSERTING}'") @@ -237,7 +239,7 @@ def run_dbjobs(self): signal_dim_add_new_load = f''' INSERT INTO signal_dim (`source`, `signal`) SELECT DISTINCT sl.source, sl.signal - FROM signal_load AS sl LEFT JOIN signal_dim AS sd + FROM {self.load_table} AS sl LEFT JOIN signal_dim AS sd ON sl.source=sd.source AND sl.signal=sd.signal WHERE sd.source IS NULL ''' @@ -246,13 +248,13 @@ def run_dbjobs(self): geo_dim_add_new_load = f''' INSERT INTO geo_dim (`geo_type`, `geo_value`) SELECT DISTINCT sl.geo_type, sl.geo_value - FROM signal_load AS sl LEFT JOIN geo_dim AS gd + FROM {self.load_table} AS sl LEFT JOIN geo_dim AS gd ON sl.geo_type=gd.geo_type AND sl.geo_value=gd.geo_value WHERE gd.geo_type IS NULL ''' signal_history_load = f''' - INSERT INTO signal_history + INSERT INTO {self.history_table} (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, computation_as_of_dt, missing_value, missing_stderr, missing_sample_size, `legacy_id`) @@ -276,7 +278,7 @@ def run_dbjobs(self): ''' signal_latest_load = f''' - INSERT INTO signal_latest + INSERT INTO {self.latest_table} (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) @@ -339,145 +341,6 @@ def run_dbjobs(self): return self - def run_dbjobs_old(self): - - signal_load_set_comp_keys = f''' - UPDATE `{self.load_table}` - SET compressed_signal_key = md5(CONCAT(`source`,`signal`)), - compressed_geo_key = md5(CONCAT(`geo_type`,`geo_value`)) - ''' - - signal_load_mark_batch = f''' - UPDATE `{self.load_table}` - SET process_status = '{PROCESS_STATUS.BATCHING}' - ''' - - signal_dim_add_new_load = f''' - INSERT INTO signal_dim (`source`, `signal`, `compressed_signal_key`) - SELECT DISTINCT `source`, `signal`, compressed_signal_key - FROM `{self.load_table}` - WHERE compressed_signal_key NOT IN - (SELECT DISTINCT compressed_signal_key - FROM signal_dim) - ''' - - geo_dim_add_new_load = f''' - INSERT INTO geo_dim (`geo_type`, `geo_value`, `compressed_geo_key`) - SELECT DISTINCT `geo_type`, `geo_value`, compressed_geo_key - FROM `{self.load_table}` - WHERE compressed_geo_key NOT IN - (SELECT DISTINCT compressed_geo_key - FROM geo_dim) - ''' - - signal_history_load = f''' - INSERT INTO signal_history - (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, - time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, - computation_as_of_dt, is_latest_issue, missing_value, missing_stderr, missing_sample_size, `legacy_id`) - SELECT - signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, - time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, - computation_as_of_dt, is_latest_issue, missing_value, missing_stderr, missing_sample_size, `legacy_id` - FROM `{self.load_table}` sl - INNER JOIN signal_dim sd - USE INDEX(`compressed_signal_key_ind`) - ON sd.compressed_signal_key = sl.compressed_signal_key - INNER JOIN geo_dim gd - USE INDEX(`compressed_geo_key_ind`) - ON gd.compressed_geo_key = sl.compressed_geo_key - WHERE process_status = '{PROCESS_STATUS.BATCHING}' - ON DUPLICATE KEY UPDATE - `signal_data_id` = sl.`signal_data_id`, - `value_updated_timestamp` = sl.`value_updated_timestamp`, - `value` = sl.`value`, - `stderr` = sl.`stderr`, - `sample_size` = sl.`sample_size`, - `lag` = sl.`lag`, - `missing_value` = sl.`missing_value`, - `missing_stderr` = sl.`missing_stderr`, - `missing_sample_size` = sl.`missing_sample_size` - ''' - - signal_latest_load = f''' - INSERT INTO signal_latest - (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, - time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, - computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) - SELECT - signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, - time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, - computation_as_of_dt, missing_value, missing_stderr, missing_sample_size - FROM `{self.load_table}` sl - INNER JOIN signal_dim sd - USE INDEX(`compressed_signal_key_ind`) - ON sd.compressed_signal_key = sl.compressed_signal_key - INNER JOIN geo_dim gd - USE INDEX(`compressed_geo_key_ind`) - ON gd.compressed_geo_key = sl.compressed_geo_key - WHERE process_status = '{PROCESS_STATUS.BATCHING}' - AND is_latest_issue = 1 - ON DUPLICATE KEY UPDATE - `signal_data_id` = sl.`signal_data_id`, - `value_updated_timestamp` = sl.`value_updated_timestamp`, - `value` = sl.`value`, - `stderr` = sl.`stderr`, - `sample_size` = sl.`sample_size`, - `issue` = sl.`issue`, - `lag` = sl.`lag`, - `missing_value` = sl.`missing_value`, - `missing_stderr` = sl.`missing_stderr`, - `missing_sample_size` = sl.`missing_sample_size` - ''' - - signal_load_delete_processed = f''' - DELETE FROM `{self.load_table}` - WHERE process_status <> '{PROCESS_STATUS.LOADED}' - ''' - - import time - time_q = [] - time_q.append(time.time()) - - print('signal_load_set_comp_keys:', end='') - self._cursor.execute(signal_load_set_comp_keys) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_load_mark_batch:', end='') - self._cursor.execute(signal_load_mark_batch) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_dim_add_new_load:', end='') - self._cursor.execute(signal_dim_add_new_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('geo_dim_add_new_load:', end='') - self._cursor.execute(geo_dim_add_new_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_history_load:', end='') - self._cursor.execute(signal_history_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_latest_load:', end='') - self._cursor.execute(signal_latest_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_load_delete_processed:', end='') - self._cursor.execute(signal_load_delete_processed) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print("done.") - - return self - def delete_batch(self, cc_deletions): """ Remove rows specified by a csv file or list of tuples. @@ -563,7 +426,7 @@ def delete_batch(self, cc_deletions): # NOTE: this must be executed *AFTER* `delete_history_sql` to ensure we get the correct `issue` # AND also after `delete_latest_sql` so that we dont get a key collision on insert. update_latest_sql = f''' -INSERT INTO signal_latest +INSERT INTO {self.latest_table} (issue, signal_data_id, signal_key_id, geo_key_id, time_type, time_value, value, stderr, sample_size, `lag`, value_updated_timestamp, From d3130def531d68e2d48926782159c3d68fa34214 Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 2 Jun 2022 16:36:10 -0400 Subject: [PATCH 04/11] removed process_status column of load table, moved dbjobs() call to end of regular acquisition method, removed helper to run dbjobs as a standalone process --- .../covidcast/test_csv_uploading.py | 10 ----- integrations/acquisition/covidcast/test_db.py | 2 - .../covidcast/test_delete_batch.py | 1 - integrations/client/test_delphi_epidata.py | 4 -- .../server/test_covidcast_endpoints.py | 1 - src/acquisition/covidcast/database.py | 38 +++++-------------- src/acquisition/covidcast/dbjobs_runner.py | 15 -------- .../covidcast/migrate_epidata_to_v4.py | 4 +- src/ddl/v4_schema.sql | 2 - 9 files changed, 11 insertions(+), 66 deletions(-) delete mode 100644 src/acquisition/covidcast/dbjobs_runner.py diff --git a/integrations/acquisition/covidcast/test_csv_uploading.py b/integrations/acquisition/covidcast/test_csv_uploading.py index 29f74f46d..ecb1eb77b 100644 --- a/integrations/acquisition/covidcast/test_csv_uploading.py +++ b/integrations/acquisition/covidcast/test_csv_uploading.py @@ -15,7 +15,6 @@ from delphi_utils import Nans from delphi.epidata.client.delphi_epidata import Epidata from delphi.epidata.acquisition.covidcast.csv_to_database import main -from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main import delphi.operations.secrets as secrets # py3tester coverage target (equivalent to `import *`) @@ -123,7 +122,6 @@ def test_uploading(self): # upload CSVs main(args) - dbjobs_main() response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records") @@ -152,7 +150,6 @@ def test_uploading(self): # upload CSVs main(args) - dbjobs_main() response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') expected_values = pd.concat([values, pd.DataFrame({ @@ -187,7 +184,6 @@ def test_uploading(self): # upload CSVs main(args) - dbjobs_main() response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') expected_response = {'result': -2, 'message': 'no results'} @@ -213,7 +209,6 @@ def test_uploading(self): # upload CSVs main(args) - dbjobs_main() response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') expected_values_df = pd.concat([values, pd.DataFrame({ @@ -247,7 +242,6 @@ def test_uploading(self): # upload CSVs main(args) - dbjobs_main() response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') expected_values = pd.concat([values, pd.DataFrame({ @@ -283,7 +277,6 @@ def test_uploading(self): # upload CSVs main(args) - dbjobs_main() response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') expected_values = pd.concat([values, pd.DataFrame({ @@ -315,7 +308,6 @@ def test_uploading(self): # upload CSVs main(args) - dbjobs_main() response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') expected_response = {'result': -2, 'message': 'no results'} @@ -332,7 +324,6 @@ def test_uploading(self): f.write('this,header,is,wrong\n') main(args) - dbjobs_main() path = data_dir + '/archive/failed/src-name/20200420_state_test.csv' self.assertIsNotNone(os.stat(path)) @@ -346,7 +337,6 @@ def test_uploading(self): f.write('file name is wrong\n') main(args) - dbjobs_main() path = data_dir + '/archive/failed/unknown/hello.csv' self.assertIsNotNone(os.stat(path)) diff --git a/integrations/acquisition/covidcast/test_db.py b/integrations/acquisition/covidcast/test_db.py index 68a6875b0..21e297932 100644 --- a/integrations/acquisition/covidcast/test_db.py +++ b/integrations/acquisition/covidcast/test_db.py @@ -31,9 +31,7 @@ def _make_dummy_row(self): # cols: ^ timeval v se ssz ^issue ^lag def _insert_rows(self, rows): - # inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() ###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib) def _find_matches_for_row(self, row): diff --git a/integrations/acquisition/covidcast/test_delete_batch.py b/integrations/acquisition/covidcast/test_delete_batch.py index 86469948c..41921a3d3 100644 --- a/integrations/acquisition/covidcast/test_delete_batch.py +++ b/integrations/acquisition/covidcast/test_delete_batch.py @@ -68,7 +68,6 @@ def _test_delete_batch(self, cc_deletions): ] rows.append(CovidcastRow('src', 'sig', 'day', 'geo', 0, "d_justone", 0,0,0,0,0,0, 1, 0)) self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() # delete entries self._db.delete_batch(cc_deletions) diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 2d4d29ff2..5f012f6c9 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -90,7 +90,6 @@ def test_covidcast(self): 6.5, 2.2, 11.5, nmv, nmv, nmv, 20200416, 2), ] self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() self._db._connection.commit() with self.subTest(name='request two signals'): @@ -355,7 +354,6 @@ def test_geo_value(self): 60, 61, 62, nmv, nmv, nmv, 20200414, 0), ] self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() self._db._connection.commit() def fetch(geo_value): @@ -445,7 +443,6 @@ def test_covidcast_meta(self): 7.0, 2.0, 12.5, nmv, nmv, nmv, 20200416, 1), ] self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() self._db._connection.commit() # cache it @@ -501,7 +498,6 @@ def test_async_epidata(self): 60, 61, 62, nmv, nmv, nmv, 20200414, 0) ] self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() self._db._connection.commit() test_output = Epidata.async_epidata([ diff --git a/integrations/server/test_covidcast_endpoints.py b/integrations/server/test_covidcast_endpoints.py index 7815fced2..f8a29b699 100644 --- a/integrations/server/test_covidcast_endpoints.py +++ b/integrations/server/test_covidcast_endpoints.py @@ -125,7 +125,6 @@ def tearDown(self): def _insert_rows(self, rows: Iterable[CovidcastRow]): self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() self._db._connection.commit() return rows diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 1152b02ff..840701282 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -61,13 +61,6 @@ def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, v self.lag = lag -# constants for the codes used in the `process_status` column of `signal_load` -class _PROCESS_STATUS(object): - INSERTING = 'i' - LOADED = 'l' - BATCHING = 'b' -PROCESS_STATUS = _PROCESS_STATUS() - class Database: """A collection of covidcast database operations.""" @@ -130,11 +123,8 @@ def count_all_history_rows(self): def count_all_latest_rows(self): return self.count_all_rows(self.latest_table) - def count_insertstatus_rows(self): - self._cursor.execute(f"SELECT count(1) from `{self.load_table}` where `process_status`='{PROCESS_STATUS.INSERTING}'") - - for (num,) in self._cursor: - return num + def count_all_load_rows(self): + return self.count_all_rows(self.load_table) def insert_or_update_bulk(self, cc_rows): @@ -143,7 +133,6 @@ def insert_or_update_bulk(self, cc_rows): def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False): """ Insert new rows (or update existing) into the load table. - Data inserted this way will not be available to clients until the appropriate steps from src/dbjobs/ have run """ # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and @@ -152,13 +141,11 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False INSERT INTO `{self.load_table}` (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, - `is_latest_issue`, `missing_value`, `missing_stderr`, `missing_sample_size`, - `process_status`) + `is_latest_issue`, `missing_value`, `missing_stderr`, `missing_sample_size`) VALUES (%s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, - 1, %s, %s, %s, - '{PROCESS_STATUS.INSERTING}') + 1, %s, %s, %s) ''' # all load table entries are already marked "is_latest_issue". @@ -170,19 +157,12 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) SET `{self.load_table}`.`is_latest_issue`=0 WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` - AND `process_status` = '{PROCESS_STATUS.INSERTING}' ''' - update_status_sql = f''' - UPDATE `{self.load_table}` - SET `process_status` = '{PROCESS_STATUS.LOADED}' - WHERE `process_status` = '{PROCESS_STATUS.INSERTING}' - ''' - - if 0 != self.count_insertstatus_rows(): - # TODO: determine if this should be fatal?! + if 0 != self.count_all_load_rows(): + # TODO: add a test for this logger = get_structured_logger("insert_or_update_batch") - logger.warn("Non-zero count in the load table!!! This indicates scheduling of acqusition and dbjobs may be out of sync.") + logger.fatal("Non-zero count in the load table!!! This indicates a previous acquisition run may have failed, another acquisition is in progress, or this process does not otherwise have exclusive access to the db!") # TODO: consider handling cc_rows as a generator instead of a list @@ -218,7 +198,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False self._cursor.executemany(insert_into_loader_sql, args) modified_row_count = self._cursor.rowcount self._cursor.execute(fix_is_latest_issue_sql) - self._cursor.execute(update_status_sql) + self.run_dbjobs() # TODO: consider incorporating the logic of dbjobs() into this method [once calls to dbjobs() are no longer needed for migrations] if modified_row_count is None or modified_row_count == -1: # the SQL connector does not support returning number of rows affected (see PEP 249) @@ -303,6 +283,7 @@ def run_dbjobs(self): `missing_sample_size` = sl.`missing_sample_size` ''' + # NOTE: DO NOT `TRUNCATE` THIS TABLE! doing so will ruin the AUTO_INCREMENT counter that the history and latest tables depend on... signal_load_delete_processed = f''' DELETE FROM `{self.load_table}` ''' @@ -471,7 +452,6 @@ def delete_batch(self, cc_deletions): return total - def compute_covidcast_meta(self, table_name=None): """Compute and return metadata on all COVIDcast signals.""" logger = get_structured_logger("compute_covidcast_meta") diff --git a/src/acquisition/covidcast/dbjobs_runner.py b/src/acquisition/covidcast/dbjobs_runner.py deleted file mode 100644 index a8f8e1c80..000000000 --- a/src/acquisition/covidcast/dbjobs_runner.py +++ /dev/null @@ -1,15 +0,0 @@ - -from delphi.epidata.acquisition.covidcast.database import Database - -# simple helper to easily run dbjobs from the command line, such as after an acquisition cycle is complete - -def main(): - database = Database() - database.connect() - try: - database.run_dbjobs() - finally: - database.disconnect(True) - -if __name__ == '__main__': - main() diff --git a/src/acquisition/covidcast/migrate_epidata_to_v4.py b/src/acquisition/covidcast/migrate_epidata_to_v4.py index 0962abbac..53bb21aa3 100644 --- a/src/acquisition/covidcast/migrate_epidata_to_v4.py +++ b/src/acquisition/covidcast/migrate_epidata_to_v4.py @@ -93,10 +93,10 @@ def do_batches(db, start, upper_lim): batch_sql = f""" INSERT INTO {destination_schema}.signal_load ( `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size, - `legacy_id`, process_status + `legacy_id` ) SELECT `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size, - `id`, 'l' + `id` FROM epidata.covidcast AS cc USE INDEX(`PRIMARY`) WHERE {batch_lower} <= cc.id AND cc.id < {batch_upper}; """ diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 03dc2677d..f12dd1968 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -120,8 +120,6 @@ CREATE TABLE signal_load ( `missing_stderr` INT(1) DEFAULT '0', `missing_sample_size` INT(1) DEFAULT '0', `legacy_id` BIGINT(20) UNSIGNED, -- not used beyond import of previous data into the v4 schema - `process_status` VARCHAR(2) DEFAULT 'l', -- using codes: 'i' (I) for "inserting", 'l' (L) for "loaded", and 'b' for "batching" - -- TODO: change `process_status` default to 'i' (I) "inserting" or even 'x'/'u' "undefined" ? UNIQUE INDEX (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) ) ENGINE=InnoDB; From d65797f5029fe7618544a2685979a361122fa1d6 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 14 Jun 2022 15:51:06 -0400 Subject: [PATCH 05/11] adding method to fix autoincrement counter --- src/acquisition/covidcast/database.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 840701282..9f67d309a 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -126,6 +126,24 @@ def count_all_latest_rows(self): def count_all_load_rows(self): return self.count_all_rows(self.load_table) + def _reset_load_table_ai_counter(self): + """Corrects the AUTO_INCREMENT counter in the load table. + + To be used in emergencies only, if the load table was accidentally TRUNCATEd. + This ensures any `signal_data_id`s generated by the load table will not collide with the history or latest tables. + This is also destructive to any data in the load table. + """ + + self._cursor.execute(f'DELETE FROM signal_load') + # NOTE: 'ones' are used as filler here for the (required) NOT NULL columns. + self._cursor.execute(f""" + INSERT INTO signal_load + (signal_data_id, + source, `signal`, geo_type, geo_value, time_type, time_value, issue, `lag`, value_updated_timestamp) + VALUES + ((SELECT 1+MAX(signal_data_id) FROM signal_history), + '1', '1', '1', '1', '1', 1, 1, 1, 1);""") + self._cursor.execute(f'DELETE FROM signal_load') def insert_or_update_bulk(self, cc_rows): return self.insert_or_update_batch(cc_rows) From aa81eb8f6c131a9c72dc0953a160b81857ebb748 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 21 Jun 2022 19:54:47 -0400 Subject: [PATCH 06/11] proper cli arguments incl a flag to resume processing (by id) --- .../covidcast/migrate_epidata_to_v4.py | 85 ++++++++++--------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/src/acquisition/covidcast/migrate_epidata_to_v4.py b/src/acquisition/covidcast/migrate_epidata_to_v4.py index 53bb21aa3..3ce24e9a5 100644 --- a/src/acquisition/covidcast/migrate_epidata_to_v4.py +++ b/src/acquisition/covidcast/migrate_epidata_to_v4.py @@ -9,18 +9,13 @@ # ^ these are already set appropriately on qa-automation in/by the operations module ^ -batch_size = 20_000_000 -upper_lim_override = False ##### 60_000_000 +# TODO: make cli flags for these two variables: use_transaction_wrappers = False use_autocommit = False -# used to continue to experiment with this module on the same DBMS but *not* muck up an already complete and valid 'covid' schema migration -destination_schema = 'covid' ##### 'covid' - # TODO: maybe output: was autocommit enabled? was table locking used? what isolation type was used? were indexes enabled? were uniqueness checks enabled? -# TODO: make cli flags for various things listed in "outputs" suggestions above, plus: starting id, id upper limit (which id to stop at), batch size, and dbjobs "version" - +# TODO: consider dropping indexes before moving data and recreating them afterward ''' @@ -63,6 +58,7 @@ from delphi.epidata.acquisition.covidcast.database import Database import time +import argparse def start_tx(cursor): cursor.execute('SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;') @@ -82,7 +78,7 @@ def finish_tx(cursor): cursor.execute('UNLOCK TABLES;') -def do_batches(db, start, upper_lim): +def do_batches(db, start, upper_lim, batch_size): # NOTE: upper_lim is not actually selected for ; make sure it exceeds any ids you want to include batch_lower = start @@ -91,12 +87,10 @@ def do_batches(db, start, upper_lim): # NOTE: first rows of column names are identical, second rows are for specifying a rename and a literal batch_sql = f""" - INSERT INTO {destination_schema}.signal_load ( - `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size, - `legacy_id` + INSERT INTO signal_load ( + `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size ) SELECT - `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size, - `id` + `issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size FROM epidata.covidcast AS cc USE INDEX(`PRIMARY`) WHERE {batch_lower} <= cc.id AND cc.id < {batch_upper}; """ @@ -115,8 +109,6 @@ def do_batches(db, start, upper_lim): t = time.time() db.run_dbjobs() print(f"-=-=-=-=-=-=-=- RAN db_jobs()... elapsed: {time.time()-t} sec -=-=-=-=-=-=-=-") - #####db.run_dbjobs_old() - #####print(f"-=-=-=-=-=-=-=- RAN db_jobs_old()... elapsed: {time.time()-t} sec -=-=-=-=-=-=-=-") print("-=-=-=-=-=-=-=- RUNNING commit()... ", end="") t = time.time() @@ -130,28 +122,13 @@ def do_batches(db, start, upper_lim): batch_lower = batch_upper -def main(): +def main(destination_schema, batch_size, start_id, upper_lim_override): Database.DATABASE_NAME = destination_schema db = Database() db.connect() if use_autocommit: db._connection.autocommit = True - print(f"starting run at: {time.strftime('%c')}") - - # clear tables in the v4 schema - # TODO: potentially drop and recreate all tables - print("truncating tables...") - for table in "signal_load signal_latest signal_history geo_dim signal_dim".split(): - db._cursor.execute(f"TRUNCATE TABLE {table}") - db.commit() - - # TODO: if using "compressed" keys, this operation saves a significant amount of time... dont forget to restore them afterward! - #####db._cursor.execute(f"DROP INDEX comp_signal_key ON {destination_schema}.signal_load") - #####db._cursor.execute(f"DROP INDEX comp_geo_key ON {destination_schema}.signal_load") - #####db.commit() - # TODO: should we drop indexes on other tables? this may not save time in the long run, as the indexes must be rebuilt before going live - if upper_lim_override: upper_lim = upper_lim_override else: @@ -160,24 +137,52 @@ def main(): for (max_id,) in db._cursor: upper_lim = 1 + max_id + print(f"migrating data to schema '{destination_schema}', with batch size {batch_size} and {start_id} <= ids < {upper_lim}") + if start_id==0: + print("this WILL truncate any existing v4 tables") + print() + if input("type 'yes' to continue: ") != 'yes': + import sys + sys.exit('operation cancelled!') + + print(f"starting run at: {time.strftime('%c')}") + + if start_id==0: + # clear tables in the v4 schema + print("truncating tables...") + for table in "signal_load signal_latest signal_history geo_dim signal_dim".split(): + db._cursor.execute(f"TRUNCATE TABLE {table}") + db.commit() + start_id = 1 + # run batch loop - do_batches(db, 1, upper_lim) + do_batches(db, start_id, upper_lim, batch_size) # get table counts [the quick and dirty way] print("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-") - db._cursor.execute(f"SELECT MAX(signal_data_id) FROM {destination_schema}.signal_history;") + db._cursor.execute(f"SELECT MAX(signal_data_id) FROM signal_history;") for (max_id,) in db._cursor: print(f"signal_history: {max_id}") - db._cursor.execute(f"SELECT MAX(signal_data_id) FROM {destination_schema}.signal_latest;") + db._cursor.execute(f"SELECT MAX(signal_data_id) FROM signal_latest;") for (max_id,) in db._cursor: - print(f"signal_latest: {max_id}") - db._cursor.execute(f"SELECT COUNT(signal_key_id), MAX(signal_key_id) FROM {destination_schema}.signal_dim;") + print(f"signal_latest: {max_id} (this should be <= the number above)") + db._cursor.execute(f"SELECT COUNT(signal_key_id), MAX(signal_key_id) FROM signal_dim;") for (count_id, max_id) in db._cursor: - print(f"signal_dim: {count_id}/{max_id}") - db._cursor.execute(f"SELECT COUNT(geo_key_id), MAX(geo_key_id) FROM {destination_schema}.geo_dim;") + print(f"signal_dim: count {count_id} / max {max_id}") + db._cursor.execute(f"SELECT COUNT(geo_key_id), MAX(geo_key_id) FROM geo_dim;") for (count_id, max_id) in db._cursor: - print(f"geo_dim: {count_id}/{max_id}") + print(f"geo_dim: count {count_id} / max {max_id}") + + return upper_lim if __name__ == '__main__': - main() + argparser = argparse.ArgumentParser() + argparser.add_argument('--destination_schema', type=str, default='covid') + argparser.add_argument('--batch_size', type=int, default=20_000_000) + argparser.add_argument('--start_id', type=int, default=0) + argparser.add_argument('--upper_lim_override', type=int) # should default to None + args = argparser.parse_args() + + upper_lim = main(args.destination_schema, args.batch_size, args.start_id, args.upper_lim_override) + print(f"the next execution of this program should include argument: --start_id={upper_lim}") From 45561dc657dd6366cc459da914c5b32c29589c4b Mon Sep 17 00:00:00 2001 From: george haff Date: Wed, 17 Aug 2022 09:48:29 -0400 Subject: [PATCH 07/11] removed bad index hint, added permuted indexes --- src/ddl/v4_schema.sql | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index f12dd1968..1bbf661ac 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -59,8 +59,12 @@ CREATE TABLE signal_history ( `missing_sample_size` INT(1) DEFAULT '0', `legacy_id` BIGINT(20) UNSIGNED, -- not used beyond import of previous data into the v4 schema - UNIQUE INDEX `value_key` (`signal_key_id`, `time_type`, `time_value`, `issue`, `geo_key_id`), - UNIQUE INDEX `val_2_key` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`, `issue`) + UNIQUE INDEX `value_key_tig` (`signal_key_id`, `time_type`, `time_value`, `issue`, `geo_key_id`), + UNIQUE INDEX `value_key_tgi` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`, `issue`), + UNIQUE INDEX `value_key_itg` (`signal_key_id`, `issue`, `time_type`, `time_value`, `geo_key_id`), + UNIQUE INDEX `value_key_igt` (`signal_key_id`, `issue`, `geo_key_id`, `time_type`, `time_value`), + UNIQUE INDEX `value_key_git` (`signal_key_id`, `geo_key_id`, `issue`, `time_type`, `time_value`), + UNIQUE INDEX `value_key_gti` (`signal_key_id`, `geo_key_id`, `time_type`, `time_value`, `issue`) ) ENGINE=InnoDB; @@ -84,7 +88,8 @@ CREATE TABLE signal_latest ( `missing_stderr` INT(1) DEFAULT '0', `missing_sample_size` INT(1) DEFAULT '0', - UNIQUE INDEX `value_key` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`) + UNIQUE INDEX `value_key` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`), + UNIQUE INDEX `value_key_also` (`signal_key_id`, `geo_key_id`, `time_type`, `time_value`) ) ENGINE=InnoDB; @@ -154,10 +159,8 @@ CREATE OR REPLACE VIEW signal_history_v AS `t1`.`geo_key_id` AS `geo_key_id` -- TODO: unnecessary ...remove? FROM `signal_history` `t1` JOIN `signal_dim` `t2` - USE INDEX (PRIMARY) ON `t1`.`signal_key_id` = `t2`.`signal_key_id` JOIN `geo_dim` `t3` - USE INDEX (PRIMARY) ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; @@ -189,10 +192,8 @@ CREATE OR REPLACE VIEW signal_latest_v AS `t1`.`geo_key_id` AS `geo_key_id` -- TODO: unnecessary ...remove? FROM `signal_latest` `t1` JOIN `signal_dim` `t2` - USE INDEX (PRIMARY) ON `t1`.`signal_key_id` = `t2`.`signal_key_id` JOIN `geo_dim` `t3` - USE INDEX (PRIMARY) ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; From 66976b4a113e8bdaaf76e2b671a0b8970a1bc68a Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 18 Aug 2022 12:56:50 -0400 Subject: [PATCH 08/11] changed dbjobs print() calls to proper logging & removed legacy_id column --- src/acquisition/covidcast/database.py | 58 +++++++++++++-------------- src/ddl/v4_schema.sql | 2 - 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 9f67d309a..e246e818f 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -255,11 +255,11 @@ def run_dbjobs(self): INSERT INTO {self.history_table} (signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, - computation_as_of_dt, missing_value, missing_stderr, missing_sample_size, `legacy_id`) + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size) SELECT signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, - computation_as_of_dt, missing_value, missing_stderr, missing_sample_size, `legacy_id` + computation_as_of_dt, missing_value, missing_stderr, missing_sample_size FROM `{self.load_table}` sl INNER JOIN signal_dim sd USING (source, `signal`) INNER JOIN geo_dim gd USING (geo_type, geo_value) @@ -306,36 +306,32 @@ def run_dbjobs(self): DELETE FROM `{self.load_table}` ''' + logger = get_structured_logger("run_dbjobs") import time - time_q = [] - time_q.append(time.time()) - - print('signal_dim_add_new_load:', end='') - self._cursor.execute(signal_dim_add_new_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('geo_dim_add_new_load:', end='') - self._cursor.execute(geo_dim_add_new_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_history_load:', end='') - self._cursor.execute(signal_history_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_latest_load:', end='') - self._cursor.execute(signal_latest_load) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print('signal_load_delete_processed:', end='') - self._cursor.execute(signal_load_delete_processed) - time_q.append(time.time()) - print(f" elapsed: {time_q[-1]-time_q[-2]}s") - - print("done.") + time_q = [time.time()] + + try: + self._cursor.execute(signal_dim_add_new_load) + time_q.append(time.time()) + logger.debug('signal_dim_add_new_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(geo_dim_add_new_load) + time_q.append(time.time()) + logger.debug('geo_dim_add_new_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(signal_history_load) + time_q.append(time.time()) + logger.debug('signal_history_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(signal_latest_load) + time_q.append(time.time()) + logger.debug('signal_latest_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + + self._cursor.execute(signal_load_delete_processed) + time_q.append(time.time()) + logger.debug('signal_load_delete_processed', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2]) + except Exception as e: + raise e return self diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 1bbf661ac..43f8e853a 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -57,7 +57,6 @@ CREATE TABLE signal_history ( `missing_value` INT(1) DEFAULT '0', `missing_stderr` INT(1) DEFAULT '0', `missing_sample_size` INT(1) DEFAULT '0', - `legacy_id` BIGINT(20) UNSIGNED, -- not used beyond import of previous data into the v4 schema UNIQUE INDEX `value_key_tig` (`signal_key_id`, `time_type`, `time_value`, `issue`, `geo_key_id`), UNIQUE INDEX `value_key_tgi` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`, `issue`), @@ -124,7 +123,6 @@ CREATE TABLE signal_load ( `missing_value` INT(1) DEFAULT '0', `missing_stderr` INT(1) DEFAULT '0', `missing_sample_size` INT(1) DEFAULT '0', - `legacy_id` BIGINT(20) UNSIGNED, -- not used beyond import of previous data into the v4 schema UNIQUE INDEX (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) ) ENGINE=InnoDB; From 47b6bf1aa72f35a1f8ad0447c88155a10af332dd Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 18 Aug 2022 14:01:49 -0400 Subject: [PATCH 09/11] changes to simplify conflict resolution in https://github.com/cmu-delphi/delphi-epidata/pull/922 : they come from a pending changeset to incorporate (into this branch, 'v4-srrpp-migrations') improvements made in a parallel 'mergedkey' branch... i fear i may just be kicking more merge conflicts down the road, but hopefully this helps me to be less confused about some current diffs in the meantime. changes are: - signal_latest table definition simplified to automatically inherit structure from signal_history table - removal of unused row-counting methods - docstring update / correction --- src/acquisition/covidcast/database.py | 21 +++------------------ src/ddl/v4_schema.sql | 24 +++--------------------- 2 files changed, 6 insertions(+), 39 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index e246e818f..c2ed5128a 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -106,26 +106,11 @@ def disconnect(self, commit): self._connection.close() - def count_all_rows(self, tablename=None): - """Return the total number of rows in table `covidcast`.""" - - if tablename is None: - tablename = self.history_table - - self._cursor.execute(f'SELECT count(1) FROM `{tablename}`') - + def count_all_load_rows(self): + self._cursor.execute(f'SELECT count(1) FROM `{self.load_table}`') for (num,) in self._cursor: return num - def count_all_history_rows(self): - return self.count_all_rows(self.history_table) - - def count_all_latest_rows(self): - return self.count_all_rows(self.latest_table) - - def count_all_load_rows(self): - return self.count_all_rows(self.load_table) - def _reset_load_table_ai_counter(self): """Corrects the AUTO_INCREMENT counter in the load table. @@ -150,7 +135,7 @@ def insert_or_update_bulk(self, cc_rows): def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False): """ - Insert new rows (or update existing) into the load table. + Insert new rows into the load table and dispatch into dimension and fact tables. """ # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 43f8e853a..def747439 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -66,30 +66,12 @@ CREATE TABLE signal_history ( UNIQUE INDEX `value_key_gti` (`signal_key_id`, `geo_key_id`, `time_type`, `time_value`, `issue`) ) ENGINE=InnoDB; - CREATE TABLE signal_latest ( - `signal_data_id` BIGINT(20) UNSIGNED NOT NULL PRIMARY KEY, - `signal_key_id` BIGINT(20) UNSIGNED NOT NULL, - `geo_key_id` BIGINT(20) UNSIGNED NOT NULL, - `strat_key_id` BIGINT(20) UNSIGNED NOT NULL DEFAULT 1, -- TODO: for future use - `issue` INT(11) NOT NULL, - `data_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed - `time_type` VARCHAR(12) NOT NULL, - `time_value` INT(11) NOT NULL, - `reference_dt` DATETIME(0), -- TODO: for future use - `value` DOUBLE, - `stderr` DOUBLE, - `sample_size` DOUBLE, - `lag` INT(11) NOT NULL, - `value_updated_timestamp` INT(11) NOT NULL, - `computation_as_of_dt` DATETIME(0), -- TODO: for future use ; also "as_of" is problematic and should be renamed - `missing_value` INT(1) DEFAULT '0', - `missing_stderr` INT(1) DEFAULT '0', - `missing_sample_size` INT(1) DEFAULT '0', - + PRIMARY KEY (`signal_data_id`), UNIQUE INDEX `value_key` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`), UNIQUE INDEX `value_key_also` (`signal_key_id`, `geo_key_id`, `time_type`, `time_value`) -) ENGINE=InnoDB; +) ENGINE=InnoDB +SELECT * FROM signal_history; -- NOTE: In production or any non-testing system that should maintain consistency, From eece6a9c4cc96abd38010aa7425663d1d2db4853 Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 18 Aug 2022 14:23:00 -0400 Subject: [PATCH 10/11] whoops, missed adding the changes from a test file to previous commit --- tests/acquisition/covidcast/test_database.py | 22 -------------------- 1 file changed, 22 deletions(-) diff --git a/tests/acquisition/covidcast/test_database.py b/tests/acquisition/covidcast/test_database.py index 71fd429b9..5587c9bbb 100644 --- a/tests/acquisition/covidcast/test_database.py +++ b/tests/acquisition/covidcast/test_database.py @@ -51,28 +51,6 @@ def test_disconnect_with_commit(self): self.assertTrue(connection.commit.called) self.assertTrue(connection.close.called) - def test_count_all_rows_query(self): - """Query to count all rows looks sensible. - - NOTE: Actual behavior is tested by integration test. - """ - - mock_connector = MagicMock() - database = Database() - database.connect(connector_impl=mock_connector) - connection = mock_connector.connect() - cursor = connection.cursor() - cursor.__iter__.return_value = [(123,)] - - num = database.count_all_rows() - - self.assertEqual(num, 123) - self.assertTrue(cursor.execute.called) - - sql = cursor.execute.call_args[0][0].lower() - self.assertIn('select count(1)', sql) - self.assertIn('from `signal_', sql) # note that this table name is incomplete - def test_update_covidcast_meta_cache_query(self): """Query to update the metadata cache looks sensible. From e6f59f3411ffef054f0774c61354be8ecfd28c44 Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 18 Aug 2022 14:57:19 -0400 Subject: [PATCH 11/11] proper index names for signal_latest --- src/ddl/v4_schema.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index def747439..c458269a6 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -68,8 +68,8 @@ CREATE TABLE signal_history ( CREATE TABLE signal_latest ( PRIMARY KEY (`signal_data_id`), - UNIQUE INDEX `value_key` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`), - UNIQUE INDEX `value_key_also` (`signal_key_id`, `geo_key_id`, `time_type`, `time_value`) + UNIQUE INDEX `value_key_tg` (`signal_key_id`, `time_type`, `time_value`, `geo_key_id`), + UNIQUE INDEX `value_key_gt` (`signal_key_id`, `geo_key_id`, `time_type`, `time_value`) ) ENGINE=InnoDB SELECT * FROM signal_history;