|
6 | 6 | """
|
7 | 7 | import atexit
|
8 | 8 | from datetime import datetime
|
| 9 | +from multiprocessing import Manager, Pool, cpu_count, current_process |
9 | 10 | import time
|
10 | 11 | from typing import Dict, Any
|
11 | 12 |
|
@@ -55,6 +56,49 @@ def get_smooth_info(sensors, _SMOOTHERS):
|
55 | 56 | smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE)
|
56 | 57 | return smoothers
|
57 | 58 |
|
| 59 | +def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, |
| 60 | + first_date, last_date, suffix, # generate args |
| 61 | + geo_res, sensor_name, export_dir, |
| 62 | + export_start_date, export_end_date, # export args |
| 63 | + lock, log_filename, log_exceptions): # logger args |
| 64 | + """Generate sensors, create export CSV then return stats.""" |
| 65 | + # logger cannot be passed to child processes, so has to be recreated |
| 66 | + with lock: |
| 67 | + logger = get_structured_logger(__name__, log_filename, log_exceptions) |
| 68 | + logger.info("Generating signal and exporting to CSV", |
| 69 | + geo_res=geo_res, |
| 70 | + sensor=sensor_name, |
| 71 | + pid=current_process().pid) |
| 72 | + res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, |
| 73 | + first_date, last_date, suffix) |
| 74 | + dates = create_export_csv(res_df, geo_res=geo_res, |
| 75 | + sensor=sensor_name, export_dir=export_dir, |
| 76 | + start_date=export_start_date, |
| 77 | + end_date=export_end_date) |
| 78 | + return dates |
| 79 | + |
| 80 | +def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, |
| 81 | + first_date, last_date, suffix, # generate args |
| 82 | + geo_res, sensor_name, export_dir, |
| 83 | + export_start_date, export_end_date, # export args |
| 84 | + lock, log_filename, log_exceptions): # logger args |
| 85 | + """Generate sensors, create export CSV then return stats.""" |
| 86 | + # logger cannot be passed to child processes, so has to be recreated |
| 87 | + with lock: |
| 88 | + logger = get_structured_logger(__name__, log_filename, log_exceptions) |
| 89 | + logger.info("Generating signal and exporting to CSV", |
| 90 | + geo_res=geo_res, |
| 91 | + sensor=sensor_name, |
| 92 | + pid=current_process().pid) |
| 93 | + res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, |
| 94 | + first_date, last_date, suffix) |
| 95 | + dates = create_export_csv(res_df, geo_res=geo_res, |
| 96 | + sensor=sensor_name, export_dir=export_dir, |
| 97 | + start_date=export_start_date, |
| 98 | + end_date=export_end_date, |
| 99 | + remove_null_samples=True) # for parent geo, remove null sample size |
| 100 | + return dates |
| 101 | + |
58 | 102 | def run_module(params: Dict[str, Any]):
|
59 | 103 | """Run the quidel_covidtest indicator.
|
60 | 104 |
|
@@ -123,53 +167,71 @@ def run_module(params: Dict[str, Any]):
|
123 | 167 | wip_signal=params["indicator"]["wip_signal"],
|
124 | 168 | prefix="wip_")
|
125 | 169 | smoothers = get_smooth_info(sensors, SMOOTHERS)
|
126 |
| - for geo_res in NONPARENT_GEO_RESOLUTIONS: |
127 |
| - geo_data, res_key = geo_map(geo_res, data) |
128 |
| - geo_groups = geo_data.groupby(res_key) |
129 |
| - for agegroup in AGE_GROUPS: |
130 |
| - for sensor in sensors: |
131 |
| - if agegroup == "total": |
132 |
| - sensor_name = sensor |
133 |
| - else: |
134 |
| - sensor_name = "_".join([sensor, agegroup]) |
135 |
| - logger.info("Generating signal and exporting to CSV", |
136 |
| - geo_res=geo_res, |
137 |
| - sensor=sensor_name) |
138 |
| - state_df = generate_sensor_for_nonparent_geo( |
139 |
| - geo_groups, res_key, smooth=smoothers[sensor][1], |
140 |
| - device=smoothers[sensor][0], first_date=first_date, |
141 |
| - last_date=last_date, suffix=agegroup) |
142 |
| - dates = create_export_csv( |
143 |
| - state_df, |
144 |
| - geo_res=geo_res, |
145 |
| - sensor=sensor_name, |
146 |
| - export_dir=export_dir, |
147 |
| - start_date=export_start_date, |
148 |
| - end_date=export_end_date) |
149 |
| - if len(dates) > 0: |
150 |
| - stats.append((max(dates), len(dates))) |
151 |
| - assert geo_res == "state" # Make sure geo_groups is for state level |
152 |
| - # County/HRR/MSA level |
153 |
| - for geo_res in PARENT_GEO_RESOLUTIONS: |
154 |
| - geo_data, res_key = geo_map(geo_res, data) |
155 |
| - for agegroup in AGE_GROUPS: |
156 |
| - for sensor in sensors: |
157 |
| - if agegroup == "total": |
158 |
| - sensor_name = sensor |
159 |
| - else: |
160 |
| - sensor_name = "_".join([sensor, agegroup]) |
161 |
| - logger.info("Generating signal and exporting to CSV", |
162 |
| - geo_res=geo_res, |
163 |
| - sensor=sensor_name) |
164 |
| - res_df = generate_sensor_for_parent_geo( |
165 |
| - geo_groups, geo_data, res_key, smooth=smoothers[sensor][1], |
166 |
| - device=smoothers[sensor][0], first_date=first_date, |
167 |
| - last_date=last_date, suffix=agegroup) |
168 |
| - dates = create_export_csv(res_df, geo_res=geo_res, |
169 |
| - sensor=sensor_name, export_dir=export_dir, |
170 |
| - start_date=export_start_date, |
171 |
| - end_date=export_end_date, |
172 |
| - remove_null_samples=True) |
| 170 | + n_cpu = min(8, cpu_count()) # for parallelization |
| 171 | + with Manager() as manager: |
| 172 | + # for using loggers in multiple threads |
| 173 | + # disabled due to a Pylint bug, resolved by version bump (#1886) |
| 174 | + lock = manager.Lock() # pylint: disable=no-member |
| 175 | + logger.info("Parallelizing sensor generation", n_cpu=n_cpu) |
| 176 | + with Pool(n_cpu) as pool: |
| 177 | + pool_results = [] |
| 178 | + for geo_res in NONPARENT_GEO_RESOLUTIONS: |
| 179 | + geo_data, res_key = geo_map(geo_res, data) |
| 180 | + geo_groups = geo_data.groupby(res_key) |
| 181 | + for agegroup in AGE_GROUPS: |
| 182 | + for sensor in sensors: |
| 183 | + if agegroup == "total": |
| 184 | + sensor_name = sensor |
| 185 | + else: |
| 186 | + sensor_name = "_".join([sensor, agegroup]) |
| 187 | + pool_results.append( |
| 188 | + pool.apply_async( |
| 189 | + generate_and_export_for_nonparent_geo, |
| 190 | + args=( |
| 191 | + # generate_sensors_for_parent_geo |
| 192 | + geo_groups, res_key, |
| 193 | + smoothers[sensor][1], smoothers[sensor][0], |
| 194 | + first_date, last_date, agegroup, |
| 195 | + # create_export_csv |
| 196 | + geo_res, sensor_name, export_dir, |
| 197 | + export_start_date, export_end_date, |
| 198 | + # logger params |
| 199 | + lock, |
| 200 | + params["common"].get("log_filename"), |
| 201 | + params["common"].get("log_exceptions", True) |
| 202 | + ) |
| 203 | + ) |
| 204 | + ) |
| 205 | + assert geo_res == "state" # Make sure geo_groups is for state level |
| 206 | + # County/HRR/MSA level |
| 207 | + for geo_res in PARENT_GEO_RESOLUTIONS: |
| 208 | + geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups |
| 209 | + for agegroup in AGE_GROUPS: |
| 210 | + for sensor in sensors: |
| 211 | + if agegroup == "total": |
| 212 | + sensor_name = sensor |
| 213 | + else: |
| 214 | + sensor_name = "_".join([sensor, agegroup]) |
| 215 | + pool_results.append( |
| 216 | + pool.apply_async( |
| 217 | + generate_and_export_for_parent_geo, |
| 218 | + args=( |
| 219 | + # generate_sensors_for_parent_geo |
| 220 | + geo_groups, geo_data, res_key, |
| 221 | + smoothers[sensor][1], smoothers[sensor][0], |
| 222 | + first_date, last_date, agegroup, |
| 223 | + # create_export_csv |
| 224 | + geo_res, sensor_name, export_dir, |
| 225 | + export_start_date, export_end_date, |
| 226 | + # logger params |
| 227 | + lock, |
| 228 | + params["common"].get("log_filename"), |
| 229 | + params["common"].get("log_exceptions", True) |
| 230 | + ) |
| 231 | + ) |
| 232 | + ) |
| 233 | + pool_results = [proc.get() for proc in pool_results] |
| 234 | + for dates in pool_results: |
173 | 235 | if len(dates) > 0:
|
174 | 236 | stats.append((max(dates), len(dates)))
|
175 | 237 |
|
|
0 commit comments