Skip to content

Commit 58c17f6

Browse files
committed
🌱 addr.Suggest should lock a file instead of memory
Envtest is often running in parallel when using go test, which spins up multiple indipendent go test processes that cannot talk to each other. The address suggestion code, mostly used to find an open port, can cause port collisions and a race condition between different envtests running at the same time. This change switches the internal memory to use a file based system that creates a file. Signed-off-by: Vince Prignano <[email protected]>
1 parent 7d83250 commit 58c17f6

File tree

8 files changed

+166
-46
lines changed

8 files changed

+166
-46
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/prometheus/client_model v0.2.0
1717
go.uber.org/goleak v1.1.10
1818
go.uber.org/zap v1.17.0
19+
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40
1920
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
2021
gomodules.xyz/jsonpatch/v2 v2.2.0
2122
google.golang.org/appengine v1.6.7 // indirect

hack/check-everything.sh

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ source ${hack_dir}/common.sh
2424
tmp_root=/tmp
2525
kb_root_dir=$tmp_root/kubebuilder
2626

27-
ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION:-"1.21.2"}
27+
# Run verification scripts.
28+
${hack_dir}/verify.sh
2829

29-
# set up envtest tools if necessary
30+
# Envtest.
31+
ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION:-"1.21.2"}
3032

3133
header_text "installing envtest tools@${ENVTEST_K8S_VERSION} with setup-envtest if necessary"
3234
tmp_bin=/tmp/cr-tests-bin
@@ -35,9 +37,9 @@ tmp_bin=/tmp/cr-tests-bin
3537
cd ${hack_dir}/../tools/setup-envtest
3638
GOBIN=${tmp_bin} go install .
3739
)
38-
source <(${tmp_bin}/setup-envtest use --use-env -p env ${ENVTEST_K8S_VERSION})
40+
export KUBEBUILDER_ASSETS="$(${tmp_bin}/setup-envtest use --use-env -p path "${ENVTEST_K8S_VERSION}")"
3941

40-
${hack_dir}/verify.sh
42+
# Run tests.
4143
${hack_dir}/test-all.sh
4244

4345
header_text "confirming examples compile (via go install)"

hack/verify.sh

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,9 @@ make generate
2727
header_text "running golangci-lint"
2828
make lint
2929

30-
header_text "verifying modules"
31-
make modules verify-modules
30+
# Only run module verification in CI, otherwise updating
31+
# go module locally (which is a valid operation) causes `make test` to fail.
32+
if [[ -n ${CI} ]]; then
33+
header_text "verifying modules"
34+
make modules verify-modules
35+
fi

pkg/internal/flock/doc.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package flock is copied from k8s.io/kubernetes/pkg/util/flock to avoid
18+
// importing k8s.io/kubernetes as a dependency.
19+
//
20+
// Provides file locking functionalities on unix systems.
21+
package flock

pkg/internal/flock/flock_other.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// +build !linux,!darwin,!freebsd,!openbsd,!netbsd,!dragonfly
2+
3+
/*
4+
Copyright 2016 The Kubernetes Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package flock
20+
21+
// Acquire is not implemented on non-unix systems.
22+
func Acquire(path string) error {
23+
return nil
24+
}

pkg/internal/flock/flock_unix.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// +build linux darwin freebsd openbsd netbsd dragonfly
2+
3+
/*
4+
Copyright 2016 The Kubernetes Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package flock
20+
21+
import "golang.org/x/sys/unix"
22+
23+
// Acquire acquires a lock on a file for the duration of the process. This method
24+
// is reentrant.
25+
func Acquire(path string) error {
26+
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
27+
if err != nil {
28+
return err
29+
}
30+
31+
// We don't need to close the fd since we should hold
32+
// it until the process exits.
33+
34+
return unix.Flock(fd, unix.LOCK_EX)
35+
}

pkg/internal/testing/addr/manager.go

Lines changed: 67 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,78 +18,109 @@ package addr
1818

1919
import (
2020
"fmt"
21+
"io/fs"
2122
"net"
22-
"sync"
23+
"os"
24+
"path/filepath"
25+
"strings"
2326
"time"
27+
28+
"sigs.k8s.io/controller-runtime/pkg/internal/flock"
2429
)
2530

2631
// TODO(directxman12): interface / release functionality for external port managers
2732

2833
const (
29-
portReserveTime = 1 * time.Minute
34+
portReserveTime = 10 * time.Minute
3035
portConflictRetry = 100
36+
portFilePrefix = "port-"
37+
)
38+
39+
var (
40+
cacheDir string
3141
)
3242

33-
type portCache struct {
34-
lock sync.Mutex
35-
ports map[int]time.Time
43+
func init() {
44+
baseDir, err := os.UserCacheDir()
45+
if err != nil {
46+
baseDir = os.TempDir()
47+
}
48+
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
49+
if err := os.MkdirAll(cacheDir, 0750); err != nil {
50+
panic(err)
51+
}
3652
}
3753

38-
func (c *portCache) add(port int) bool {
39-
c.lock.Lock()
40-
defer c.lock.Unlock()
41-
// remove outdated port
42-
for p, t := range c.ports {
43-
if time.Since(t) > portReserveTime {
44-
delete(c.ports, p)
54+
type portCache struct{}
55+
56+
func (c *portCache) add(port int) (bool, error) {
57+
// Remove outdated ports.
58+
if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error {
59+
if err != nil {
60+
return err
61+
}
62+
if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) {
63+
return nil
64+
}
65+
info, err := d.Info()
66+
if err != nil {
67+
return err
4568
}
69+
if time.Since(info.ModTime()) > portReserveTime {
70+
if err := os.Remove(filepath.Join(cacheDir, path)); err != nil {
71+
return err
72+
}
73+
}
74+
return nil
75+
}); err != nil {
76+
return false, err
4677
}
47-
// try allocating new port
48-
if _, ok := c.ports[port]; ok {
49-
return false
78+
// Try allocating new port, by acquiring a file.
79+
if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) {
80+
return false, nil
81+
} else if err != nil {
82+
return false, err
5083
}
51-
c.ports[port] = time.Now()
52-
return true
84+
return true, nil
5385
}
5486

55-
var cache = &portCache{
56-
ports: make(map[int]time.Time),
57-
}
87+
var cache = &portCache{}
5888

59-
func suggest(listenHost string) (port int, resolvedHost string, err error) {
89+
func suggest(listenHost string) (int, string, error) {
6090
if listenHost == "" {
6191
listenHost = "localhost"
6292
}
6393
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
6494
if err != nil {
65-
return
95+
return -1, "", err
6696
}
6797
l, err := net.ListenTCP("tcp", addr)
6898
if err != nil {
69-
return
99+
return -1, "", err
100+
}
101+
if err := l.Close(); err != nil {
102+
return -1, "", err
70103
}
71-
port = l.Addr().(*net.TCPAddr).Port
72-
defer func() {
73-
err = l.Close()
74-
}()
75-
resolvedHost = addr.IP.String()
76-
return
104+
return l.Addr().(*net.TCPAddr).Port,
105+
addr.IP.String(),
106+
nil
77107
}
78108

79109
// Suggest suggests an address a process can listen on. It returns
80110
// a tuple consisting of a free port and the hostname resolved to its IP.
81111
// It makes sure that new port allocated does not conflict with old ports
82112
// allocated within 1 minute.
83-
func Suggest(listenHost string) (port int, resolvedHost string, err error) {
113+
func Suggest(listenHost string) (int, string, error) {
84114
for i := 0; i < portConflictRetry; i++ {
85-
port, resolvedHost, err = suggest(listenHost)
115+
port, resolvedHost, err := suggest(listenHost)
86116
if err != nil {
87-
return
117+
return -1, "", err
88118
}
89-
if cache.add(port) {
90-
return
119+
if ok, err := cache.add(port); ok {
120+
return port, resolvedHost, nil
121+
} else if err != nil {
122+
return -1, "", err
91123
}
92124
}
93-
err = fmt.Errorf("no free ports found after %d retries", portConflictRetry)
94-
return
125+
return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry)
95126
}

pkg/internal/testing/process/process.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,12 @@ func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh
248248
// Stop stops this process gracefully, waits for its termination, and cleans up
249249
// the CertDir if necessary.
250250
func (ps *State) Stop() error {
251+
// Always clear the directory if we need to.
252+
defer func() {
253+
if ps.DirNeedsCleaning {
254+
_ = os.RemoveAll(ps.Dir)
255+
}
256+
}()
251257
if ps.Cmd == nil {
252258
return nil
253259
}
@@ -267,9 +273,5 @@ func (ps *State) Stop() error {
267273
return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
268274
}
269275
ps.ready = false
270-
if ps.DirNeedsCleaning {
271-
return os.RemoveAll(ps.Dir)
272-
}
273-
274276
return nil
275277
}

0 commit comments

Comments
 (0)