Skip to content

Use spawn start method in multiprocessing programs #11391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 53 additions & 26 deletions sorts/odd_even_transposition_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
synchronization could be used.
"""

from multiprocessing import Lock, Pipe, Process
import multiprocessing as mp

# lock used to ensure that two processes do not access a pipe at the same time
# NOTE This breaks testing on build runner. May work better locally
# process_lock = Lock()
# process_lock = mp.Lock()

"""
The function run by the processes that sorts the list
Expand All @@ -29,36 +29,41 @@
"""


def oe_process(position, value, l_send, r_send, lr_cv, rr_cv, result_pipe):
process_lock = Lock()
def oe_process(
position,
value,
l_send,
r_send,
lr_cv,
rr_cv,
result_pipe,
multiprocessing_context,
):
process_lock = multiprocessing_context.Lock()

# we perform n swaps since after n swaps we know we are sorted
# we *could* stop early if we are sorted already, but it takes as long to
# find out we are sorted as it does to sort the list with this algorithm
for i in range(10):
if (i + position) % 2 == 0 and r_send is not None:
# send your value to your right neighbor
process_lock.acquire()
r_send[1].send(value)
process_lock.release()
with process_lock:
r_send[1].send(value)

# receive your right neighbor's value
process_lock.acquire()
temp = rr_cv[0].recv()
process_lock.release()
with process_lock:
temp = rr_cv[0].recv()

# take the lower value since you are on the left
value = min(value, temp)
elif (i + position) % 2 != 0 and l_send is not None:
# send your value to your left neighbor
process_lock.acquire()
l_send[1].send(value)
process_lock.release()
with process_lock:
l_send[1].send(value)

# receive your left neighbor's value
process_lock.acquire()
temp = lr_cv[0].recv()
process_lock.release()
with process_lock:
temp = lr_cv[0].recv()

# take the higher value since you are on the right
value = max(value, temp)
Expand Down Expand Up @@ -94,39 +99,60 @@ def odd_even_transposition(arr):
>>> odd_even_transposition(unsorted_list) == sorted(unsorted_list + [1])
False
"""
# spawn method is considered safer than fork
multiprocessing_context = mp.get_context("spawn")

process_array_ = []
result_pipe = []
# initialize the list of pipes where the values will be retrieved
for _ in arr:
result_pipe.append(Pipe())
result_pipe.append(multiprocessing_context.Pipe())
# creates the processes
# the first and last process only have one neighbor so they are made outside
# of the loop
temp_rs = Pipe()
temp_rr = Pipe()
temp_rs = multiprocessing_context.Pipe()
temp_rr = multiprocessing_context.Pipe()
process_array_.append(
Process(
multiprocessing_context.Process(
Copy link
Contributor Author

@XuehaiPan XuehaiPan May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a commit to access the multiprocessing context explicitly. The context instance is set to spawn above separating from the global default context.

import multiprocessing as mp

mp.set_start_method("spawn")  # set global default context

# Use the global default context
process = mp.Process(...)
pipe = mp.Pipe(...)
lock = mp.Lock(...)

vs.

import multiprocessing as mp

ctx = mp.get_context("spawn")

# Use the spawn context explicitly
process = ctx.Process(...)
pipe = ctx.Pipe(...)
lock = ctx.Lock(...)

If this change is not desired or makes the code too complex, I can remove the last commit in the PR. cc @cclauss

target=oe_process,
args=(0, arr[0], None, temp_rs, None, temp_rr, result_pipe[0]),
args=(
0,
arr[0],
None,
temp_rs,
None,
temp_rr,
result_pipe[0],
multiprocessing_context,
),
)
)
temp_lr = temp_rs
temp_ls = temp_rr

for i in range(1, len(arr) - 1):
temp_rs = Pipe()
temp_rr = Pipe()
temp_rs = multiprocessing_context.Pipe()
temp_rr = multiprocessing_context.Pipe()
process_array_.append(
Process(
multiprocessing_context.Process(
target=oe_process,
args=(i, arr[i], temp_ls, temp_rs, temp_lr, temp_rr, result_pipe[i]),
args=(
i,
arr[i],
temp_ls,
temp_rs,
temp_lr,
temp_rr,
result_pipe[i],
multiprocessing_context,
),
)
)
temp_lr = temp_rs
temp_ls = temp_rr

process_array_.append(
Process(
multiprocessing_context.Process(
target=oe_process,
args=(
len(arr) - 1,
Expand All @@ -136,6 +162,7 @@ def odd_even_transposition(arr):
temp_lr,
None,
result_pipe[len(arr) - 1],
multiprocessing_context,
),
)
)
Expand Down