Skip to content

Add multiprocessing to the Quidel indicator #1881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 10, 2023
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 102 additions & 49 deletions quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"""
import atexit
from datetime import datetime
from multiprocessing import Pool, cpu_count
import os
import time
from typing import Dict, Any

Expand Down Expand Up @@ -55,6 +57,45 @@ def get_smooth_info(sensors, _SMOOTHERS):
smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE)
return smoothers

def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method and generate_and_export_for_parent_geo() are EXTREMELY similar but not exactly (more than just in the name of the generate_sensor...() methods they use) -- can you annotate those differences in the code? similarly for the blocks under for geo_res in NONPARENT_GEO_RESOLUTIONS: and for geo_res in PARENT_GEO_RESOLUTIONS:.

first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
log_filename, log_exceptions): # logger args
"""Generate sensors, create export CSV then return stats."""
logger = get_structured_logger(__name__, log_filename, log_exceptions)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because trying to pass the existing logger breaks multiprocessing:

AttributeError: Can't pickle local object 'BoundLoggerLazyProxy.bind.<locals>.finalized_bind'"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

damn, i should have seen that coming... :(
this is worth a comment in the code, something like # `logger` cannot be passed to a child process

however! this current implementation with the newly constructed loggers is susceptible to concurrency issues, as they are all going to be trying to write to the same file from different processes without a mutex. to alleviate this, you should be able to pass along a multiprocessing.Lock() to use as a context manager around logging operations. that does add some more verbosity to your generate_and_...() calls, but it will work for now -- i have an idea for something that is cleaner and reusable for situations like this in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just created #1885 which could greatly simplify the logging stuff, removing the need for the lock and re-creating loggers, as soon as thats available

logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=os.getpid())
res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date)
return dates

def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device,
first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
log_filename, log_exceptions): # logger args
"""Generate sensors, create export CSV then return stats."""
logger = get_structured_logger(__name__, log_filename, log_exceptions)
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=os.getpid())
res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date,
remove_null_samples=True)
return dates

def run_module(params: Dict[str, Any]):
"""Run the quidel_covidtest indicator.

Expand Down Expand Up @@ -123,55 +164,67 @@ def run_module(params: Dict[str, Any]):
wip_signal=params["indicator"]["wip_signal"],
prefix="wip_")
smoothers = get_smooth_info(sensors, SMOOTHERS)
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
state_df = generate_sensor_for_nonparent_geo(
geo_groups, res_key, smooth=smoothers[sensor][1],
device=smoothers[sensor][0], first_date=first_date,
last_date=last_date, suffix=agegroup)
dates = create_export_csv(
state_df,
geo_res=geo_res,
sensor=sensor_name,
export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
res_df = generate_sensor_for_parent_geo(
geo_groups, geo_data, res_key, smooth=smoothers[sensor][1],
device=smoothers[sensor][0], first_date=first_date,
last_date=last_date, suffix=agegroup)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date,
remove_null_samples=True)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
n_cpu = min(8, cpu_count()) # for parallelization
logger.info("Parallelizing sensor generation", n_cpu=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_nonparent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_parent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, geo_data, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
pool_results = [proc.get() for proc in pool_results]
for dates in pool_results:
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Export the cache file if the pipeline runs successfully.
# Otherwise, don't update the cache file
Expand Down