Skip to content

Metadata threads and ANALYZE and etc #964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions dev/local/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@
# Creates all prereq images (delphi_database, delphi_python) only if they don't
# exist. If you need to rebuild a prereq, you're probably doing something
# complicated, and can figure out the rebuild command on your own.
#
#
#
#
# Commands:
#
#
# web: Stops currently-running delphi_web_epidata instances, if any.
# Rebuilds delphi_web_epidata image.
# Runs image in the background and pipes stdout to a log file.
#
#
# db: Stops currently-running delphi_database_epidata instances, if any.
# Rebuilds delphi_database_epidata image.
# Runs image in the background and pipes stdout to a log file.
# Blocks until database is ready to receive connections.
#
#
# python: Rebuilds delphi_web_python image. You shouldn't need to do this
# often; only if you are installing a new environment, or have
# made changes to delphi-epidata/dev/docker/python/Dockerfile.
Expand All @@ -35,7 +35,7 @@
#
# clean: Cleans up dangling Docker images.
#
#
#
# Optional arguments:
# pdb=1 Drops you into debug mode upon test failure, if running tests.
# test= Only runs tests in the directories provided here, e.g.
Expand Down Expand Up @@ -105,6 +105,7 @@ db:
@# Run the database
@docker run --rm -p 127.0.0.1:13306:3306 \
--network delphi-net --name delphi_database_epidata \
--cap-add=sys_nice \
delphi_database_epidata >$(LOG_DB) 2>&1 &

@# Block until DB is ready
Expand All @@ -127,7 +128,7 @@ py:
all: web db py

.PHONY=test
test:
test:
@docker run -i --rm --network delphi-net \
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata,target=/usr/src/app/repos/delphi/delphi-epidata,readonly \
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata/src,target=/usr/src/app/delphi/epidata,readonly \
Expand Down
5 changes: 4 additions & 1 deletion src/acquisition/covidcast/covidcast_meta_cache_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def get_argument_parser():

parser = argparse.ArgumentParser()
parser.add_argument("--log_file", help="filename for log output")
parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing source/signal pairs")
return parser


Expand All @@ -24,8 +25,10 @@ def main(args, epidata_impl=Epidata, database_impl=Database):
`args`: parsed command-line arguments
"""
log_file = None
num_threads = None
if (args):
log_file = args.log_file
num_threads = args.num_threads

logger = get_structured_logger(
"metadata_cache_updater",
Expand All @@ -37,7 +40,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database):
# fetch metadata
try:
metadata_calculation_start_time = time.time()
metadata = database.compute_covidcast_meta()
metadata = database.compute_covidcast_meta(n_threads=num_threads)
metadata_calculation_interval_in_seconds = time.time() - metadata_calculation_start_time
except:
# clean up before failing
Expand Down
17 changes: 8 additions & 9 deletions src/acquisition/covidcast/csv_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ def upload_archive(
csv_importer_impl=CsvImporter):
"""Upload CSVs to the database and archive them using the specified handlers.

:path_details: output from CsvImporter.find*_csv_files
:path_details: output from CsvImporter.find*_csv_files

:database: an open connection to the epidata database

:handlers: functions for archiving (successful, failed) files

:return: the number of modified rows
"""
archive_as_successful, archive_as_failed = handlers
Expand Down Expand Up @@ -130,7 +130,7 @@ def upload_archive(
archive_as_successful(path_src, filename, source, logger)
else:
archive_as_failed(path_src, filename, source,logger)

return total_modified_row_count


Expand All @@ -149,7 +149,7 @@ def main(
if not path_details:
logger.info('nothing to do; exiting...')
return

logger.info("Ingesting CSVs", csv_count = len(path_details))

database = database_impl()
Expand All @@ -161,13 +161,12 @@ def main(
database,
make_handlers(args.data_dir, args.specific_issue_date),
logger)
logger.info("Finished inserting database rows", row_count = modified_row_count)
# the following print statement serves the same function as the logger.info call above
# print('inserted/updated %d rows' % modified_row_count)
logger.info("Finished inserting/updating database rows", row_count = modified_row_count)
finally:
database.do_analyze()
# unconditionally commit database changes since CSVs have been archived
database.disconnect(True)

logger.info(
"Ingested CSVs into database",
total_runtime_in_seconds=round(time.time() - start_time, 2))
Expand Down
21 changes: 17 additions & 4 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ def _reset_load_table_ai_counter(self):
'1', '1', '1', '1', '1', 1, 1, 1, 1);""")
self._cursor.execute(f'DELETE FROM epimetric_load')

def do_analyze(self):
"""performs and stores key distribution analyses, used for join order and index selection"""
# TODO: consider expanding this to update columns' histograms
# https://dev.mysql.com/doc/refman/8.0/en/analyze-table.html#analyze-table-histogram-statistics-analysis
self._cursor.execute(
f'''ANALYZE TABLE
signal_dim, geo_dim,
{self.load_table}, {self.history_table}, {self.latest_table}''')
output = [self._cursor.column_names] + self._cursor.fetchall()
get_structured_logger('do_analyze').info("ANALYZE results", results=str(output))

def insert_or_update_bulk(self, cc_rows):
return self.insert_or_update_batch(cc_rows)

Expand Down Expand Up @@ -476,16 +487,18 @@ def split_list(lst, n):
return total


def compute_covidcast_meta(self, table_name=None):
def compute_covidcast_meta(self, table_name=None, n_threads=None):
"""Compute and return metadata on all COVIDcast signals."""
logger = get_structured_logger("compute_covidcast_meta")

if table_name is None:
table_name = self.latest_view

n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server
# NOTE: this may present a small problem if this job runs on different hardware than the db,
# but we should not run into that issue in prod.
if n_threads is None:
logger.info("n_threads unspecified, automatically choosing based on number of detected cores...")
n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server
# NOTE: this may present a small problem if this job runs on different hardware than the db,
# which is why this value can be overriden by optional argument.
logger.info(f"using {n_threads} workers")

srcsigs = Queue() # multi-consumer threadsafe!
Expand Down
12 changes: 4 additions & 8 deletions src/ddl/migrations/v4_renaming.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ CREATE OR REPLACE VIEW epimetric_full_v AS
`t1`.`signal_key_id` AS `signal_key_id`,
`t1`.`geo_key_id` AS `geo_key_id`
FROM `epimetric_full` `t1`
JOIN `signal_dim` `t2`
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
JOIN `geo_dim` `t3`
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
JOIN `signal_dim` `t2` USING (`signal_key_id`)
JOIN `geo_dim` `t3` USING (`geo_key_id`);
CREATE OR REPLACE VIEW epimetric_latest_v AS
SELECT
1 AS `is_latest_issue`, -- provides column-compatibility to match `covidcast` table
Expand Down Expand Up @@ -85,10 +83,8 @@ CREATE OR REPLACE VIEW epimetric_latest_v AS
`t1`.`signal_key_id` AS `signal_key_id`,
`t1`.`geo_key_id` AS `geo_key_id`
FROM `epimetric_latest` `t1`
JOIN `signal_dim` `t2`
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
JOIN `geo_dim` `t3`
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
JOIN `signal_dim` `t2` USING (`signal_key_id`)
JOIN `geo_dim` `t3` USING (`geo_key_id`);


-- re-create `epidata` alias VIEWs
Expand Down
13 changes: 4 additions & 9 deletions src/ddl/v4_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ CREATE TABLE geo_dim (
UNIQUE INDEX `geo_dim_index` (`geo_type`, `geo_value`)
) ENGINE=InnoDB;


CREATE TABLE signal_dim (
`signal_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`source` VARCHAR(32) NOT NULL,
Expand Down Expand Up @@ -124,10 +123,8 @@ CREATE OR REPLACE VIEW epimetric_full_v AS
`t1`.`signal_key_id` AS `signal_key_id`,
`t1`.`geo_key_id` AS `geo_key_id`
FROM `epimetric_full` `t1`
JOIN `signal_dim` `t2`
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
JOIN `geo_dim` `t3`
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
JOIN `signal_dim` `t2` USING (`signal_key_id`)
JOIN `geo_dim` `t3` USING (`geo_key_id`);

CREATE OR REPLACE VIEW epimetric_latest_v AS
SELECT
Expand Down Expand Up @@ -156,10 +153,8 @@ CREATE OR REPLACE VIEW epimetric_latest_v AS
`t1`.`signal_key_id` AS `signal_key_id`,
`t1`.`geo_key_id` AS `geo_key_id`
FROM `epimetric_latest` `t1`
JOIN `signal_dim` `t2`
ON `t1`.`signal_key_id` = `t2`.`signal_key_id`
JOIN `geo_dim` `t3`
ON `t1`.`geo_key_id` = `t3`.`geo_key_id`;
JOIN `signal_dim` `t2` USING (`signal_key_id`)
JOIN `geo_dim` `t3` USING (`geo_key_id`);


CREATE TABLE `covidcast_meta_cache` (
Expand Down