Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Commit 9770045

Browse files
authored
Merge pull request #67 from shanduur/feat-event-recorder
Added EventRecorder and EventBroadcaster
2 parents 2504944 + 45dcd3d commit 9770045

File tree

2 files changed

+40
-22
lines changed

2 files changed

+40
-22
lines changed

Diff for: controller/controller.go

+38-22
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package controller
33
import (
44
"context"
55
"fmt"
6-
"io/ioutil"
76
"os"
87
"reflect"
98
"regexp"
@@ -81,17 +80,20 @@ type ObjectStorageController struct {
8180
RenewDeadline time.Duration
8281
RetryPeriod time.Duration
8382

83+
eventBroadcaster record.EventBroadcaster
84+
eventRecorder record.EventRecorder
85+
8486
// Controller
8587
ResyncPeriod time.Duration
8688
queue workqueue.RateLimitingInterface
8789
threadiness int
8890

8991
// Listeners
90-
BucketListener BucketListener
91-
BucketClaimListener BucketClaimListener
92-
BucketAccessListener BucketAccessListener
93-
BucketClassListener BucketClassListener
94-
BucketAccessClassListener BucketAccessClassListener
92+
BucketListener BucketListener
93+
BucketClaimListener BucketClaimListener
94+
BucketAccessListener BucketAccessListener
95+
BucketClassListener BucketClassListener
96+
BucketAccessClassListener BucketAccessClassListener
9597

9698
// leader election
9799
leaderLock string
@@ -148,7 +150,17 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
148150
}
149151
}
150152

153+
rb := record.NewBroadcaster()
154+
155+
extendedScheme := scheme.Scheme
156+
if err := v1alpha1.AddToScheme(extendedScheme); err != nil {
157+
return nil, err
158+
}
159+
151160
return &ObjectStorageController{
161+
eventBroadcaster: rb,
162+
eventRecorder: rb.NewRecorder(extendedScheme, v1.EventSource{Component: id}),
163+
152164
identity: id,
153165
kubeClient: kubeClient,
154166
bucketClient: bucketClient,
@@ -178,39 +190,28 @@ func (c *ObjectStorageController) Run(ctx context.Context) error {
178190
return ns
179191
}
180192

181-
if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
193+
if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
182194
if ns := strings.TrimSpace(string(data)); len(ns) > 0 {
183195
return ns
184196
}
185197
}
186198
return "default"
187199
}()
188200

189-
sanitize := func(n string) string {
190-
re := regexp.MustCompile("[^a-zA-Z0-9-]")
191-
name := strings.ToLower(re.ReplaceAllString(n, "-"))
192-
if name[len(name)-1] == '-' {
193-
// name must not end with '-'
194-
name = name + "X"
195-
}
196-
return name
197-
}
198-
199-
leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity))
200201
id, err := os.Hostname()
201202
if err != nil {
202203
return fmt.Errorf("error getting the default leader identity: %v", err)
203204
}
204205

205-
recorder := record.NewBroadcaster()
206-
recorder.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)})
207-
eRecorder := recorder.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader})
206+
c.eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)})
207+
defer c.eventBroadcaster.Shutdown()
208208

209209
rlConfig := resourcelock.ResourceLockConfig{
210210
Identity: sanitize(id),
211-
EventRecorder: eRecorder,
211+
EventRecorder: c.eventRecorder,
212212
}
213213

214+
leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity))
214215
l, err := resourcelock.New(resourcelock.LeasesResourceLock, ns, leader, c.kubeClient.CoreV1(), c.kubeClient.CoordinationV1(), rlConfig)
215216
if err != nil {
216217
return err
@@ -423,6 +424,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
423424
if c.BucketListener != nil {
424425
c.BucketListener.InitializeKubeClient(c.kubeClient)
425426
c.BucketListener.InitializeBucketClient(c.bucketClient)
427+
c.BucketListener.InitializeEventRecorder(c.eventRecorder)
426428
addFunc := func(ctx context.Context, obj interface{}) error {
427429
return c.BucketListener.Add(ctx, obj.(*v1alpha1.Bucket))
428430
}
@@ -437,6 +439,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
437439
if c.BucketClaimListener != nil {
438440
c.BucketClaimListener.InitializeKubeClient(c.kubeClient)
439441
c.BucketClaimListener.InitializeBucketClient(c.bucketClient)
442+
c.BucketClaimListener.InitializeEventRecorder(c.eventRecorder)
440443
addFunc := func(ctx context.Context, obj interface{}) error {
441444
return c.BucketClaimListener.Add(ctx, obj.(*v1alpha1.BucketClaim))
442445
}
@@ -451,6 +454,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
451454
if c.BucketAccessListener != nil {
452455
c.BucketAccessListener.InitializeKubeClient(c.kubeClient)
453456
c.BucketAccessListener.InitializeBucketClient(c.bucketClient)
457+
c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder)
454458
addFunc := func(ctx context.Context, obj interface{}) error {
455459
return c.BucketAccessListener.Add(ctx, obj.(*v1alpha1.BucketAccess))
456460
}
@@ -465,6 +469,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
465469
if c.BucketClassListener != nil {
466470
c.BucketClassListener.InitializeKubeClient(c.kubeClient)
467471
c.BucketClassListener.InitializeBucketClient(c.bucketClient)
472+
c.BucketClassListener.InitializeEventRecorder(c.eventRecorder)
468473
addFunc := func(ctx context.Context, obj interface{}) error {
469474
return c.BucketClassListener.Add(ctx, obj.(*v1alpha1.BucketClass))
470475
}
@@ -479,6 +484,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
479484
if c.BucketAccessClassListener != nil {
480485
c.BucketAccessClassListener.InitializeKubeClient(c.kubeClient)
481486
c.BucketAccessClassListener.InitializeBucketClient(c.bucketClient)
487+
c.BucketAccessClassListener.InitializeEventRecorder(c.eventRecorder)
482488
addFunc := func(ctx context.Context, obj interface{}) error {
483489
return c.BucketAccessClassListener.Add(ctx, obj.(*v1alpha1.BucketAccessClass))
484490
}
@@ -493,3 +499,13 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
493499

494500
<-ctx.Done()
495501
}
502+
503+
func sanitize(n string) string {
504+
re := regexp.MustCompile("[^a-zA-Z0-9-]")
505+
name := strings.ToLower(re.ReplaceAllString(n, "-"))
506+
if name[len(name)-1] == '-' {
507+
// name must not end with '-'
508+
name = name + "X"
509+
}
510+
return name
511+
}

Diff for: controller/interfaces.go

+2
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import (
99

1010
// k8s client
1111
kubeclientset "k8s.io/client-go/kubernetes"
12+
"k8s.io/client-go/tools/record"
1213
)
1314

1415
// Set the clients for each of the listeners
1516
type GenericListener interface {
1617
InitializeKubeClient(kubeclientset.Interface)
1718
InitializeBucketClient(bucketclientset.Interface)
19+
InitializeEventRecorder(record.EventRecorder)
1820
}
1921

2022
type BucketListener interface {

0 commit comments

Comments
 (0)