Skip to content

CP/DP split: Add leader election #3092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 10, 2025
18 changes: 14 additions & 4 deletions internal/mode/static/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,37 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker {
// graphBuiltHealthChecker is used to check if the initial graph is built, if the NGF Pod is leader, and if the
// NGF Pod is ready.
type graphBuiltHealthChecker struct {
// readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready.
readyCh chan struct{}
// eventCh is a channel that a NewLeaderEvent gets sent to when the NGF Pod becomes leader.
eventCh chan interface{}
lock sync.RWMutex
ready bool
leader bool
}

func (h *graphBuiltHealthChecker) readyHandler(resp http.ResponseWriter, req *http.Request) {
if err := h.readyCheck(req); err != nil {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
}

// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
// We are considered ready after the first graph is built and if the NGF Pod is leader.
func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
h.lock.RLock()
defer h.lock.RUnlock()

if !h.ready {
return errors.New("control plane is not yet ready")
}

if !h.leader {
return errors.New("this NGF Pod is not currently leader")
}

if !h.ready {
return errors.New("control plane is not yet ready")
}

return nil
}

Expand Down
24 changes: 23 additions & 1 deletion internal/mode/static/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package static

import (
"errors"
"net/http"
"net/http/httptest"
"testing"

. "github.com/onsi/gomega"
Expand All @@ -11,7 +13,8 @@ func TestReadyCheck(t *testing.T) {
t.Parallel()
g := NewWithT(t)
healthChecker := newGraphBuiltHealthChecker()
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("control plane is not yet ready")))

g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this NGF Pod is not currently leader")))

healthChecker.ready = true
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this NGF Pod is not currently leader")))
Expand All @@ -38,3 +41,22 @@ func TestSetAsLeader(t *testing.T) {
g.Expect(healthChecker.leader).To(BeTrue())
g.Expect(healthChecker.eventCh).Should(Receive())
}

func TestReadyHandler(t *testing.T) {
t.Parallel()
g := NewWithT(t)
healthChecker := newGraphBuiltHealthChecker()

r := httptest.NewRequest(http.MethodGet, "/readyz", nil)
w := httptest.NewRecorder()

healthChecker.readyHandler(w, r)
g.Expect(w.Result().StatusCode).To(Equal(http.StatusServiceUnavailable))

healthChecker.ready = true
healthChecker.leader = true

w = httptest.NewRecorder()
healthChecker.readyHandler(w, r)
g.Expect(w.Result().StatusCode).To(Equal(http.StatusOK))
}
49 changes: 43 additions & 6 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"context"
"fmt"
"net"
"net/http"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -75,6 +77,8 @@
plusClientCertField = "tls.crt"
plusClientKeyField = "tls.key"
grpcServerPort = 8443
// defined in our deployment.yaml.
readinessEndpointName = "/readyz"
)

var scheme = runtime.NewScheme()
Expand Down Expand Up @@ -247,12 +251,12 @@

// the healthChecker needs the same eventCh as the event handler so it can send a NewLeaderEvent when
// the pod becomes leader, triggering HandleEventBatch to be called.
healthChecker.eventCh = eventCh
if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader(
groupStatusUpdater.Enable,
healthChecker.setAsLeader,
)); err != nil {
return fmt.Errorf("cannot register status updater or set pod as leader: %w", err)

Check warning on line 259 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L254-L259

Added lines #L254 - L259 were not covered by tests
}

if cfg.ProductTelemetryConfig.Enabled {
Expand Down Expand Up @@ -280,6 +284,7 @@
}

cfg.Logger.Info("Starting manager")
cfg.Logger.Info("Nginx Gateway Fabric Pod will be marked as unready until it has the leader lease")

Check warning on line 287 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L287

Added line #L287 was not covered by tests
go func() {
<-ctx.Done()
cfg.Logger.Info("Shutting down")
Expand Down Expand Up @@ -332,10 +337,6 @@
},
}

if cfg.HealthConfig.Enabled {
options.HealthProbeBindAddress = fmt.Sprintf(":%d", cfg.HealthConfig.Port)
}

clusterCfg := ctlr.GetConfigOrDie()
clusterCfg.Timeout = clusterTimeout

Expand All @@ -345,8 +346,13 @@
}

if cfg.HealthConfig.Enabled {
if err := mgr.AddReadyzCheck("readyz", healthChecker.readyCheck); err != nil {
return nil, fmt.Errorf("error adding ready check: %w", err)
healthProbeServer, err := createHealthProbe(cfg, healthChecker)
if err != nil {
return nil, fmt.Errorf("error creating health probe: %w", err)
}

Check warning on line 352 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L349-L352

Added lines #L349 - L352 were not covered by tests

if err := mgr.Add(&healthProbeServer); err != nil {
return nil, fmt.Errorf("error adding health probe: %w", err)

Check warning on line 355 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L354-L355

Added lines #L354 - L355 were not covered by tests
}
}

Expand Down Expand Up @@ -809,3 +815,34 @@

return metricsOptions
}

// createHealthProbe creates a Server runnable to serve as our health and readiness checker.
func createHealthProbe(cfg config.Config, healthChecker *graphBuiltHealthChecker) (manager.Server, error) {
// we chose to create our own health probe server instead of using the controller-runtime one because
// of an annoying log which would flood our logs on non-ready non-leader NGF Pods. This health probe is pretty
// similar to the controller-runtime's health probe.

mux := http.NewServeMux()

// copy of controller-runtime sane defaults for new http.Server
s := &http.Server{
Handler: mux,
MaxHeaderBytes: 1 << 20,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second,
}

mux.HandleFunc(readinessEndpointName, healthChecker.readyHandler)

ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.HealthConfig.Port))
if err != nil {
return manager.Server{},
fmt.Errorf("error listening on %s: %w", fmt.Sprintf(":%d", cfg.HealthConfig.Port), err)
}

Check warning on line 841 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L820-L841

Added lines #L820 - L841 were not covered by tests

return manager.Server{
Name: "health probe",
Server: s,
Listener: ln,
}, nil

Check warning on line 847 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L843-L847

Added lines #L843 - L847 were not covered by tests
}
Loading