Skip to content
This repository was archived by the owner on Apr 17, 2025. It is now read-only.

Decouple reconcilers from each other #180

Merged
merged 1 commit into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func startControllers(mgr ctrl.Manager, certsReady chan struct{}) {
// certs are all in place.
setupLog.Info("Waiting for certificate generation to complete")
<-certsReady
setupLog.Info("Certs ready")

if testLog {
stats.StartLoggingActivity()
Expand All @@ -298,18 +299,12 @@ func startControllers(mgr ctrl.Manager, certsReady chan struct{}) {
// other components.
f := forest.NewForest()

// Create all validating and mutating admission controllers.
if !noWebhooks {
setupLog.Info("Registering validating webhook (won't work when running locally; use --no-webhooks)")
setup.CreateWebhooks(mgr, f)
}

// Create all reconciling controllers
setupLog.Info("Creating controllers", "maxReconciles", maxReconciles)
if err := setup.CreateReconcilers(mgr, f, maxReconciles, webhooksOnly, false); err != nil {
setupLog.Error(err, "cannot create controllers")
os.Exit(1)
opts := setup.Options{
NoWebhooks: noWebhooks,
MaxReconciles: maxReconciles,
ReadOnly: webhooksOnly,
}
setup.Create(setupLog, mgr, f, opts)

setupLog.Info("All controllers started; setup complete")
}
Expand Down
24 changes: 16 additions & 8 deletions internal/anchor/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type Reconciler struct {

Forest *forest.Forest

// Affected is a channel of event.GenericEvent (see "Watching Channels" in
// affected is a channel of event.GenericEvent (see "Watching Channels" in
// https://book-v1.book.kubebuilder.io/beyond_basics/controller_watches.html) that is used to
// enqueue additional objects that need updating.
Affected chan event.GenericEvent
affected chan event.GenericEvent

// ReadOnly disables writebacks
ReadOnly bool
Expand Down Expand Up @@ -319,16 +319,22 @@ func (r *Reconciler) updateState(log logr.Logger, inst *api.SubnamespaceAnchor,
}
}

// Enqueue enqueues a subnamespace anchor for later reconciliation. This occurs in a goroutine
// so the caller doesn't block; since the reconciler is never garbage-collected, this is safe.
func (r *Reconciler) Enqueue(log logr.Logger, nm, pnm, reason string) {
// OnChangeNamespace enqueues a subnamespace anchor for later reconciliation. This occurs in a
// goroutine so the caller doesn't block; since the reconciler is never garbage-collected, this is
// safe.
func (r *Reconciler) OnChangeNamespace(log logr.Logger, ns *forest.Namespace) {
if ns == nil || !ns.IsSub {
return
}
nm := ns.Name()
pnm := ns.Parent().Name()
go func() {
// The watch handler doesn't care about anything except the metadata.
inst := &api.SubnamespaceAnchor{}
inst.ObjectMeta.Name = nm
inst.ObjectMeta.Namespace = pnm
log.V(1).Info("Enqueuing for reconciliation", "affected", pnm+"/"+nm, "reason", reason)
r.Affected <- event.GenericEvent{Object: inst}
log.V(1).Info("Enqueuing for reconciliation", "affected", pnm+"/"+nm)
r.affected <- event.GenericEvent{Object: inst}
}()
}

Expand Down Expand Up @@ -423,6 +429,8 @@ func (r *Reconciler) deleteNamespace(ctx context.Context, log logr.Logger, inst
}

func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
r.affected = make(chan event.GenericEvent)

// Maps an subnamespace to its anchor in the parent namespace.
nsMapFn := func(obj client.Object) []reconcile.Request {
if obj.GetAnnotations()[api.SubnamespaceOf] == "" {
Expand All @@ -437,7 +445,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
}
return ctrl.NewControllerManagedBy(mgr).
For(&api.SubnamespaceAnchor{}).
Watches(&source.Channel{Source: r.Affected}, &handler.EnqueueRequestForObject{}).
Watches(&source.Channel{Source: r.affected}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(nsMapFn)).
Complete(r)
}
22 changes: 19 additions & 3 deletions internal/forest/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ type Forest struct {
// We can also move the lock out of the forest and pass it to all reconcilers that need the lock.
// In that way, we don't need to put the list in the forest.
types []TypeSyncer

// nsListeners is a list of listeners
listeners []NamespaceListener
}

type namedNamespaces map[string]*Namespace

// TypeSyncer syncs objects of a specific type. Reconcilers implement the interface so that they can be
// called by the HierarchyReconciler if the hierarchy changes.
type TypeSyncer interface {
// SyncNamespace syncs objects of a namespace for a specific type.
SyncNamespace(context.Context, logr.Logger, string) error

// Provides the GVK that is handled by the reconciler who implements the interface.
GetGVK() schema.GroupVersionKind

Expand All @@ -55,6 +55,12 @@ type TypeSyncer interface {
GetNumPropagatedObjects() int
}

// NamespaceListener has methods that get called whenever a namespace changes.
type NamespaceListener interface {
// OnChangeNamespace is called whenever a namespace changes.
OnChangeNamespace(logr.Logger, *Namespace)
}

func NewForest() *Forest {
return &Forest{
namespaces: namedNamespaces{},
Expand Down Expand Up @@ -148,3 +154,13 @@ func (f *Forest) GetTypeSyncers() []TypeSyncer {
copy(types, f.types)
return types
}

func (f *Forest) AddListener(l NamespaceListener) {
f.listeners = append(f.listeners, l)
}

func (f *Forest) OnChangeNamespace(log logr.Logger, ns *Namespace) {
for _, l := range f.listeners {
l.OnChangeNamespace(log, ns)
}
}
3 changes: 3 additions & 0 deletions internal/forest/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (ns *Namespace) Name() string {

// Parent returns a pointer to the parent namespace.
func (ns *Namespace) Parent() *Namespace {
if ns == nil {
return nil
}
return ns.parent
}

Expand Down
31 changes: 16 additions & 15 deletions internal/forest/namespaceobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package forest
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)

// SetSourceObject updates or creates the source object in forest.namespace.
Expand Down Expand Up @@ -35,11 +36,11 @@ func (ns *Namespace) DeleteSourceObject(gvk schema.GroupVersionKind, nm string)
}
}

// GetSourceObjects returns all source objects in the namespace.
func (ns *Namespace) GetSourceObjects(gvk schema.GroupVersionKind) []*unstructured.Unstructured {
o := []*unstructured.Unstructured{}
// GetSourceNames returns all source objects in the namespace.
func (ns *Namespace) GetSourceNames(gvk schema.GroupVersionKind) []types.NamespacedName {
o := []types.NamespacedName{}
for _, obj := range ns.sourceObjects[gvk] {
o = append(o, obj)
o = append(o, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()})
}
return o
}
Expand All @@ -50,10 +51,10 @@ func (ns *Namespace) GetNumSourceObjects(gvk schema.GroupVersionKind) int {
return len(ns.sourceObjects[gvk])
}

// GetAncestorSourceObjects returns all source objects with the specified name
// GetAncestorSourceNames returns all source objects with the specified name
// in the ancestors (including itself) from top down. If the name is not
// specified, all the source objects in the ancestors will be returned.
func (ns *Namespace) GetAncestorSourceObjects(gvk schema.GroupVersionKind, name string) []*unstructured.Unstructured {
func (ns *Namespace) GetAncestorSourceNames(gvk schema.GroupVersionKind, name string) []types.NamespacedName {
// The namespace could be nil when we use this function on "ns.Parent()" to
// get the source objects of the ancestors excluding itself without caring if
// the "ns.Parent()" is nil.
Expand All @@ -62,22 +63,22 @@ func (ns *Namespace) GetAncestorSourceObjects(gvk schema.GroupVersionKind, name
}

// Get the source objects in the ancestors from top down.
objs := []*unstructured.Unstructured{}
ans := ns.AncestryNames()
for _, n := range ans {
nsObjs := ns.forest.Get(n).GetSourceObjects(gvk)
allNNMs := []types.NamespacedName{}
ancs := ns.AncestryNames()
for _, anc := range ancs {
nnms := ns.forest.Get(anc).GetSourceNames(gvk)
if name == "" {
// Get all the source objects if the name is not specified.
objs = append(objs, ns.forest.Get(n).GetSourceObjects(gvk)...)
allNNMs = append(allNNMs, nnms...)
} else {
// If a name is specified, return the matching objects.
for _, o := range nsObjs {
if o.GetName() == name {
objs = append(objs, o)
for _, o := range nnms {
if o.Name == name {
allNNMs = append(allNNMs, o)
}
}
}
}

return objs
return allNNMs
}
Loading