-
Notifications
You must be signed in to change notification settings - Fork 16
Add multiprocessing to the Quidel indicator #1881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Taken at face value and assuming you're testing on an 8 core machine, these results look believable and promising. Also nice that the code change is pretty simple! Some questions about the benchmarks:
|
The built-in
snakeviz - after the run above,
I limit the pull start & end dates to 2023-07-01 and 2023-08-01 respectively, which is a volume of data comparable to the daily indicator run (on prod, the Quidel indicator doesn't seem to limit these dates in a similar way, but rather relies on its local state to determine which data to download & process).
Yes, some files are downloaded from S3 by the indicator itself - on the screenshots above you can see these as the second most time-consuming function,
Yep, I clear all of the indicator's outputs between runs, and for the test in the PR body I actually ran the multiprocessing version before the single-threaded version just in case. In fact, if this is not done between runs, the indicator will not process any new data at all, taking <5 seconds to run.
Unfortunately the multiprocessing functions obscure the Pandas timings in the profiler, but the indicator itself keeps track of the time spent on the run and this matches the profiler's overall timing. The single-threaded run's summary reads:
And the multi-threaded run's:
Memory profilers aren't super helpful here, since the idea behind multiprocessing is to bypass the global interpreter lock by spawning a few different Python processes. mprof displays this output for the single-threaded version: And this for the multi-processing version: However, this output refers to the base process only; the faster version spawns processes that do use more memory overall. Just manually checking the system's memory usage suggests these take ~180MB per process, for a total of ~2.2GB. |
Thanks for answering all these questions @rzats! 🚀
Good to have that confirmation from outside timing functions 👍.
Good to know! In case the prod indicator server has more than 8 cores, we can get an estimate of the memory needed to support them👍 No further questions. As long as we make sure machine memory is enough for all the cores, I'm happy to approve. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice job! i think we can make it even better, though!
good questions, @dshemetov !
the profiler doesnt seem to capture anything from the worker processes. the call tree diagram would lose much of its semantic meaning if that were the case: the sum of timing from all the child processes would exceed the wall clock time of the root call.
howd you reach that value? base plus workers gives me also, @rzats: given the previous difficulty in locally replicating the runtime of the prod jobs, plus the fact that theres a difference in spawn/fork behaviors on macos/unix multiprocessing implementations, you should unlink the automatic close of #1547. we can close it manually after we see a run of this code on prod. |
export_start_date, export_end_date, # export args | ||
log_filename, log_exceptions): # logger args | ||
"""Generate sensors, create export CSV then return stats.""" | ||
logger = get_structured_logger(__name__, log_filename, log_exceptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because trying to pass the existing logger breaks multiprocessing:
AttributeError: Can't pickle local object 'BoundLoggerLazyProxy.bind.<locals>.finalized_bind'"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
damn, i should have seen that coming... :(
this is worth a comment in the code, something like # `logger` cannot be passed to a child process
however! this current implementation with the newly constructed loggers is susceptible to concurrency issues, as they are all going to be trying to write to the same file from different processes without a mutex. to alleviate this, you should be able to pass along a multiprocessing.Lock()
to use as a context manager around logging operations. that does add some more verbosity to your generate_and_...()
calls, but it will work for now -- i have an idea for something that is cleaner and reusable for situations like this in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just created #1885 which could greatly simplify the logging stuff, removing the need for the lock and re-creating loggers, as soon as thats available
@melange396 fixed, re-requesting review! Also, with these changes the runtime was reduced even further down to ~65 seconds for the test query: ![]()
That's based on me recording the machine's peak memory usage before and during the test run, but it could have spiked a bit more for some unrelated OS reasons :) adding up the Python processes' memory usage alone makes it around 1.95GB |
@@ -55,6 +57,45 @@ def get_smooth_info(sensors, _SMOOTHERS): | |||
smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE) | |||
return smoothers | |||
|
|||
def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method and generate_and_export_for_parent_geo()
are EXTREMELY similar but not exactly (more than just in the name of the generate_sensor...()
methods they use) -- can you annotate those differences in the code? similarly for the blocks under for geo_res in NONPARENT_GEO_RESOLUTIONS:
and for geo_res in PARENT_GEO_RESOLUTIONS:
.
export_start_date, export_end_date, # export args | ||
log_filename, log_exceptions): # logger args | ||
"""Generate sensors, create export CSV then return stats.""" | ||
logger = get_structured_logger(__name__, log_filename, log_exceptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
damn, i should have seen that coming... :(
this is worth a comment in the code, something like # `logger` cannot be passed to a child process
however! this current implementation with the newly constructed loggers is susceptible to concurrency issues, as they are all going to be trying to write to the same file from different processes without a mutex. to alleviate this, you should be able to pass along a multiprocessing.Lock()
to use as a context manager around logging operations. that does add some more verbosity to your generate_and_...()
calls, but it will work for now -- i have an idea for something that is cleaner and reusable for situations like this in the future.
@dshemetov the @melange396 applied your suggested changes, re-requesting review! |
this is looking pretty good. do you have the logging output from a test run? |
@melange396 yep, here's an example of a run from yesterday:
Previously the timestamps followed the behavior you mentioned (timing when the job was queued, not actually ran) but that changed when the logging was moved to the multiprocessing workers. |
that logging output looks like it had a previous run in it that must have been from before you were using a if we merge this now, will you promise to come back and update it when #1885 is also merged? 🙏 |
@melange396 Oh yeah, looks like that caught a bit of an older run as well :) the first run failed since And sure, I can work on updating various loggers in the indicators when that PR is merged! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good stuff!
Description
Parallelizes the long-running
generate_sensor_for_*_geo
functions in the Quidel indicator - see #1547In a local test run, this change made the total runtime for the
pull
step of this indicator 4.2x faster, and the runtime of thegenerate
functions 6x faster:Indicator runtime before the parallelization - note the
generate_sensor_for_parent_geo
call...Indicator runtime after the parallelization -
generate
call replaced by multiprocessing functions.Changelog