Skip to content

Commit 863f515

Browse files
committed
Fix locking issues by using two locks for different purposes
1 parent 5b467b9 commit 863f515

File tree

3 files changed

+85
-69
lines changed

3 files changed

+85
-69
lines changed

internal/mode/static/handler.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,13 @@ func (h *eventHandlerImpl) processStateAndBuildConfig(
251251

252252
h.setLatestConfiguration(&cfg)
253253

254-
deployment.Lock.Lock()
254+
deployment.FileLock.Lock()
255255
if h.cfg.plus {
256256
configApplied = h.cfg.nginxUpdater.UpdateUpstreamServers(deployment, cfg)
257257
} else {
258258
configApplied = h.updateNginxConf(deployment, cfg)
259259
}
260-
deployment.Lock.Unlock()
260+
deployment.FileLock.Unlock()
261261
case state.ClusterStateChange:
262262
h.version++
263263
cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version, h.cfg.plus)
@@ -269,9 +269,9 @@ func (h *eventHandlerImpl) processStateAndBuildConfig(
269269

270270
h.setLatestConfiguration(&cfg)
271271

272-
deployment.Lock.Lock()
272+
deployment.FileLock.Lock()
273273
configApplied = h.updateNginxConf(deployment, cfg)
274-
deployment.Lock.Unlock()
274+
deployment.FileLock.Unlock()
275275
}
276276

277277
return configApplied

internal/mode/static/nginx/agent/command.go

+21-10
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,9 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
143143
go msgr.Run(ctx)
144144

145145
// apply current config before starting event loop
146-
deployment.Lock.RLock()
147146
if err := cs.setInitialConfig(ctx, deployment, conn, msgr); err != nil {
148-
deployment.Lock.RUnlock()
149-
150147
return err
151148
}
152-
deployment.Lock.RUnlock()
153149

154150
// subscribe to the deployment broadcaster to get file updates
155151
broadcaster := deployment.GetBroadcaster()
@@ -254,13 +250,15 @@ func (cs *commandService) waitForConnection(
254250
}
255251

256252
// setInitialConfig gets the initial configuration for this connection and applies it.
257-
// The caller MUST lock the deployment before calling this.
258253
func (cs *commandService) setInitialConfig(
259254
ctx context.Context,
260255
deployment *Deployment,
261256
conn *agentgrpc.Connection,
262257
msgr messenger.Messenger,
263258
) error {
259+
deployment.FileLock.Lock()
260+
defer deployment.FileLock.Unlock()
261+
264262
fileOverviews, configVersion := deployment.GetFileOverviews()
265263
if err := msgr.Send(ctx, buildRequest(fileOverviews, conn.InstanceID, configVersion)); err != nil {
266264
cs.logAndSendErrorStatus(deployment, conn, err)
@@ -419,7 +417,7 @@ func buildPlusAPIRequest(action *pb.NGINXPlusAction, instanceID string) *pb.Mana
419417
}
420418

421419
func (cs *commandService) getPodOwner(podName string) (types.NamespacedName, error) {
422-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
420+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
423421
defer cancel()
424422

425423
var pods v1.PodList
@@ -450,12 +448,25 @@ func (cs *commandService) getPodOwner(podName string) (types.NamespacedName, err
450448
}
451449

452450
var replicaSet appsv1.ReplicaSet
453-
if err := cs.k8sReader.Get(
451+
var replicaSetErr error
452+
if err := wait.PollUntilContextCancel(
454453
ctx,
455-
types.NamespacedName{Namespace: pod.Namespace, Name: podOwnerRefs[0].Name},
456-
&replicaSet,
454+
500*time.Millisecond,
455+
true, /* poll immediately */
456+
func(ctx context.Context) (bool, error) {
457+
if err := cs.k8sReader.Get(
458+
ctx,
459+
types.NamespacedName{Namespace: pod.Namespace, Name: podOwnerRefs[0].Name},
460+
&replicaSet,
461+
); err != nil {
462+
replicaSetErr = err
463+
return false, nil //nolint:nilerr // error is returned at the end
464+
}
465+
466+
return true, nil
467+
},
457468
); err != nil {
458-
return types.NamespacedName{}, fmt.Errorf("failed to get nginx Pod's ReplicaSet: %w", err)
469+
return types.NamespacedName{}, fmt.Errorf("failed to get nginx Pod's ReplicaSet: %w", replicaSetErr)
459470
}
460471

461472
replicaOwnerRefs := replicaSet.GetOwnerReferences()

internal/mode/static/nginx/agent/deployment.go

+60-55
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ type Deployment struct {
5656
fileOverviews []*pb.File
5757
files []File
5858

59-
Lock sync.RWMutex
59+
FileLock sync.RWMutex
60+
errLock sync.RWMutex
6061
}
6162

6263
// newDeployment returns a new Deployment object.
@@ -72,56 +73,94 @@ func (d *Deployment) GetBroadcaster() broadcast.Broadcaster {
7273
return d.broadcaster
7374
}
7475

75-
// GetFileOverviews returns the current list of fileOverviews and configVersion for the deployment.
76-
func (d *Deployment) GetFileOverviews() ([]*pb.File, string) {
77-
d.Lock.RLock()
78-
defer d.Lock.RUnlock()
76+
// SetLatestConfigError sets the latest config apply error for the deployment.
77+
func (d *Deployment) SetLatestConfigError(err error) {
78+
d.errLock.Lock()
79+
defer d.errLock.Unlock()
7980

80-
return d.fileOverviews, d.configVersion
81+
d.latestConfigError = err
8182
}
8283

83-
// GetNGINXPlusActions returns the current NGINX Plus API Actions for the deployment.
84-
func (d *Deployment) GetNGINXPlusActions() []*pb.NGINXPlusAction {
85-
d.Lock.RLock()
86-
defer d.Lock.RUnlock()
84+
// SetLatestUpstreamError sets the latest upstream update error for the deployment.
85+
func (d *Deployment) SetLatestUpstreamError(err error) {
86+
d.errLock.Lock()
87+
defer d.errLock.Unlock()
8788

88-
return d.nginxPlusActions
89+
d.latestUpstreamError = err
8990
}
9091

9192
// GetLatestConfigError gets the latest config apply error for the deployment.
9293
func (d *Deployment) GetLatestConfigError() error {
93-
d.Lock.RLock()
94-
defer d.Lock.RUnlock()
94+
d.errLock.RLock()
95+
defer d.errLock.RUnlock()
9596

9697
return d.latestConfigError
9798
}
9899

99100
// GetLatestUpstreamError gets the latest upstream update error for the deployment.
100101
func (d *Deployment) GetLatestUpstreamError() error {
101-
d.Lock.RLock()
102-
defer d.Lock.RUnlock()
102+
d.errLock.RLock()
103+
defer d.errLock.RUnlock()
103104

104105
return d.latestUpstreamError
105106
}
106107

108+
// SetPodErrorStatus sets the error status of a Pod in this Deployment if applying the config failed.
109+
func (d *Deployment) SetPodErrorStatus(pod string, err error) {
110+
d.errLock.Lock()
111+
defer d.errLock.Unlock()
112+
113+
d.podStatuses[pod] = err
114+
}
115+
107116
// RemovePodStatus deletes a pod from the pod status map.
108117
func (d *Deployment) RemovePodStatus(podName string) {
109-
d.Lock.Lock()
110-
defer d.Lock.Unlock()
118+
d.errLock.Lock()
119+
defer d.errLock.Unlock()
111120

112121
delete(d.podStatuses, podName)
113122
}
114123

124+
// GetConfigurationStatus returns the current config status for this Deployment. It combines
125+
// the most recent errors (if they exist) for all Pods in the Deployment into a single error.
126+
func (d *Deployment) GetConfigurationStatus() error {
127+
d.errLock.RLock()
128+
defer d.errLock.RUnlock()
129+
130+
errs := make([]error, 0, len(d.podStatuses))
131+
for _, err := range d.podStatuses {
132+
errs = append(errs, err)
133+
}
134+
135+
if len(errs) == 1 {
136+
return errs[0]
137+
}
138+
139+
return errors.Join(errs...)
140+
}
141+
115142
/*
116143
The following functions for the Deployment object are UNLOCKED, meaning that they are unsafe.
117-
Callers of these functions MUST ensure the lock is set before calling.
144+
Callers of these functions MUST ensure the FileLock is set before calling.
118145
119146
These functions are called as part of the ConfigApply or APIRequest processes. These entire processes
120147
are locked by the caller, hence why the functions themselves do not set the locks.
121148
*/
122149

150+
// GetFileOverviews returns the current list of fileOverviews and configVersion for the deployment.
151+
// The deployment FileLock MUST already be locked before calling this function.
152+
func (d *Deployment) GetFileOverviews() ([]*pb.File, string) {
153+
return d.fileOverviews, d.configVersion
154+
}
155+
156+
// GetNGINXPlusActions returns the current NGINX Plus API Actions for the deployment.
157+
// The deployment FileLock MUST already be locked before calling this function.
158+
func (d *Deployment) GetNGINXPlusActions() []*pb.NGINXPlusAction {
159+
return d.nginxPlusActions
160+
}
161+
123162
// GetFile gets the requested file for the deployment and returns its contents.
124-
// The deployment MUST already be locked before calling this function.
163+
// The deployment FileLock MUST already be locked before calling this function.
125164
func (d *Deployment) GetFile(name, hash string) []byte {
126165
for _, file := range d.files {
127166
if name == file.Meta.GetName() && hash == file.Meta.GetHash() {
@@ -133,7 +172,7 @@ func (d *Deployment) GetFile(name, hash string) []byte {
133172
}
134173

135174
// SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send.
136-
// The deployment MUST already be locked before calling this function.
175+
// The deployment FileLock MUST already be locked before calling this function.
137176
func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage {
138177
d.files = files
139178

@@ -167,45 +206,11 @@ func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage {
167206

168207
// SetNGINXPlusActions updates the deployment's latest NGINX Plus Actions to perform if using NGINX Plus.
169208
// Used by a Subscriber when it first connects.
170-
// The deployment MUST already be locked before calling this function.
209+
// The deployment FileLock MUST already be locked before calling this function.
171210
func (d *Deployment) SetNGINXPlusActions(actions []*pb.NGINXPlusAction) {
172211
d.nginxPlusActions = actions
173212
}
174213

175-
// SetPodErrorStatus sets the error status of a Pod in this Deployment if applying the config failed.
176-
// The deployment MUST already be locked before calling this function.
177-
func (d *Deployment) SetPodErrorStatus(pod string, err error) {
178-
d.podStatuses[pod] = err
179-
}
180-
181-
// SetLatestConfigError sets the latest config apply error for the deployment.
182-
// The deployment MUST already be locked before calling this function.
183-
func (d *Deployment) SetLatestConfigError(err error) {
184-
d.latestConfigError = err
185-
}
186-
187-
// SetLatestUpstreamError sets the latest upstream update error for the deployment.
188-
// The deployment MUST already be locked before calling this function.
189-
func (d *Deployment) SetLatestUpstreamError(err error) {
190-
d.latestUpstreamError = err
191-
}
192-
193-
// GetConfigurationStatus returns the current config status for this Deployment. It combines
194-
// the most recent errors (if they exist) for all Pods in the Deployment into a single error.
195-
// The deployment MUST already be locked before calling this function.
196-
func (d *Deployment) GetConfigurationStatus() error {
197-
errs := make([]error, 0, len(d.podStatuses))
198-
for _, err := range d.podStatuses {
199-
errs = append(errs, err)
200-
}
201-
202-
if len(errs) == 1 {
203-
return errs[0]
204-
}
205-
206-
return errors.Join(errs...)
207-
}
208-
209214
//counterfeiter:generate . DeploymentStorer
210215

211216
// DeploymentStorer is an interface to store Deployments.

0 commit comments

Comments
 (0)