Skip to content

Commit 6dc49bf

Browse files
committed
Delete rows specified by CSV
1 parent 1849ce6 commit 6dc49bf

File tree

3 files changed

+282
-1
lines changed

3 files changed

+282
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
"""Integration tests for covidcast's batch deletions."""
2+
3+
# standard library
4+
from collections import namedtuple
5+
import unittest
6+
from os import path
7+
8+
# third party
9+
import mysql.connector
10+
11+
# first party
12+
from delphi_utils import Nans
13+
from delphi.epidata.client.delphi_epidata import Epidata
14+
import delphi.operations.secrets as secrets
15+
16+
# py3tester coverage target (equivalent to `import *`)
17+
__test_target__ = 'delphi.epidata.acquisition.covidcast.database'
18+
19+
Example = namedtuple("example", "given expected")
20+
21+
class DeleteBatch(unittest.TestCase):
22+
"""Tests batch deletions"""
23+
24+
25+
def setUp(self):
26+
"""Perform per-test setup."""
27+
28+
# connect to the `epidata` database and clear the `covidcast` table
29+
cnx = mysql.connector.connect(
30+
user='user',
31+
password='pass',
32+
host='delphi_database_epidata',
33+
database='epidata')
34+
cur = cnx.cursor()
35+
cur.execute('truncate table covidcast')
36+
cnx.commit()
37+
cur.close()
38+
39+
# make connection and cursor available to test cases
40+
self.cnx = cnx
41+
self.cur = cnx.cursor()
42+
43+
# use the local instance of the epidata database
44+
secrets.db.host = 'delphi_database_epidata'
45+
secrets.db.epi = ('user', 'pass')
46+
47+
# use the local instance of the Epidata API
48+
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'
49+
50+
# will use secrets as set above
51+
from delphi.epidata.acquisition.covidcast.database import Database
52+
self.database = Database()
53+
self.database.connect()
54+
55+
def tearDown(self):
56+
"""Perform per-test teardown."""
57+
self.cur.close()
58+
self.cnx.close()
59+
60+
@unittest.skip("Database user would require FILE privileges")
61+
def test_delete_from_file(self):
62+
self._test_delete_batch(path.join(path.dirname(__file__), "delete_batch.csv"))
63+
def test_delete_from_tuples(self):
64+
with open(path.join(path.dirname(__file__), "delete_batch.csv")) as f:
65+
rows=[]
66+
for line in f:
67+
rows.append(line.strip().split(","))
68+
rows = [r + ["day"] for r in rows[1:]]
69+
self._test_delete_batch(rows)
70+
def _test_delete_batch(self, cc_deletions):
71+
# load sample data
72+
rows = [
73+
# geo_value issue is_latest
74+
["d_nonlatest", 1, 0],
75+
["d_nonlatest", 2, 1],
76+
["d_latest", 1, 0],
77+
["d_latest", 2, 0],
78+
["d_latest", 3, 1]
79+
]
80+
for time_value in [0, 1]:
81+
self.cur.executemany(f'''
82+
INSERT INTO covidcast
83+
(`geo_value`, `issue`, `is_latest_issue`, `time_value`,
84+
`source`, `signal`, `time_type`, `geo_type`,
85+
value_updated_timestamp, direction_updated_timestamp, value, stderr, sample_size, lag, direction)
86+
VALUES
87+
(%s, %s, %s, {time_value},
88+
"src", "sig", "day", "geo",
89+
0, 0, 0, 0, 0, 0, 0)
90+
''', rows)
91+
self.cnx.commit()
92+
93+
# delete entries
94+
self.database.delete_batch(cc_deletions)
95+
96+
# verify remaining data is still there
97+
self.cur.execute("select * from covidcast")
98+
result = list(self.cur)
99+
self.assertEqual(len(result), 2*len(rows)-2)
100+
101+
examples = [
102+
# verify deletions are gone
103+
Example(
104+
'select * from covidcast where time_value=0 and geo_value="d_nonlatest" and issue=1',
105+
[]
106+
),
107+
Example(
108+
'select * from covidcast where time_value=0 and geo_value="d_latest" and issue=3',
109+
[]
110+
),
111+
# verify is_latest_issue flag was corrected
112+
Example(
113+
'select geo_value, issue from covidcast where time_value=0 and is_latest_issue=1',
114+
[('d_nonlatest', 2),
115+
('d_latest', 2)]
116+
)
117+
]
118+
119+
for ex in examples:
120+
self.cur.execute(ex.given)
121+
result = list(self.cur)
122+
self.assertEqual(result, ex.expected, ex.given)

src/acquisition/covidcast/database.py

+99-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class Database:
6969

7070
def connect(self, connector_impl=mysql.connector):
7171
"""Establish a connection to the database."""
72-
72+
print(secrets.db.host, secrets.db.epi)
7373
u, p = secrets.db.epi
7474
self._connector_impl = connector_impl
7575
self._connection = self._connector_impl.connect(
@@ -247,6 +247,104 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
247247
self._cursor.execute(drop_tmp_table_sql)
248248
return total
249249

250+
def delete_batch(self, cc_deletions):
251+
"""
252+
Remove rows specified by a csv file or list of tuples.
253+
254+
File should include a header row and use the following field order:
255+
- geo_id
256+
- value (ignored)
257+
- stderr (ignored)
258+
- sample_size (ignored)
259+
- issue (YYYYMMDD format)
260+
- time_value (YYYYMMDD format)
261+
- geo_type
262+
- signal
263+
- source
264+
265+
List of tuples should use the following field order (same as above, plus time_type):
266+
- geo_id
267+
- value (ignored)
268+
- stderr (ignored)
269+
- sample_size (ignored)
270+
- issue (YYYYMMDD format)
271+
- time_value (YYYYMMDD format)
272+
- geo_type
273+
- signal
274+
- source
275+
- time_type
276+
"""
277+
tmp_table_name = "tmp_delete_table"
278+
create_tmp_table_sql = f'''
279+
CREATE OR REPLACE TABLE {tmp_table_name} LIKE covidcast;
280+
'''
281+
282+
amend_tmp_table_sql = f'''
283+
ALTER TABLE {tmp_table_name} ADD COLUMN covidcast_id bigint unsigned;
284+
'''
285+
286+
load_tmp_table_infile_sql = f'''
287+
LOAD DATA INFILE "{cc_deletions}"
288+
INTO TABLE {tmp_table_name}
289+
FIELDS TERMINATED BY ","
290+
IGNORE 1 LINES
291+
(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`)
292+
SET time_type="day";
293+
'''
294+
295+
load_tmp_table_insert_sql = f'''
296+
INSERT INTO {tmp_table_name}
297+
(`geo_value`, `value`, `stderr`, `sample_size`, `issue`, `time_value`, `geo_type`, `signal`, `source`, `time_type`,
298+
`value_updated_timestamp`, `direction_updated_timestamp`, `lag`, `direction`, `is_latest_issue`)
299+
VALUES
300+
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
301+
0, 0, 0, 0, 0)
302+
'''
303+
304+
add_id_sql = f'''
305+
UPDATE {tmp_table_name} d INNER JOIN covidcast c USING
306+
(`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`)
307+
SET d.covidcast_id=c.id, d.is_latest_issue=c.is_latest_issue;
308+
'''
309+
310+
delete_sql = f'''
311+
DELETE c FROM {tmp_table_name} d INNER JOIN covidcast c WHERE d.covidcast_id=c.id;
312+
'''
313+
314+
fix_latest_issue_sql = f'''
315+
UPDATE
316+
(SELECT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, MAX(`issue`) AS `issue`
317+
FROM
318+
(SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`
319+
FROM {tmp_table_name} WHERE `is_latest_issue`=1) AS was_latest
320+
LEFT JOIN covidcast c
321+
USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`)
322+
GROUP BY `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`
323+
) AS TMP
324+
LEFT JOIN `covidcast`
325+
USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`)
326+
SET `is_latest_issue`=1;
327+
'''
328+
329+
drop_tmp_table_sql = f'DROP TABLE {tmp_table_name}'
330+
try:
331+
self._cursor.execute(create_tmp_table_sql)
332+
self._cursor.execute(amend_tmp_table_sql)
333+
if isinstance(cc_deletions, str):
334+
self._cursor.execute(load_tmp_table_infile_sql)
335+
elif isinstance(cc_deletions, list):
336+
self._cursor.executemany(load_tmp_table_insert_sql, cc_deletions)
337+
else:
338+
raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}")
339+
self._cursor.execute(add_id_sql)
340+
self._cursor.execute(delete_sql)
341+
self._cursor.execute(fix_latest_issue_sql)
342+
self._connection.commit()
343+
except Exception as e:
344+
raise e
345+
finally:
346+
self._cursor.execute(drop_tmp_table_sql)
347+
250348
def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
251349
"""Compute and return metadata on all non-WIP COVIDcast signals."""
252350
logger = get_structured_logger("compute_covidcast_meta")
+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""Deletes large numbers of rows from covidcast based on a CSV"""
2+
3+
# standard library
4+
import argparse
5+
import os
6+
import time
7+
8+
# first party
9+
from delphi.epidata.acquisition.covidcast.database import Database
10+
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
11+
12+
13+
def get_argument_parser():
14+
"""Define command line arguments."""
15+
16+
parser = argparse.ArgumentParser()
17+
parser.add_argument(
18+
'--deletion_dir',
19+
help='directory where deletion CSVs are stored')
20+
parser.add_argument(
21+
'--log_file',
22+
help="filename for log output (defaults to stdout)")
23+
return parser
24+
25+
def handle_file(deletion_file, database):
26+
logger.info("Deleting from csv file", filename=deletion_file)
27+
rows = []
28+
with open(deletion_file) as f:
29+
for line in f:
30+
rows.append(line.strip().split(","))
31+
rows = rows[1:]
32+
try:
33+
n = database.delete_batch(rows)
34+
logger.info("Deleted database rows", row_count=n)
35+
return n
36+
except Exception as e:
37+
logger.exception('Exception while deleting rows:', e)
38+
database.rollback()
39+
return 0
40+
41+
def main(args):
42+
"""Delete rows from covidcast."""
43+
44+
logger = get_structured_logger("csv_deletion", filename=args.log_file)
45+
start_time = time.time()
46+
database = Database()
47+
database.connect()
48+
all_n = 0
49+
50+
try:
51+
for deletion_file in sorted(glob.glob(os.path.join(args.deletion_dir, '*.csv'))):
52+
all_n += handle_file(deletion_file)
53+
finally:
54+
database.disconnect(True)
55+
56+
logger.info(
57+
"Deleted CSVs from database",
58+
total_runtime_in_seconds=round(time.time() - start_time, 2), row_count=all_n)
59+
60+
if __name__ == '__main__':
61+
main(get_argument_parser().parse_args())

0 commit comments

Comments
 (0)