@@ -20,6 +20,7 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"io/ioutil"
23
+ "net/http"
23
24
"os"
24
25
"regexp"
25
26
"strings"
@@ -39,6 +40,9 @@ const (
39
40
defaultLeaseDuration = 15 * time .Second
40
41
defaultRenewDeadline = 10 * time .Second
41
42
defaultRetryPeriod = 5 * time .Second
43
+ healthCheckTimeout = 20 * time .Second
44
+
45
+ HealthCheckerAddress = "/healthz/leader-election"
42
46
)
43
47
44
48
// leaderElection is a convenience wrapper around client-go's leader election library.
@@ -55,6 +59,9 @@ type leaderElection struct {
55
59
// valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock,
56
60
// and resourcelock.ConfigMapsResourceLock
57
61
resourceLock string
62
+ // healthCheck reports unhealthy if leader election fails to renew leadership
63
+ // within a timeout period.
64
+ healthCheck * leaderelection.HealthzAdaptor
58
65
59
66
leaseDuration time.Duration
60
67
renewDeadline time.Duration
@@ -76,6 +83,7 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string
76
83
runFunc : runFunc ,
77
84
lockName : lockName ,
78
85
resourceLock : resourcelock .LeasesResourceLock ,
86
+ healthCheck : leaderelection .NewLeaderHealthzAdaptor (healthCheckTimeout ),
79
87
leaseDuration : defaultLeaseDuration ,
80
88
renewDeadline : defaultRenewDeadline ,
81
89
retryPeriod : defaultRetryPeriod ,
@@ -89,6 +97,7 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName str
89
97
runFunc : runFunc ,
90
98
lockName : lockName ,
91
99
resourceLock : resourcelock .EndpointsResourceLock ,
100
+ healthCheck : leaderelection .NewLeaderHealthzAdaptor (healthCheckTimeout ),
92
101
leaseDuration : defaultLeaseDuration ,
93
102
renewDeadline : defaultRenewDeadline ,
94
103
retryPeriod : defaultRetryPeriod ,
@@ -102,6 +111,7 @@ func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName st
102
111
runFunc : runFunc ,
103
112
lockName : lockName ,
104
113
resourceLock : resourcelock .ConfigMapsResourceLock ,
114
+ healthCheck : leaderelection .NewLeaderHealthzAdaptor (healthCheckTimeout ),
105
115
leaseDuration : defaultLeaseDuration ,
106
116
renewDeadline : defaultRenewDeadline ,
107
117
retryPeriod : defaultRetryPeriod ,
@@ -134,6 +144,19 @@ func (l *leaderElection) WithContext(ctx context.Context) {
134
144
l .ctx = ctx
135
145
}
136
146
147
+ // Server represents any type that could serve HTTP requests for the leader
148
+ // election health check endpoint.
149
+ type Server interface {
150
+ Handle (pattern string , handler http.Handler )
151
+ }
152
+
153
+ // RegisterHealthCheck creates a health check for this leader election object and
154
+ // registers its HTTP handler to the given server at the path specified by the
155
+ // constant "healthCheckerAddress".
156
+ func (l * leaderElection ) RegisterHealthCheck (s Server ) {
157
+ s .Handle (HealthCheckerAddress , adaptCheckToHandler (l .healthCheck .Check ))
158
+ }
159
+
137
160
func (l * leaderElection ) Run () error {
138
161
if l .identity == "" {
139
162
id , err := defaultLeaderElectionIdentity ()
@@ -179,6 +202,7 @@ func (l *leaderElection) Run() error {
179
202
klog .V (3 ).Infof ("new leader detected, current leader: %s" , identity )
180
203
},
181
204
},
205
+ WatchDog : l .healthCheck ,
182
206
}
183
207
184
208
ctx := l .ctx
@@ -220,3 +244,15 @@ func inClusterNamespace() string {
220
244
221
245
return "default"
222
246
}
247
+
248
+ // adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
249
+ func adaptCheckToHandler (c func (r * http.Request ) error ) http.HandlerFunc {
250
+ return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
251
+ err := c (r )
252
+ if err != nil {
253
+ http .Error (w , fmt .Sprintf ("internal server error: %v" , err ), http .StatusInternalServerError )
254
+ } else {
255
+ fmt .Fprint (w , "ok" )
256
+ }
257
+ })
258
+ }
0 commit comments