Skip to content

Commit 7f46cab

Browse files
committed
Implemented parallel task runner
1 parent cdf8c10 commit 7f46cab

File tree

3 files changed

+187
-0
lines changed

3 files changed

+187
-0
lines changed

Diff for: internal/arduino/builder/internal/runner/runner.go

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// This file is part of arduino-cli.
2+
//
3+
// Copyright 2024 ARDUINO SA (http://www.arduino.cc/)
4+
//
5+
// This software is released under the GNU General Public License version 3,
6+
// which covers the main part of arduino-cli.
7+
// The terms of this license can be found at:
8+
// https://www.gnu.org/licenses/gpl-3.0.en.html
9+
//
10+
// You can be released from the requirements of the above licenses by purchasing
11+
// a commercial license. Buying such a license is mandatory if you want to
12+
// modify or otherwise use the software for commercial activities involving the
13+
// Arduino software without disclosing the source code of your own applications.
14+
// To purchase a commercial license, send an email to [email protected].
15+
16+
package runner
17+
18+
import (
19+
"context"
20+
"runtime"
21+
"sync"
22+
)
23+
24+
// Runner is a helper to run commands in a queue, the commands are immediately exectuded
25+
// in a goroutine as they are enqueued. The runner can be stopped by calling Cancel.
26+
type Runner struct {
27+
lock sync.Mutex
28+
queue chan<- *enqueuedCommand
29+
results map[string]<-chan *Result
30+
ctx context.Context
31+
ctxCancel func()
32+
wg sync.WaitGroup
33+
}
34+
35+
type enqueuedCommand struct {
36+
task *Task
37+
accept func(*Result)
38+
}
39+
40+
func (cmd *enqueuedCommand) String() string {
41+
return cmd.task.String()
42+
}
43+
44+
func New(inCtx context.Context) *Runner {
45+
ctx, cancel := context.WithCancel(inCtx)
46+
queue := make(chan *enqueuedCommand, 1000)
47+
r := &Runner{
48+
ctx: ctx,
49+
ctxCancel: cancel,
50+
queue: queue,
51+
results: map[string]<-chan *Result{},
52+
}
53+
54+
// Spawn workers
55+
for i := 0; i < runtime.NumCPU(); i++ {
56+
r.wg.Add(1)
57+
go func() {
58+
worker(ctx, queue)
59+
r.wg.Done()
60+
}()
61+
}
62+
63+
return r
64+
}
65+
66+
func worker(ctx context.Context, queue <-chan *enqueuedCommand) {
67+
done := ctx.Done()
68+
for {
69+
select {
70+
case <-done:
71+
return
72+
default:
73+
}
74+
75+
select {
76+
case <-done:
77+
return
78+
case cmd := <-queue:
79+
result := cmd.task.Run(ctx)
80+
cmd.accept(result)
81+
}
82+
}
83+
}
84+
85+
func (r *Runner) Enqueue(task *Task) {
86+
r.lock.Lock()
87+
defer r.lock.Unlock()
88+
89+
result := make(chan *Result, 1)
90+
r.results[task.String()] = result
91+
r.queue <- &enqueuedCommand{
92+
task: task,
93+
accept: func(res *Result) {
94+
result <- res
95+
},
96+
}
97+
}
98+
99+
func (r *Runner) Results(task *Task) *Result {
100+
r.lock.Lock()
101+
result, ok := r.results[task.String()]
102+
r.lock.Unlock()
103+
if !ok {
104+
return nil
105+
}
106+
return <-result
107+
}
108+
109+
func (r *Runner) Cancel() {
110+
r.ctxCancel()
111+
r.wg.Wait()
112+
}
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package runner_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/arduino/arduino-cli/internal/arduino/builder/internal/runner"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestRunMultipleTask(t *testing.T) {
14+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
15+
defer cancel()
16+
r := runner.New(ctx)
17+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 1 ; echo -n 0"))
18+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 2 ; echo -n 1"))
19+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 3 ; echo -n 2"))
20+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 4 ; echo -n 3"))
21+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 5 ; echo -n 4"))
22+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 6 ; echo -n 5"))
23+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 7 ; echo -n 6"))
24+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 8 ; echo -n 7"))
25+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 9 ; echo -n 8"))
26+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 10 ; echo -n 9"))
27+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 11 ; echo -n 10"))
28+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 12 ; echo -n 11"))
29+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 13 ; echo -n 12"))
30+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 14 ; echo -n 13"))
31+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 15 ; echo -n 14"))
32+
r.Enqueue(runner.NewTask("bash", "-c", "sleep 16 ; echo -n 15"))
33+
require.Nil(t, r.Results(runner.NewTask("bash", "-c", "echo -n 5")))
34+
fmt.Println(string(r.Results(runner.NewTask("bash", "-c", "sleep 3 ; echo -n 2")).Stdout))
35+
fmt.Println("Cancelling")
36+
r.Cancel()
37+
fmt.Println("Runner completed")
38+
}

Diff for: internal/arduino/builder/internal/runner/task.go

+37
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,46 @@
1515

1616
package runner
1717

18+
import (
19+
"context"
20+
"fmt"
21+
"strings"
22+
23+
"github.com/arduino/go-paths-helper"
24+
)
25+
26+
// Task is a command to be executed
27+
type Task struct {
28+
Args []string `json:"args"`
29+
}
30+
31+
// NewTask creates a new Task
32+
func NewTask(args ...string) *Task {
33+
return &Task{Args: args}
34+
}
35+
36+
func (t *Task) String() string {
37+
return strings.Join(t.Args, " ")
38+
}
39+
1840
// Result contains the output of a command execution
1941
type Result struct {
2042
Args []string
2143
Stdout []byte
2244
Stderr []byte
45+
Error error
46+
}
47+
48+
// Run executes the command and returns the result
49+
func (t *Task) Run(ctx context.Context) *Result {
50+
proc, err := paths.NewProcess(nil, t.Args...)
51+
if err != nil {
52+
return &Result{Args: t.Args, Error: err}
53+
}
54+
stdout, stderr, err := proc.RunAndCaptureOutput(ctx)
55+
56+
// Append arguments to stdout
57+
stdout = append([]byte(fmt.Sprintln(t)), stdout...)
58+
59+
return &Result{Args: proc.GetArgs(), Stdout: stdout, Stderr: stderr, Error: err}
2360
}

0 commit comments

Comments
 (0)