Skip to content

Commit 2cc04ca

Browse files
author
Nathan Shreve
committed
Improved multiprocessing efficiency in script
1 parent dfab0a8 commit 2cc04ca

File tree

1 file changed

+67
-40
lines changed

1 file changed

+67
-40
lines changed

vtr_flow/scripts/profiling_utils/parse_lookahead_data.py

Lines changed: 67 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@
1212
import shutil
1313
import sys
1414
import argparse
15+
from collections import deque
1516
from enum import Enum
1617
from pathlib import Path
17-
from concurrent.futures import ThreadPoolExecutor
18-
from multiprocessing import Lock
18+
from multiprocessing import Lock, Process
1919
import pandas as pd
2020
from sklearn.metrics import mean_squared_error
2121
import matplotlib.pyplot as plt
2222
import distinctipy as dp
2323
from colour import Color
2424
import seaborn as sns
2525

26+
lock = Lock() # Used for multiprocessing
27+
2628

2729
# pylint: disable=too-many-instance-attributes
2830
# pylint: disable=too-few-public-methods
@@ -41,8 +43,7 @@ def __init__(self,
4143
no_replace: bool,
4244
should_print: bool,
4345
percent_error_threshold: float,
44-
exclusions: dict,
45-
pool: ThreadPoolExecutor):
46+
exclusions: dict):
4647
# Output directory
4748
self.output_dir = "./vtr_flow/tasks/lookahead_verifier_output"
4849
# The graph types (pie, heatmap, bar, scatter) that will be created
@@ -106,9 +107,8 @@ def __init__(self,
106107
"test name"
107108
]
108109

109-
# Lock and Pool for multithreading
110-
self.lock = Lock()
111-
self.pool = pool
110+
# Processes list for multiprocessing
111+
self.processes = []
112112

113113

114114
def check_valid_component(comp: str):
@@ -132,21 +132,21 @@ def check_valid_df(df: pd.DataFrame, gv: GlobalVars):
132132
raise Exception("IncompleteDataFrame")
133133

134134

135-
def make_dir(directory: str, clean: bool, gv: GlobalVars):
135+
def make_dir(directory: str, clean: bool):
136136
"""Create a directory"""
137137

138-
gv.lock.acquire()
138+
lock.acquire()
139139

140140
if os.path.exists(directory):
141141
if clean:
142142
shutil.rmtree(directory)
143143
else:
144-
gv.lock.release()
144+
lock.release()
145145
return
146146

147147
os.makedirs(directory)
148148

149-
gv.lock.release()
149+
lock.release()
150150

151151

152152
def column_file_name(column_name: str) -> str:
@@ -227,7 +227,7 @@ def __init__(
227227
os.path.join(self.__directory, "proportion_under_threshold"),
228228
]
229229
for directory in self.__sub_dirs:
230-
make_dir(directory, False, gv)
230+
make_dir(directory, False)
231231

232232
self.__test_name = test_name
233233

@@ -336,7 +336,7 @@ def make_scatter_plot(
336336
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
337337
return
338338

339-
make_dir(curr_dir, False, gv)
339+
make_dir(curr_dir, False)
340340

341341
# Determine colors for legend
342342
num_colors = self.__df[legend_column].nunique() + 1
@@ -427,7 +427,11 @@ def make_standard_scatter_plots(self, test_name_plot: bool, gv: GlobalVars):
427427
if first_it and col == "iteration no.":
428428
continue
429429

430-
gv.pool.submit(self.make_scatter_plot, comp, plot_type, col, first_it, gv)
430+
gv.processes.append((self.make_scatter_plot, (comp,
431+
plot_type,
432+
col,
433+
first_it,
434+
gv)))
431435

432436
# pylint: disable=too-many-locals
433437
def make_bar_graph(self,
@@ -499,7 +503,7 @@ def make_bar_graph(self,
499503
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
500504
return
501505

502-
make_dir(curr_dir, False, gv)
506+
make_dir(curr_dir, False)
503507

504508
# Get DF with average error for each "type" encountered in column
505509
avg_error = {}
@@ -548,7 +552,11 @@ def make_standard_bar_graphs(self, test_name_plot: bool, gv: GlobalVars):
548552
for col in columns:
549553
for use_abs in [True, False]:
550554
for first_it in [True, False]:
551-
gv.pool.submit(self.make_bar_graph, comp, col, use_abs, first_it, gv)
555+
gv.processes.append((self.make_bar_graph, (comp,
556+
col,
557+
use_abs,
558+
first_it,
559+
gv)))
552560

553561
# pylint: disable=too-many-locals
554562
def make_heatmap(
@@ -636,7 +644,7 @@ def make_heatmap(
636644
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
637645
return
638646

639-
make_dir(curr_dir, False, gv)
647+
make_dir(curr_dir, False)
640648

641649
# Get DF with average error for each "coordinate" in the heatmap
642650
df_avgs = pd.DataFrame(columns=[x_column, y_column, scale_column])
@@ -683,20 +691,18 @@ def make_standard_heatmaps(self, gv: GlobalVars):
683691
for comp in gv.components:
684692
for first_it in [True, False]:
685693
for use_abs in [True, False]:
686-
gv.pool.submit(self.make_heatmap,
687-
comp,
688-
"sink cluster tile width",
689-
"sink cluster tile height",
690-
first_it,
691-
use_abs,
692-
gv)
693-
gv.pool.submit(self.make_heatmap,
694-
comp,
695-
"delta x",
696-
"delta y",
697-
first_it,
698-
use_abs,
699-
gv)
694+
gv.processes.append((self.make_heatmap, (comp,
695+
"sink cluster tile width",
696+
"sink cluster tile height",
697+
first_it,
698+
use_abs,
699+
gv)))
700+
gv.processes.append((self.make_heatmap, (comp,
701+
"delta x",
702+
"delta y",
703+
first_it,
704+
use_abs,
705+
gv)))
700706

701707
# pylint: disable=too-many-locals
702708
def make_pie_chart(
@@ -764,7 +770,7 @@ def make_pie_chart(
764770
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
765771
return
766772

767-
make_dir(curr_dir, False, gv)
773+
make_dir(curr_dir, False)
768774

769775
# Constrict DF to columns whose error is under threshold
770776
curr_df = curr_df[curr_df[f"{comp} % error"] < gv.percent_error_threshold]
@@ -821,7 +827,11 @@ def make_standard_pie_charts(self, test_name_plot: bool, gv: GlobalVars):
821827
for col in columns:
822828
for first_it in [True, False]:
823829
for weighted in [True, False]:
824-
gv.pool.submit(self.make_pie_chart, comp, col, first_it, weighted, gv)
830+
gv.processes.append((self.make_pie_chart, (comp,
831+
col,
832+
first_it,
833+
weighted,
834+
gv)))
825835

826836
def make_standard_plots(self, test_name_plot: bool, gv: GlobalVars):
827837
"""
@@ -961,7 +971,7 @@ def make_csv(df_out: pd.DataFrame, file_name: str):
961971
gv.csv_data and
962972
(not os.path.exists(os.path.join(directory, "data.csv")) or not gv.no_replace)
963973
):
964-
gv.pool.submit(make_csv, df, os.path.join(directory, "data.csv"))
974+
gv.processes.append((make_csv, (df, os.path.join(directory, "data.csv"))))
965975

966976
if gv.should_print:
967977
print("Created ", os.path.join(directory, "data.csv"), sep="")
@@ -1111,7 +1121,6 @@ def parse_args():
11111121
)
11121122

11131123
args = parser.parse_args()
1114-
pool = ThreadPoolExecutor(max_workers=args.j)
11151124

11161125
if args.all:
11171126
graph_types = ["bar", "scatter", "heatmap", "pie"]
@@ -1162,7 +1171,6 @@ def parse_args():
11621171
args.print,
11631172
args.threshold,
11641173
{},
1165-
pool
11661174
)
11671175

11681176
for excl in args.exclude:
@@ -1202,7 +1210,7 @@ def create_complete_outputs(
12021210
if len(args.dir_app) > 0:
12031211
results_folder_path += f"{args.dir_app[0]}"
12041212

1205-
make_dir(results_folder_path, False, gv)
1213+
make_dir(results_folder_path, False)
12061214

12071215
df_complete = df_complete.reset_index(drop=True)
12081216

@@ -1228,7 +1236,7 @@ def create_benchmark_outputs(
12281236

12291237
results_folder = os.path.join(output_folder, "__all__")
12301238
results_folder_path = os.path.join(gv.output_dir, results_folder)
1231-
make_dir(results_folder_path, False, gv)
1239+
make_dir(results_folder_path, False)
12321240

12331241
df_benchmark = df_benchmark.reset_index(drop=True)
12341242

@@ -1270,7 +1278,7 @@ def create_circuit_outputs(
12701278

12711279
results_folder = os.path.join(output_folder, test_name)
12721280
results_folder_path = os.path.join(gv.output_dir, results_folder)
1273-
make_dir(results_folder_path, False, gv)
1281+
make_dir(results_folder_path, False)
12741282
# Read csv with lookahead data (or, a csv previously created by this script)
12751283
df = pd.read_csv(file_path)
12761284
df = df.reset_index(drop=True)
@@ -1329,7 +1337,7 @@ def create_circuit_outputs(
13291337
def main():
13301338
args, gv = parse_args()
13311339

1332-
make_dir(gv.output_dir, False, gv)
1340+
make_dir(gv.output_dir, False)
13331341

13341342
# The DF containing info across all given csv files
13351343
df_complete = pd.DataFrame(columns=gv.column_names)
@@ -1371,6 +1379,25 @@ def main():
13711379
if args.collect != "":
13721380
create_complete_outputs(args, df_complete, gv)
13731381

1382+
# Create output graphs simultaneously
1383+
# This is the best way I have found to do this that avoids having a while
1384+
# loop continually checking if any processes have finished while using an
1385+
# entire CPU. Pool, concurrent.futures.ProcessPoolExecutor, and
1386+
# concurrent.futures.ThreadPoolExecutor seem to have a lot of overhead.
1387+
# This loop assumes that the graph-creating functions take approximately
1388+
# the same amount of time, which is not the case. We could alternatively
1389+
# use a global variable which we increment when we start a process, and
1390+
# decrement at the end of every graph-creating function.
1391+
q = deque()
1392+
for func, params in gv.processes:
1393+
while len(q) >= args.j:
1394+
proc = q.popleft()
1395+
proc.join()
1396+
1397+
proc = Process(target=func, args=params)
1398+
proc.start()
1399+
q.append(proc)
1400+
13741401

13751402
if __name__ == "__main__":
13761403
main()

0 commit comments

Comments
 (0)