Skip to content

Commit a8e6d58

Browse files
committed
Add health checker to leader election library
1 parent a833d13 commit a8e6d58

File tree

3 files changed

+74
-34
lines changed

3 files changed

+74
-34
lines changed

leaderelection/leader_election.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"io/ioutil"
23+
"net/http"
2324
"os"
2425
"regexp"
2526
"strings"
@@ -39,6 +40,14 @@ const (
3940
defaultLeaseDuration = 15 * time.Second
4041
defaultRenewDeadline = 10 * time.Second
4142
defaultRetryPeriod = 5 * time.Second
43+
44+
DefaultHealthCheckTimeout = 20 * time.Second
45+
46+
// HealthCheckerAddress is the address at which the leader election health
47+
// checker reports status.
48+
// The caller sidecar should document this address in appropriate flag
49+
// descriptions.
50+
HealthCheckerAddress = "/healthz/leader-election"
4251
)
4352

4453
// leaderElection is a convenience wrapper around client-go's leader election library.
@@ -55,6 +64,9 @@ type leaderElection struct {
5564
// valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock,
5665
// and resourcelock.ConfigMapsResourceLock
5766
resourceLock string
67+
// healthCheck reports unhealthy if leader election fails to renew leadership
68+
// within a timeout period.
69+
healthCheck *leaderelection.HealthzAdaptor
5870

5971
leaseDuration time.Duration
6072
renewDeadline time.Duration
@@ -71,6 +83,8 @@ func NewLeaderElection(clientset kubernetes.Interface, lockName string, runFunc
7183
}
7284

7385
// NewLeaderElectionWithLeases returns an implementation of leader election using Leases
86+
// healthCheckTimeout determines the max duration beyond lease expiration
87+
// allowed before reporting unhealthy.
7488
func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
7589
return &leaderElection{
7690
runFunc: runFunc,
@@ -84,6 +98,8 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string
8498
}
8599

86100
// NewLeaderElectionWithEndpoints returns an implementation of leader election using Endpoints
101+
// healthCheckTimeout determines the max duration beyond lease expiration
102+
// allowed before reporting unhealthy.
87103
func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
88104
return &leaderElection{
89105
runFunc: runFunc,
@@ -97,6 +113,8 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName str
97113
}
98114

99115
// NewLeaderElectionWithConfigMaps returns an implementation of leader election using ConfigMaps
116+
// healthCheckTimeout determines the max duration beyond lease expiration
117+
// allowed before reporting unhealthy.
100118
func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection {
101119
return &leaderElection{
102120
runFunc: runFunc,
@@ -134,6 +152,27 @@ func (l *leaderElection) WithContext(ctx context.Context) {
134152
l.ctx = ctx
135153
}
136154

155+
// Server represents any type that could serve HTTP requests for the leader
156+
// election health check endpoint.
157+
type Server interface {
158+
Handle(pattern string, handler http.Handler)
159+
}
160+
161+
// PrepareHealthCheck creates a health check for this leader election object
162+
// with the given healthCheckTimeout and registers its HTTP handler to the given
163+
// server at the path specified by the constant "healthCheckerAddress".
164+
// healthCheckTimeout determines the max duration beyond lease expiration
165+
// allowed before reporting unhealthy.
166+
// The caller sidecar should document the handler address in appropriate flag
167+
// descriptions.
168+
func (l *leaderElection) PrepareHealthCheck(
169+
s Server,
170+
healthCheckTimeout time.Duration) {
171+
172+
l.healthCheck = leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout)
173+
s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check))
174+
}
175+
137176
func (l *leaderElection) Run() error {
138177
if l.identity == "" {
139178
id, err := defaultLeaderElectionIdentity()
@@ -179,6 +218,7 @@ func (l *leaderElection) Run() error {
179218
klog.V(3).Infof("new leader detected, current leader: %s", identity)
180219
},
181220
},
221+
WatchDog: l.healthCheck,
182222
}
183223

184224
ctx := l.ctx
@@ -220,3 +260,15 @@ func inClusterNamespace() string {
220260

221261
return "default"
222262
}
263+
264+
// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
265+
func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc {
266+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
267+
err := c(r)
268+
if err != nil {
269+
http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError)
270+
} else {
271+
fmt.Fprint(w, "ok")
272+
}
273+
})
274+
}

metrics/metrics.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
2929
"k8s.io/component-base/metrics"
30-
"k8s.io/klog/v2"
3130
)
3231

3332
const (
@@ -90,10 +89,15 @@ type CSIMetricsManager interface {
9089
// driverName - Name of the CSI driver against which this operation was executed.
9190
SetDriverName(driverName string)
9291

93-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
94-
// for this metrics manager.
95-
// If the metricsAddress is an empty string, this will be a no op.
96-
StartMetricsEndpoint(metricsAddress, metricsPath string)
92+
// RegisterToServer registers an HTTP handler for this metrics manager to the
93+
// given server at the specified address/path.
94+
RegisterToServer(s Server, metricsPath string)
95+
}
96+
97+
// Server represents any type that could serve HTTP requests for the metrics
98+
// endpoint.
99+
type Server interface {
100+
Handle(pattern string, handler http.Handler)
97101
}
98102

99103
// MetricsManagerOption is used to pass optional configuration to a
@@ -325,27 +329,13 @@ func (cmm *csiMetricsManager) SetDriverName(driverName string) {
325329
}
326330
}
327331

328-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
329-
// for this metrics manager on a new go routine.
330-
// If the metricsAddress is an empty string, this will be a no op.
331-
func (cmm *csiMetricsManager) StartMetricsEndpoint(metricsAddress, metricsPath string) {
332-
if metricsAddress == "" {
333-
klog.Warningf("metrics endpoint will not be started because `metrics-address` was not specified.")
334-
return
335-
}
336-
337-
http.Handle(metricsPath, metrics.HandlerFor(
332+
// RegisterToServer registers an HTTP handler for this metrics manager to the
333+
// given server at the specified address/path.
334+
func (cmm *csiMetricsManager) RegisterToServer(s Server, metricsPath string) {
335+
s.Handle(metricsPath, metrics.HandlerFor(
338336
cmm.GetRegistry(),
339337
metrics.HandlerOpts{
340338
ErrorHandling: metrics.ContinueOnError}))
341-
342-
// Spawn a new go routine to listen on specified endpoint
343-
go func() {
344-
err := http.ListenAndServe(metricsAddress, nil)
345-
if err != nil {
346-
klog.Fatalf("Failed to start prometheus metrics endpoint on specified address (%q) and path (%q): %s", metricsAddress, metricsPath, err)
347-
}
348-
}()
349339
}
350340

351341
// VerifyMetricsMatch is a helper function that verifies that the expected and

metrics/metrics_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package metrics
1919
import (
2020
"io/ioutil"
2121
"net/http"
22+
"net/http/httptest"
2223
"strings"
2324
"testing"
2425
"time"
@@ -481,29 +482,26 @@ func TestRecordMetrics_Negative(t *testing.T) {
481482
}
482483
}
483484

484-
func TestStartMetricsEndPoint_Noop(t *testing.T) {
485+
func TestRegisterToServer_Noop(t *testing.T) {
485486
// Arrange
486487
cmm := NewCSIMetricsManagerForSidecar(
487488
"fake.csi.driver.io" /* driverName */)
488489
operationDuration, _ := time.ParseDuration("20s")
490+
mux := http.NewServeMux()
489491

490492
// Act
491-
cmm.StartMetricsEndpoint(":8080", "/metrics")
493+
cmm.RegisterToServer(mux, "/metrics")
492494
cmm.RecordMetrics(
493495
"/csi.v1.Controller/ControllerGetCapabilities", /* operationName */
494496
nil, /* operationErr */
495497
operationDuration /* operationDuration */)
496498

497499
// Assert
498-
request, err := http.NewRequest("GET", "http://localhost:8080/metrics", strings.NewReader(""))
499-
if err != nil {
500-
t.Fatalf("Creating request for metrics endpoint failed: %v", err)
501-
}
502-
client := &http.Client{}
503-
resp, err := client.Do(request)
504-
if err != nil {
505-
t.Fatalf("Failed to GET metrics. Error: %v", err)
506-
}
500+
request := httptest.NewRequest("GET", "/metrics", strings.NewReader(""))
501+
rec := httptest.NewRecorder()
502+
mux.ServeHTTP(rec, request)
503+
resp := rec.Result()
504+
507505
if resp.StatusCode != 200 {
508506
t.Fatalf("/metrics response status not 200. Response was: %+v", resp)
509507
}

0 commit comments

Comments
 (0)