Skip to content

Commit 22066b2

Browse files
committed
Add health checker to leader election library
1 parent a833d13 commit 22066b2

File tree

3 files changed

+58
-34
lines changed

3 files changed

+58
-34
lines changed

leaderelection/leader_election.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"io/ioutil"
23+
"net/http"
2324
"os"
2425
"regexp"
2526
"strings"
@@ -39,6 +40,9 @@ const (
3940
defaultLeaseDuration = 15 * time.Second
4041
defaultRenewDeadline = 10 * time.Second
4142
defaultRetryPeriod = 5 * time.Second
43+
healthCheckTimeout = 20 * time.Second
44+
45+
HealthCheckerAddress = "/healthz/leader-election"
4246
)
4347

4448
// leaderElection is a convenience wrapper around client-go's leader election library.
@@ -55,6 +59,9 @@ type leaderElection struct {
5559
// valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock,
5660
// and resourcelock.ConfigMapsResourceLock
5761
resourceLock string
62+
// healthCheck reports unhealthy if leader election fails to renew leadership
63+
// within a timeout period.
64+
healthCheck *leaderelection.HealthzAdaptor
5865

5966
leaseDuration time.Duration
6067
renewDeadline time.Duration
@@ -76,6 +83,7 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string
7683
runFunc: runFunc,
7784
lockName: lockName,
7885
resourceLock: resourcelock.LeasesResourceLock,
86+
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
7987
leaseDuration: defaultLeaseDuration,
8088
renewDeadline: defaultRenewDeadline,
8189
retryPeriod: defaultRetryPeriod,
@@ -89,6 +97,7 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName str
8997
runFunc: runFunc,
9098
lockName: lockName,
9199
resourceLock: resourcelock.EndpointsResourceLock,
100+
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
92101
leaseDuration: defaultLeaseDuration,
93102
renewDeadline: defaultRenewDeadline,
94103
retryPeriod: defaultRetryPeriod,
@@ -102,6 +111,7 @@ func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName st
102111
runFunc: runFunc,
103112
lockName: lockName,
104113
resourceLock: resourcelock.ConfigMapsResourceLock,
114+
healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout),
105115
leaseDuration: defaultLeaseDuration,
106116
renewDeadline: defaultRenewDeadline,
107117
retryPeriod: defaultRetryPeriod,
@@ -134,6 +144,19 @@ func (l *leaderElection) WithContext(ctx context.Context) {
134144
l.ctx = ctx
135145
}
136146

147+
// Server represents any type that could serve HTTP requests for the leader
148+
// election health check endpoint.
149+
type Server interface {
150+
Handle(pattern string, handler http.Handler)
151+
}
152+
153+
// RegisterHealthCheck creates a health check for this leader election object and
154+
// registers its HTTP handler to the given server at the path specified by the
155+
// constant "healthCheckerAddress".
156+
func (l *leaderElection) RegisterHealthCheck(s Server) {
157+
s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check))
158+
}
159+
137160
func (l *leaderElection) Run() error {
138161
if l.identity == "" {
139162
id, err := defaultLeaderElectionIdentity()
@@ -179,6 +202,7 @@ func (l *leaderElection) Run() error {
179202
klog.V(3).Infof("new leader detected, current leader: %s", identity)
180203
},
181204
},
205+
WatchDog: l.healthCheck,
182206
}
183207

184208
ctx := l.ctx
@@ -220,3 +244,15 @@ func inClusterNamespace() string {
220244

221245
return "default"
222246
}
247+
248+
// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
249+
func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc {
250+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
251+
err := c(r)
252+
if err != nil {
253+
http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError)
254+
} else {
255+
fmt.Fprint(w, "ok")
256+
}
257+
})
258+
}

metrics/metrics.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
2929
"k8s.io/component-base/metrics"
30-
"k8s.io/klog/v2"
3130
)
3231

3332
const (
@@ -90,10 +89,15 @@ type CSIMetricsManager interface {
9089
// driverName - Name of the CSI driver against which this operation was executed.
9190
SetDriverName(driverName string)
9291

93-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
94-
// for this metrics manager.
95-
// If the metricsAddress is an empty string, this will be a no op.
96-
StartMetricsEndpoint(metricsAddress, metricsPath string)
92+
// RegisterToServer registers an HTTP handler for this metrics manager to the
93+
// given server at the specified address/path.
94+
RegisterToServer(s Server, metricsPath string)
95+
}
96+
97+
// Server represents any type that could serve HTTP requests for the metrics
98+
// endpoint.
99+
type Server interface {
100+
Handle(pattern string, handler http.Handler)
97101
}
98102

99103
// MetricsManagerOption is used to pass optional configuration to a
@@ -325,27 +329,13 @@ func (cmm *csiMetricsManager) SetDriverName(driverName string) {
325329
}
326330
}
327331

328-
// StartMetricsEndpoint starts the metrics endpoint at the specified address/path
329-
// for this metrics manager on a new go routine.
330-
// If the metricsAddress is an empty string, this will be a no op.
331-
func (cmm *csiMetricsManager) StartMetricsEndpoint(metricsAddress, metricsPath string) {
332-
if metricsAddress == "" {
333-
klog.Warningf("metrics endpoint will not be started because `metrics-address` was not specified.")
334-
return
335-
}
336-
337-
http.Handle(metricsPath, metrics.HandlerFor(
332+
// RegisterToServer registers an HTTP handler for this metrics manager to the
333+
// given server at the specified address/path.
334+
func (cmm *csiMetricsManager) RegisterToServer(s Server, metricsPath string) {
335+
s.Handle(metricsPath, metrics.HandlerFor(
338336
cmm.GetRegistry(),
339337
metrics.HandlerOpts{
340338
ErrorHandling: metrics.ContinueOnError}))
341-
342-
// Spawn a new go routine to listen on specified endpoint
343-
go func() {
344-
err := http.ListenAndServe(metricsAddress, nil)
345-
if err != nil {
346-
klog.Fatalf("Failed to start prometheus metrics endpoint on specified address (%q) and path (%q): %s", metricsAddress, metricsPath, err)
347-
}
348-
}()
349339
}
350340

351341
// VerifyMetricsMatch is a helper function that verifies that the expected and

metrics/metrics_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package metrics
1919
import (
2020
"io/ioutil"
2121
"net/http"
22+
"net/http/httptest"
2223
"strings"
2324
"testing"
2425
"time"
@@ -481,29 +482,26 @@ func TestRecordMetrics_Negative(t *testing.T) {
481482
}
482483
}
483484

484-
func TestStartMetricsEndPoint_Noop(t *testing.T) {
485+
func TestRegisterToServer_Noop(t *testing.T) {
485486
// Arrange
486487
cmm := NewCSIMetricsManagerForSidecar(
487488
"fake.csi.driver.io" /* driverName */)
488489
operationDuration, _ := time.ParseDuration("20s")
490+
mux := http.NewServeMux()
489491

490492
// Act
491-
cmm.StartMetricsEndpoint(":8080", "/metrics")
493+
cmm.RegisterToServer(mux, "/metrics")
492494
cmm.RecordMetrics(
493495
"/csi.v1.Controller/ControllerGetCapabilities", /* operationName */
494496
nil, /* operationErr */
495497
operationDuration /* operationDuration */)
496498

497499
// Assert
498-
request, err := http.NewRequest("GET", "http://localhost:8080/metrics", strings.NewReader(""))
499-
if err != nil {
500-
t.Fatalf("Creating request for metrics endpoint failed: %v", err)
501-
}
502-
client := &http.Client{}
503-
resp, err := client.Do(request)
504-
if err != nil {
505-
t.Fatalf("Failed to GET metrics. Error: %v", err)
506-
}
500+
request := httptest.NewRequest("GET", "/metrics", strings.NewReader(""))
501+
rec := httptest.NewRecorder()
502+
mux.ServeHTTP(rec, request)
503+
resp := rec.Result()
504+
507505
if resp.StatusCode != 200 {
508506
t.Fatalf("/metrics response status not 200. Response was: %+v", resp)
509507
}

0 commit comments

Comments
 (0)