Skip to content

Add multiprocessing to the Quidel indicator #1881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 10, 2023
Merged
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 108 additions & 47 deletions quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import atexit
from datetime import datetime
from multiprocessing import Manager, Pool, cpu_count, current_process
import time
from typing import Dict, Any

Expand Down Expand Up @@ -55,6 +56,49 @@ def get_smooth_info(sensors, _SMOOTHERS):
smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE)
return smoothers

def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method and generate_and_export_for_parent_geo() are EXTREMELY similar but not exactly (more than just in the name of the generate_sensor...() methods they use) -- can you annotate those differences in the code? similarly for the blocks under for geo_res in NONPARENT_GEO_RESOLUTIONS: and for geo_res in PARENT_GEO_RESOLUTIONS:.

first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
lock, log_filename, log_exceptions): # logger args
"""Generate sensors, create export CSV then return stats."""
# logger cannot be passed to child processes, so has to be recreated
with lock:
logger = get_structured_logger(__name__, log_filename, log_exceptions)
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=current_process().pid)
res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date)
return dates

def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device,
first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
lock, log_filename, log_exceptions): # logger args
"""Generate sensors, create export CSV then return stats."""
# logger cannot be passed to child processes, so has to be recreated
with lock:
logger = get_structured_logger(__name__, log_filename, log_exceptions)
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=current_process().pid)
res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date,
remove_null_samples=True) # for parent geo, remove null sample size
return dates

def run_module(params: Dict[str, Any]):
"""Run the quidel_covidtest indicator.

Expand Down Expand Up @@ -123,53 +167,70 @@ def run_module(params: Dict[str, Any]):
wip_signal=params["indicator"]["wip_signal"],
prefix="wip_")
smoothers = get_smooth_info(sensors, SMOOTHERS)
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
state_df = generate_sensor_for_nonparent_geo(
geo_groups, res_key, smooth=smoothers[sensor][1],
device=smoothers[sensor][0], first_date=first_date,
last_date=last_date, suffix=agegroup)
dates = create_export_csv(
state_df,
geo_res=geo_res,
sensor=sensor_name,
export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
res_df = generate_sensor_for_parent_geo(
geo_groups, geo_data, res_key, smooth=smoothers[sensor][1],
device=smoothers[sensor][0], first_date=first_date,
last_date=last_date, suffix=agegroup)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date,
remove_null_samples=True)
n_cpu = min(8, cpu_count()) # for parallelization
with Manager() as manager:
# for using loggers in multiple threads
lock = manager.Lock() # pylint: disable=no-member
logger.info("Parallelizing sensor generation", n_cpu=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_nonparent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_parent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, geo_data, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
pool_results = [proc.get() for proc in pool_results]
for dates in pool_results:
if len(dates) > 0:
stats.append((max(dates), len(dates)))

Expand Down