@@ -2,33 +2,37 @@ package bucketclaim
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
6
7
v1 "k8s.io/api/core/v1"
7
- "k8s.io/apimachinery/pkg/api/errors"
8
+ kubeerrors "k8s.io/apimachinery/pkg/api/errors"
8
9
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10
+ "k8s.io/apimachinery/pkg/runtime"
9
11
kubeclientset "k8s.io/client-go/kubernetes"
12
+ "k8s.io/client-go/tools/record"
10
13
"k8s.io/klog/v2"
11
- "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
12
-
13
14
"sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
14
15
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
15
16
objectstoragev1alpha1 "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1"
16
-
17
+ "sigs.k8s.io/container-object-storage-interface-api/controller/events"
17
18
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
19
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
18
20
)
19
21
20
- // bucketClaimListener is a resource handler for bucket requests objects
21
- type bucketClaimListener struct {
22
+ // BucketClaimListener is a resource handler for bucket requests objects
23
+ type BucketClaimListener struct {
24
+ eventRecorder record.EventRecorder
25
+
22
26
kubeClient kubeclientset.Interface
23
27
bucketClient bucketclientset.Interface
24
28
}
25
29
26
- func NewBucketClaimListener () * bucketClaimListener {
27
- return & bucketClaimListener {}
30
+ func NewBucketClaimListener () * BucketClaimListener {
31
+ return & BucketClaimListener {}
28
32
}
29
33
30
34
// Add creates a bucket in response to a bucketClaim
31
- func (b * bucketClaimListener ) Add (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
35
+ func (b * BucketClaimListener ) Add (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
32
36
klog .V (3 ).InfoS ("Add BucketClaim" ,
33
37
"name" , bucketClaim .ObjectMeta .Name ,
34
38
"ns" , bucketClaim .ObjectMeta .Namespace ,
@@ -39,7 +43,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
39
43
if err != nil {
40
44
switch err {
41
45
case util .ErrInvalidBucketClass :
42
- klog .V (3 ).ErrorS (util . ErrInvalidBucketClass ,
46
+ klog .V (3 ).ErrorS (err ,
43
47
"bucketClaim" , bucketClaim .ObjectMeta .Name ,
44
48
"ns" , bucketClaim .ObjectMeta .Namespace ,
45
49
"bucketClassName" , bucketClaim .Spec .BucketClassName )
@@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
65
69
}
66
70
67
71
// update processes any updates made to the bucket request
68
- func (b * bucketClaimListener ) Update (ctx context.Context , old , new * v1alpha1.BucketClaim ) error {
72
+ func (b * BucketClaimListener ) Update (ctx context.Context , old , new * v1alpha1.BucketClaim ) error {
69
73
klog .V (3 ).InfoS ("Update BucketClaim" ,
70
74
"name" , old .Name ,
71
75
"ns" , old .Namespace )
@@ -80,7 +84,7 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
80
84
klog .V (3 ).ErrorS (err , "Error deleting bucket" ,
81
85
"bucket" , bucketName ,
82
86
"bucketClaim" , bucketClaim .ObjectMeta .Name )
83
- return err
87
+ return b . recordError ( bucketClaim , v1 . EventTypeWarning , events . FailedDeleteBucket , err )
84
88
}
85
89
86
90
klog .V (5 ).Infof ("Successfully deleted bucket: %s from bucketClaim: %s" , bucketName , bucketClaim .ObjectMeta .Name )
@@ -94,22 +98,22 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
94
98
}
95
99
96
100
// Delete processes a bucket for which bucket request is deleted
97
- func (b * bucketClaimListener ) Delete (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
98
- klog .V (3 ).Info ("Delete BucketClaim" ,
101
+ func (b * BucketClaimListener ) Delete (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
102
+ klog .V (3 ).InfoS ("Delete BucketClaim" ,
99
103
"name" , bucketClaim .ObjectMeta .Name ,
100
104
"ns" , bucketClaim .ObjectMeta .Namespace )
101
105
102
106
return nil
103
107
}
104
108
105
109
// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
106
- // Return values
107
110
//
108
- // nil - BucketClaim successfully processed
109
- // ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
110
- // ErrBucketAlreadyExists - BucketClaim already processed
111
- // non-nil err - Internal error [requeue'd with exponential backoff]
112
- func (b * bucketClaimListener ) provisionBucketClaimOperation (ctx context.Context , inputBucketClaim * v1alpha1.BucketClaim ) error {
111
+ // Return values
112
+ // - nil - BucketClaim successfully processed
113
+ // - ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
114
+ // - ErrBucketAlreadyExists - BucketClaim already processed
115
+ // - non-nil err - Internal error [requeue'd with exponential backoff]
116
+ func (b * BucketClaimListener ) provisionBucketClaimOperation (ctx context.Context , inputBucketClaim * v1alpha1.BucketClaim ) error {
113
117
bucketClaim := inputBucketClaim .DeepCopy ()
114
118
if bucketClaim .Status .BucketReady {
115
119
return util .ErrBucketAlreadyExists
@@ -121,9 +125,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
121
125
if bucketClaim .Spec .ExistingBucketName != "" {
122
126
bucketName = bucketClaim .Spec .ExistingBucketName
123
127
bucket , err := b .buckets ().Get (ctx , bucketName , metav1.GetOptions {})
124
- if err != nil {
128
+ if kubeerrors .IsNotFound (err ) {
129
+ return b .recordError (inputBucketClaim , v1 .EventTypeWarning , events .FailedCreateBucket , err )
130
+ } else if err != nil {
125
131
klog .V (3 ).ErrorS (err , "Get Bucket with ExistingBucketName error" , "name" , bucketClaim .Spec .ExistingBucketName )
126
- return err
132
+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
127
133
}
128
134
129
135
bucket .Spec .BucketClaim = & v1.ObjectReference {
@@ -141,21 +147,23 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
141
147
klog .V (3 ).ErrorS (err , "Error updating existing bucket" ,
142
148
"bucket" , bucket .ObjectMeta .Name ,
143
149
"bucketClaim" , bucketClaim .ObjectMeta .Name )
144
- return err
150
+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
145
151
}
146
152
147
153
bucketClaim .Status .BucketName = bucketName
148
154
bucketClaim .Status .BucketReady = true
149
155
} else {
150
156
bucketClassName := bucketClaim .Spec .BucketClassName
151
157
if bucketClassName == "" {
152
- return util .ErrInvalidBucketClass
158
+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , util .ErrInvalidBucketClass )
153
159
}
154
160
155
161
bucketClass , err := b .bucketClasses ().Get (ctx , bucketClassName , metav1.GetOptions {})
156
- if err != nil {
162
+ if kubeerrors .IsNotFound (err ) {
163
+ return b .recordError (inputBucketClaim , v1 .EventTypeWarning , events .FailedCreateBucket , err )
164
+ } else if err != nil {
157
165
klog .V (3 ).ErrorS (err , "Get Bucketclass Error" , "name" , bucketClassName )
158
- return util . ErrInvalidBucketClass
166
+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
159
167
}
160
168
161
169
bucketName = bucketClassName + string (bucketClaim .ObjectMeta .UID )
@@ -180,11 +188,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
180
188
181
189
bucket .Spec .Protocols = protocolCopy
182
190
bucket , err = b .buckets ().Create (ctx , bucket , metav1.CreateOptions {})
183
- if err != nil && ! errors .IsAlreadyExists (err ) {
191
+ if err != nil && ! kubeerrors .IsAlreadyExists (err ) {
184
192
klog .V (3 ).ErrorS (err , "Error creationg bucket" ,
185
193
"bucket" , bucketName ,
186
194
"bucketClaim" , bucketClaim .ObjectMeta .Name )
187
- return err
195
+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
188
196
}
189
197
190
198
bucketClaim .Status .BucketName = bucketName
@@ -196,7 +204,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
196
204
bucketClaim , err = b .bucketClaims (bucketClaim .ObjectMeta .Namespace ).UpdateStatus (ctx , bucketClaim , metav1.UpdateOptions {})
197
205
if err != nil {
198
206
klog .V (3 ).ErrorS (err , "Failed to update status of BucketClaim" , "name" , bucketClaim .ObjectMeta .Name )
199
- return err
207
+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
200
208
}
201
209
202
210
// Add the finalizers so that bucketClaim is deleted
@@ -205,38 +213,63 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
205
213
_ , err = b .bucketClaims (bucketClaim .ObjectMeta .Namespace ).Update (ctx , bucketClaim , metav1.UpdateOptions {})
206
214
if err != nil {
207
215
klog .V (3 ).ErrorS (err , "Failed to add finalizer BucketClaim" , "name" , bucketClaim .ObjectMeta .Name )
208
- return err
216
+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
209
217
}
210
218
211
219
klog .V (3 ).Infof ("Finished creating Bucket %v" , bucketName )
212
220
return nil
213
221
}
214
222
215
- func (b * bucketClaimListener ) InitializeKubeClient (k kubeclientset.Interface ) {
223
+ // InitializeKubeClient initializes the kubernetes client
224
+ func (b * BucketClaimListener ) InitializeKubeClient (k kubeclientset.Interface ) {
216
225
b .kubeClient = k
217
226
}
218
227
219
- func (b * bucketClaimListener ) InitializeBucketClient (bc bucketclientset.Interface ) {
228
+ // InitializeBucketClient initializes the object storage bucket client
229
+ func (b * BucketClaimListener ) InitializeBucketClient (bc bucketclientset.Interface ) {
220
230
b .bucketClient = bc
221
231
}
222
232
223
- func (b * bucketClaimListener ) buckets () objectstoragev1alpha1.BucketInterface {
233
+ // InitializeEventRecorder initializes the event recorder
234
+ func (b * BucketClaimListener ) InitializeEventRecorder (er record.EventRecorder ) {
235
+ b .eventRecorder = er
236
+ }
237
+
238
+ func (b * BucketClaimListener ) buckets () objectstoragev1alpha1.BucketInterface {
224
239
if b .bucketClient != nil {
225
240
return b .bucketClient .ObjectstorageV1alpha1 ().Buckets ()
226
241
}
227
242
panic ("uninitialized listener" )
228
243
}
229
244
230
- func (b * bucketClaimListener ) bucketClasses () objectstoragev1alpha1.BucketClassInterface {
245
+ func (b * BucketClaimListener ) bucketClasses () objectstoragev1alpha1.BucketClassInterface {
231
246
if b .bucketClient != nil {
232
247
return b .bucketClient .ObjectstorageV1alpha1 ().BucketClasses ()
233
248
}
234
249
panic ("uninitialized listener" )
235
250
}
236
251
237
- func (b * bucketClaimListener ) bucketClaims (namespace string ) objectstoragev1alpha1.BucketClaimInterface {
252
+ func (b * BucketClaimListener ) bucketClaims (namespace string ) objectstoragev1alpha1.BucketClaimInterface {
238
253
if b .bucketClient != nil {
239
254
return b .bucketClient .ObjectstorageV1alpha1 ().BucketClaims (namespace )
240
255
}
241
256
panic ("uninitialized listener" )
242
257
}
258
+
259
+ // recordError during the processing of the objects
260
+ func (b * BucketClaimListener ) recordError (subject runtime.Object , eventtype , reason string , err error ) error {
261
+ if b .eventRecorder == nil {
262
+ return err
263
+ }
264
+ b .eventRecorder .Event (subject , eventtype , reason , err .Error ())
265
+
266
+ return err
267
+ }
268
+
269
+ // recordEvent during the processing of the objects
270
+ func (b * BucketClaimListener ) recordEvent (subject runtime.Object , eventtype , reason , message string , args ... any ) {
271
+ if b .eventRecorder == nil {
272
+ return
273
+ }
274
+ b .eventRecorder .Event (subject , eventtype , reason , fmt .Sprintf (message , args ... ))
275
+ }
0 commit comments