Skip to content

Commit c890712

Browse files
authored
Merge pull request #964 from cmu-delphi/metadata_threads_and_etc
Metadata threads and ANALYZE and etc
2 parents 1fe5b9c + 037e211 commit c890712

File tree

6 files changed

+45
-38
lines changed

6 files changed

+45
-38
lines changed

dev/local/Makefile

+8-7
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@
1010
# Creates all prereq images (delphi_database, delphi_python) only if they don't
1111
# exist. If you need to rebuild a prereq, you're probably doing something
1212
# complicated, and can figure out the rebuild command on your own.
13-
#
14-
#
13+
#
14+
#
1515
# Commands:
16-
#
16+
#
1717
# web: Stops currently-running delphi_web_epidata instances, if any.
1818
# Rebuilds delphi_web_epidata image.
1919
# Runs image in the background and pipes stdout to a log file.
20-
#
20+
#
2121
# db: Stops currently-running delphi_database_epidata instances, if any.
2222
# Rebuilds delphi_database_epidata image.
2323
# Runs image in the background and pipes stdout to a log file.
2424
# Blocks until database is ready to receive connections.
25-
#
25+
#
2626
# python: Rebuilds delphi_web_python image. You shouldn't need to do this
2727
# often; only if you are installing a new environment, or have
2828
# made changes to delphi-epidata/dev/docker/python/Dockerfile.
@@ -35,7 +35,7 @@
3535
#
3636
# clean: Cleans up dangling Docker images.
3737
#
38-
#
38+
#
3939
# Optional arguments:
4040
# pdb=1 Drops you into debug mode upon test failure, if running tests.
4141
# test= Only runs tests in the directories provided here, e.g.
@@ -105,6 +105,7 @@ db:
105105
@# Run the database
106106
@docker run --rm -p 127.0.0.1:13306:3306 \
107107
--network delphi-net --name delphi_database_epidata \
108+
--cap-add=sys_nice \
108109
delphi_database_epidata >$(LOG_DB) 2>&1 &
109110

110111
@# Block until DB is ready
@@ -127,7 +128,7 @@ py:
127128
all: web db py
128129

129130
.PHONY=test
130-
test:
131+
test:
131132
@docker run -i --rm --network delphi-net \
132133
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata,target=/usr/src/app/repos/delphi/delphi-epidata,readonly \
133134
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata/src,target=/usr/src/app/delphi/epidata,readonly \

src/acquisition/covidcast/covidcast_meta_cache_updater.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def get_argument_parser():
1515

1616
parser = argparse.ArgumentParser()
1717
parser.add_argument("--log_file", help="filename for log output")
18+
parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing source/signal pairs")
1819
return parser
1920

2021

@@ -24,8 +25,10 @@ def main(args, epidata_impl=Epidata, database_impl=Database):
2425
`args`: parsed command-line arguments
2526
"""
2627
log_file = None
28+
num_threads = None
2729
if (args):
2830
log_file = args.log_file
31+
num_threads = args.num_threads
2932

3033
logger = get_structured_logger(
3134
"metadata_cache_updater",
@@ -37,7 +40,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database):
3740
# fetch metadata
3841
try:
3942
metadata_calculation_start_time = time.time()
40-
metadata = database.compute_covidcast_meta()
43+
metadata = database.compute_covidcast_meta(n_threads=num_threads)
4144
metadata_calculation_interval_in_seconds = time.time() - metadata_calculation_start_time
4245
except:
4346
# clean up before failing

src/acquisition/covidcast/csv_to_database.py

+8-9
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ def upload_archive(
7777
csv_importer_impl=CsvImporter):
7878
"""Upload CSVs to the database and archive them using the specified handlers.
7979
80-
:path_details: output from CsvImporter.find*_csv_files
81-
80+
:path_details: output from CsvImporter.find*_csv_files
81+
8282
:database: an open connection to the epidata database
8383
8484
:handlers: functions for archiving (successful, failed) files
85-
85+
8686
:return: the number of modified rows
8787
"""
8888
archive_as_successful, archive_as_failed = handlers
@@ -130,7 +130,7 @@ def upload_archive(
130130
archive_as_successful(path_src, filename, source, logger)
131131
else:
132132
archive_as_failed(path_src, filename, source,logger)
133-
133+
134134
return total_modified_row_count
135135

136136

@@ -149,7 +149,7 @@ def main(
149149
if not path_details:
150150
logger.info('nothing to do; exiting...')
151151
return
152-
152+
153153
logger.info("Ingesting CSVs", csv_count = len(path_details))
154154

155155
database = database_impl()
@@ -161,13 +161,12 @@ def main(
161161
database,
162162
make_handlers(args.data_dir, args.specific_issue_date),
163163
logger)
164-
logger.info("Finished inserting database rows", row_count = modified_row_count)
165-
# the following print statement serves the same function as the logger.info call above
166-
# print('inserted/updated %d rows' % modified_row_count)
164+
logger.info("Finished inserting/updating database rows", row_count = modified_row_count)
167165
finally:
166+
database.do_analyze()
168167
# unconditionally commit database changes since CSVs have been archived
169168
database.disconnect(True)
170-
169+
171170
logger.info(
172171
"Ingested CSVs into database",
173172
total_runtime_in_seconds=round(time.time() - start_time, 2))

src/acquisition/covidcast/database.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ def _reset_load_table_ai_counter(self):
138138
'1', '1', '1', '1', '1', 1, 1, 1, 1);""")
139139
self._cursor.execute(f'DELETE FROM epimetric_load')
140140

141+
def do_analyze(self):
142+
"""performs and stores key distribution analyses, used for join order and index selection"""
143+
# TODO: consider expanding this to update columns' histograms
144+
# https://dev.mysql.com/doc/refman/8.0/en/analyze-table.html#analyze-table-histogram-statistics-analysis
145+
self._cursor.execute(
146+
f'''ANALYZE TABLE
147+
signal_dim, geo_dim,
148+
{self.load_table}, {self.history_table}, {self.latest_table}''')
149+
output = [self._cursor.column_names] + self._cursor.fetchall()
150+
get_structured_logger('do_analyze').info("ANALYZE results", results=str(output))
151+
141152
def insert_or_update_bulk(self, cc_rows):
142153
return self.insert_or_update_batch(cc_rows)
143154

@@ -476,16 +487,18 @@ def split_list(lst, n):
476487
return total
477488

478489

479-
def compute_covidcast_meta(self, table_name=None):
490+
def compute_covidcast_meta(self, table_name=None, n_threads=None):
480491
"""Compute and return metadata on all COVIDcast signals."""
481492
logger = get_structured_logger("compute_covidcast_meta")
482493

483494
if table_name is None:
484495
table_name = self.latest_view
485496

486-
n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server
487-
# NOTE: this may present a small problem if this job runs on different hardware than the db,
488-
# but we should not run into that issue in prod.
497+
if n_threads is None:
498+
logger.info("n_threads unspecified, automatically choosing based on number of detected cores...")
499+
n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server
500+
# NOTE: this may present a small problem if this job runs on different hardware than the db,
501+
# which is why this value can be overriden by optional argument.
489502
logger.info(f"using {n_threads} workers")
490503

491504
srcsigs = Queue() # multi-consumer threadsafe!

src/ddl/migrations/v4_renaming.sql

+4-8
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,8 @@ CREATE OR REPLACE VIEW epimetric_full_v AS
5454
`t1`.`signal_key_id` AS `signal_key_id`,
5555
`t1`.`geo_key_id` AS `geo_key_id`
5656
FROM `epimetric_full` `t1`
57-
JOIN `signal_dim` `t2`
58-
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
59-
JOIN `geo_dim` `t3`
60-
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
57+
JOIN `signal_dim` `t2` USING (`signal_key_id`)
58+
JOIN `geo_dim` `t3` USING (`geo_key_id`);
6159
CREATE OR REPLACE VIEW epimetric_latest_v AS
6260
SELECT
6361
1 AS `is_latest_issue`, -- provides column-compatibility to match `covidcast` table
@@ -85,10 +83,8 @@ CREATE OR REPLACE VIEW epimetric_latest_v AS
8583
`t1`.`signal_key_id` AS `signal_key_id`,
8684
`t1`.`geo_key_id` AS `geo_key_id`
8785
FROM `epimetric_latest` `t1`
88-
JOIN `signal_dim` `t2`
89-
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
90-
JOIN `geo_dim` `t3`
91-
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
86+
JOIN `signal_dim` `t2` USING (`signal_key_id`)
87+
JOIN `geo_dim` `t3` USING (`geo_key_id`);
9288

9389

9490
-- re-create `epidata` alias VIEWs

src/ddl/v4_schema.sql

+4-9
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ CREATE TABLE geo_dim (
88
UNIQUE INDEX `geo_dim_index` (`geo_type`, `geo_value`)
99
) ENGINE=InnoDB;
1010

11-
1211
CREATE TABLE signal_dim (
1312
`signal_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
1413
`source` VARCHAR(32) NOT NULL,
@@ -124,10 +123,8 @@ CREATE OR REPLACE VIEW epimetric_full_v AS
124123
`t1`.`signal_key_id` AS `signal_key_id`,
125124
`t1`.`geo_key_id` AS `geo_key_id`
126125
FROM `epimetric_full` `t1`
127-
JOIN `signal_dim` `t2`
128-
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
129-
JOIN `geo_dim` `t3`
130-
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
126+
JOIN `signal_dim` `t2` USING (`signal_key_id`)
127+
JOIN `geo_dim` `t3` USING (`geo_key_id`);
131128

132129
CREATE OR REPLACE VIEW epimetric_latest_v AS
133130
SELECT
@@ -156,10 +153,8 @@ CREATE OR REPLACE VIEW epimetric_latest_v AS
156153
`t1`.`signal_key_id` AS `signal_key_id`,
157154
`t1`.`geo_key_id` AS `geo_key_id`
158155
FROM `epimetric_latest` `t1`
159-
JOIN `signal_dim` `t2`
160-
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
161-
JOIN `geo_dim` `t3`
162-
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
156+
JOIN `signal_dim` `t2` USING (`signal_key_id`)
157+
JOIN `geo_dim` `t3` USING (`geo_key_id`);
163158

164159

165160
CREATE TABLE `covidcast_meta_cache` (

0 commit comments

Comments
 (0)