Skip to content

Commit a5c989e

Browse files
bjee19sjberman
authored andcommitted
CP/DP split: Add leader election (#3092)
Add leader election to allow data plane pods to only connect to the lead NGF pod. If control plane is scaled, only the leader is marked as ready and the backups are Unready so the data plane doesn't connect to them. Problem: We want the NGF control plane to fail-over to another pod when the control plane pod goes down. Solution: Only the leader pod is marked as ready by Kubernetes, and all connections from data plane pods are connected to the leader pod.
1 parent 8e33268 commit a5c989e

File tree

8 files changed

+253
-52
lines changed

8 files changed

+253
-52
lines changed
Loading

internal/framework/runnables/runnables.go

+16-12
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,33 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool {
3434
return false
3535
}
3636

37-
// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes
37+
// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes
3838
// the leader.
39-
type EnableAfterBecameLeader struct {
40-
enable func(context.Context)
39+
type CallFunctionsAfterBecameLeader struct {
40+
enableFunctions []func(context.Context)
4141
}
4242

4343
var (
44-
_ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{}
45-
_ manager.Runnable = &EnableAfterBecameLeader{}
44+
_ manager.LeaderElectionRunnable = &CallFunctionsAfterBecameLeader{}
45+
_ manager.Runnable = &CallFunctionsAfterBecameLeader{}
4646
)
4747

48-
// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable.
49-
func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader {
50-
return &EnableAfterBecameLeader{
51-
enable: enable,
48+
// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable.
49+
func NewCallFunctionsAfterBecameLeader(
50+
enableFunctions []func(context.Context),
51+
) *CallFunctionsAfterBecameLeader {
52+
return &CallFunctionsAfterBecameLeader{
53+
enableFunctions: enableFunctions,
5254
}
5355
}
5456

55-
func (j *EnableAfterBecameLeader) Start(ctx context.Context) error {
56-
j.enable(ctx)
57+
func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error {
58+
for _, f := range j.enableFunctions {
59+
f(ctx)
60+
}
5761
return nil
5862
}
5963

60-
func (j *EnableAfterBecameLeader) NeedLeaderElection() bool {
64+
func (j *CallFunctionsAfterBecameLeader) NeedLeaderElection() bool {
6165
return true
6266
}

internal/framework/runnables/runnables_test.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,25 @@ func TestLeaderOrNonLeader(t *testing.T) {
2323
g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse())
2424
}
2525

26-
func TestEnableAfterBecameLeader(t *testing.T) {
26+
func TestCallFunctionsAfterBecameLeader(t *testing.T) {
2727
t.Parallel()
28-
enabled := false
29-
enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) {
30-
enabled = true
28+
statusUpdaterEnabled := false
29+
healthCheckEnableLeader := false
30+
eventHandlerEnabled := false
31+
32+
callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader([]func(ctx context.Context){
33+
func(_ context.Context) { statusUpdaterEnabled = true },
34+
func(_ context.Context) { healthCheckEnableLeader = true },
35+
func(_ context.Context) { eventHandlerEnabled = true },
3136
})
3237

3338
g := NewWithT(t)
34-
g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
35-
g.Expect(enabled).To(BeFalse())
39+
g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
3640

37-
err := enableAfterBecameLeader.Start(context.Background())
41+
err := callFunctionsAfterBecameLeader.Start(context.Background())
3842
g.Expect(err).ToNot(HaveOccurred())
3943

40-
g.Expect(enabled).To(BeTrue())
44+
g.Expect(statusUpdaterEnabled).To(BeTrue())
45+
g.Expect(healthCheckEnableLeader).To(BeTrue())
46+
g.Expect(eventHandlerEnabled).To(BeTrue())
4147
}

internal/mode/static/handler.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,33 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
167167

168168
changeType, gr := h.cfg.processor.Process()
169169

170-
// Once we've processed resources on startup and built our first graph, mark the Pod as ready.
171-
if !h.cfg.graphBuiltHealthChecker.ready {
172-
h.cfg.graphBuiltHealthChecker.setAsReady()
170+
// Once we've processed resources on startup and built our first graph, mark the Pod as having built the graph.
171+
if !h.cfg.graphBuiltHealthChecker.graphBuilt {
172+
h.cfg.graphBuiltHealthChecker.setGraphBuilt()
173173
}
174174

175-
// TODO(sberman): hardcode this deployment name until we support provisioning data planes
176-
// If no deployments exist, we should just return without doing anything.
175+
// if this Pod is not the leader or does not have the leader lease yet,
176+
// the nginx conf should not be updated.
177+
if !h.cfg.graphBuiltHealthChecker.leader {
178+
return
179+
}
180+
181+
h.sendNginxConfig(ctx, logger, gr, changeType)
182+
}
183+
184+
func (h *eventHandlerImpl) eventHandlerEnable(ctx context.Context) {
185+
// Latest graph is guaranteed to not be nil since the leader election process takes longer than
186+
// the initial call to HandleEventBatch when NGF starts up. And GatewayClass will typically always exist which
187+
// triggers an event.
188+
h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph(), state.ClusterStateChange)
189+
}
190+
191+
func (h *eventHandlerImpl) sendNginxConfig(
192+
ctx context.Context,
193+
logger logr.Logger,
194+
gr *graph.Graph,
195+
changeType state.ChangeType,
196+
) {
177197
deploymentName := types.NamespacedName{
178198
Name: "tmp-nginx-deployment",
179199
Namespace: h.cfg.gatewayPodConfig.Namespace,

internal/mode/static/handler_test.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ var _ = Describe("eventHandler", func() {
126126
metricsCollector: collectors.NewControllerNoopCollector(),
127127
updateGatewayClassStatus: true,
128128
})
129-
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse())
129+
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeFalse())
130+
131+
handler.cfg.graphBuiltHealthChecker.leader = true
130132
})
131133

132134
AfterEach(func() {
@@ -161,7 +163,7 @@ var _ = Describe("eventHandler", func() {
161163
})
162164

163165
AfterEach(func() {
164-
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeTrue())
166+
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
165167
})
166168

167169
When("a batch has one event", func() {
@@ -484,22 +486,36 @@ var _ = Describe("eventHandler", func() {
484486
Expect(gr.LatestReloadResult.Error.Error()).To(Equal("status error"))
485487
})
486488

487-
It("should set the health checker status properly", func() {
489+
It("should update nginx conf only when leader", func() {
490+
ctx := context.Background()
491+
handler.cfg.graphBuiltHealthChecker.leader = false
492+
488493
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
489494
batch := []interface{}{e}
490495
readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh()
491496

492497
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})
493498

494-
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
495499
handler.HandleEventBatch(context.Background(), logr.Discard(), batch)
496500

501+
// graph is built, but since the graphBuiltHealthChecker.leader is false, configuration isn't created and
502+
// the readyCheck fails
503+
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
504+
Expect(handler.GetLatestConfiguration()).To(BeNil())
505+
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
506+
Expect(readyChannel).ShouldNot(BeClosed())
507+
508+
// Once the pod becomes leader, these two functions will be called through the runnables we set in the manager
509+
handler.cfg.graphBuiltHealthChecker.setAsLeader(ctx)
510+
handler.eventHandlerEnable(ctx)
511+
512+
// nginx conf has been set
497513
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
498514
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
499515

500-
Expect(readyChannel).To(BeClosed())
501-
516+
// ready check is also set
502517
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).To(Succeed())
518+
Expect(handler.cfg.graphBuiltHealthChecker.getReadyCh()).To(BeClosed())
503519
})
504520

505521
It("should panic for an unknown event type", func() {

internal/mode/static/health.go

+76-11
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package static
22

33
import (
4+
"context"
45
"errors"
6+
"fmt"
7+
"net"
58
"net/http"
69
"sync"
10+
"time"
11+
12+
"sigs.k8s.io/controller-runtime/pkg/manager"
13+
14+
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/config"
715
)
816

917
// newGraphBuiltHealthChecker creates a new graphBuiltHealthChecker.
@@ -13,37 +21,94 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker {
1321
}
1422
}
1523

16-
// graphBuiltHealthChecker is used to check if the initial graph is built and the NGF Pod is ready.
24+
// graphBuiltHealthChecker is used to check if the NGF Pod is ready. The NGF Pod is ready if the initial graph has
25+
// been built and if it is leader.
1726
type graphBuiltHealthChecker struct {
1827
// readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready.
19-
readyCh chan struct{}
20-
lock sync.RWMutex
21-
ready bool
28+
readyCh chan struct{}
29+
lock sync.RWMutex
30+
graphBuilt bool
31+
leader bool
32+
}
33+
34+
// createHealthProbe creates a Server runnable to serve as our health and readiness checker.
35+
func createHealthProbe(cfg config.Config, healthChecker *graphBuiltHealthChecker) (manager.Server, error) {
36+
// we chose to create our own health probe server instead of using the controller-runtime one because
37+
// of repetitive log which would flood our logs on non-ready non-leader NGF Pods. This health probe is
38+
// similar to the controller-runtime's health probe.
39+
40+
mux := http.NewServeMux()
41+
42+
// copy of controller-runtime sane defaults for new http.Server
43+
s := &http.Server{
44+
Handler: mux,
45+
MaxHeaderBytes: 1 << 20,
46+
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
47+
ReadHeaderTimeout: 32 * time.Second,
48+
}
49+
50+
mux.HandleFunc(readinessEndpointName, healthChecker.readyHandler)
51+
52+
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.HealthConfig.Port))
53+
if err != nil {
54+
return manager.Server{},
55+
fmt.Errorf("error listening on %s: %w", fmt.Sprintf(":%d", cfg.HealthConfig.Port), err)
56+
}
57+
58+
return manager.Server{
59+
Name: "health probe",
60+
Server: s,
61+
Listener: ln,
62+
}, nil
63+
}
64+
65+
func (h *graphBuiltHealthChecker) readyHandler(resp http.ResponseWriter, req *http.Request) {
66+
if err := h.readyCheck(req); err != nil {
67+
resp.WriteHeader(http.StatusServiceUnavailable)
68+
} else {
69+
resp.WriteHeader(http.StatusOK)
70+
}
2271
}
2372

2473
// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
25-
// We are considered ready after the first graph is built.
74+
// We are considered ready after the first graph is built and if the NGF Pod is leader.
2675
func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
2776
h.lock.RLock()
2877
defer h.lock.RUnlock()
2978

30-
if !h.ready {
31-
return errors.New("control plane is not yet ready")
79+
if !h.leader {
80+
return errors.New("this Pod is not currently leader")
81+
}
82+
83+
if !h.graphBuilt {
84+
return errors.New("control plane initial graph has not been built")
3285
}
3386

3487
return nil
3588
}
3689

37-
// setAsReady marks the health check as ready.
38-
func (h *graphBuiltHealthChecker) setAsReady() {
90+
// setGraphBuilt marks the health check as having the initial graph built.
91+
func (h *graphBuiltHealthChecker) setGraphBuilt() {
3992
h.lock.Lock()
4093
defer h.lock.Unlock()
4194

42-
h.ready = true
43-
close(h.readyCh)
95+
h.graphBuilt = true
4496
}
4597

4698
// getReadyCh returns a read-only channel, which determines if the NGF Pod is ready.
4799
func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} {
48100
return h.readyCh
49101
}
102+
103+
// setAsLeader marks the health check as leader.
104+
func (h *graphBuiltHealthChecker) setAsLeader(_ context.Context) {
105+
h.lock.Lock()
106+
defer h.lock.Unlock()
107+
108+
h.leader = true
109+
110+
// setGraphBuilt should already have been called when processing the resources on startup because the leader
111+
// election process takes longer than the initial call to HandleEventBatch. Thus, the NGF Pod should be marked as
112+
// ready and have this channel be closed.
113+
close(h.readyCh)
114+
}

0 commit comments

Comments
 (0)