Skip to content

Commit d7f5f58

Browse files
committed
Support leader election
1 parent 8265e5d commit d7f5f58

File tree

4 files changed

+77
-11
lines changed

4 files changed

+77
-11
lines changed

internal/mode/static/manager.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func StartManager(cfg config.Config) error {
199199
return fmt.Errorf("cannot register grpc server: %w", err)
200200
}
201201

202-
prov, provLoop, err := provisioner.NewNginxProvisioner(
202+
nginxProvisioner, provLoop, err := provisioner.NewNginxProvisioner(
203203
ctx,
204204
mgr,
205205
provisioner.Config{
@@ -223,7 +223,7 @@ func StartManager(cfg config.Config) error {
223223
eventHandler := newEventHandlerImpl(eventHandlerConfig{
224224
ctx: ctx,
225225
nginxUpdater: nginxUpdater,
226-
nginxProvisioner: prov,
226+
nginxProvisioner: nginxProvisioner,
227227
metricsCollector: handlerCollector,
228228
statusUpdater: groupStatusUpdater,
229229
processor: processor,
@@ -266,6 +266,7 @@ func StartManager(cfg config.Config) error {
266266

267267
if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader([]func(context.Context){
268268
groupStatusUpdater.Enable,
269+
nginxProvisioner.Enable,
269270
healthChecker.setAsLeader,
270271
eventHandler.eventHandlerEnable,
271272
})); err != nil {

internal/mode/static/provisioner/handler.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger,
106106
}
107107
}
108108

109-
// updateOrDeletResources ensures that nginx resources are either:
109+
// updateOrDeleteResources ensures that nginx resources are either:
110110
// - deleted if the Gateway no longer exists (this is for when the controller first starts up)
111111
// - are updated to the proper state in case a user makes a change directly to the resource.
112112
func (h *eventHandler) updateOrDeleteResources(
@@ -115,6 +115,12 @@ func (h *eventHandler) updateOrDeleteResources(
115115
gatewayNSName types.NamespacedName,
116116
) error {
117117
if gw := h.store.getGateway(gatewayNSName); gw == nil {
118+
if !h.provisioner.isLeader() {
119+
h.provisioner.setResourceToDelete(gatewayNSName)
120+
121+
return nil
122+
}
123+
118124
if err := h.provisioner.deprovisionNginx(ctx, gatewayNSName); err != nil {
119125
return fmt.Errorf("error deprovisioning nginx resources: %w", err)
120126
}

internal/mode/static/provisioner/provisioner.go

+63-8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"strings"
7+
"sync"
78
"time"
89

910
"github.com/go-logr/logr"
@@ -51,10 +52,16 @@ type Config struct {
5152

5253
// NginxProvisioner handles provisioning nginx kubernetes resources.
5354
type NginxProvisioner struct {
54-
store *store
55-
k8sClient client.Client
56-
baseLabelSelector metav1.LabelSelector
57-
cfg Config
55+
store *store
56+
k8sClient client.Client
57+
// resourcesToDeleteOnStartup contains a list of Gateway names that no longer exist
58+
// but have nginx resources tied to them that need to be deleted.
59+
resourcesToDeleteOnStartup []types.NamespacedName
60+
baseLabelSelector metav1.LabelSelector
61+
cfg Config
62+
leader bool
63+
64+
lock sync.RWMutex
5865
}
5966

6067
// NewNginxProvisioner returns a new instance of a Provisioner that will deploy nginx resources.
@@ -76,10 +83,11 @@ func NewNginxProvisioner(
7683
}
7784

7885
provisioner := &NginxProvisioner{
79-
k8sClient: mgr.GetClient(),
80-
store: store,
81-
baseLabelSelector: selector,
82-
cfg: cfg,
86+
k8sClient: mgr.GetClient(),
87+
store: store,
88+
baseLabelSelector: selector,
89+
resourcesToDeleteOnStartup: []types.NamespacedName{},
90+
cfg: cfg,
8391
}
8492

8593
handler, err := newEventHandler(store, provisioner, selector, cfg.GCName)
@@ -95,13 +103,53 @@ func NewNginxProvisioner(
95103
return provisioner, eventLoop, nil
96104
}
97105

106+
// Enable is called when the Pod becomes leader and allows the provisioner to manage resources.
107+
func (p *NginxProvisioner) Enable(ctx context.Context) {
108+
p.lock.Lock()
109+
p.leader = true
110+
p.lock.Unlock()
111+
112+
p.lock.RLock()
113+
for _, gatewayNSName := range p.resourcesToDeleteOnStartup {
114+
if err := p.deprovisionNginx(ctx, gatewayNSName); err != nil {
115+
p.cfg.Logger.Error(err, "error deprovisioning nginx resources on startup")
116+
}
117+
}
118+
p.lock.RUnlock()
119+
120+
p.lock.Lock()
121+
p.resourcesToDeleteOnStartup = []types.NamespacedName{}
122+
p.lock.Unlock()
123+
}
124+
125+
// isLeader returns whether or not this provisioner is the leader.
126+
func (p *NginxProvisioner) isLeader() bool {
127+
p.lock.RLock()
128+
defer p.lock.RUnlock()
129+
130+
return p.leader
131+
}
132+
133+
// setResourceToDelete is called when there are resources to delete, but this pod is not leader.
134+
// Once it becomes leader, it will delete those resources.
135+
func (p *NginxProvisioner) setResourceToDelete(gatewayNSName types.NamespacedName) {
136+
p.lock.Lock()
137+
defer p.lock.Unlock()
138+
139+
p.resourcesToDeleteOnStartup = append(p.resourcesToDeleteOnStartup, gatewayNSName)
140+
}
141+
98142
//nolint:gocyclo // will refactor at some point
99143
func (p *NginxProvisioner) provisionNginx(
100144
ctx context.Context,
101145
resourceName string,
102146
gateway *gatewayv1.Gateway,
103147
nProxyCfg *graph.EffectiveNginxProxy,
104148
) error {
149+
if !p.isLeader() {
150+
return nil
151+
}
152+
105153
objects := p.buildNginxResourceObjects(resourceName, gateway, nProxyCfg)
106154

107155
p.cfg.Logger.Info(
@@ -208,6 +256,9 @@ func (p *NginxProvisioner) reprovisionNginx(
208256
gateway *gatewayv1.Gateway,
209257
nProxyCfg *graph.EffectiveNginxProxy,
210258
) error {
259+
if !p.isLeader() {
260+
return nil
261+
}
211262
objects := p.buildNginxResourceObjects(resourceName, gateway, nProxyCfg)
212263

213264
p.cfg.Logger.Info(
@@ -236,6 +287,10 @@ func (p *NginxProvisioner) reprovisionNginx(
236287
}
237288

238289
func (p *NginxProvisioner) deprovisionNginx(ctx context.Context, gatewayNSName types.NamespacedName) error {
290+
if !p.isLeader() {
291+
return nil
292+
}
293+
239294
p.cfg.Logger.Info(
240295
"Removing nginx resources for Gateway",
241296
"name", gatewayNSName.Name,

internal/mode/static/provisioner/store.go

+4
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ func (s *store) registerResourceInGatewayConfig(gatewayNSName types.NamespacedNa
130130
}
131131

132132
func gatewayChanged(original, updated *graph.Gateway) bool {
133+
if original == nil {
134+
return true
135+
}
136+
133137
if original.Valid != updated.Valid {
134138
return true
135139
}

0 commit comments

Comments
 (0)