Skip to content

Commit d6a7193

Browse files
committed
add initial CSI leader election library
1 parent 549dd97 commit d6a7193

File tree

2 files changed

+221
-0
lines changed

2 files changed

+221
-0
lines changed

leaderelection/leader_election.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
"regexp"
24+
"time"
25+
26+
"k8s.io/api/core/v1"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/kubernetes/scheme"
29+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
30+
"k8s.io/client-go/tools/leaderelection"
31+
"k8s.io/client-go/tools/leaderelection/resourcelock"
32+
"k8s.io/client-go/tools/record"
33+
"k8s.io/klog"
34+
)
35+
36+
const (
37+
defaultLeaseDuration = 15 * time.Second
38+
defaultRenewDeadline = 10 * time.Second
39+
defaultRetryPeriod = 5 * time.Second
40+
)
41+
42+
// leaderElection is a convenience wrapper around client-go's leader election library.
43+
type leaderElection struct {
44+
runFunc func(ctx context.Context)
45+
46+
// the lockName identifies the leader election config and should be shared across all members
47+
lockName string
48+
// the identity is the unique identity of the currently running member
49+
identity string
50+
// the namespace to store the lock resource
51+
namespace string
52+
// resourceLock defines the type of leaderelection that should be used
53+
// valid options are resourcelock.ConfigMapsResourceLock and resourcelock.EndpointsResourceLock
54+
resourceLock string
55+
56+
leaseDuration time.Duration
57+
renewDeadline time.Duration
58+
retryPeriod time.Duration
59+
60+
clientset kubernetes.Interface
61+
}
62+
63+
func NewConfigMapLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
64+
return &leaderElection{
65+
runFunc: runFunc,
66+
lockName: lockName,
67+
namespace: lockNamespace,
68+
resourceLock: resourcelock.ConfigMapsResourceLock,
69+
leaseDuration: defaultLeaseDuration,
70+
renewDeadline: defaultRenewDeadline,
71+
retryPeriod: defaultRetryPeriod,
72+
clientset: clientset,
73+
}
74+
}
75+
76+
func NewEndpointsLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
77+
return &leaderElection{
78+
runFunc: runFunc,
79+
lockName: lockName,
80+
namespace: lockNamespace,
81+
resourceLock: resourcelock.EndpointsResourceLock,
82+
leaseDuration: defaultLeaseDuration,
83+
renewDeadline: defaultRenewDeadline,
84+
retryPeriod: defaultRetryPeriod,
85+
clientset: clientset,
86+
}
87+
}
88+
89+
func (l *leaderElection) WithIdentity(identity string) {
90+
l.identity = identity
91+
}
92+
93+
func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) {
94+
l.leaseDuration = leaseDuration
95+
}
96+
97+
func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) {
98+
l.renewDeadline = renewDeadline
99+
}
100+
101+
func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) {
102+
l.retryPeriod = retryPeriod
103+
}
104+
105+
func (l *leaderElection) Run() error {
106+
if l.identity == "" {
107+
id, err := defaultLeaderElectionIdentity()
108+
if err != nil {
109+
return fmt.Errorf("error getting the default leader identity: %v", err)
110+
}
111+
112+
l.identity = id
113+
}
114+
115+
broadcaster := record.NewBroadcaster()
116+
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
117+
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))})
118+
119+
rlConfig := resourcelock.ResourceLockConfig{
120+
Identity: sanitizeName(l.identity),
121+
EventRecorder: eventRecorder,
122+
}
123+
124+
lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), rlConfig)
125+
if err != nil {
126+
return err
127+
}
128+
129+
leaderConfig := leaderelection.LeaderElectionConfig{
130+
Lock: lock,
131+
LeaseDuration: l.leaseDuration,
132+
RenewDeadline: l.renewDeadline,
133+
RetryPeriod: l.retryPeriod,
134+
Callbacks: leaderelection.LeaderCallbacks{
135+
OnStartedLeading: func(ctx context.Context) {
136+
klog.V(2).Info("became leader, starting")
137+
l.runFunc(ctx)
138+
},
139+
OnStoppedLeading: func() {
140+
klog.Fatal("stopped leading")
141+
},
142+
OnNewLeader: func(identity string) {
143+
klog.V(3).Infof("new leader detected, current leader: %s", identity)
144+
},
145+
},
146+
}
147+
148+
leaderelection.RunOrDie(context.TODO(), leaderConfig)
149+
return nil // should never reach here
150+
}
151+
152+
func defaultLeaderElectionIdentity() (string, error) {
153+
return os.Hostname()
154+
}
155+
156+
// sanitizeName sanitizes the provided string so it can be consumed by leader election library
157+
func sanitizeName(name string) string {
158+
re := regexp.MustCompile("[^a-zA-Z0-9-]")
159+
name = re.ReplaceAllString(name, "-")
160+
if name[len(name)-1] == '-' {
161+
// name must not end with '-'
162+
name = name + "X"
163+
}
164+
return name
165+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"testing"
21+
)
22+
23+
func Test_sanitizeName(t *testing.T) {
24+
tests := []struct {
25+
name string
26+
input string
27+
output string
28+
}{
29+
{
30+
"requires no change",
31+
"test-driver",
32+
"test-driver",
33+
},
34+
{
35+
"has characters that should be replaced",
36+
"test!driver/foo",
37+
"test-driver-foo",
38+
},
39+
{
40+
"has trailing space",
41+
"driver\\",
42+
"driver-X",
43+
},
44+
}
45+
46+
for _, test := range tests {
47+
t.Run(test.name, func(t *testing.T) {
48+
output := sanitizeName(test.input)
49+
if output != test.output {
50+
t.Logf("expected name: %q", test.output)
51+
t.Logf("actual name: %q", output)
52+
t.Errorf("unexpected santized name")
53+
}
54+
})
55+
}
56+
}

0 commit comments

Comments
 (0)