Skip to content

Commit 86dd3e6

Browse files
salonichf5sjberman
authored andcommitted
Revert "CP/DP split: Add leader election (#3092)"
This reverts commit a5c989e.
1 parent 0c7f47b commit 86dd3e6

File tree

8 files changed

+47
-246
lines changed

8 files changed

+47
-246
lines changed
Loading

internal/framework/runnables/runnables.go

+12-16
Original file line numberDiff line numberDiff line change
@@ -34,33 +34,29 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool {
3434
return false
3535
}
3636

37-
// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes
37+
// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes
3838
// the leader.
39-
type CallFunctionsAfterBecameLeader struct {
40-
enableFunctions []func(context.Context)
39+
type EnableAfterBecameLeader struct {
40+
enable func(context.Context)
4141
}
4242

4343
var (
44-
_ manager.LeaderElectionRunnable = &CallFunctionsAfterBecameLeader{}
45-
_ manager.Runnable = &CallFunctionsAfterBecameLeader{}
44+
_ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{}
45+
_ manager.Runnable = &EnableAfterBecameLeader{}
4646
)
4747

48-
// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable.
49-
func NewCallFunctionsAfterBecameLeader(
50-
enableFunctions []func(context.Context),
51-
) *CallFunctionsAfterBecameLeader {
52-
return &CallFunctionsAfterBecameLeader{
53-
enableFunctions: enableFunctions,
48+
// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable.
49+
func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader {
50+
return &EnableAfterBecameLeader{
51+
enable: enable,
5452
}
5553
}
5654

57-
func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error {
58-
for _, f := range j.enableFunctions {
59-
f(ctx)
60-
}
55+
func (j *EnableAfterBecameLeader) Start(ctx context.Context) error {
56+
j.enable(ctx)
6157
return nil
6258
}
6359

64-
func (j *CallFunctionsAfterBecameLeader) NeedLeaderElection() bool {
60+
func (j *EnableAfterBecameLeader) NeedLeaderElection() bool {
6561
return true
6662
}

internal/framework/runnables/runnables_test.go

+8-14
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,19 @@ func TestLeaderOrNonLeader(t *testing.T) {
2323
g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse())
2424
}
2525

26-
func TestCallFunctionsAfterBecameLeader(t *testing.T) {
26+
func TestEnableAfterBecameLeader(t *testing.T) {
2727
t.Parallel()
28-
statusUpdaterEnabled := false
29-
healthCheckEnableLeader := false
30-
eventHandlerEnabled := false
31-
32-
callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader([]func(ctx context.Context){
33-
func(_ context.Context) { statusUpdaterEnabled = true },
34-
func(_ context.Context) { healthCheckEnableLeader = true },
35-
func(_ context.Context) { eventHandlerEnabled = true },
28+
enabled := false
29+
enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) {
30+
enabled = true
3631
})
3732

3833
g := NewWithT(t)
39-
g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
34+
g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
35+
g.Expect(enabled).To(BeFalse())
4036

41-
err := callFunctionsAfterBecameLeader.Start(context.Background())
37+
err := enableAfterBecameLeader.Start(context.Background())
4238
g.Expect(err).ToNot(HaveOccurred())
4339

44-
g.Expect(statusUpdaterEnabled).To(BeTrue())
45-
g.Expect(healthCheckEnableLeader).To(BeTrue())
46-
g.Expect(eventHandlerEnabled).To(BeTrue())
40+
g.Expect(enabled).To(BeTrue())
4741
}

internal/mode/static/handler.go

+3-16
Original file line numberDiff line numberDiff line change
@@ -161,27 +161,14 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
161161

162162
changeType, gr := h.cfg.processor.Process()
163163

164-
// Once we've processed resources on startup and built our first graph, mark the Pod as having built the graph.
165-
if !h.cfg.graphBuiltHealthChecker.graphBuilt {
166-
h.cfg.graphBuiltHealthChecker.setGraphBuilt()
167-
}
168-
169-
// if this Pod is not the leader or does not have the leader lease yet,
170-
// the nginx conf should not be updated.
171-
if !h.cfg.graphBuiltHealthChecker.leader {
172-
return
164+
// Once we've processed resources on startup and built our first graph, mark the Pod as ready.
165+
if !h.cfg.graphBuiltHealthChecker.ready {
166+
h.cfg.graphBuiltHealthChecker.setAsReady()
173167
}
174168

175169
h.sendNginxConfig(ctx, logger, gr, changeType)
176170
}
177171

178-
func (h *eventHandlerImpl) eventHandlerEnable(ctx context.Context) {
179-
// Latest graph is guaranteed to not be nil since the leader election process takes longer than
180-
// the initial call to HandleEventBatch when NGF starts up. And GatewayClass will typically always exist which
181-
// triggers an event.
182-
h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph(), state.ClusterStateChange)
183-
}
184-
185172
func (h *eventHandlerImpl) sendNginxConfig(
186173
ctx context.Context,
187174
logger logr.Logger,

internal/mode/static/handler_test.go

+5-21
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,7 @@ var _ = Describe("eventHandler", func() {
141141
metricsCollector: collectors.NewControllerNoopCollector(),
142142
updateGatewayClassStatus: true,
143143
})
144-
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeFalse())
145-
146-
handler.cfg.graphBuiltHealthChecker.leader = true
144+
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse())
147145
})
148146

149147
AfterEach(func() {
@@ -177,7 +175,7 @@ var _ = Describe("eventHandler", func() {
177175
})
178176

179177
AfterEach(func() {
180-
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
178+
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeTrue())
181179
})
182180

183181
When("a batch has one event", func() {
@@ -458,35 +456,21 @@ var _ = Describe("eventHandler", func() {
458456
})
459457

460458
It("should update nginx conf only when leader", func() {
461-
ctx := context.Background()
462-
handler.cfg.graphBuiltHealthChecker.leader = false
463-
464459
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
465460
batch := []interface{}{e}
466461
readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh()
467462

468463
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{Gateway: &graph.Gateway{Valid: true}})
469464

470-
handler.HandleEventBatch(context.Background(), logr.Discard(), batch)
471-
472-
// graph is built, but since the graphBuiltHealthChecker.leader is false, configuration isn't created and
473-
// the readyCheck fails
474-
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
475-
Expect(handler.GetLatestConfiguration()).To(BeNil())
476465
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
477-
Expect(readyChannel).ShouldNot(BeClosed())
478-
479-
// Once the pod becomes leader, these two functions will be called through the runnables we set in the manager
480-
handler.cfg.graphBuiltHealthChecker.setAsLeader(ctx)
481-
handler.eventHandlerEnable(ctx)
466+
handler.HandleEventBatch(context.Background(), logr.Discard(), batch)
482467

483-
// nginx conf has been set
484468
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
485469
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
486470

487-
// ready check is also set
471+
Expect(readyChannel).To(BeClosed())
472+
488473
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).To(Succeed())
489-
Expect(handler.cfg.graphBuiltHealthChecker.getReadyCh()).To(BeClosed())
490474
})
491475

492476
It("should panic for an unknown event type", func() {

internal/mode/static/health.go

+11-76
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,9 @@
11
package static
22

33
import (
4-
"context"
54
"errors"
6-
"fmt"
7-
"net"
85
"net/http"
96
"sync"
10-
"time"
11-
12-
"sigs.k8s.io/controller-runtime/pkg/manager"
13-
14-
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/config"
157
)
168

179
// newGraphBuiltHealthChecker creates a new graphBuiltHealthChecker.
@@ -21,94 +13,37 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker {
2113
}
2214
}
2315

24-
// graphBuiltHealthChecker is used to check if the NGF Pod is ready. The NGF Pod is ready if the initial graph has
25-
// been built and if it is leader.
16+
// graphBuiltHealthChecker is used to check if the initial graph is built and the NGF Pod is ready.
2617
type graphBuiltHealthChecker struct {
2718
// readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready.
28-
readyCh chan struct{}
29-
lock sync.RWMutex
30-
graphBuilt bool
31-
leader bool
32-
}
33-
34-
// createHealthProbe creates a Server runnable to serve as our health and readiness checker.
35-
func createHealthProbe(cfg config.Config, healthChecker *graphBuiltHealthChecker) (manager.Server, error) {
36-
// we chose to create our own health probe server instead of using the controller-runtime one because
37-
// of repetitive log which would flood our logs on non-ready non-leader NGF Pods. This health probe is
38-
// similar to the controller-runtime's health probe.
39-
40-
mux := http.NewServeMux()
41-
42-
// copy of controller-runtime sane defaults for new http.Server
43-
s := &http.Server{
44-
Handler: mux,
45-
MaxHeaderBytes: 1 << 20,
46-
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
47-
ReadHeaderTimeout: 32 * time.Second,
48-
}
49-
50-
mux.HandleFunc(readinessEndpointName, healthChecker.readyHandler)
51-
52-
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.HealthConfig.Port))
53-
if err != nil {
54-
return manager.Server{},
55-
fmt.Errorf("error listening on %s: %w", fmt.Sprintf(":%d", cfg.HealthConfig.Port), err)
56-
}
57-
58-
return manager.Server{
59-
Name: "health probe",
60-
Server: s,
61-
Listener: ln,
62-
}, nil
63-
}
64-
65-
func (h *graphBuiltHealthChecker) readyHandler(resp http.ResponseWriter, req *http.Request) {
66-
if err := h.readyCheck(req); err != nil {
67-
resp.WriteHeader(http.StatusServiceUnavailable)
68-
} else {
69-
resp.WriteHeader(http.StatusOK)
70-
}
19+
readyCh chan struct{}
20+
lock sync.RWMutex
21+
ready bool
7122
}
7223

7324
// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
74-
// We are considered ready after the first graph is built and if the NGF Pod is leader.
25+
// We are considered ready after the first graph is built.
7526
func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
7627
h.lock.RLock()
7728
defer h.lock.RUnlock()
7829

79-
if !h.leader {
80-
return errors.New("this Pod is not currently leader")
81-
}
82-
83-
if !h.graphBuilt {
84-
return errors.New("control plane initial graph has not been built")
30+
if !h.ready {
31+
return errors.New("control plane is not yet ready")
8532
}
8633

8734
return nil
8835
}
8936

90-
// setGraphBuilt marks the health check as having the initial graph built.
91-
func (h *graphBuiltHealthChecker) setGraphBuilt() {
37+
// setAsReady marks the health check as ready.
38+
func (h *graphBuiltHealthChecker) setAsReady() {
9239
h.lock.Lock()
9340
defer h.lock.Unlock()
9441

95-
h.graphBuilt = true
42+
h.ready = true
43+
close(h.readyCh)
9644
}
9745

9846
// getReadyCh returns a read-only channel, which determines if the NGF Pod is ready.
9947
func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} {
10048
return h.readyCh
10149
}
102-
103-
// setAsLeader marks the health check as leader.
104-
func (h *graphBuiltHealthChecker) setAsLeader(_ context.Context) {
105-
h.lock.Lock()
106-
defer h.lock.Unlock()
107-
108-
h.leader = true
109-
110-
// setGraphBuilt should already have been called when processing the resources on startup because the leader
111-
// election process takes longer than the initial call to HandleEventBatch. Thus, the NGF Pod should be marked as
112-
// ready and have this channel be closed.
113-
close(h.readyCh)
114-
}

internal/mode/static/health_test.go

+2-84
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,17 @@
11
package static
22

33
import (
4-
"context"
5-
"errors"
6-
"net"
7-
"net/http"
8-
"net/http/httptest"
94
"testing"
105

116
. "github.com/onsi/gomega"
12-
13-
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/config"
147
)
158

169
func TestReadyCheck(t *testing.T) {
1710
t.Parallel()
1811
g := NewWithT(t)
1912
healthChecker := newGraphBuiltHealthChecker()
13+
g.Expect(healthChecker.readyCheck(nil)).ToNot(Succeed())
2014

21-
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this Pod is not currently leader")))
22-
23-
healthChecker.graphBuilt = true
24-
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this Pod is not currently leader")))
25-
26-
healthChecker.graphBuilt = false
27-
healthChecker.leader = true
28-
g.Expect(healthChecker.readyCheck(nil)).
29-
To(MatchError(errors.New("control plane initial graph has not been built")))
30-
31-
healthChecker.graphBuilt = true
15+
healthChecker.ready = true
3216
g.Expect(healthChecker.readyCheck(nil)).To(Succeed())
3317
}
34-
35-
func TestSetAsLeader(t *testing.T) {
36-
t.Parallel()
37-
g := NewWithT(t)
38-
healthChecker := newGraphBuiltHealthChecker()
39-
40-
g.Expect(healthChecker.leader).To(BeFalse())
41-
g.Expect(healthChecker.readyCh).ShouldNot(BeClosed())
42-
43-
healthChecker.setAsLeader(context.Background())
44-
45-
g.Expect(healthChecker.leader).To(BeTrue())
46-
g.Expect(healthChecker.readyCh).To(BeClosed())
47-
}
48-
49-
func TestSetGraphBuilt(t *testing.T) {
50-
t.Parallel()
51-
g := NewWithT(t)
52-
healthChecker := newGraphBuiltHealthChecker()
53-
54-
g.Expect(healthChecker.graphBuilt).To(BeFalse())
55-
56-
healthChecker.setGraphBuilt()
57-
58-
g.Expect(healthChecker.graphBuilt).To(BeTrue())
59-
}
60-
61-
func TestReadyHandler(t *testing.T) {
62-
t.Parallel()
63-
g := NewWithT(t)
64-
healthChecker := newGraphBuiltHealthChecker()
65-
66-
r := httptest.NewRequest(http.MethodGet, "/readyz", nil)
67-
w := httptest.NewRecorder()
68-
69-
healthChecker.readyHandler(w, r)
70-
g.Expect(w.Result().StatusCode).To(Equal(http.StatusServiceUnavailable))
71-
72-
healthChecker.graphBuilt = true
73-
healthChecker.leader = true
74-
75-
w = httptest.NewRecorder()
76-
healthChecker.readyHandler(w, r)
77-
g.Expect(w.Result().StatusCode).To(Equal(http.StatusOK))
78-
}
79-
80-
func TestCreateHealthProbe(t *testing.T) {
81-
t.Parallel()
82-
g := NewWithT(t)
83-
84-
healthChecker := newGraphBuiltHealthChecker()
85-
86-
cfg := config.Config{HealthConfig: config.HealthConfig{Port: 100000}}
87-
_, err := createHealthProbe(cfg, healthChecker)
88-
g.Expect(err).To(MatchError("error listening on :100000: listen tcp: address 100000: invalid port"))
89-
90-
cfg = config.Config{HealthConfig: config.HealthConfig{Port: 8081}}
91-
hp, err := createHealthProbe(cfg, healthChecker)
92-
g.Expect(err).ToNot(HaveOccurred())
93-
94-
addr, ok := (hp.Listener.Addr()).(*net.TCPAddr)
95-
g.Expect(ok).To(BeTrue())
96-
97-
g.Expect(addr.Port).To(Equal(cfg.HealthConfig.Port))
98-
g.Expect(hp.Server).ToNot(BeNil())
99-
}

0 commit comments

Comments
 (0)