@@ -4,9 +4,11 @@ import (
4
4
"context"
5
5
6
6
v1 "k8s.io/api/core/v1"
7
- "k8s.io/apimachinery/pkg/api/errors"
7
+ kubeerrors "k8s.io/apimachinery/pkg/api/errors"
8
8
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9
+ "k8s.io/apimachinery/pkg/runtime"
9
10
kubeclientset "k8s.io/client-go/kubernetes"
11
+ "k8s.io/client-go/tools/record"
10
12
"k8s.io/klog/v2"
11
13
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
12
14
@@ -17,18 +19,20 @@ import (
17
19
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
18
20
)
19
21
20
- // bucketClaimListener is a resource handler for bucket requests objects
21
- type bucketClaimListener struct {
22
+ // BucketClaimListener is a resource handler for bucket requests objects
23
+ type BucketClaimListener struct {
24
+ eventRecorder record.EventRecorder
25
+
22
26
kubeClient kubeclientset.Interface
23
27
bucketClient bucketclientset.Interface
24
28
}
25
29
26
- func NewBucketClaimListener () * bucketClaimListener {
27
- return & bucketClaimListener {}
30
+ func NewBucketClaimListener () * BucketClaimListener {
31
+ return & BucketClaimListener {}
28
32
}
29
33
30
34
// Add creates a bucket in response to a bucketClaim
31
- func (b * bucketClaimListener ) Add (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
35
+ func (b * BucketClaimListener ) Add (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
32
36
klog .V (3 ).InfoS ("Add BucketClaim" ,
33
37
"name" , bucketClaim .ObjectMeta .Name ,
34
38
"ns" , bucketClaim .ObjectMeta .Namespace ,
@@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
65
69
}
66
70
67
71
// update processes any updates made to the bucket request
68
- func (b * bucketClaimListener ) Update (ctx context.Context , old , new * v1alpha1.BucketClaim ) error {
72
+ func (b * BucketClaimListener ) Update (ctx context.Context , old , new * v1alpha1.BucketClaim ) error {
69
73
klog .V (3 ).InfoS ("Update BucketClaim" ,
70
74
"name" , old .Name ,
71
75
"ns" , old .Namespace )
@@ -94,22 +98,28 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
94
98
}
95
99
96
100
// Delete processes a bucket for which bucket request is deleted
97
- func (b * bucketClaimListener ) Delete (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
98
- klog .V (3 ).Info ("Delete BucketClaim" ,
101
+ func (b * BucketClaimListener ) Delete (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
102
+ klog .V (3 ).InfoS ("Delete BucketClaim" ,
99
103
"name" , bucketClaim .ObjectMeta .Name ,
100
104
"ns" , bucketClaim .ObjectMeta .Namespace )
101
105
102
106
return nil
103
107
}
104
108
105
109
// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
110
+ //
111
+ // Recorded events
112
+ //
113
+ // InvalidBucket - Bucket provided in the BucketClaim does not exist
114
+ // InvalidBucketClass - BucketClass provided in the BucketClaim does not exist
115
+ //
106
116
// Return values
107
117
//
108
118
// nil - BucketClaim successfully processed
109
119
// ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
110
120
// ErrBucketAlreadyExists - BucketClaim already processed
111
121
// non-nil err - Internal error [requeue'd with exponential backoff]
112
- func (b * bucketClaimListener ) provisionBucketClaimOperation (ctx context.Context , inputBucketClaim * v1alpha1.BucketClaim ) error {
122
+ func (b * BucketClaimListener ) provisionBucketClaimOperation (ctx context.Context , inputBucketClaim * v1alpha1.BucketClaim ) error {
113
123
bucketClaim := inputBucketClaim .DeepCopy ()
114
124
if bucketClaim .Status .BucketReady {
115
125
return util .ErrBucketAlreadyExists
@@ -121,7 +131,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
121
131
if bucketClaim .Spec .ExistingBucketName != "" {
122
132
bucketName = bucketClaim .Spec .ExistingBucketName
123
133
bucket , err := b .buckets ().Get (ctx , bucketName , metav1.GetOptions {})
124
- if err != nil {
134
+ if kubeerrors .IsNotFound (err ) {
135
+ b .recordEvent (inputBucketClaim , v1 .EventTypeWarning , "InvalidBucket" , "Bucket provided in the BucketClaim does not exist" )
136
+ return err
137
+ } else if err != nil {
125
138
klog .V (3 ).ErrorS (err , "Get Bucket with ExistingBucketName error" , "name" , bucketClaim .Spec .ExistingBucketName )
126
139
return err
127
140
}
@@ -153,7 +166,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
153
166
}
154
167
155
168
bucketClass , err := b .bucketClasses ().Get (ctx , bucketClassName , metav1.GetOptions {})
156
- if err != nil {
169
+ if kubeerrors .IsNotFound (err ) {
170
+ b .recordEvent (inputBucketClaim , v1 .EventTypeWarning , "InvalidBucketClass" , "BucketClass provided in the BucketClaim does not exist" )
171
+ return util .ErrInvalidBucketClass
172
+ } else if err != nil {
157
173
klog .V (3 ).ErrorS (err , "Get Bucketclass Error" , "name" , bucketClassName )
158
174
return util .ErrInvalidBucketClass
159
175
}
@@ -180,7 +196,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
180
196
181
197
bucket .Spec .Protocols = protocolCopy
182
198
bucket , err = b .buckets ().Create (ctx , bucket , metav1.CreateOptions {})
183
- if err != nil && ! errors .IsAlreadyExists (err ) {
199
+ if err != nil && ! kubeerrors .IsAlreadyExists (err ) {
184
200
klog .V (3 ).ErrorS (err , "Error creationg bucket" ,
185
201
"bucket" , bucketName ,
186
202
"bucketClaim" , bucketClaim .ObjectMeta .Name )
@@ -212,31 +228,46 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
212
228
return nil
213
229
}
214
230
215
- func (b * bucketClaimListener ) InitializeKubeClient (k kubeclientset.Interface ) {
231
+ // InitializeKubeClient initializes the kubernetes client
232
+ func (b * BucketClaimListener ) InitializeKubeClient (k kubeclientset.Interface ) {
216
233
b .kubeClient = k
217
234
}
218
235
219
- func (b * bucketClaimListener ) InitializeBucketClient (bc bucketclientset.Interface ) {
236
+ // InitializeBucketClient initializes the object storage bucket client
237
+ func (b * BucketClaimListener ) InitializeBucketClient (bc bucketclientset.Interface ) {
220
238
b .bucketClient = bc
221
239
}
222
240
223
- func (b * bucketClaimListener ) buckets () objectstoragev1alpha1.BucketInterface {
241
+ // InitializeEventRecorder initializes the event recorder
242
+ func (b * BucketClaimListener ) InitializeEventRecorder (er record.EventRecorder ) {
243
+ b .eventRecorder = er
244
+ }
245
+
246
+ func (b * BucketClaimListener ) buckets () objectstoragev1alpha1.BucketInterface {
224
247
if b .bucketClient != nil {
225
248
return b .bucketClient .ObjectstorageV1alpha1 ().Buckets ()
226
249
}
227
250
panic ("uninitialized listener" )
228
251
}
229
252
230
- func (b * bucketClaimListener ) bucketClasses () objectstoragev1alpha1.BucketClassInterface {
253
+ func (b * BucketClaimListener ) bucketClasses () objectstoragev1alpha1.BucketClassInterface {
231
254
if b .bucketClient != nil {
232
255
return b .bucketClient .ObjectstorageV1alpha1 ().BucketClasses ()
233
256
}
234
257
panic ("uninitialized listener" )
235
258
}
236
259
237
- func (b * bucketClaimListener ) bucketClaims (namespace string ) objectstoragev1alpha1.BucketClaimInterface {
260
+ func (b * BucketClaimListener ) bucketClaims (namespace string ) objectstoragev1alpha1.BucketClaimInterface {
238
261
if b .bucketClient != nil {
239
262
return b .bucketClient .ObjectstorageV1alpha1 ().BucketClaims (namespace )
240
263
}
241
264
panic ("uninitialized listener" )
242
265
}
266
+
267
+ // recordEvent during the processing of the objects
268
+ func (b * BucketClaimListener ) recordEvent (subject runtime.Object , eventtype , reason , message string ) {
269
+ if b .eventRecorder == nil {
270
+ return
271
+ }
272
+ b .eventRecorder .Event (subject , eventtype , reason , message )
273
+ }
0 commit comments