diff --git a/controller/controller.go b/controller/controller.go index c44b7875..0373b463 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -81,17 +81,20 @@ type ObjectStorageController struct { RenewDeadline time.Duration RetryPeriod time.Duration + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + // Controller ResyncPeriod time.Duration queue workqueue.RateLimitingInterface threadiness int // Listeners - BucketListener BucketListener - BucketClaimListener BucketClaimListener - BucketAccessListener BucketAccessListener - BucketClassListener BucketClassListener - BucketAccessClassListener BucketAccessClassListener + BucketListener BucketListener + BucketClaimListener BucketClaimListener + BucketAccessListener BucketAccessListener + BucketClassListener BucketClassListener + BucketAccessClassListener BucketAccessClassListener // leader election leaderLock string @@ -148,7 +151,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str } } + rb := record.NewBroadcaster() + leader := sanitize(fmt.Sprintf("%s/%s", leaderLockName, identity)) + return &ObjectStorageController{ + eventBroadcaster: rb, + eventRecorder: rb.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader}), + identity: id, kubeClient: kubeClient, bucketClient: bucketClient, @@ -186,31 +195,20 @@ func (c *ObjectStorageController) Run(ctx context.Context) error { return "default" }() - sanitize := func(n string) string { - re := regexp.MustCompile("[^a-zA-Z0-9-]") - name := strings.ToLower(re.ReplaceAllString(n, "-")) - if name[len(name)-1] == '-' { - // name must not end with '-' - name = name + "X" - } - return name - } - - leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity)) id, err := os.Hostname() if err != nil { return fmt.Errorf("error getting the default leader identity: %v", err) } - recorder := record.NewBroadcaster() - recorder.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)}) - eRecorder := recorder.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader}) + c.eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)}) + defer c.eventBroadcaster.Shutdown() rlConfig := resourcelock.ResourceLockConfig{ Identity: sanitize(id), - EventRecorder: eRecorder, + EventRecorder: c.eventRecorder, } + leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity)) l, err := resourcelock.New(resourcelock.LeasesResourceLock, ns, leader, c.kubeClient.CoreV1(), c.kubeClient.CoordinationV1(), rlConfig) if err != nil { return err @@ -423,6 +421,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketListener != nil { c.BucketListener.InitializeKubeClient(c.kubeClient) c.BucketListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketListener.Add(ctx, obj.(*v1alpha1.Bucket)) } @@ -437,6 +436,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketClaimListener != nil { c.BucketClaimListener.InitializeKubeClient(c.kubeClient) c.BucketClaimListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketClaimListener.Add(ctx, obj.(*v1alpha1.BucketClaim)) } @@ -451,6 +451,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketAccessListener != nil { c.BucketAccessListener.InitializeKubeClient(c.kubeClient) c.BucketAccessListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketAccessListener.Add(ctx, obj.(*v1alpha1.BucketAccess)) } @@ -465,6 +466,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketClassListener != nil { c.BucketClassListener.InitializeKubeClient(c.kubeClient) c.BucketClassListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketClassListener.Add(ctx, obj.(*v1alpha1.BucketClass)) } @@ -479,6 +481,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { if c.BucketAccessClassListener != nil { c.BucketAccessClassListener.InitializeKubeClient(c.kubeClient) c.BucketAccessClassListener.InitializeBucketClient(c.bucketClient) + c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder) addFunc := func(ctx context.Context, obj interface{}) error { return c.BucketAccessClassListener.Add(ctx, obj.(*v1alpha1.BucketAccessClass)) } @@ -493,3 +496,13 @@ func (c *ObjectStorageController) runController(ctx context.Context) { <-ctx.Done() } + +func sanitize(n string) string { + re := regexp.MustCompile("[^a-zA-Z0-9-]") + name := strings.ToLower(re.ReplaceAllString(n, "-")) + if name[len(name)-1] == '-' { + // name must not end with '-' + name = name + "X" + } + return name +} diff --git a/controller/interfaces.go b/controller/interfaces.go index 366adb21..d3d34e88 100644 --- a/controller/interfaces.go +++ b/controller/interfaces.go @@ -9,12 +9,14 @@ import ( // k8s client kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" ) // Set the clients for each of the listeners type GenericListener interface { InitializeKubeClient(kubeclientset.Interface) InitializeBucketClient(bucketclientset.Interface) + InitializeEventRecorder(record.EventRecorder) } type BucketListener interface {