@@ -18,6 +18,7 @@ package leaderelection
18
18
19
19
import (
20
20
"context"
21
+ "reflect"
21
22
"time"
22
23
23
24
v1 "k8s.io/api/coordination/v1"
@@ -37,11 +38,15 @@ import (
37
38
38
39
const requeueInterval = 5 * time .Minute
39
40
41
+ type CacheSyncWaiter interface {
42
+ WaitForCacheSync (stopCh <- chan struct {}) map [reflect.Type ]bool
43
+ }
44
+
40
45
type LeaseCandidate struct {
41
- LeaseClient coordinationv1alpha1client.LeaseCandidateInterface
42
- LeaseCandidateInformer cache.SharedIndexInformer
43
- InformerFactory informers.SharedInformerFactory
44
- HasSynced cache.InformerSynced
46
+ leaseClient coordinationv1alpha1client.LeaseCandidateInterface
47
+ leaseCandidateInformer cache.SharedIndexInformer
48
+ informerFactory informers.SharedInformerFactory
49
+ hasSynced cache.InformerSynced
45
50
46
51
// At most there will be one item in this Queue (since we only watch one item)
47
52
queue workqueue.TypedRateLimitingInterface [int ]
@@ -52,7 +57,7 @@ type LeaseCandidate struct {
52
57
// controller lease
53
58
leaseName string
54
59
55
- Clock clock.Clock
60
+ clock clock.Clock
56
61
57
62
binaryVersion , emulationVersion string
58
63
preferredStrategies []v1.CoordinatedLeaseStrategy
@@ -62,10 +67,9 @@ func NewCandidate(clientset kubernetes.Interface,
62
67
candidateName string ,
63
68
candidateNamespace string ,
64
69
targetLease string ,
65
- clock clock.Clock ,
66
70
binaryVersion , emulationVersion string ,
67
71
preferredStrategies []v1.CoordinatedLeaseStrategy ,
68
- ) (* LeaseCandidate , error ) {
72
+ ) (* LeaseCandidate , CacheSyncWaiter , error ) {
69
73
fieldSelector := fields .OneTermEqualSelector ("metadata.name" , candidateName ).String ()
70
74
// A separate informer factory is required because this must start before informerFactories
71
75
// are started for leader elected components
@@ -78,20 +82,20 @@ func NewCandidate(clientset kubernetes.Interface,
78
82
leaseCandidateInformer := informerFactory .Coordination ().V1alpha1 ().LeaseCandidates ().Informer ()
79
83
80
84
lc := & LeaseCandidate {
81
- LeaseClient : clientset .CoordinationV1alpha1 ().LeaseCandidates (candidateNamespace ),
82
- LeaseCandidateInformer : leaseCandidateInformer ,
83
- InformerFactory : informerFactory ,
85
+ leaseClient : clientset .CoordinationV1alpha1 ().LeaseCandidates (candidateNamespace ),
86
+ leaseCandidateInformer : leaseCandidateInformer ,
87
+ informerFactory : informerFactory ,
84
88
name : candidateName ,
85
89
namespace : candidateNamespace ,
86
90
leaseName : targetLease ,
87
- Clock : clock ,
91
+ clock : clock. RealClock {} ,
88
92
binaryVersion : binaryVersion ,
89
93
emulationVersion : emulationVersion ,
90
94
preferredStrategies : preferredStrategies ,
91
95
}
92
96
lc .queue = workqueue .NewTypedRateLimitingQueueWithConfig (workqueue .DefaultTypedControllerRateLimiter [int ](), workqueue.TypedRateLimitingQueueConfig [int ]{Name : "leasecandidate" })
93
97
94
- synced , err := leaseCandidateInformer .AddEventHandler (cache.ResourceEventHandlerFuncs {
98
+ h , err := leaseCandidateInformer .AddEventHandler (cache.ResourceEventHandlerFuncs {
95
99
UpdateFunc : func (oldObj , newObj interface {}) {
96
100
if leasecandidate , ok := newObj .(* v1alpha1.LeaseCandidate ); ok {
97
101
if leasecandidate .Spec .PingTime != nil {
@@ -101,18 +105,18 @@ func NewCandidate(clientset kubernetes.Interface,
101
105
},
102
106
})
103
107
if err != nil {
104
- return nil , err
108
+ return nil , nil , err
105
109
}
106
- lc .HasSynced = synced .HasSynced
110
+ lc .hasSynced = h .HasSynced
107
111
108
- return lc , nil
112
+ return lc , informerFactory , nil
109
113
}
110
114
111
115
func (c * LeaseCandidate ) Run (ctx context.Context ) {
112
116
defer c .queue .ShutDown ()
113
117
114
- go c .InformerFactory .Start (ctx .Done ())
115
- if ! cache .WaitForNamedCacheSync ("leasecandidateclient" , ctx .Done (), c .HasSynced ) {
118
+ go c .informerFactory .Start (ctx .Done ())
119
+ if ! cache .WaitForNamedCacheSync ("leasecandidateclient" , ctx .Done (), c .hasSynced ) {
116
120
return
117
121
}
118
122
@@ -153,12 +157,12 @@ func (c *LeaseCandidate) enqueueLease() {
153
157
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
154
158
// a bool (true if this call created the lease), or any error that occurs.
155
159
func (c * LeaseCandidate ) ensureLease (ctx context.Context ) error {
156
- lease , err := c .LeaseClient .Get (ctx , c .name , metav1.GetOptions {})
160
+ lease , err := c .leaseClient .Get (ctx , c .name , metav1.GetOptions {})
157
161
if apierrors .IsNotFound (err ) {
158
162
klog .V (2 ).Infof ("Creating lease candidate" )
159
163
// lease does not exist, create it.
160
164
leaseToCreate := c .newLease ()
161
- _ , err := c .LeaseClient .Create (ctx , leaseToCreate , metav1.CreateOptions {})
165
+ _ , err := c .leaseClient .Create (ctx , leaseToCreate , metav1.CreateOptions {})
162
166
if err != nil {
163
167
return err
164
168
}
@@ -169,9 +173,9 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
169
173
}
170
174
klog .V (2 ).Infof ("lease candidate exists.. renewing" )
171
175
clone := lease .DeepCopy ()
172
- clone .Spec .RenewTime = & metav1.MicroTime {Time : c .Clock .Now ()}
176
+ clone .Spec .RenewTime = & metav1.MicroTime {Time : c .clock .Now ()}
173
177
clone .Spec .PingTime = nil
174
- _ , err = c .LeaseClient .Update (ctx , clone , metav1.UpdateOptions {})
178
+ _ , err = c .leaseClient .Update (ctx , clone , metav1.UpdateOptions {})
175
179
if err != nil {
176
180
return err
177
181
}
@@ -191,6 +195,6 @@ func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate {
191
195
PreferredStrategies : c .preferredStrategies ,
192
196
},
193
197
}
194
- lease .Spec .RenewTime = & metav1.MicroTime {Time : c .Clock .Now ()}
198
+ lease .Spec .RenewTime = & metav1.MicroTime {Time : c .clock .Now ()}
195
199
return lease
196
200
}
0 commit comments