Skip to content

CP/DP split: Add leader election #3092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 10, 2025
18 changes: 6 additions & 12 deletions internal/framework/runnables/runnables.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool {
// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes
// the leader.
type CallFunctionsAfterBecameLeader struct {
statusUpdaterEnable func(context.Context)
healthCheckEnableLeader func()
eventHandlerEnable func(context.Context)
enableFunctions []func(context.Context)
}

var (
Expand All @@ -49,21 +47,17 @@ var (

// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable.
func NewCallFunctionsAfterBecameLeader(
statusUpdaterEnable func(context.Context),
healthCheckEnableLeader func(),
eventHandlerEnable func(context.Context),
enableFunctions []func(context.Context),
) *CallFunctionsAfterBecameLeader {
return &CallFunctionsAfterBecameLeader{
statusUpdaterEnable: statusUpdaterEnable,
healthCheckEnableLeader: healthCheckEnableLeader,
eventHandlerEnable: eventHandlerEnable,
enableFunctions: enableFunctions,
}
}

func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error {
j.statusUpdaterEnable(ctx)
j.healthCheckEnableLeader()
j.eventHandlerEnable(ctx)
for _, function := range j.enableFunctions {
function(ctx)
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/framework/runnables/runnables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func TestCallFunctionsAfterBecameLeader(t *testing.T) {
healthCheckEnableLeader := false
eventHandlerEnabled := false

callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader(
callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader([]func(ctx context.Context){
func(_ context.Context) { statusUpdaterEnabled = true },
func() { healthCheckEnableLeader = true },
func(_ context.Context) { healthCheckEnableLeader = true },
func(_ context.Context) { eventHandlerEnabled = true },
)
})

g := NewWithT(t)
g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
Expand Down
5 changes: 3 additions & 2 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ var _ = Describe("eventHandler", func() {
})

It("should update nginx conf only when leader", func() {
ctx := context.Background()
handler.cfg.graphBuiltHealthChecker.leader = false

e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
Expand All @@ -505,8 +506,8 @@ var _ = Describe("eventHandler", func() {
Expect(readyChannel).ShouldNot(BeClosed())

// Once the pod becomes leader, these two functions will be called through the runnables we set in the manager
handler.cfg.graphBuiltHealthChecker.setAsLeader()
handler.eventHandlerEnable(context.Background())
handler.cfg.graphBuiltHealthChecker.setAsLeader(ctx)
handler.eventHandlerEnable(ctx)

// nginx conf has been set
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
Expand Down
3 changes: 2 additions & 1 deletion internal/mode/static/health.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package static

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -100,7 +101,7 @@ func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} {
}

// setAsLeader marks the health check as leader.
func (h *graphBuiltHealthChecker) setAsLeader() {
func (h *graphBuiltHealthChecker) setAsLeader(_ context.Context) {
h.lock.Lock()
defer h.lock.Unlock()

Expand Down
7 changes: 4 additions & 3 deletions internal/mode/static/health_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package static

import (
"context"
"errors"
"net"
"net/http"
Expand All @@ -17,10 +18,10 @@ func TestReadyCheck(t *testing.T) {
g := NewWithT(t)
healthChecker := newGraphBuiltHealthChecker()

g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this NGF Pod is not currently leader")))
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this Pod is not currently leader")))

healthChecker.graphBuilt = true
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this NGF Pod is not currently leader")))
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this Pod is not currently leader")))

healthChecker.graphBuilt = false
healthChecker.leader = true
Expand All @@ -39,7 +40,7 @@ func TestSetAsLeader(t *testing.T) {
g.Expect(healthChecker.leader).To(BeFalse())
g.Expect(healthChecker.readyCh).ShouldNot(BeClosed())

healthChecker.setAsLeader()
healthChecker.setAsLeader(context.Background())

g.Expect(healthChecker.leader).To(BeTrue())
g.Expect(healthChecker.readyCh).To(BeClosed())
Expand Down
4 changes: 2 additions & 2 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,12 @@
return fmt.Errorf("cannot register event loop: %w", err)
}

if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader(
if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader([]func(context.Context){
groupStatusUpdater.Enable,
healthChecker.setAsLeader,
eventHandler.eventHandlerEnable,
)); err != nil {
})); err != nil {
return fmt.Errorf("cannot register functions that get called after Pod becomes leader: %w", err)

Check warning on line 255 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L250-L255

Added lines #L250 - L255 were not covered by tests
}

if cfg.ProductTelemetryConfig.Enabled {
Expand Down Expand Up @@ -280,7 +280,7 @@
}

cfg.Logger.Info("Starting manager")
cfg.Logger.Info("NGINX Gateway Fabric Pod will be marked as unready until it has the leader lease")

Check warning on line 283 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L283

Added line #L283 was not covered by tests
go func() {
<-ctx.Done()
cfg.Logger.Info("Shutting down")
Expand Down Expand Up @@ -342,13 +342,13 @@
}

if cfg.HealthConfig.Enabled {
healthProbeServer, err := createHealthProbe(cfg, healthChecker)
if err != nil {
return nil, fmt.Errorf("error creating health probe: %w", err)
}

Check warning on line 348 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L345-L348

Added lines #L345 - L348 were not covered by tests

if err := mgr.Add(&healthProbeServer); err != nil {
return nil, fmt.Errorf("error adding health probe: %w", err)

Check warning on line 351 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L350-L351

Added lines #L350 - L351 were not covered by tests
}
}

Expand Down
Loading