Skip to content

Delete rows specified by CSV #840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions integrations/acquisition/covidcast/delete_batch.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
geo_id,value,stderr,sample_size,issue,time_value,geo_type,signal,source
d_nonlatest,0,0,0,1,0,geo,sig,src
d_latest, 0,0,0,3,0,geo,sig,src
122 changes: 122 additions & 0 deletions integrations/acquisition/covidcast/test_delete_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Integration tests for covidcast's batch deletions."""

# standard library
from collections import namedtuple
import unittest
from os import path

# third party
import mysql.connector

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
__test_target__ = 'delphi.epidata.acquisition.covidcast.database'

Example = namedtuple("example", "given expected")

class DeleteBatch(unittest.TestCase):
"""Tests batch deletions"""


def setUp(self):
"""Perform per-test setup."""

# connect to the `epidata` database and clear the `covidcast` table
cnx = mysql.connector.connect(
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
cur = cnx.cursor()
cur.execute('truncate table covidcast')
cnx.commit()
cur.close()

# make connection and cursor available to test cases
self.cnx = cnx
self.cur = cnx.cursor()

# use the local instance of the epidata database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

# use the local instance of the Epidata API
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'

# will use secrets as set above
from delphi.epidata.acquisition.covidcast.database import Database
self.database = Database()
self.database.connect()

def tearDown(self):
"""Perform per-test teardown."""
self.cur.close()
self.cnx.close()

@unittest.skip("Database user would require FILE privileges")
def test_delete_from_file(self):
self._test_delete_batch(path.join(path.dirname(__file__), "delete_batch.csv"))
def test_delete_from_tuples(self):
with open(path.join(path.dirname(__file__), "delete_batch.csv")) as f:
rows=[]
for line in f:
rows.append(line.strip().split(","))
rows = [r + ["day"] for r in rows[1:]]
self._test_delete_batch(rows)
def _test_delete_batch(self, cc_deletions):
# load sample data
rows = [
# geo_value issue is_latest
["d_nonlatest", 1, 0],
["d_nonlatest", 2, 1],
["d_latest", 1, 0],
["d_latest", 2, 0],
["d_latest", 3, 1]
]
for time_value in [0, 1]:
self.cur.executemany(f'''
INSERT INTO covidcast
(`geo_value`, `issue`, `is_latest_issue`, `time_value`,
`source`, `signal`, `time_type`, `geo_type`,
value_updated_timestamp, direction_updated_timestamp, value, stderr, sample_size, lag, direction)
VALUES
(%s, %s, %s, {time_value},
"src", "sig", "day", "geo",
0, 0, 0, 0, 0, 0, 0)
''', rows)
self.cnx.commit()

# delete entries
self.database.delete_batch(cc_deletions)

# verify remaining data is still there
self.cur.execute("select * from covidcast")
result = list(self.cur)
self.assertEqual(len(result), 2*len(rows)-2)

examples = [
# verify deletions are gone
Example(
'select * from covidcast where time_value=0 and geo_value="d_nonlatest" and issue=1',
[]
),
Example(
'select * from covidcast where time_value=0 and geo_value="d_latest" and issue=3',
[]
),
# verify is_latest_issue flag was corrected
Example(
'select geo_value, issue from covidcast where time_value=0 and is_latest_issue=1',
[('d_nonlatest', 2),
('d_latest', 2)]
)
]

for ex in examples:
self.cur.execute(ex.given)
result = list(self.cur)
self.assertEqual(result, ex.expected, ex.given)
99 changes: 98 additions & 1 deletion src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class Database:

def connect(self, connector_impl=mysql.connector):
"""Establish a connection to the database."""

u, p = secrets.db.epi
self._connector_impl = connector_impl
self._connection = self._connector_impl.connect(
Expand Down Expand Up @@ -247,6 +246,104 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
self._cursor.execute(drop_tmp_table_sql)
return total

def delete_batch(self, cc_deletions):
"""
Remove rows specified by a csv file or list of tuples.

If cc_deletions is a filename, the file should include a header row and use the following field order:
- geo_id
- value (ignored)
- stderr (ignored)
- sample_size (ignored)
- issue (YYYYMMDD format)
- time_value (YYYYMMDD format)
- geo_type
- signal
- source

If cc_deletions is a list of tuples, the tuples should use the following field order (=same as above, plus time_type):
- geo_id
- value (ignored)
- stderr (ignored)
- sample_size (ignored)
- issue (YYYYMMDD format)
- time_value (YYYYMMDD format)
- geo_type
- signal
- source
- time_type
"""
tmp_table_name = "tmp_delete_table"
create_tmp_table_sql = f'''
CREATE OR REPLACE TABLE {tmp_table_name} LIKE covidcast;
'''

amend_tmp_table_sql = f'''
ALTER TABLE {tmp_table_name} ADD COLUMN covidcast_id bigint unsigned;
'''

load_tmp_table_infile_sql = f'''
LOAD DATA INFILE "{cc_deletions}"
INTO TABLE {tmp_table_name}
FIELDS TERMINATED BY ","
IGNORE 1 LINES
(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`)
SET time_type="day";
'''

load_tmp_table_insert_sql = f'''
INSERT INTO {tmp_table_name}
(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`, `time_type`,
`value_updated_timestamp`, `direction_updated_timestamp`, `lag`, `direction`, `is_latest_issue`)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
0, 0, 0, 0, 0)
'''

add_id_sql = f'''
UPDATE {tmp_table_name} d INNER JOIN covidcast c USING
(`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`)
SET d.covidcast_id=c.id, d.is_latest_issue=c.is_latest_issue;
'''

delete_sql = f'''
DELETE c FROM {tmp_table_name} d INNER JOIN covidcast c WHERE d.covidcast_id=c.id;
'''

fix_latest_issue_sql = f'''
UPDATE
(SELECT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, MAX(`issue`) AS `issue`
FROM
(SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`
FROM {tmp_table_name} WHERE `is_latest_issue`=1) AS was_latest
LEFT JOIN covidcast c
USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`)
GROUP BY `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`
) AS TMP
LEFT JOIN `covidcast`
USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`)
SET `is_latest_issue`=1;
'''

drop_tmp_table_sql = f'DROP TABLE {tmp_table_name}'
try:
self._cursor.execute(create_tmp_table_sql)
self._cursor.execute(amend_tmp_table_sql)
if isinstance(cc_deletions, str):
self._cursor.execute(load_tmp_table_infile_sql)
elif isinstance(cc_deletions, list):
self._cursor.executemany(load_tmp_table_insert_sql, cc_deletions)
else:
raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}")
self._cursor.execute(add_id_sql)
self._cursor.execute(delete_sql)
self._cursor.execute(fix_latest_issue_sql)
self._connection.commit()
except Exception as e:
raise e
finally:
self._cursor.execute(drop_tmp_table_sql)

def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
"""Compute and return metadata on all non-WIP COVIDcast signals."""
logger = get_structured_logger("compute_covidcast_meta")
Expand Down
61 changes: 61 additions & 0 deletions src/acquisition/covidcast/delete_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Deletes large numbers of rows from covidcast based on a CSV"""

# standard library
import argparse
import os
import time

# first party
from delphi.epidata.acquisition.covidcast.database import Database
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger


def get_argument_parser():
"""Define command line arguments."""

parser = argparse.ArgumentParser()
parser.add_argument(
'--deletion_dir',
help='directory where deletion CSVs are stored')
parser.add_argument(
'--log_file',
help="filename for log output (defaults to stdout)")
return parser

def handle_file(deletion_file, database):
logger.info("Deleting from csv file", filename=deletion_file)
rows = []
with open(deletion_file) as f:
for line in f:
rows.append(line.strip().split(","))
rows = rows[1:]
try:
n = database.delete_batch(rows)
logger.info("Deleted database rows", row_count=n)
return n
except Exception as e:
logger.exception('Exception while deleting rows:', e)
database.rollback()
return 0

def main(args):
"""Delete rows from covidcast."""

logger = get_structured_logger("csv_deletion", filename=args.log_file)
start_time = time.time()
database = Database()
database.connect()
all_n = 0

try:
for deletion_file in sorted(glob.glob(os.path.join(args.deletion_dir, '*.csv'))):
all_n += handle_file(deletion_file)
finally:
database.disconnect(True)

logger.info(
"Deleted CSVs from database",
total_runtime_in_seconds=round(time.time() - start_time, 2), row_count=all_n)

if __name__ == '__main__':
main(get_argument_parser().parse_args())