-
Notifications
You must be signed in to change notification settings - Fork 16
Add multiprocessing to the Quidel indicator #1881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
a762f4c
915bd7c
92f51de
f2b4293
bf9f621
4bd32ff
17aaf17
62a6b23
4689206
20abd8c
b90e941
dfdf0aa
43a0415
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,8 @@ | |
""" | ||
import atexit | ||
from datetime import datetime | ||
from multiprocessing import Pool, cpu_count | ||
import os | ||
import time | ||
from typing import Dict, Any | ||
|
||
|
@@ -55,6 +57,45 @@ def get_smooth_info(sensors, _SMOOTHERS): | |
smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE) | ||
return smoothers | ||
|
||
def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, | ||
first_date, last_date, suffix, # generate args | ||
geo_res, sensor_name, export_dir, | ||
export_start_date, export_end_date, # export args | ||
log_filename, log_exceptions): # logger args | ||
"""Generate sensors, create export CSV then return stats.""" | ||
logger = get_structured_logger(__name__, log_filename, log_exceptions) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because trying to pass the existing logger breaks multiprocessing:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. damn, i should have seen that coming... :( however! this current implementation with the newly constructed loggers is susceptible to concurrency issues, as they are all going to be trying to write to the same file from different processes without a mutex. to alleviate this, you should be able to pass along a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i just created #1885 which could greatly simplify the logging stuff, removing the need for the lock and re-creating loggers, as soon as thats available |
||
logger.info("Generating signal and exporting to CSV", | ||
geo_res=geo_res, | ||
sensor=sensor_name, | ||
pid=os.getpid()) | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, | ||
first_date, last_date, suffix) | ||
dates = create_export_csv(res_df, geo_res=geo_res, | ||
sensor=sensor_name, export_dir=export_dir, | ||
start_date=export_start_date, | ||
end_date=export_end_date) | ||
return dates | ||
|
||
def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, | ||
first_date, last_date, suffix, # generate args | ||
geo_res, sensor_name, export_dir, | ||
export_start_date, export_end_date, # export args | ||
log_filename, log_exceptions): # logger args | ||
"""Generate sensors, create export CSV then return stats.""" | ||
logger = get_structured_logger(__name__, log_filename, log_exceptions) | ||
logger.info("Generating signal and exporting to CSV", | ||
geo_res=geo_res, | ||
sensor=sensor_name, | ||
pid=os.getpid()) | ||
res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, | ||
first_date, last_date, suffix) | ||
dates = create_export_csv(res_df, geo_res=geo_res, | ||
sensor=sensor_name, export_dir=export_dir, | ||
start_date=export_start_date, | ||
end_date=export_end_date, | ||
remove_null_samples=True) | ||
return dates | ||
|
||
def run_module(params: Dict[str, Any]): | ||
"""Run the quidel_covidtest indicator. | ||
|
||
|
@@ -123,55 +164,67 @@ def run_module(params: Dict[str, Any]): | |
wip_signal=params["indicator"]["wip_signal"], | ||
prefix="wip_") | ||
smoothers = get_smooth_info(sensors, SMOOTHERS) | ||
for geo_res in NONPARENT_GEO_RESOLUTIONS: | ||
geo_data, res_key = geo_map(geo_res, data) | ||
geo_groups = geo_data.groupby(res_key) | ||
for agegroup in AGE_GROUPS: | ||
for sensor in sensors: | ||
if agegroup == "total": | ||
sensor_name = sensor | ||
else: | ||
sensor_name = "_".join([sensor, agegroup]) | ||
logger.info("Generating signal and exporting to CSV", | ||
geo_res=geo_res, | ||
sensor=sensor_name) | ||
state_df = generate_sensor_for_nonparent_geo( | ||
geo_groups, res_key, smooth=smoothers[sensor][1], | ||
device=smoothers[sensor][0], first_date=first_date, | ||
last_date=last_date, suffix=agegroup) | ||
dates = create_export_csv( | ||
state_df, | ||
geo_res=geo_res, | ||
sensor=sensor_name, | ||
export_dir=export_dir, | ||
start_date=export_start_date, | ||
end_date=export_end_date) | ||
if len(dates) > 0: | ||
stats.append((max(dates), len(dates))) | ||
assert geo_res == "state" # Make sure geo_groups is for state level | ||
# County/HRR/MSA level | ||
for geo_res in PARENT_GEO_RESOLUTIONS: | ||
geo_data, res_key = geo_map(geo_res, data) | ||
for agegroup in AGE_GROUPS: | ||
for sensor in sensors: | ||
if agegroup == "total": | ||
sensor_name = sensor | ||
else: | ||
sensor_name = "_".join([sensor, agegroup]) | ||
logger.info("Generating signal and exporting to CSV", | ||
geo_res=geo_res, | ||
sensor=sensor_name) | ||
res_df = generate_sensor_for_parent_geo( | ||
geo_groups, geo_data, res_key, smooth=smoothers[sensor][1], | ||
device=smoothers[sensor][0], first_date=first_date, | ||
last_date=last_date, suffix=agegroup) | ||
dates = create_export_csv(res_df, geo_res=geo_res, | ||
sensor=sensor_name, export_dir=export_dir, | ||
start_date=export_start_date, | ||
end_date=export_end_date, | ||
remove_null_samples=True) | ||
if len(dates) > 0: | ||
stats.append((max(dates), len(dates))) | ||
n_cpu = min(8, cpu_count()) # for parallelization | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.info("Parallelizing sensor generation", n_cpu=n_cpu) | ||
with Pool(n_cpu) as pool: | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pool_results = [] | ||
for geo_res in NONPARENT_GEO_RESOLUTIONS: | ||
geo_data, res_key = geo_map(geo_res, data) | ||
geo_groups = geo_data.groupby(res_key) | ||
for agegroup in AGE_GROUPS: | ||
for sensor in sensors: | ||
if agegroup == "total": | ||
sensor_name = sensor | ||
else: | ||
sensor_name = "_".join([sensor, agegroup]) | ||
pool_results.append( | ||
pool.apply_async( | ||
generate_and_export_for_nonparent_geo, | ||
args=( | ||
# generate_sensors_for_parent_geo | ||
geo_groups, res_key, | ||
smoothers[sensor][1], smoothers[sensor][0], | ||
first_date, last_date, agegroup, | ||
# create_export_csv | ||
geo_res, sensor_name, export_dir, | ||
export_start_date, export_end_date, | ||
# logger params | ||
params["common"].get("log_filename"), | ||
params["common"].get("log_exceptions", True) | ||
) | ||
) | ||
) | ||
assert geo_res == "state" # Make sure geo_groups is for state level | ||
# County/HRR/MSA level | ||
for geo_res in PARENT_GEO_RESOLUTIONS: | ||
geo_data, res_key = geo_map(geo_res, data) | ||
for agegroup in AGE_GROUPS: | ||
for sensor in sensors: | ||
if agegroup == "total": | ||
sensor_name = sensor | ||
else: | ||
sensor_name = "_".join([sensor, agegroup]) | ||
pool_results.append( | ||
pool.apply_async( | ||
generate_and_export_for_parent_geo, | ||
args=( | ||
# generate_sensors_for_parent_geo | ||
geo_groups, geo_data, res_key, | ||
smoothers[sensor][1], smoothers[sensor][0], | ||
first_date, last_date, agegroup, | ||
# create_export_csv | ||
geo_res, sensor_name, export_dir, | ||
export_start_date, export_end_date, | ||
# logger params | ||
params["common"].get("log_filename"), | ||
params["common"].get("log_exceptions", True) | ||
) | ||
) | ||
) | ||
pool_results = [proc.get() for proc in pool_results] | ||
for dates in pool_results: | ||
if len(dates) > 0: | ||
stats.append((max(dates), len(dates))) | ||
|
||
# Export the cache file if the pipeline runs successfully. | ||
# Otherwise, don't update the cache file | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method and
generate_and_export_for_parent_geo()
are EXTREMELY similar but not exactly (more than just in the name of thegenerate_sensor...()
methods they use) -- can you annotate those differences in the code? similarly for the blocks underfor geo_res in NONPARENT_GEO_RESOLUTIONS:
andfor geo_res in PARENT_GEO_RESOLUTIONS:
.