From 05200f0ca393b33874ab166691c926a3d0d2a595 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Thu, 11 May 2023 15:04:36 +0200 Subject: [PATCH 1/4] feat(api): added EventRecorder n EventBroadcaster --- controller/controller.go | 53 +++++++++++++++++++++++++--------------- controller/interfaces.go | 2 ++ 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index c44b7875..0373b463 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -81,17 +81,20 @@ type ObjectStorageController struct { RenewDeadline time.Duration RetryPeriod time.Duration + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + // Controller ResyncPeriod time.Duration queue workqueue.RateLimitingInterface threadiness int // Listeners - BucketListener BucketListener - BucketClaimListener BucketClaimListener - BucketAccessListener BucketAccessListener - BucketClassListener BucketClassListener - BucketAccessClassListener BucketAccessClassListener + BucketListener BucketListener + BucketClaimListener BucketClaimListener + BucketAccessListener BucketAccessListener + BucketClassListener BucketClassListener + BucketAccessClassListener BucketAccessClassListener // leader election leaderLock string @@ -148,7 +151,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str } } + rb := record.NewBroadcaster() + leader := sanitize(fmt.Sprintf("%s/%s", leaderLockName, identity)) + return &ObjectStorageController{ + eventBroadcaster: rb, + eventRecorder: rb.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader}), + identity: id, kubeClient: kubeClient, bucketClient: bucketClient, @@ -186,31 +195,20 @@ func (c *ObjectStorageController) Run(ctx context.Context) error { return "default" }() - sanitize := func(n string) string { - re := regexp.MustCompile("[^a-zA-Z0-9-]") - name := strings.ToLower(re.ReplaceAllString(n, "-")) - if name[len(name)-1] == '-' { - // name must not end with '-' - name = name + "X" - } - return name - } - - leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity)) id, err := os.Hostname() if err != nil { return fmt.Errorf("error getting the default leader identity: %v", err) } - recorder := record.NewBroadcaster() - recorder.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)}) - eRecorder := recorder.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader}) + c.eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)}) + defer c.eventBroadcaster.Shutdown() rlConfig := resourcelock.ResourceLockConfig{ Identity: sanitize(id), - EventRecorder: eRecorder, + EventRecorder: c.eventRecorder, } + leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity)) l, err := resourcelock.New(resourcelock.LeasesResourceLock, ns, leader, c.kubeClient.CoreV1(), c.kubeClient.CoordinationV1(), rlConfig) if err != nil { return err @@ -423,6 +421,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketListener != nil { c.BucketListener.InitializeKubeClient(c.kubeClient) c.BucketListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketListener.Add(ctx, obj.(*v1alpha1.Bucket)) } @@ -437,6 +436,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketClaimListener != nil { c.BucketClaimListener.InitializeKubeClient(c.kubeClient) c.BucketClaimListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketClaimListener.Add(ctx, obj.(*v1alpha1.BucketClaim)) } @@ -451,6 +451,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketAccessListener != nil { c.BucketAccessListener.InitializeKubeClient(c.kubeClient) c.BucketAccessListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketAccessListener.Add(ctx, obj.(*v1alpha1.BucketAccess)) } @@ -465,6 +466,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketClassListener != nil { c.BucketClassListener.InitializeKubeClient(c.kubeClient) c.BucketClassListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketClassListener.Add(ctx, obj.(*v1alpha1.BucketClass)) } @@ -479,6 +481,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketAccessClassListener != nil { c.BucketAccessClassListener.InitializeKubeClient(c.kubeClient) c.BucketAccessClassListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketAccessClassListener.Add(ctx, obj.(*v1alpha1.BucketAccessClass)) } @@ -493,3 +496,13 @@ func (c *ObjectStorageController) runController(ctx context.Context) { <-ctx.Done() } + +func sanitize(n string) string { + re := regexp.MustCompile("[^a-zA-Z0-9-]") + name := strings.ToLower(re.ReplaceAllString(n, "-")) + if name[len(name)-1] == '-' { + // name must not end with '-' + name = name + "X" + } + return name +} diff --git a/controller/interfaces.go b/controller/interfaces.go index 366adb21..d3d34e88 100644 --- a/controller/interfaces.go +++ b/controller/interfaces.go @@ -9,12 +9,14 @@ import ( // k8s client kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" ) // Set the clients for each of the listeners type GenericListener interface { InitializeKubeClient(kubeclientset.Interface) InitializeBucketClient(bucketclientset.Interface) + InitializeEventRecorder(record.EventRecorder) } type BucketListener interface { From 45d4532cbbfcc4c8b9ce80a72c7b8a88b55fffc9 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Wed, 18 Oct 2023 14:45:08 +0200 Subject: [PATCH 2/4] feat: fix panic --- controller/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 0373b463..64860606 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -421,7 +421,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketListener != nil { c.BucketListener.InitializeKubeClient(c.kubeClient) c.BucketListener.InitializeBucketClient(c.bucketClient) - c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) + c.BucketListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketListener.Add(ctx, obj.(*v1alpha1.Bucket)) } @@ -436,7 +436,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketClaimListener != nil { c.BucketClaimListener.InitializeKubeClient(c.kubeClient) c.BucketClaimListener.InitializeBucketClient(c.bucketClient) - c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) + c.BucketClaimListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketClaimListener.Add(ctx, obj.(*v1alpha1.BucketClaim)) } @@ -466,7 +466,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketClassListener != nil { c.BucketClassListener.InitializeKubeClient(c.kubeClient) c.BucketClassListener.InitializeBucketClient(c.bucketClient) - c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) + c.BucketClassListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketClassListener.Add(ctx, obj.(*v1alpha1.BucketClass)) } @@ -481,7 +481,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketAccessClassListener != nil { c.BucketAccessClassListener.InitializeKubeClient(c.kubeClient) c.BucketAccessClassListener.InitializeBucketClient(c.bucketClient) - c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) + c.BucketAccessClassListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketAccessClassListener.Add(ctx, obj.(*v1alpha1.BucketAccessClass)) } From df1d34a6d9e94c13c85aa324978b1892f1afced9 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Wed, 18 Oct 2023 15:49:26 +0200 Subject: [PATCH 3/4] feat: register v1alpha1 in schema --- controller/controller.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/controller/controller.go b/controller/controller.go index 64860606..faefcef5 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -154,9 +154,14 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str rb := record.NewBroadcaster() leader := sanitize(fmt.Sprintf("%s/%s", leaderLockName, identity)) + extendedScheme := scheme.Scheme + if err := v1alpha1.AddToScheme(extendedScheme); err != nil { + return nil, err + } + return &ObjectStorageController{ eventBroadcaster: rb, - eventRecorder: rb.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader}), + eventRecorder: rb.NewRecorder(extendedScheme, v1.EventSource{Component: leader}), identity: id, kubeClient: kubeClient, From 45dcd3d6b22e75594b34a078315da3739e740944 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Wed, 18 Oct 2023 16:08:17 +0200 Subject: [PATCH 4/4] fix: id instead of leader --- controller/controller.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index faefcef5..5c698d4e 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -3,7 +3,6 @@ package controller import ( "context" "fmt" - "io/ioutil" "os" "reflect" "regexp" @@ -152,7 +151,6 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str } rb := record.NewBroadcaster() - leader := sanitize(fmt.Sprintf("%s/%s", leaderLockName, identity)) extendedScheme := scheme.Scheme if err := v1alpha1.AddToScheme(extendedScheme); err != nil { @@ -161,7 +159,7 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str return &ObjectStorageController{ eventBroadcaster: rb, - eventRecorder: rb.NewRecorder(extendedScheme, v1.EventSource{Component: leader}), + eventRecorder: rb.NewRecorder(extendedScheme, v1.EventSource{Component: id}), identity: id, kubeClient: kubeClient, @@ -192,7 +190,7 @@ func (c *ObjectStorageController) Run(ctx context.Context) error { return ns } - if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { if ns := strings.TrimSpace(string(data)); len(ns) > 0 { return ns }