Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Added EventRecorder and EventBroadcaster #64

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,20 @@ type ObjectStorageController struct {
RenewDeadline time.Duration
RetryPeriod time.Duration

eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

// Controller
ResyncPeriod time.Duration
queue workqueue.RateLimitingInterface
threadiness int

// Listeners
BucketListener BucketListener
BucketClaimListener BucketClaimListener
BucketAccessListener BucketAccessListener
BucketClassListener BucketClassListener
BucketAccessClassListener BucketAccessClassListener
BucketListener BucketListener
BucketClaimListener BucketClaimListener
BucketAccessListener BucketAccessListener
BucketClassListener BucketClassListener
BucketAccessClassListener BucketAccessClassListener

// leader election
leaderLock string
Expand Down Expand Up @@ -148,7 +151,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
}
}

rb := record.NewBroadcaster()
leader := sanitize(fmt.Sprintf("%s/%s", leaderLockName, identity))

return &ObjectStorageController{
eventBroadcaster: rb,
eventRecorder: rb.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader}),

identity: id,
kubeClient: kubeClient,
bucketClient: bucketClient,
Expand Down Expand Up @@ -186,31 +195,20 @@ func (c *ObjectStorageController) Run(ctx context.Context) error {
return "default"
}()

sanitize := func(n string) string {
re := regexp.MustCompile("[^a-zA-Z0-9-]")
name := strings.ToLower(re.ReplaceAllString(n, "-"))
if name[len(name)-1] == '-' {
// name must not end with '-'
name = name + "X"
}
return name
}

leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity))
id, err := os.Hostname()
if err != nil {
return fmt.Errorf("error getting the default leader identity: %v", err)
}

recorder := record.NewBroadcaster()
recorder.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)})
eRecorder := recorder.NewRecorder(scheme.Scheme, v1.EventSource{Component: leader})
c.eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events(ns)})
defer c.eventBroadcaster.Shutdown()

rlConfig := resourcelock.ResourceLockConfig{
Identity: sanitize(id),
EventRecorder: eRecorder,
EventRecorder: c.eventRecorder,
}

leader := sanitize(fmt.Sprintf("%s/%s", c.leaderLock, c.identity))
l, err := resourcelock.New(resourcelock.LeasesResourceLock, ns, leader, c.kubeClient.CoreV1(), c.kubeClient.CoordinationV1(), rlConfig)
if err != nil {
return err
Expand Down Expand Up @@ -423,6 +421,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
if c.BucketListener != nil {
c.BucketListener.InitializeKubeClient(c.kubeClient)
c.BucketListener.InitializeBucketClient(c.bucketClient)
c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder)
addFunc := func(ctx context.Context, obj interface{}) error {
return c.BucketListener.Add(ctx, obj.(*v1alpha1.Bucket))
}
Expand All @@ -437,6 +436,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
if c.BucketClaimListener != nil {
c.BucketClaimListener.InitializeKubeClient(c.kubeClient)
c.BucketClaimListener.InitializeBucketClient(c.bucketClient)
c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder)
addFunc := func(ctx context.Context, obj interface{}) error {
return c.BucketClaimListener.Add(ctx, obj.(*v1alpha1.BucketClaim))
}
Expand All @@ -451,6 +451,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
if c.BucketAccessListener != nil {
c.BucketAccessListener.InitializeKubeClient(c.kubeClient)
c.BucketAccessListener.InitializeBucketClient(c.bucketClient)
c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder)
addFunc := func(ctx context.Context, obj interface{}) error {
return c.BucketAccessListener.Add(ctx, obj.(*v1alpha1.BucketAccess))
}
Expand All @@ -465,6 +466,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
if c.BucketClassListener != nil {
c.BucketClassListener.InitializeKubeClient(c.kubeClient)
c.BucketClassListener.InitializeBucketClient(c.bucketClient)
c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder)
addFunc := func(ctx context.Context, obj interface{}) error {
return c.BucketClassListener.Add(ctx, obj.(*v1alpha1.BucketClass))
}
Expand All @@ -479,6 +481,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
if c.BucketAccessClassListener != nil {
c.BucketAccessClassListener.InitializeKubeClient(c.kubeClient)
c.BucketAccessClassListener.InitializeBucketClient(c.bucketClient)
c.BucketAccessListener.InitializeEventRecorder(c.eventRecorder)
addFunc := func(ctx context.Context, obj interface{}) error {
return c.BucketAccessClassListener.Add(ctx, obj.(*v1alpha1.BucketAccessClass))
}
Expand All @@ -493,3 +496,13 @@ func (c *ObjectStorageController) runController(ctx context.Context) {

<-ctx.Done()
}

func sanitize(n string) string {
re := regexp.MustCompile("[^a-zA-Z0-9-]")
name := strings.ToLower(re.ReplaceAllString(n, "-"))
if name[len(name)-1] == '-' {
// name must not end with '-'
name = name + "X"
}
return name
}
2 changes: 2 additions & 0 deletions controller/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (

// k8s client
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
)

// Set the clients for each of the listeners
type GenericListener interface {
InitializeKubeClient(kubeclientset.Interface)
InitializeBucketClient(bucketclientset.Interface)
InitializeEventRecorder(record.EventRecorder)
}

type BucketListener interface {
Expand Down