diff --git a/integrations/acquisition/covidcast/delete_batch.csv b/integrations/acquisition/covidcast/delete_batch.csv new file mode 100644 index 000000000..7f8678157 --- /dev/null +++ b/integrations/acquisition/covidcast/delete_batch.csv @@ -0,0 +1,3 @@ +geo_id,value,stderr,sample_size,issue,time_value,geo_type,signal,source +d_nonlatest,0,0,0,1,0,geo,sig,src +d_latest, 0,0,0,3,0,geo,sig,src \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_delete_batch.py b/integrations/acquisition/covidcast/test_delete_batch.py new file mode 100644 index 000000000..30f3239cb --- /dev/null +++ b/integrations/acquisition/covidcast/test_delete_batch.py @@ -0,0 +1,124 @@ +"""Integration tests for covidcast's batch deletions.""" + +# standard library +from collections import namedtuple +import unittest +from os import path + +# third party +import mysql.connector + +# first party +from delphi_utils import Nans +from delphi.epidata.client.delphi_epidata import Epidata +import delphi.operations.secrets as secrets + +# py3tester coverage target (equivalent to `import *`) +__test_target__ = 'delphi.epidata.acquisition.covidcast.database' + +Example = namedtuple("example", "given expected") + +class DeleteBatch(unittest.TestCase): + """Tests batch deletions""" + + + def setUp(self): + """Perform per-test setup.""" + + # connect to the `epidata` database and clear the `covidcast` table + cnx = mysql.connector.connect( + user='user', + password='pass', + host='delphi_database_epidata', + database='epidata') + cur = cnx.cursor() + cur.execute('truncate table covidcast') + cnx.commit() + cur.close() + + # make connection and cursor available to test cases + self.cnx = cnx + self.cur = cnx.cursor() + + # use the local instance of the epidata database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + # use the local instance of the Epidata API + Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + + # will use secrets as set above + from delphi.epidata.acquisition.covidcast.database import Database + self.database = Database() + self.database.connect() + + def tearDown(self): + """Perform per-test teardown.""" + self.cur.close() + self.cnx.close() + + @unittest.skip("Database user would require FILE privileges") + def test_delete_from_file(self): + self._test_delete_batch(path.join(path.dirname(__file__), "delete_batch.csv")) + + def test_delete_from_tuples(self): + with open(path.join(path.dirname(__file__), "delete_batch.csv")) as f: + rows=[] + for line in f: + rows.append(line.strip().split(",")) + rows = [r + ["day"] for r in rows[1:]] + self._test_delete_batch(rows) + + def _test_delete_batch(self, cc_deletions): + # load sample data + rows = [ + # geo_value issue is_latest + ["d_nonlatest", 1, 0], + ["d_nonlatest", 2, 1], + ["d_latest", 1, 0], + ["d_latest", 2, 0], + ["d_latest", 3, 1] + ] + for time_value in [0, 1]: + self.cur.executemany(f''' + INSERT INTO covidcast + (`geo_value`, `issue`, `is_latest_issue`, `time_value`, + `source`, `signal`, `time_type`, `geo_type`, + value_updated_timestamp, direction_updated_timestamp, value, stderr, sample_size, lag, direction) + VALUES + (%s, %s, %s, {time_value}, + "src", "sig", "day", "geo", + 0, 0, 0, 0, 0, 0, 0) + ''', rows) + self.cnx.commit() + + # delete entries + self.database.delete_batch(cc_deletions) + + # verify remaining data is still there + self.cur.execute("select * from covidcast") + result = list(self.cur) + self.assertEqual(len(result), 2*len(rows)-2) + + examples = [ + # verify deletions are gone + Example( + 'select * from covidcast where time_value=0 and geo_value="d_nonlatest" and issue=1', + [] + ), + Example( + 'select * from covidcast where time_value=0 and geo_value="d_latest" and issue=3', + [] + ), + # verify is_latest_issue flag was corrected + Example( + 'select geo_value, issue from covidcast where time_value=0 and is_latest_issue=1', + [('d_nonlatest', 2), + ('d_latest', 2)] + ) + ] + + for ex in examples: + self.cur.execute(ex.given) + result = list(self.cur) + self.assertEqual(result, ex.expected, ex.given) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 35d443cdc..70660b60c 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -69,7 +69,6 @@ class Database: def connect(self, connector_impl=mysql.connector): """Establish a connection to the database.""" - u, p = secrets.db.epi self._connector_impl = connector_impl self._connection = self._connector_impl.connect( @@ -247,6 +246,104 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False self._cursor.execute(drop_tmp_table_sql) return total + def delete_batch(self, cc_deletions): + """ + Remove rows specified by a csv file or list of tuples. + + If cc_deletions is a filename, the file should include a header row and use the following field order: + - geo_id + - value (ignored) + - stderr (ignored) + - sample_size (ignored) + - issue (YYYYMMDD format) + - time_value (YYYYMMDD format) + - geo_type + - signal + - source + + If cc_deletions is a list of tuples, the tuples should use the following field order (=same as above, plus time_type): + - geo_id + - value (ignored) + - stderr (ignored) + - sample_size (ignored) + - issue (YYYYMMDD format) + - time_value (YYYYMMDD format) + - geo_type + - signal + - source + - time_type + """ + tmp_table_name = "tmp_delete_table" + create_tmp_table_sql = f''' +CREATE OR REPLACE TABLE {tmp_table_name} LIKE covidcast; +''' + + amend_tmp_table_sql = f''' +ALTER TABLE {tmp_table_name} ADD COLUMN covidcast_id bigint unsigned; +''' + + load_tmp_table_infile_sql = f''' +LOAD DATA INFILE "{cc_deletions}" +INTO TABLE {tmp_table_name} +FIELDS TERMINATED BY "," +IGNORE 1 LINES +(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`) +SET time_type="day"; +''' + + load_tmp_table_insert_sql = f''' +INSERT INTO {tmp_table_name} +(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`, `time_type`, +`value_updated_timestamp`, `direction_updated_timestamp`, `lag`, `direction`, `is_latest_issue`) +VALUES +(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, +0, 0, 0, 0, 0) +''' + + add_id_sql = f''' +UPDATE {tmp_table_name} d INNER JOIN covidcast c USING +(`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) +SET d.covidcast_id=c.id, d.is_latest_issue=c.is_latest_issue; +''' + + delete_sql = f''' +DELETE c FROM {tmp_table_name} d INNER JOIN covidcast c WHERE d.covidcast_id=c.id; +''' + + fix_latest_issue_sql = f''' +UPDATE +(SELECT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, MAX(`issue`) AS `issue` + FROM + (SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` + FROM {tmp_table_name} WHERE `is_latest_issue`=1) AS was_latest + LEFT JOIN covidcast c + USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`) + GROUP BY `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` +) AS TMP +LEFT JOIN `covidcast` +USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) +SET `covidcast`.`is_latest_issue`=1; +''' + + drop_tmp_table_sql = f'DROP TABLE {tmp_table_name}' + try: + self._cursor.execute(create_tmp_table_sql) + self._cursor.execute(amend_tmp_table_sql) + if isinstance(cc_deletions, str): + self._cursor.execute(load_tmp_table_infile_sql) + elif isinstance(cc_deletions, list): + self._cursor.executemany(load_tmp_table_insert_sql, cc_deletions) + else: + raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}") + self._cursor.execute(add_id_sql) + self._cursor.execute(delete_sql) + self._cursor.execute(fix_latest_issue_sql) + self._connection.commit() + except Exception as e: + raise e + finally: + self._cursor.execute(drop_tmp_table_sql) + def compute_covidcast_meta(self, table_name='covidcast', use_index=True): """Compute and return metadata on all non-WIP COVIDcast signals.""" logger = get_structured_logger("compute_covidcast_meta") diff --git a/src/acquisition/covidcast/delete_batch.py b/src/acquisition/covidcast/delete_batch.py new file mode 100644 index 000000000..33bc6751d --- /dev/null +++ b/src/acquisition/covidcast/delete_batch.py @@ -0,0 +1,61 @@ +"""Deletes large numbers of rows from covidcast based on a CSV""" + +# standard library +import argparse +import os +import time + +# first party +from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger + + +def get_argument_parser(): + """Define command line arguments.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + '--deletion_dir', + help='directory where deletion CSVs are stored') + parser.add_argument( + '--log_file', + help="filename for log output (defaults to stdout)") + return parser + +def handle_file(deletion_file, database): + logger.info("Deleting from csv file", filename=deletion_file) + rows = [] + with open(deletion_file) as f: + for line in f: + rows.append(line.strip().split(",")) + rows = rows[1:] + try: + n = database.delete_batch(rows) + logger.info("Deleted database rows", row_count=n) + return n + except Exception as e: + logger.exception('Exception while deleting rows:', e) + database.rollback() + return 0 + +def main(args): + """Delete rows from covidcast.""" + + logger = get_structured_logger("csv_deletion", filename=args.log_file) + start_time = time.time() + database = Database() + database.connect() + all_n = 0 + + try: + for deletion_file in sorted(glob.glob(os.path.join(args.deletion_dir, '*.csv'))): + all_n += handle_file(deletion_file) + finally: + database.disconnect(True) + + logger.info( + "Deleted CSVs from database", + total_runtime_in_seconds=round(time.time() - start_time, 2), row_count=all_n) + +if __name__ == '__main__': + main(get_argument_parser().parse_args())