Skip to content

Commit bc2146c

Browse files
committed
Add health checker to leader election library
1 parent a833d13 commit bc2146c

File tree

3 files changed

+98
-39
lines changed

3 files changed

+98
-39
lines changed

leaderelection/leader_election.go

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"io/ioutil"
23+
"net/http"
2324
"os"
2425
"regexp"
2526
"strings"
@@ -39,6 +40,14 @@ const (
3940
defaultLeaseDuration = 15 * time.Second
4041
defaultRenewDeadline = 10 * time.Second
4142
defaultRetryPeriod = 5 * time.Second
43+
44+
DefaultHealthCheckTimeout = 20 * time.Second
45+
46+
// HealthCheckerAddress is the address at which the leader election health
47+
// checker reports status.
48+
// The caller sidecar should document this address in appropriate flag
49+
// descriptions.
50+
HealthCheckerAddress = "/healthz/leader-election"
4251
)
4352

4453
// leaderElection is a convenience wrapper around client-go's leader election library.
@@ -55,6 +64,9 @@ type leaderElection struct {
5564
// valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock,
5665
// and resourcelock.ConfigMapsResourceLock
5766
resourceLock string
67+
// healthCheck reports unhealthy if leader election fails to renew leadership
68+
// within a timeout period.
69+
healthCheck *leaderelection.HealthzAdaptor
5870

5971
leaseDuration time.Duration
6072
renewDeadline time.Duration
@@ -66,16 +78,33 @@ type leaderElection struct {
6678
}
6779

6880
// NewLeaderElection returns the default & preferred leader election type
69-
func NewLeaderElection(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
70-
return NewLeaderElectionWithLeases(clientset, lockName, runFunc)
81+
// healthCheckTimeout determines the max duration beyond lease expiration
82+
// allowed before reporting unhealthy.
83+
func NewLeaderElection(
84+
clientset kubernetes.Interface,
85+
lockName string,
86+
healthCheckTimeout time.Duration,
87+
runFunc func(ctx context.Context)) *leaderElection {
88+
return NewLeaderElectionWithLeases(
89+
clientset,
90+
lockName,
91+
healthCheckTimeout,
92+
runFunc)
7193
}
7294

7395
// NewLeaderElectionWithLeases returns an implementation of leader election using Leases
74-
func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
96+
// healthCheckTimeout determines the max duration beyond lease expiration
97+
// allowed before reporting unhealthy.
98+
func NewLeaderElectionWithLeases(
99+
clientset kubernetes.Interface,
100+
lockName string,
101+
healthCheckTimeout time.Duration,
102+
runFunc func(ctx context.Context)) *leaderElection {
75103
return &leaderElection{
76104
runFunc: runFunc,
77105
lockName: lockName,
78106
resourceLock: resourcelock.LeasesResourceLock,
107+
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
79108
leaseDuration: defaultLeaseDuration,
80109
renewDeadline: defaultRenewDeadline,
81110
retryPeriod: defaultRetryPeriod,
@@ -84,11 +113,18 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string
84113
}
85114

86115
// NewLeaderElectionWithEndpoints returns an implementation of leader election using Endpoints
87-
func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
116+
// healthCheckTimeout determines the max duration beyond lease expiration
117+
// allowed before reporting unhealthy.
118+
func NewLeaderElectionWithEndpoints(
119+
clientset kubernetes.Interface,
120+
lockName string,
121+
healthCheckTimeout time.Duration,
122+
runFunc func(ctx context.Context)) *leaderElection {
88123
return &leaderElection{
89124
runFunc: runFunc,
90125
lockName: lockName,
91126
resourceLock: resourcelock.EndpointsResourceLock,
127+
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
92128
leaseDuration: defaultLeaseDuration,
93129
renewDeadline: defaultRenewDeadline,
94130
retryPeriod: defaultRetryPeriod,
@@ -97,11 +133,18 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName str
97133
}
98134

99135
// NewLeaderElectionWithConfigMaps returns an implementation of leader election using ConfigMaps
100-
func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
136+
// healthCheckTimeout determines the max duration beyond lease expiration
137+
// allowed before reporting unhealthy.
138+
func NewLeaderElectionWithConfigMaps(
139+
clientset kubernetes.Interface,
140+
lockName string,
141+
healthCheckTimeout time.Duration,
142+
runFunc func(ctx context.Context)) *leaderElection {
101143
return &leaderElection{
102144
runFunc: runFunc,
103145
lockName: lockName,
104146
resourceLock: resourcelock.ConfigMapsResourceLock,
147+
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
105148
leaseDuration: defaultLeaseDuration,
106149
renewDeadline: defaultRenewDeadline,
107150
retryPeriod: defaultRetryPeriod,
@@ -134,6 +177,21 @@ func (l *leaderElection) WithContext(ctx context.Context) {
134177
l.ctx = ctx
135178
}
136179

180+
// Server represents any type that could serve HTTP requests for the leader
181+
// election health check endpoint.
182+
type Server interface {
183+
Handle(pattern string, handler http.Handler)
184+
}
185+
186+
// RegisterHealthCheck creates a health check for this leader election object
187+
// and registers its HTTP handler to the given server at the path specified by
188+
// the constant "healthCheckerAddress".
189+
// The caller sidecar should document the handler address in appropriate flag
190+
// descriptions.
191+
func (l *leaderElection) RegisterHealthCheck(s Server) {
192+
s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check))
193+
}
194+
137195
func (l *leaderElection) Run() error {
138196
if l.identity == "" {
139197
id, err := defaultLeaderElectionIdentity()
@@ -179,6 +237,7 @@ func (l *leaderElection) Run() error {
179237
klog.V(3).Infof("new leader detected, current leader: %s", identity)
180238
},
181239
},
240+
WatchDog: l.healthCheck,
182241
}
183242

184243
ctx := l.ctx
@@ -220,3 +279,15 @@ func inClusterNamespace() string {
220279

221280
return "default"
222281
}
282+
283+
// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
284+
func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc {
285+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
286+
err := c(r)
287+
if err != nil {
288+
http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError)
289+
} else {
290+
fmt.Fprint(w, "ok")
291+
}
292+
})
293+
}

metrics/metrics.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
2929
"k8s.io/component-base/metrics"
30-
"k8s.io/klog/v2"
3130
)
3231

3332
const (
@@ -90,10 +89,15 @@ type CSIMetricsManager interface {
9089
// driverName - Name of the CSI driver against which this operation was executed.
9190
SetDriverName(driverName string)
9291

93-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
94-
// for this metrics manager.
95-
// If the metricsAddress is an empty string, this will be a no op.
96-
StartMetricsEndpoint(metricsAddress, metricsPath string)
92+
// RegisterToServer registers an HTTP handler for this metrics manager to the
93+
// given server at the specified address/path.
94+
RegisterToServer(s Server, metricsPath string)
95+
}
96+
97+
// Server represents any type that could serve HTTP requests for the metrics
98+
// endpoint.
99+
type Server interface {
100+
Handle(pattern string, handler http.Handler)
97101
}
98102

99103
// MetricsManagerOption is used to pass optional configuration to a
@@ -325,27 +329,13 @@ func (cmm *csiMetricsManager) SetDriverName(driverName string) {
325329
}
326330
}
327331

328-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
329-
// for this metrics manager on a new go routine.
330-
// If the metricsAddress is an empty string, this will be a no op.
331-
func (cmm *csiMetricsManager) StartMetricsEndpoint(metricsAddress, metricsPath string) {
332-
if metricsAddress == "" {
333-
klog.Warningf("metrics endpoint will not be started because `metrics-address` was not specified.")
334-
return
335-
}
336-
337-
http.Handle(metricsPath, metrics.HandlerFor(
332+
// RegisterToServer registers an HTTP handler for this metrics manager to the
333+
// given server at the specified address/path.
334+
func (cmm *csiMetricsManager) RegisterToServer(s Server, metricsPath string) {
335+
s.Handle(metricsPath, metrics.HandlerFor(
338336
cmm.GetRegistry(),
339337
metrics.HandlerOpts{
340338
ErrorHandling: metrics.ContinueOnError}))
341-
342-
// Spawn a new go routine to listen on specified endpoint
343-
go func() {
344-
err := http.ListenAndServe(metricsAddress, nil)
345-
if err != nil {
346-
klog.Fatalf("Failed to start prometheus metrics endpoint on specified address (%q) and path (%q): %s", metricsAddress, metricsPath, err)
347-
}
348-
}()
349339
}
350340

351341
// VerifyMetricsMatch is a helper function that verifies that the expected and

metrics/metrics_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package metrics
1919
import (
2020
"io/ioutil"
2121
"net/http"
22+
"net/http/httptest"
2223
"strings"
2324
"testing"
2425
"time"
@@ -481,29 +482,26 @@ func TestRecordMetrics_Negative(t *testing.T) {
481482
}
482483
}
483484

484-
func TestStartMetricsEndPoint_Noop(t *testing.T) {
485+
func TestRegisterToServer_Noop(t *testing.T) {
485486
// Arrange
486487
cmm := NewCSIMetricsManagerForSidecar(
487488
"fake.csi.driver.io" /* driverName */)
488489
operationDuration, _ := time.ParseDuration("20s")
490+
mux := http.NewServeMux()
489491

490492
// Act
491-
cmm.StartMetricsEndpoint(":8080", "/metrics")
493+
cmm.RegisterToServer(mux, "/metrics")
492494
cmm.RecordMetrics(
493495
"/csi.v1.Controller/ControllerGetCapabilities", /* operationName */
494496
nil, /* operationErr */
495497
operationDuration /* operationDuration */)
496498

497499
// Assert
498-
request, err := http.NewRequest("GET", "http://localhost:8080/metrics", strings.NewReader(""))
499-
if err != nil {
500-
t.Fatalf("Creating request for metrics endpoint failed: %v", err)
501-
}
502-
client := &http.Client{}
503-
resp, err := client.Do(request)
504-
if err != nil {
505-
t.Fatalf("Failed to GET metrics. Error: %v", err)
506-
}
500+
request := httptest.NewRequest("GET", "/metrics", strings.NewReader(""))
501+
rec := httptest.NewRecorder()
502+
mux.ServeHTTP(rec, request)
503+
resp := rec.Result()
504+
507505
if resp.StatusCode != 200 {
508506
t.Fatalf("/metrics response status not 200. Response was: %+v", resp)
509507
}

0 commit comments

Comments
 (0)