From fd05c6029a69eabe8c734352689bfe90cce5f57c Mon Sep 17 00:00:00 2001 From: Xuehai Pan Date: Thu, 2 May 2024 11:57:09 +0000 Subject: [PATCH 1/4] Use `spawn` start method in multiprocessing programs --- sorts/odd_even_transposition_parallel.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sorts/odd_even_transposition_parallel.py b/sorts/odd_even_transposition_parallel.py index 9d2bcdbd7576..74ca2e5ca07e 100644 --- a/sorts/odd_even_transposition_parallel.py +++ b/sorts/odd_even_transposition_parallel.py @@ -11,7 +11,7 @@ synchronization could be used. """ -from multiprocessing import Lock, Pipe, Process +from multiprocessing import Lock, Pipe, Process, set_start_method # lock used to ensure that two processes do not access a pipe at the same time # NOTE This breaks testing on build runner. May work better locally @@ -153,6 +153,8 @@ def odd_even_transposition(arr): # creates a reverse sorted list and sorts it def main(): + set_start_method("spawn") # spawn method is considered safer than fork + arr = list(range(10, 0, -1)) print("Initial List") print(*arr) From cccd92a5478fa4f3404aed8585d72f78df6a3d52 Mon Sep 17 00:00:00 2001 From: Xuehai Pan Date: Thu, 2 May 2024 14:50:19 +0000 Subject: [PATCH 2/4] Set `spawn` start method in doctest --- sorts/odd_even_transposition_parallel.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sorts/odd_even_transposition_parallel.py b/sorts/odd_even_transposition_parallel.py index 74ca2e5ca07e..c82b407a71ca 100644 --- a/sorts/odd_even_transposition_parallel.py +++ b/sorts/odd_even_transposition_parallel.py @@ -75,6 +75,8 @@ def oe_process(position, value, l_send, r_send, lr_cv, rr_cv, result_pipe): def odd_even_transposition(arr): """ + >>> from multiprocessing import set_start_method + >>> set_start_method("spawn") >>> odd_even_transposition(list(range(10)[::-1])) == sorted(list(range(10)[::-1])) True >>> odd_even_transposition(["a", "x", "c"]) == sorted(["x", "a", "c"]) From 56092ab7c8f5f7c3343b6ccf04bcde5592b21244 Mon Sep 17 00:00:00 2001 From: Xuehai Pan Date: Thu, 2 May 2024 16:08:22 +0000 Subject: [PATCH 3/4] Use `with` statement for locks --- sorts/odd_even_transposition_parallel.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/sorts/odd_even_transposition_parallel.py b/sorts/odd_even_transposition_parallel.py index c82b407a71ca..bd8c88fe7490 100644 --- a/sorts/odd_even_transposition_parallel.py +++ b/sorts/odd_even_transposition_parallel.py @@ -38,27 +38,23 @@ def oe_process(position, value, l_send, r_send, lr_cv, rr_cv, result_pipe): for i in range(10): if (i + position) % 2 == 0 and r_send is not None: # send your value to your right neighbor - process_lock.acquire() - r_send[1].send(value) - process_lock.release() + with process_lock: + r_send[1].send(value) # receive your right neighbor's value - process_lock.acquire() - temp = rr_cv[0].recv() - process_lock.release() + with process_lock: + temp = rr_cv[0].recv() # take the lower value since you are on the left value = min(value, temp) elif (i + position) % 2 != 0 and l_send is not None: # send your value to your left neighbor - process_lock.acquire() - l_send[1].send(value) - process_lock.release() + with process_lock: + l_send[1].send(value) # receive your left neighbor's value - process_lock.acquire() - temp = lr_cv[0].recv() - process_lock.release() + with process_lock: + temp = lr_cv[0].recv() # take the higher value since you are on the right value = max(value, temp) From 59634c21b3dfb424faf4aeeb0f18ecdfddd074c2 Mon Sep 17 00:00:00 2001 From: Xuehai Pan Date: Thu, 2 May 2024 16:10:48 +0000 Subject: [PATCH 4/4] Pass multiprocessing context explicitly --- sorts/odd_even_transposition_parallel.py | 63 +++++++++++++++++------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/sorts/odd_even_transposition_parallel.py b/sorts/odd_even_transposition_parallel.py index bd8c88fe7490..5d4e09b211c0 100644 --- a/sorts/odd_even_transposition_parallel.py +++ b/sorts/odd_even_transposition_parallel.py @@ -11,11 +11,11 @@ synchronization could be used. """ -from multiprocessing import Lock, Pipe, Process, set_start_method +import multiprocessing as mp # lock used to ensure that two processes do not access a pipe at the same time # NOTE This breaks testing on build runner. May work better locally -# process_lock = Lock() +# process_lock = mp.Lock() """ The function run by the processes that sorts the list @@ -29,8 +29,17 @@ """ -def oe_process(position, value, l_send, r_send, lr_cv, rr_cv, result_pipe): - process_lock = Lock() +def oe_process( + position, + value, + l_send, + r_send, + lr_cv, + rr_cv, + result_pipe, + multiprocessing_context, +): + process_lock = multiprocessing_context.Lock() # we perform n swaps since after n swaps we know we are sorted # we *could* stop early if we are sorted already, but it takes as long to @@ -71,8 +80,6 @@ def oe_process(position, value, l_send, r_send, lr_cv, rr_cv, result_pipe): def odd_even_transposition(arr): """ - >>> from multiprocessing import set_start_method - >>> set_start_method("spawn") >>> odd_even_transposition(list(range(10)[::-1])) == sorted(list(range(10)[::-1])) True >>> odd_even_transposition(["a", "x", "c"]) == sorted(["x", "a", "c"]) @@ -92,39 +99,60 @@ def odd_even_transposition(arr): >>> odd_even_transposition(unsorted_list) == sorted(unsorted_list + [1]) False """ + # spawn method is considered safer than fork + multiprocessing_context = mp.get_context("spawn") + process_array_ = [] result_pipe = [] # initialize the list of pipes where the values will be retrieved for _ in arr: - result_pipe.append(Pipe()) + result_pipe.append(multiprocessing_context.Pipe()) # creates the processes # the first and last process only have one neighbor so they are made outside # of the loop - temp_rs = Pipe() - temp_rr = Pipe() + temp_rs = multiprocessing_context.Pipe() + temp_rr = multiprocessing_context.Pipe() process_array_.append( - Process( + multiprocessing_context.Process( target=oe_process, - args=(0, arr[0], None, temp_rs, None, temp_rr, result_pipe[0]), + args=( + 0, + arr[0], + None, + temp_rs, + None, + temp_rr, + result_pipe[0], + multiprocessing_context, + ), ) ) temp_lr = temp_rs temp_ls = temp_rr for i in range(1, len(arr) - 1): - temp_rs = Pipe() - temp_rr = Pipe() + temp_rs = multiprocessing_context.Pipe() + temp_rr = multiprocessing_context.Pipe() process_array_.append( - Process( + multiprocessing_context.Process( target=oe_process, - args=(i, arr[i], temp_ls, temp_rs, temp_lr, temp_rr, result_pipe[i]), + args=( + i, + arr[i], + temp_ls, + temp_rs, + temp_lr, + temp_rr, + result_pipe[i], + multiprocessing_context, + ), ) ) temp_lr = temp_rs temp_ls = temp_rr process_array_.append( - Process( + multiprocessing_context.Process( target=oe_process, args=( len(arr) - 1, @@ -134,6 +162,7 @@ def odd_even_transposition(arr): temp_lr, None, result_pipe[len(arr) - 1], + multiprocessing_context, ), ) ) @@ -151,8 +180,6 @@ def odd_even_transposition(arr): # creates a reverse sorted list and sorts it def main(): - set_start_method("spawn") # spawn method is considered safer than fork - arr = list(range(10, 0, -1)) print("Initial List") print(*arr)