@@ -20,31 +20,33 @@ import (
20
20
"fmt"
21
21
"strings"
22
22
23
+ "google.golang.org/grpc/codes"
24
+ "google.golang.org/grpc/status"
25
+ v1 "k8s.io/api/core/v1"
26
+ kubeerrors "k8s.io/apimachinery/pkg/api/errors"
23
27
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24
- utilversion "k8s.io/apimachinery/pkg/util/version "
28
+ "k8s.io/apimachinery/pkg/runtime "
25
29
kube "k8s.io/client-go/kubernetes"
30
+ "k8s.io/client-go/tools/record"
26
31
"k8s.io/klog/v2"
27
-
28
32
"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
29
33
buckets "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
30
34
bucketapi "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1"
35
+ "sigs.k8s.io/container-object-storage-interface-api/controller/events"
31
36
"sigs.k8s.io/container-object-storage-interface-provisioner-sidecar/pkg/consts"
32
37
cosi "sigs.k8s.io/container-object-storage-interface-spec"
33
38
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
34
-
35
- "github.com/pkg/errors"
36
- "google.golang.org/grpc/codes"
37
- "google.golang.org/grpc/status"
38
39
)
39
40
40
41
// BucketListener manages Bucket objects
41
42
type BucketListener struct {
42
43
provisionerClient cosi.ProvisionerClient
43
44
driverName string
44
45
46
+ eventRecorder record.EventRecorder
47
+
45
48
kubeClient kube.Interface
46
49
bucketClient buckets.Interface
47
- kubeVersion * utilversion.Version
48
50
}
49
51
50
52
// NewBucketListener returns a resource handler for Bucket objects
@@ -58,9 +60,10 @@ func NewBucketListener(driverName string, client cosi.ProvisionerClient) *Bucket
58
60
}
59
61
60
62
// Add attempts to create a bucket for a given bucket. This function must be idempotent
63
+ //
61
64
// Return values
62
- // nil - Bucket successfully provisioned
63
- // non-nil err - Internal error [requeue'd with exponential backoff]
65
+ // - nil - Bucket successfully provisioned
66
+ // - non-nil err - Internal error [requeue'd with exponential backoff]
64
67
func (b * BucketListener ) Add (ctx context.Context , inputBucket * v1alpha1.Bucket ) error {
65
68
bucket := inputBucket .DeepCopy ()
66
69
@@ -70,9 +73,7 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
70
73
"name" , bucket .ObjectMeta .Name )
71
74
72
75
if bucket .Spec .BucketClassName == "" {
73
- err = errors .New (fmt .Sprintf ("BucketClassName not defined for bucket %s" , bucket .ObjectMeta .Name ))
74
- klog .V (3 ).ErrorS (err , "BucketClassName not defined" )
75
- return err
76
+ return b .recordError (inputBucket , v1 .EventTypeWarning , events .FailedCreateBucket , fmt .Errorf ("%w for Bucket %v" , consts .ErrUndefinedBucketClassName , bucket .Name ))
76
77
}
77
78
78
79
if ! strings .EqualFold (bucket .Spec .DriverName , b .driverName ) {
@@ -100,15 +101,17 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
100
101
bucketID = bucket .Spec .ExistingBucketID
101
102
if bucket .Spec .Parameters == nil {
102
103
bucketClass , err := b .bucketClasses ().Get (ctx , bucket .Spec .BucketClassName , metav1.GetOptions {})
103
- if err != nil {
104
+ if kubeerrors .IsNotFound (err ) {
105
+ return b .recordError (inputBucket , v1 .EventTypeWarning , events .FailedCreateBucket , err )
106
+ } else if err != nil {
104
107
klog .V (3 ).ErrorS (err , "Error fetching bucketClass" ,
105
108
"bucketClass" , bucket .Spec .BucketClassName ,
106
109
"bucket" , bucket .ObjectMeta .Name )
107
- return err
110
+ return b . recordError ( inputBucket , v1 . EventTypeWarning , events . FailedCreateBucket , err )
108
111
}
109
112
110
113
if bucketClass .Parameters != nil {
111
- var param map [string ]string
114
+ param := make ( map [string ]string )
112
115
for k , v := range bucketClass .Parameters {
113
116
param [k ] = v
114
117
}
@@ -125,18 +128,15 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
125
128
rsp , err := b .provisionerClient .DriverCreateBucket (ctx , req )
126
129
if err != nil {
127
130
if status .Code (err ) != codes .AlreadyExists {
128
- klog .V (3 ).ErrorS (err , "Driver failed to create bucket" ,
129
- "bucket" , bucket .ObjectMeta .Name )
130
- return errors .Wrap (err , "Failed to create bucket" )
131
+ return b .recordError (inputBucket , v1 .EventTypeWarning , events .FailedCreateBucket , fmt .Errorf ("failed to create bucket: %w" , err ))
131
132
}
132
-
133
133
}
134
134
135
135
if rsp == nil {
136
- err = errors . New ( fmt . Sprintf ( "DriverCreateBucket returned a nil response for bucket: %s" , bucket . ObjectMeta . Name ))
136
+ err = consts . ErrInternal
137
137
klog .V (3 ).ErrorS (err , "Internal Error from driver" ,
138
138
"bucket" , bucket .ObjectMeta .Name )
139
- return err
139
+ return fmt . Errorf ( "%w for Bucket %v" , err , bucket . Name )
140
140
}
141
141
142
142
if rsp .BucketId != "" {
@@ -145,8 +145,7 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
145
145
} else {
146
146
klog .V (3 ).ErrorS (err , "DriverCreateBucket returned an empty bucketID" ,
147
147
"bucket" , bucket .ObjectMeta .Name )
148
- err = errors .New (fmt .Sprintf ("DriverCreateBucket returned an empty bucketID for bucket: %s" , bucket .ObjectMeta .Name ))
149
- return err
148
+ return fmt .Errorf ("%w for Bucket %v" , consts .ErrEmptyBucketID , bucket .Name )
150
149
}
151
150
152
151
// Now we update the BucketReady status of BucketClaim
@@ -157,15 +156,15 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
157
156
klog .V (3 ).ErrorS (err , "Failed to get bucketClaim" ,
158
157
"bucketClaim" , ref .Name ,
159
158
"bucket" , bucket .ObjectMeta .Name )
160
- return err
159
+ return b . recordError ( bucket , v1 . EventTypeWarning , events . FailedCreateBucket , err )
161
160
}
162
161
163
162
bucketClaim .Status .BucketReady = true
164
163
if _ , err = b .bucketClaims (bucketClaim .Namespace ).UpdateStatus (ctx , bucketClaim , metav1.UpdateOptions {}); err != nil {
165
164
klog .V (3 ).ErrorS (err , "Failed to update bucketClaim" ,
166
165
"bucketClaim" , ref .Name ,
167
166
"bucket" , bucket .ObjectMeta .Name )
168
- return err
167
+ return b . recordError ( bucket , v1 . EventTypeWarning , events . FailedCreateBucket , err )
169
168
}
170
169
171
170
klog .V (5 ).Infof ("Successfully updated status of bucketClaim: %s, bucket: %s" , bucketClaim .ObjectMeta .Name , bucket .ObjectMeta .Name )
@@ -175,7 +174,8 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
175
174
controllerutil .AddFinalizer (bucket , consts .BucketFinalizer )
176
175
if bucket , err = b .buckets ().Update (ctx , bucket , metav1.UpdateOptions {}); err != nil {
177
176
klog .V (3 ).ErrorS (err , "Failed to update bucket finalizers" , "bucket" , bucket .ObjectMeta .Name )
178
- return errors .Wrap (err , "Failed to update bucket finalizers" )
177
+ return b .recordError (bucket , v1 .EventTypeWarning , events .FailedCreateBucket ,
178
+ fmt .Errorf ("failed to update bucket finalizers: %w" , err ))
179
179
}
180
180
181
181
klog .V (5 ).Infof ("Successfully added finalizer to bucket: %s" , bucket .ObjectMeta .Name )
@@ -188,7 +188,8 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
188
188
if _ , err = b .buckets ().UpdateStatus (ctx , bucket , metav1.UpdateOptions {}); err != nil {
189
189
klog .V (3 ).ErrorS (err , "Failed to update bucket status" ,
190
190
"bucket" , bucket .ObjectMeta .Name )
191
- return errors .Wrap (err , "Failed to update bucket status" )
191
+ return b .recordError (bucket , v1 .EventTypeWarning , events .FailedCreateBucket ,
192
+ fmt .Errorf ("failed to update bucket status: %w" , err ))
192
193
}
193
194
194
195
klog .V (3 ).InfoS ("Add Bucket success" ,
@@ -201,8 +202,8 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
201
202
202
203
// Update attempts to reconcile changes to a given bucket. This function must be idempotent
203
204
// Return values
204
- // nil - Bucket successfully reconciled
205
- // non-nil err - Internal error [requeue'd with exponential backoff]
205
+ // - nil - Bucket successfully reconciled
206
+ // - non-nil err - Internal error [requeue'd with exponential backoff]
206
207
func (b * BucketListener ) Update (ctx context.Context , old , new * v1alpha1.Bucket ) error {
207
208
klog .V (3 ).InfoS ("Update Bucket" ,
208
209
"name" , old .Name )
@@ -216,6 +217,11 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
216
217
bucketClaimNs := bucket .Spec .BucketClaim .Namespace
217
218
bucketClaimName := bucket .Spec .BucketClaim .Name
218
219
bucketAccessList , err := b .bucketAccesses (bucketClaimNs ).List (ctx , metav1.ListOptions {})
220
+ if err != nil {
221
+ klog .V (3 ).ErrorS (err , "Error fetching BucketAccessList" ,
222
+ "bucket" , bucket .ObjectMeta .Name )
223
+ return err
224
+ }
219
225
220
226
for _ , bucketAccess := range bucketAccessList .Items {
221
227
if strings .EqualFold (bucketAccess .Spec .BucketClaimName , bucketClaimName ) {
@@ -238,7 +244,7 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
238
244
if controllerutil .ContainsFinalizer (bucket , consts .BucketFinalizer ) {
239
245
err = b .deleteBucketOp (ctx , bucket )
240
246
if err != nil {
241
- return err
247
+ return b . recordError ( bucket , v1 . EventTypeWarning , events . FailedDeleteBucket , err )
242
248
}
243
249
244
250
controllerutil .RemoveFinalizer (bucket , consts .BucketFinalizer )
@@ -263,8 +269,8 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
263
269
// Delete function is called when the bucket was not able to add finalizers while creation.
264
270
// Hence we will take care of removing the BucketClaim finalizer before deleting the Bucket object.
265
271
// Return values
266
- // nil - Bucket successfully deleted
267
- // non-nil err - Internal error [requeue'd with exponential backoff]
272
+ // - nil - Bucket successfully deleted
273
+ // - non-nil err - Internal error [requeue'd with exponential backoff]
268
274
func (b * BucketListener ) Delete (ctx context.Context , inputBucket * v1alpha1.Bucket ) error {
269
275
klog .V (3 ).InfoS ("Delete Bucket" ,
270
276
"name" , inputBucket .ObjectMeta .Name ,
@@ -294,26 +300,23 @@ func (b *BucketListener) Delete(ctx context.Context, inputBucket *v1alpha1.Bucke
294
300
}
295
301
296
302
return nil
297
-
298
303
}
299
304
300
305
// InitializeKubeClient initializes the kubernetes client
301
306
func (b * BucketListener ) InitializeKubeClient (k kube.Interface ) {
302
307
b .kubeClient = k
303
-
304
- serverVersion , err := k .Discovery ().ServerVersion ()
305
- if err != nil {
306
- klog .V (3 ).ErrorS (err , "Cannot determine server version" )
307
- } else {
308
- b .kubeVersion = utilversion .MustParseSemantic (serverVersion .GitVersion )
309
- }
310
308
}
311
309
312
310
// InitializeBucketClient initializes the object storage bucket client
313
311
func (b * BucketListener ) InitializeBucketClient (bc buckets.Interface ) {
314
312
b .bucketClient = bc
315
313
}
316
314
315
+ // InitializeEventRecorder initializes the event recorder
316
+ func (b * BucketListener ) InitializeEventRecorder (er record.EventRecorder ) {
317
+ b .eventRecorder = er
318
+ }
319
+
317
320
func (b * BucketListener ) deleteBucketOp (ctx context.Context , bucket * v1alpha1.Bucket ) error {
318
321
if ! strings .EqualFold (bucket .Spec .DriverName , b .driverName ) {
319
322
klog .V (5 ).InfoS ("Skipping bucket for provisioner" ,
@@ -333,10 +336,7 @@ func (b *BucketListener) deleteBucketOp(ctx context.Context, bucket *v1alpha1.Bu
333
336
334
337
if _ , err := b .provisionerClient .DriverDeleteBucket (ctx , req ); err != nil {
335
338
if status .Code (err ) != codes .NotFound {
336
- klog .V (3 ).ErrorS (err , "Failed to delete bucket" ,
337
- "bucket" , bucket .ObjectMeta .Name ,
338
- )
339
- return err
339
+ return fmt .Errorf ("failed to delete bucket: %w" , err )
340
340
}
341
341
}
342
342
@@ -396,3 +396,21 @@ func (b *BucketListener) bucketAccesses(namespace string) bucketapi.BucketAccess
396
396
}
397
397
panic ("uninitialized listener" )
398
398
}
399
+
400
+ // recordError during the processing of the objects
401
+ func (b * BucketListener ) recordError (subject runtime.Object , eventtype , reason string , err error ) error {
402
+ if b .eventRecorder == nil {
403
+ return err
404
+ }
405
+ b .eventRecorder .Event (subject , eventtype , reason , err .Error ())
406
+
407
+ return err
408
+ }
409
+
410
+ // recordEvent during the processing of the objects
411
+ func (b * BucketListener ) recordEvent (subject runtime.Object , eventtype , reason , message string , args ... any ) {
412
+ if b .eventRecorder == nil {
413
+ return
414
+ }
415
+ b .eventRecorder .Event (subject , eventtype , reason , fmt .Sprintf (message , args ... ))
416
+ }
0 commit comments