Skip to content

Commit ecb330a

Browse files
committed
add initial CSI leader election library
1 parent 690c21a commit ecb330a

File tree

2 files changed

+242
-0
lines changed

2 files changed

+242
-0
lines changed

leaderelection/leader_election.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
"regexp"
24+
"time"
25+
26+
"k8s.io/api/core/v1"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/kubernetes/scheme"
29+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
30+
"k8s.io/client-go/tools/leaderelection"
31+
"k8s.io/client-go/tools/leaderelection/resourcelock"
32+
"k8s.io/client-go/tools/record"
33+
"k8s.io/klog"
34+
)
35+
36+
const (
37+
defaultLeaseDuration = 15 * time.Second
38+
defaultRenewDeadline = 10 * time.Second
39+
defaultRetryPeriod = 5 * time.Second
40+
)
41+
42+
// leaderElection is a convenience wrapper around client-go's leader election library.
43+
type leaderElection struct {
44+
runFunc func(ctx context.Context)
45+
46+
// the lockName identifies the leader election config and should be shared across all members
47+
lockName string
48+
// the identity is the unique identity of the currently running member
49+
identity string
50+
// the namespace to store the lock resource
51+
namespace string
52+
// resourceLock defines the type of leaderelection that should be used
53+
// valid options are resourcelock.ConfigMapsResourceLock and resourcelock.EndpointsResourceLock
54+
resourceLock string
55+
56+
leaseDuration time.Duration
57+
renewDeadline time.Duration
58+
retryPeriod time.Duration
59+
60+
clientset kubernetes.Interface
61+
}
62+
63+
// NewLeaderElection returns the default & preferred leader election type
64+
func NewLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
65+
return NewLeasesLeaderElection(clientset, lockName, lockNamespace, runFunc)
66+
}
67+
68+
// NewLeasesLeaderElection returns the Leases implementation of leader election
69+
func NewLeasesLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
70+
return &leaderElection{
71+
runFunc: runFunc,
72+
lockName: lockName,
73+
namespace: lockNamespace,
74+
resourceLock: resourcelock.LeasesResourceLock,
75+
leaseDuration: defaultLeaseDuration,
76+
renewDeadline: defaultRenewDeadline,
77+
retryPeriod: defaultRetryPeriod,
78+
clientset: clientset,
79+
}
80+
}
81+
82+
// NewEndpointsLeaderElection returns the Endpoints implementation of leader election
83+
func NewEndpointsLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
84+
return &leaderElection{
85+
runFunc: runFunc,
86+
lockName: lockName,
87+
namespace: lockNamespace,
88+
resourceLock: resourcelock.EndpointsResourceLock,
89+
leaseDuration: defaultLeaseDuration,
90+
renewDeadline: defaultRenewDeadline,
91+
retryPeriod: defaultRetryPeriod,
92+
clientset: clientset,
93+
}
94+
}
95+
96+
// NewConfigMapLeaderElection returns the ConfigMaps implementation of leader election
97+
func NewConfigMapLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
98+
return &leaderElection{
99+
runFunc: runFunc,
100+
lockName: lockName,
101+
namespace: lockNamespace,
102+
resourceLock: resourcelock.ConfigMapsResourceLock,
103+
leaseDuration: defaultLeaseDuration,
104+
renewDeadline: defaultRenewDeadline,
105+
retryPeriod: defaultRetryPeriod,
106+
clientset: clientset,
107+
}
108+
}
109+
110+
func (l *leaderElection) WithIdentity(identity string) {
111+
l.identity = identity
112+
}
113+
114+
func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) {
115+
l.leaseDuration = leaseDuration
116+
}
117+
118+
func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) {
119+
l.renewDeadline = renewDeadline
120+
}
121+
122+
func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) {
123+
l.retryPeriod = retryPeriod
124+
}
125+
126+
func (l *leaderElection) Run() error {
127+
if l.identity == "" {
128+
id, err := defaultLeaderElectionIdentity()
129+
if err != nil {
130+
return fmt.Errorf("error getting the default leader identity: %v", err)
131+
}
132+
133+
l.identity = id
134+
}
135+
136+
broadcaster := record.NewBroadcaster()
137+
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
138+
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))})
139+
140+
rlConfig := resourcelock.ResourceLockConfig{
141+
Identity: sanitizeName(l.identity),
142+
EventRecorder: eventRecorder,
143+
}
144+
145+
lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig)
146+
if err != nil {
147+
return err
148+
}
149+
150+
leaderConfig := leaderelection.LeaderElectionConfig{
151+
Lock: lock,
152+
LeaseDuration: l.leaseDuration,
153+
RenewDeadline: l.renewDeadline,
154+
RetryPeriod: l.retryPeriod,
155+
Callbacks: leaderelection.LeaderCallbacks{
156+
OnStartedLeading: func(ctx context.Context) {
157+
klog.V(2).Info("became leader, starting")
158+
l.runFunc(ctx)
159+
},
160+
OnStoppedLeading: func() {
161+
klog.Fatal("stopped leading")
162+
},
163+
OnNewLeader: func(identity string) {
164+
klog.V(3).Infof("new leader detected, current leader: %s", identity)
165+
},
166+
},
167+
}
168+
169+
leaderelection.RunOrDie(context.TODO(), leaderConfig)
170+
return nil // should never reach here
171+
}
172+
173+
func defaultLeaderElectionIdentity() (string, error) {
174+
return os.Hostname()
175+
}
176+
177+
// sanitizeName sanitizes the provided string so it can be consumed by leader election library
178+
func sanitizeName(name string) string {
179+
re := regexp.MustCompile("[^a-zA-Z0-9-]")
180+
name = re.ReplaceAllString(name, "-")
181+
if name[len(name)-1] == '-' {
182+
// name must not end with '-'
183+
name = name + "X"
184+
}
185+
return name
186+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"testing"
21+
)
22+
23+
func Test_sanitizeName(t *testing.T) {
24+
tests := []struct {
25+
name string
26+
input string
27+
output string
28+
}{
29+
{
30+
"requires no change",
31+
"test-driver",
32+
"test-driver",
33+
},
34+
{
35+
"has characters that should be replaced",
36+
"test!driver/foo",
37+
"test-driver-foo",
38+
},
39+
{
40+
"has trailing space",
41+
"driver\\",
42+
"driver-X",
43+
},
44+
}
45+
46+
for _, test := range tests {
47+
t.Run(test.name, func(t *testing.T) {
48+
output := sanitizeName(test.input)
49+
if output != test.output {
50+
t.Logf("expected name: %q", test.output)
51+
t.Logf("actual name: %q", output)
52+
t.Errorf("unexpected santized name")
53+
}
54+
})
55+
}
56+
}

0 commit comments

Comments
 (0)