Skip to content

Commit 478ec8a

Browse files
committed
Add MIX_DEPS_COMPILE_PARALLEL_COUNT for parallel app compilation
1 parent 8e49286 commit 478ec8a

File tree

2 files changed

+259
-60
lines changed

2 files changed

+259
-60
lines changed

lib/mix/lib/mix/tasks/deps.compile.ex

Lines changed: 67 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -75,86 +75,82 @@ defmodule Mix.Tasks.Deps.Compile do
7575

7676
@doc false
7777
def compile(deps, options \\ []) do
78-
shell = Mix.shell()
79-
config = Mix.Project.deps_config()
8078
Mix.Task.run("deps.precompile")
79+
force? = Keyword.get(options, :force, false)
8180

82-
compiled =
81+
deps =
8382
deps
8483
|> reject_umbrella_children(options)
8584
|> reject_local_deps(options)
86-
|> Enum.map(fn %Mix.Dep{app: app, status: status, opts: opts, scm: scm} = dep ->
87-
check_unavailable!(app, scm, status)
88-
maybe_clean(dep, options)
8985

90-
compiled? =
91-
cond do
92-
not is_nil(opts[:compile]) ->
93-
do_compile(dep, config)
86+
count = System.get_env("MIX_DEPS_COMPILE_PARALLEL_COUNT", "0") |> String.to_integer()
9487

95-
Mix.Dep.mix?(dep) ->
96-
do_mix(dep, config)
88+
compiled? =
89+
if count > 1 and length(deps) > count do
90+
Mix.shell().info("mix deps.compile running in parallel with count=#{count}")
91+
Mix.Tasks.Deps.Parallel.server(deps, count, force?)
92+
else
93+
config = Mix.Project.deps_config()
94+
true in Enum.map(deps, &compile_single(&1, force?, config))
95+
end
9796

98-
Mix.Dep.make?(dep) ->
99-
do_make(dep, config)
97+
if compiled?, do: Mix.Task.run("will_recompile"), else: :ok
98+
end
10099

101-
dep.manager == :rebar3 ->
102-
do_rebar3(dep, config)
100+
@doc false
101+
def compile_single(%Mix.Dep{} = dep, force?, config) do
102+
%{app: app, status: status, opts: opts, scm: scm} = dep
103+
check_unavailable!(app, scm, status)
103104

104-
true ->
105-
shell.error(
106-
"Could not compile #{inspect(app)}, no \"mix.exs\", \"rebar.config\" or \"Makefile\" " <>
107-
"(pass :compile as an option to customize compilation, set it to \"false\" to do nothing)"
108-
)
105+
# If a dependency was marked as fetched or with an out of date lock
106+
# or missing the app file, we always compile it from scratch.
107+
if force? or Mix.Dep.compilable?(dep) do
108+
File.rm_rf!(Path.join([Mix.Project.build_path(), "lib", Atom.to_string(dep.app)]))
109+
end
109110

110-
false
111-
end
111+
compiled? =
112+
cond do
113+
not is_nil(opts[:compile]) ->
114+
do_compile(dep, config)
112115

113-
if compiled? do
114-
build_path = Mix.Project.build_path(config)
116+
Mix.Dep.mix?(dep) ->
117+
do_mix(dep, config)
115118

116-
lazy_message = fn ->
117-
info = %{
118-
app: dep.app,
119-
scm: dep.scm,
120-
manager: dep.manager,
121-
os_pid: System.pid()
122-
}
119+
Mix.Dep.make?(dep) ->
120+
do_make(dep, config)
123121

124-
{:dep_compiled, info}
125-
end
122+
dep.manager == :rebar3 ->
123+
do_rebar3(dep, config)
126124

127-
Mix.Sync.PubSub.broadcast(build_path, lazy_message)
128-
end
125+
true ->
126+
Mix.shell().error(
127+
"Could not compile #{inspect(app)}, no \"mix.exs\", \"rebar.config\" or \"Makefile\" " <>
128+
"(pass :compile as an option to customize compilation, set it to \"false\" to do nothing)"
129+
)
129130

130-
# We should touch fetchable dependencies even if they
131-
# did not compile otherwise they will always be marked
132-
# as stale, even when there is nothing to do.
133-
fetchable? = touch_fetchable(scm, opts[:build])
131+
false
132+
end
134133

135-
compiled? and fetchable?
134+
if compiled? do
135+
config
136+
|> Mix.Project.build_path()
137+
|> Mix.Sync.PubSub.broadcast(fn ->
138+
info = %{
139+
app: dep.app,
140+
scm: dep.scm,
141+
manager: dep.manager,
142+
os_pid: System.pid()
143+
}
144+
145+
{:dep_compiled, info}
136146
end)
137-
138-
if true in compiled, do: Mix.Task.run("will_recompile"), else: :ok
139-
end
140-
141-
defp maybe_clean(dep, opts) do
142-
# If a dependency was marked as fetched or with an out of date lock
143-
# or missing the app file, we always compile it from scratch.
144-
if Keyword.get(opts, :force, false) or Mix.Dep.compilable?(dep) do
145-
File.rm_rf!(Path.join([Mix.Project.build_path(), "lib", Atom.to_string(dep.app)]))
146147
end
147-
end
148148

149-
defp touch_fetchable(scm, path) do
150-
if scm.fetchable?() do
151-
path = Path.join(path, ".mix")
152-
File.mkdir_p!(path)
153-
File.touch!(Path.join(path, "compile.fetch"))
154-
true
155-
else
156-
false
157-
end
149+
# We should touch fetchable dependencies even if they
150+
# did not compile otherwise they will always be marked
151+
# as stale, even when there is nothing to do.
152+
fetchable? = touch_fetchable(scm, opts[:build])
153+
compiled? and fetchable?
158154
end
159155

160156
defp check_unavailable!(app, scm, {:unavailable, path}) do
@@ -176,6 +172,17 @@ defmodule Mix.Tasks.Deps.Compile do
176172
:ok
177173
end
178174

175+
defp touch_fetchable(scm, path) do
176+
if scm.fetchable?() do
177+
path = Path.join(path, ".mix")
178+
File.mkdir_p!(path)
179+
File.touch!(Path.join(path, "compile.fetch"))
180+
true
181+
else
182+
false
183+
end
184+
end
185+
179186
defp do_mix(dep, _config) do
180187
Mix.Dep.in_dependency(dep, fn _ ->
181188
config = Mix.Project.config()
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
defmodule Mix.Tasks.Deps.Parallel do
2+
@moduledoc false
3+
use Mix.Task
4+
5+
## Server
6+
7+
def server(deps, count, force?) do
8+
elixir =
9+
System.find_executable("elixir") ||
10+
raise "cannot find elixir executable for parallel compilation"
11+
12+
{:ok, socket} = :gen_tcp.listen(0, [:binary, packet: :line, active: true, reuseaddr: true])
13+
{:ok, {ip, port}} = :inet.sockname(socket)
14+
ansi_flag = if IO.ANSI.enabled?(), do: ~c"--color", else: ~c"--no-color"
15+
force_flag = if force?, do: ~c"--force", else: ~c"--no-force"
16+
17+
args = [
18+
ansi_flag,
19+
~c"-e",
20+
~c"Mix.CLI.main()",
21+
~c"deps.parallel",
22+
force_flag,
23+
~c"--port",
24+
Integer.to_charlist(port),
25+
~c"--host",
26+
:inet.ntoa(ip)
27+
]
28+
29+
options = [
30+
:binary,
31+
:hide,
32+
:use_stdio,
33+
:stderr_to_stdout,
34+
line: 1_000_000,
35+
args: args,
36+
env: [{~c"MIX_OS_CONCURRENCY_LOCK", ~c"false"}]
37+
]
38+
39+
clients =
40+
Enum.map(1..count//1, fn index ->
41+
if Mix.debug?() do
42+
IO.puts("-> Starting mix deps.parallel ##{index}")
43+
end
44+
45+
port = Port.open({:spawn_executable, String.to_charlist(elixir)}, options)
46+
47+
case :gen_tcp.accept(socket, 15000) do
48+
{:ok, client} ->
49+
%{port: port, index: index, socket: client}
50+
51+
error ->
52+
raise """
53+
could not start parallel dependency compiler, no connection made to TCP port: #{inspect(error)}
54+
55+
The spawned operating system process wrote the following output:
56+
#{collect_data(port, "")}
57+
"""
58+
end
59+
end)
60+
61+
send_deps_and_server_loop(clients, [], deps, [])
62+
end
63+
64+
defp send_deps_and_server_loop(available, busy, deps, completed) do
65+
{available, busy, deps} = send_deps(available, busy, deps, completed)
66+
server_loop(available, busy, deps, completed)
67+
end
68+
69+
defp send_deps([client | available], busy, deps, completed) do
70+
case pop_with(deps, fn dep -> Enum.all?(dep.deps, &Keyword.has_key?(completed, &1.app)) end) do
71+
:error ->
72+
{[client | available], busy, deps}
73+
74+
{dep, deps} ->
75+
if Mix.debug?() do
76+
Mix.shell().info("-- Sending #{dep.app} to mix deps.parallel #{client.index}")
77+
end
78+
79+
:gen_tcp.send(client.socket, "#{dep.app}\n")
80+
send_deps(available, [client | busy], deps, completed)
81+
end
82+
end
83+
84+
defp send_deps([], busy, deps, _completed) do
85+
{[], busy, deps}
86+
end
87+
88+
defp server_loop(available, _busy = [], _deps = [], completed) do
89+
shutdown_clients(available)
90+
Enum.any?(completed, &(elem(&1, 1) == true))
91+
end
92+
93+
defp server_loop(available, busy, deps, completed) do
94+
receive do
95+
{:tcp, socket, data} ->
96+
[app, status] = data |> String.trim() |> String.split(":") |> Enum.map(&String.to_atom/1)
97+
deps = Enum.reject(deps, &(&1.app == app))
98+
{client, busy} = pop_with(busy, &(&1.socket == socket))
99+
100+
if Mix.debug?() do
101+
Mix.shell().info("-- mix deps.parallel #{client.index} compiled #{app}")
102+
end
103+
104+
send_deps_and_server_loop([client | available], busy, deps, [{app, status} | completed])
105+
106+
{:tcp_closed, socket} ->
107+
shutdown_clients(available ++ busy)
108+
raise "socket #{inspect(socket)} closed unexpectedly"
109+
110+
{:tcp_error, socket, error} ->
111+
shutdown_clients(available ++ busy)
112+
raise "socket #{inspect(socket)} errored: #{inspect(error)}"
113+
114+
{port, {:data, {eol, data}}} ->
115+
with %{index: index} <-
116+
Enum.find(busy, &(&1.port == port)) || Enum.find(available, &(&1.port == port)) do
117+
terminator = if eol == :eol, do: "\n", else: ""
118+
IO.write([Integer.to_string(index), "> ", data, terminator])
119+
end
120+
121+
server_loop(available, busy, deps, completed)
122+
end
123+
end
124+
125+
defp pop_with(list, fun) do
126+
case Enum.split_while(list, &(not fun.(&1))) do
127+
{_, []} -> :error
128+
{pre, [result | post]} -> {result, pre ++ post}
129+
end
130+
end
131+
132+
defp shutdown_clients(clients) do
133+
Enum.each(clients, fn %{socket: socket, port: port, index: index} ->
134+
if Mix.debug?() do
135+
IO.puts("-> Closing mix deps.parallel ##{index}")
136+
end
137+
138+
_ = :gen_tcp.close(socket)
139+
IO.write(collect_data(port, "#{index}> "))
140+
end)
141+
end
142+
143+
defp collect_data(port, prefix) do
144+
receive do
145+
{^port, {:data, {:eol, data}}} -> [prefix, data, ?\n | collect_data(port, prefix)]
146+
{^port, {:data, {:noeol, data}}} -> [data | collect_data(port, prefix)]
147+
after
148+
0 -> []
149+
end
150+
end
151+
152+
## Client
153+
154+
@switches [port: :integer, host: :string, force: :boolean]
155+
156+
@impl true
157+
def run(args) do
158+
# If stdin closes, we shutdown the VM
159+
spawn(fn ->
160+
_ = IO.gets("")
161+
System.halt(0)
162+
end)
163+
164+
{opts, []} = OptionParser.parse!(args, strict: @switches)
165+
host = Keyword.fetch!(opts, :host)
166+
port = Keyword.fetch!(opts, :port)
167+
force? = Keyword.get(opts, :force, false)
168+
169+
{:ok, socket} =
170+
:gen_tcp.connect(String.to_charlist(host), port, [:binary, packet: :line, active: false])
171+
172+
deps = Mix.Dep.load_and_cache()
173+
client_loop(socket, deps, force?, Mix.Project.deps_config())
174+
end
175+
176+
def client_loop(socket, deps, force?, config) do
177+
case :gen_tcp.recv(socket, 0, :infinity) do
178+
{:ok, app} ->
179+
app = app |> String.trim() |> String.to_atom()
180+
181+
dep =
182+
Enum.find(deps, &(&1.app == app)) || raise "could not find dependency #{inspect(app)}"
183+
184+
compiled? = Mix.Tasks.Deps.Compile.compile_single(dep, force?, config)
185+
:ok = :gen_tcp.send(socket, "#{app}:#{compiled?}\n")
186+
client_loop(socket, deps, force?, config)
187+
188+
{:error, :closed} ->
189+
:ok
190+
end
191+
end
192+
end

0 commit comments

Comments
 (0)