@@ -24,10 +24,10 @@ import (
24
24
kube "k8s.io/client-go/kubernetes"
25
25
"k8s.io/klog/v2"
26
26
27
- "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io /v1alpha1"
28
- buckets "sigs.k8s.io/container-object-storage-interface-api/clientset"
29
- bucketapi "sigs.k8s.io/container-object-storage-interface-api/clientset/typed/objectstorage.k8s.io /v1alpha1"
30
- "sigs.k8s.io/container-object-storage-interface-provisioner-sidecar/pkg/const "
27
+ "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
28
+ buckets "sigs.k8s.io/container-object-storage-interface-api/client/ clientset/versioned "
29
+ bucketapi "sigs.k8s.io/container-object-storage-interface-api/client/ clientset/versioned/ typed/objectstorage/v1alpha1"
30
+ "sigs.k8s.io/container-object-storage-interface-provisioner-sidecar/pkg/consts "
31
31
cosi "sigs.k8s.io/container-object-storage-interface-spec"
32
32
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
33
33
@@ -63,6 +63,8 @@ func NewBucketListener(driverName string, client cosi.ProvisionerClient) *Bucket
63
63
func (b * BucketListener ) Add (ctx context.Context , inputBucket * v1alpha1.Bucket ) error {
64
64
bucket := inputBucket .DeepCopy ()
65
65
66
+ var err error
67
+
66
68
klog .V (3 ).InfoS ("Add Bucket" ,
67
69
"name" , bucket .ObjectMeta .Name ,
68
70
"bucketclass" , bucket .Spec .BucketClassName ,
@@ -85,9 +87,10 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
85
87
return nil
86
88
}
87
89
88
- if bucket .Spec .ExistingBucketID != nil {
90
+ if bucket .Spec .ExistingBucketID != "" {
89
91
bucket .Status .BucketReady = true
90
92
bucket .Status .BucketID = bucket .Spec .ExistingBucketID
93
+
91
94
} else {
92
95
req := & cosi.DriverCreateBucketRequest {
93
96
Parameters : bucket .Spec .Parameters ,
@@ -122,20 +125,26 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
122
125
// Now we update the BucketReady status of BucketClaim
123
126
if bucket .Spec .BucketClaim != nil {
124
127
ref := bucket .Spec .BucketClaim
125
- bucketClaim , err := b .BucketClaims (ref .Namespace ).Get (ctx , ref .Name , metav1.GetOptions {})
128
+ bucketClaim , err := b .bucketClaims (ref .Namespace ).Get (ctx , ref .Name , metav1.GetOptions {})
126
129
if err != nil {
127
130
return err
128
131
}
129
132
130
133
bucketClaim .Status .BucketReady = true
131
- if _ , err = b .BucketClaims (bucketClaim .Namespace ).Update (ctx , bucketClaim , metav1.UpdateOptions {}); err != nil {
134
+ if _ , err = b .bucketClaims (bucketClaim .Namespace ).Update (ctx , bucketClaim , metav1.UpdateOptions {}); err != nil {
132
135
return err
133
136
}
134
137
}
135
138
}
136
139
140
+ controllerutil .AddFinalizer (bucket , consts .BucketFinalizer )
141
+ if _ , err = b .buckets ().Update (ctx , bucket , metav1.UpdateOptions {}); err != nil {
142
+ klog .ErrorS (err , "Failed to update bucket finalizers" , "bucket" , bucket .ObjectMeta .Name )
143
+ return errors .Wrap (err , "Failed to update bucket finalizers" )
144
+ }
145
+
137
146
// if this step fails, then controller will retry with backoff
138
- if _ , err = b .Buckets ().UpdateStatus (ctx , bucket , metav1.UpdateOptions {}); err != nil {
147
+ if _ , err = b .buckets ().UpdateStatus (ctx , bucket , metav1.UpdateOptions {}); err != nil {
139
148
klog .ErrorS (err , "Failed to update bucket status" ,
140
149
"bucket" , bucket .ObjectMeta .Name )
141
150
return errors .Wrap (err , "Failed to update bucket status" )
@@ -152,20 +161,33 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
152
161
klog .V (3 ).InfoS ("Update Bucket" ,
153
162
"name" , old .Name )
154
163
155
- if ! new .GetDeletionTimestamp ().IsZero () {
156
- if len (new .ObjectMeta .Finalizers ) > 0 {
157
- bucketClaimNs := new .Spec .BucketClaim .Namespace
158
- bucketClaimName := new .Spec .BucketClaim .Name
159
- bucketAccessList , err := b .BucketAccesses (bucketClaimNs ).List (ctx , ListOptions {})
164
+ bucket := new .DeepCopy ()
165
+
166
+ var err error
167
+
168
+ if ! bucket .GetDeletionTimestamp ().IsZero () {
169
+ if controllerutil .ContainsFinalizer (bucket , consts .BABucketFinalizer ) {
170
+ bucketClaimNs := bucket .Spec .BucketClaim .Namespace
171
+ bucketClaimName := bucket .Spec .BucketClaim .Name
172
+ bucketAccessList , err := b .bucketAccesses (bucketClaimNs ).List (ctx , metav1.ListOptions {})
160
173
161
174
for _ , bucketAccess := range bucketAccessList .Items {
162
175
if strings .EqualFold (bucketAccess .Spec .BucketClaimName , bucketClaimName ) {
163
- err = b .BucketAccesses (bucketClaimNs ).Delete (ctx , bucketAccess .Name , metav1.DeleteOptions {})
176
+ err = b .bucketAccesses (bucketClaimNs ).Delete (ctx , bucketAccess .Name , metav1.DeleteOptions {})
164
177
if err != nil {
165
178
return err
166
179
}
167
180
}
168
181
}
182
+
183
+ controllerutil .RemoveFinalizer (bucket , consts .BABucketFinalizer )
184
+ }
185
+
186
+ if controllerutil .ContainsFinalizer (bucket , consts .BucketFinalizer ) {
187
+ err = b .deleteBucketOp (ctx , bucket )
188
+ if err != nil {
189
+ return err
190
+ }
169
191
}
170
192
}
171
193
@@ -177,13 +199,33 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
177
199
// nil - Bucket successfully deleted
178
200
// non-nil err - Internal error [requeue'd with exponential backoff]
179
201
func (b * BucketListener ) Delete (ctx context.Context , inputBucket * v1alpha1.Bucket ) error {
180
- bucket := inputBucket .DeepCopy ()
181
-
182
202
klog .V (3 ).InfoS ("Delete Bucket" ,
183
- "name" , bucket .ObjectMeta .Name ,
184
- "bucketclass" , bucket .Spec .BucketClassName ,
203
+ "name" , inputBucket .ObjectMeta .Name ,
204
+ "bucketclass" , inputBucket .Spec .BucketClassName ,
185
205
)
186
206
207
+ return nil
208
+
209
+ }
210
+
211
+ // InitializeKubeClient initializes the kubernetes client
212
+ func (b * BucketListener ) InitializeKubeClient (k kube.Interface ) {
213
+ b .kubeClient = k
214
+
215
+ serverVersion , err := k .Discovery ().ServerVersion ()
216
+ if err != nil {
217
+ klog .ErrorS (err , "Cannot determine server version" )
218
+ } else {
219
+ b .kubeVersion = utilversion .MustParseSemantic (serverVersion .GitVersion )
220
+ }
221
+ }
222
+
223
+ // InitializeBucketClient initializes the object storage bucket client
224
+ func (b * BucketListener ) InitializeBucketClient (bc buckets.Interface ) {
225
+ b .bucketClient = bc
226
+ }
227
+
228
+ func (b * BucketListener ) deleteBucketOp (ctx context.Context , bucket * v1alpha1.Bucket ) error {
187
229
if ! strings .EqualFold (bucket .Spec .DriverName , b .driverName ) {
188
230
klog .V (5 ).InfoS ("Skipping bucket for provisiner" ,
189
231
"bucket" , bucket .ObjectMeta .Name ,
@@ -194,7 +236,7 @@ func (b *BucketListener) Delete(ctx context.Context, inputBucket *v1alpha1.Bucke
194
236
195
237
// We ask the driver to clean up the bucket from the storage provider
196
238
// only when the retain policy is set to Delete
197
- if bucket .Spec .DeletionPolicy == bucketapi .DeletionPolicyDelete {
239
+ if bucket .Spec .DeletionPolicy == v1alpha1 .DeletionPolicyDelete {
198
240
req := & cosi.DriverDeleteBucketRequest {
199
241
BucketId : bucket .Status .BucketID ,
200
242
}
@@ -211,13 +253,13 @@ func (b *BucketListener) Delete(ctx context.Context, inputBucket *v1alpha1.Bucke
211
253
212
254
if bucket .Spec .BucketClaim != nil {
213
255
ref := bucket .Spec .BucketClaim
214
- bucketClaim , err := b .BucketClaims (ref .Namespace ).Get (ctx , ref .Name , metav1.GetOptions {})
256
+ bucketClaim , err := b .bucketClaims (ref .ObjectMeta . Namespace ).Get (ctx , ref . ObjectMeta .Name , metav1.GetOptions {})
215
257
if err != nil {
216
258
return err
217
259
}
218
260
219
- if controllerutil .RemoveFinalizer (bucketClaim , const . BcFinalizer ) {
220
- if _ , err := b .BucketClaims (bucketClaim .Namespace ).Update (ctx , bucketClaim , metav1.UpdateOptions {}); err != nil {
261
+ if controllerutil .RemoveFinalizer (bucketClaim , consts . BCFinalizer ) {
262
+ if _ , err := b .bucketClaims (bucketClaim . ObjectMeta .Namespace ).Update (ctx , bucketClaim , metav1.UpdateOptions {}); err != nil {
221
263
return err
222
264
}
223
265
}
@@ -226,14 +268,14 @@ func (b *BucketListener) Delete(ctx context.Context, inputBucket *v1alpha1.Bucke
226
268
return nil
227
269
}
228
270
229
- func (b * BucketListener ) Buckets () bucketapi.BucketInterface {
271
+ func (b * BucketListener ) buckets () bucketapi.BucketInterface {
230
272
if b .bucketClient != nil {
231
273
return b .bucketClient .ObjectstorageV1alpha1 ().Buckets ()
232
274
}
233
275
panic ("uninitialized listener" )
234
276
}
235
277
236
- func (b * BucketListener ) BucketClaims (namespace string ) bucketapi.BucketClaimInterface {
278
+ func (b * BucketListener ) bucketClaims (namespace string ) bucketapi.BucketClaimInterface {
237
279
if b .bucketClient != nil {
238
280
return b .bucketClient .ObjectstorageV1alpha1 ().BucketClaims (namespace )
239
281
}
@@ -242,26 +284,10 @@ func (b *BucketListener) BucketClaims(namespace string) bucketapi.BucketClaimInt
242
284
}
243
285
244
286
245
- func (b * BucketListener ) BucketAccesses (namespace string ) bucketapi.BucketAccessInterface {
287
+ func (b * BucketListener ) bucketAccesses (namespace string ) bucketapi.BucketAccessInterface {
246
288
if b .bucketClient != nil {
247
289
return b .bucketClient .ObjectstorageV1alpha1 ().BucketAccesses (namespace )
248
290
}
249
291
panic ("uninitialized listener" )
250
292
}
251
293
252
- // InitializeKubeClient initializes the kubernetes client
253
- func (b * BucketListener ) InitializeKubeClient (k kube.Interface ) {
254
- b .kubeClient = k
255
-
256
- serverVersion , err := k .Discovery ().ServerVersion ()
257
- if err != nil {
258
- klog .ErrorS (err , "Cannot determine server version" )
259
- } else {
260
- b .kubeVersion = utilversion .MustParseSemantic (serverVersion .GitVersion )
261
- }
262
- }
263
-
264
- // InitializeBucketClient initializes the object storage bucket client
265
- func (b * BucketListener ) InitializeBucketClient (bc buckets.Interface ) {
266
- b .bucketClient = bc
267
- }
0 commit comments