Files
kubemirror/pkg/controller/namespace_reconciler.go
T
lukaszraczylo dfe08b35d1 fix(controller): stop self-triggered reconcile loops
C2: updateLastSyncStatus wrote the sync-status annotation on every
successful reconcile. Because the source's watch predicate is the
'enabled' label (server-side filter), that Update fires a watch event
that re-enters Reconcile. With reconciled/error counts varying across
cycles, the value differs each time, so the API server bumps RV and
the loop never quiesces. Now skips the Update when the value matches
the existing annotation.

C3: NamespaceReconciler's happy-path returned RequeueAfter=3s
unconditionally. Every namespace in the cluster re-reconciled every
3 seconds forever, generating constant List calls per source kind.
Now returns ctrl.Result{}; cache-staleness windows are handled by
the manager's resync period and source freshness verification.
2026-05-02 22:39:09 +01:00

344 lines
11 KiB
Go

// Package controller implements the kubemirror reconciliation logic.
package controller
import (
"context"
"fmt"
"slices"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/lukaszraczylo/kubemirror/pkg/config"
"github.com/lukaszraczylo/kubemirror/pkg/constants"
"github.com/lukaszraczylo/kubemirror/pkg/filter"
)
// NamespaceReconciler watches for namespace CREATE and UPDATE events
// and triggers reconciliation of source resources that match the new namespace.
type NamespaceReconciler struct {
client.Client
NamespaceLister NamespaceLister
APIReader client.Reader
Scheme *runtime.Scheme
Config *config.Config
Filter *filter.NamespaceFilter
ResourceTypes []config.ResourceType
}
// Reconcile processes namespace events and creates mirrors for matching sources.
func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues(
"namespace", req.Name,
"reconciler", "namespace",
)
// Fetch the namespace
namespace := &corev1.Namespace{}
if err := r.Get(ctx, req.NamespacedName, namespace); err != nil {
// Namespace was deleted - nothing to do (source reconcilers will handle cleanup)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Skip system namespaces
if r.Filter != nil && !r.Filter.IsAllowed(namespace.Name) {
logger.V(1).Info("namespace filtered out, skipping")
return ctrl.Result{}, nil
}
logger.Info("namespace event detected, reconciling source resources")
// Query all source resources that have mirroring enabled
// For each resource type, find resources with the sync annotation
var totalReconciled, totalErrors int
for _, rt := range r.ResourceTypes {
reconciled, errors, err := r.reconcileResourceType(ctx, rt, namespace.Name)
if err != nil {
logger.Error(err, "failed to reconcile resource type",
"group", rt.Group, "version", rt.Version, "kind", rt.Kind)
totalErrors++
continue
}
totalReconciled += reconciled
totalErrors += errors
}
logger.Info("namespace reconciliation complete",
"reconciled", totalReconciled,
"errors", totalErrors,
"resourceTypes", len(r.ResourceTypes))
if totalErrors > 0 {
return ctrl.Result{}, fmt.Errorf("failed to reconcile %d source resources", totalErrors)
}
// Don't requeue. The previous unconditional RequeueAfter caused every
// namespace in the cluster to re-reconcile every 3 seconds forever,
// generating constant API-server pressure scaled by namespace count.
// Cache-staleness windows after label changes are handled by:
// - the manager's resync period (default 10m), which re-fires events,
// - source freshness verification (--verify-source-freshness, default on)
// in the SourceReconciler path,
// - and the next genuine namespace event.
return ctrl.Result{}, nil
}
// reconcileResourceType finds and reconciles all sources of a specific resource type
// that match the namespace.
func (r *NamespaceReconciler) reconcileResourceType(ctx context.Context, rt config.ResourceType, namespaceName string) (int, int, error) {
logger := log.FromContext(ctx)
gvk := rt.GroupVersionKind()
// List all resources of this type with the enabled label
// Using label selector for server-side filtering
list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)
listOpts := []client.ListOption{
client.HasLabels{constants.LabelEnabled},
}
if err := r.List(ctx, list, listOpts...); err != nil {
return 0, 0, fmt.Errorf("failed to list resources: %w", err)
}
var reconciledCount, errorCount int
for i := range list.Items {
source := &list.Items[i]
// Check if source has sync annotation
annotations := source.GetAnnotations()
if annotations == nil || annotations[constants.AnnotationSync] != "true" {
continue
}
// Skip if this is a mirror resource itself
if IsMirrorResource(source) {
continue
}
// Resolve target namespaces for this source
targetNamespaces, err := r.resolveTargetNamespaces(ctx, source)
if err != nil {
logger.Error(err, "failed to resolve target namespaces",
"source", source.GetName(), "namespace", source.GetNamespace())
errorCount++
continue
}
// Check if the new namespace matches this source's targets
isTarget := slices.Contains(targetNamespaces, namespaceName)
if isTarget {
// Create or update mirror in the namespace
if err := r.reconcileMirror(ctx, source, namespaceName); err != nil {
logger.Error(err, "failed to create mirror",
"source", source.GetName(),
"sourceNamespace", source.GetNamespace(),
"targetNamespace", namespaceName)
errorCount++
continue
}
reconciledCount++
logger.V(1).Info("mirror created/updated for namespace",
"source", source.GetName(),
"sourceNamespace", source.GetNamespace(),
"targetNamespace", namespaceName,
"resourceType", rt.String())
} else {
// Namespace is no longer a target - check if mirror exists and delete it
mirror := &unstructured.Unstructured{}
mirror.SetGroupVersionKind(source.GroupVersionKind())
mirror.SetNamespace(namespaceName)
mirror.SetName(source.GetName())
err := r.Get(ctx, client.ObjectKey{Namespace: namespaceName, Name: source.GetName()}, mirror)
if errors.IsNotFound(err) {
// No mirror exists, nothing to clean up
continue
}
if err != nil {
logger.Error(err, "failed to check for mirror",
"source", source.GetName(),
"namespace", namespaceName)
errorCount++
continue
}
// Verify this is actually our mirror (not someone else's resource with the same name)
if !IsManagedByUs(mirror) {
continue
}
// Verify this mirror points to our source
srcNs, srcName, _, found := GetSourceReference(mirror)
if !found || srcNs != source.GetNamespace() || srcName != source.GetName() {
continue
}
// This mirror should be deleted (namespace no longer a valid target)
if err := r.Delete(ctx, mirror); err != nil {
logger.Error(err, "failed to delete orphaned mirror",
"source", source.GetName(),
"sourceNamespace", source.GetNamespace(),
"targetNamespace", namespaceName)
errorCount++
continue
}
reconciledCount++
logger.V(1).Info("deleted orphaned mirror due to namespace label change",
"source", source.GetName(),
"sourceNamespace", source.GetNamespace(),
"targetNamespace", namespaceName,
"resourceType", rt.String())
}
}
return reconciledCount, errorCount, nil
}
// resolveTargetNamespaces determines which namespaces should receive mirrors for a source.
// Uses the same logic as SourceReconciler.resolveTargetNamespaces.
func (r *NamespaceReconciler) resolveTargetNamespaces(ctx context.Context, source *unstructured.Unstructured) ([]string, error) {
annotations := source.GetAnnotations()
if annotations == nil {
return nil, nil
}
targetNsAnnotation := annotations[constants.AnnotationTargetNamespaces]
if targetNsAnnotation == "" {
return nil, nil
}
// Parse patterns
patterns := filter.ParseTargetNamespaces(targetNsAnnotation)
if len(patterns) == 0 {
return nil, nil
}
// Validate patterns and log warnings for invalid ones
validationResults, allValid := filter.ValidatePatterns(patterns)
if !allValid {
logger := log.FromContext(ctx)
invalidPatterns := filter.InvalidPatterns(validationResults)
for _, invalid := range invalidPatterns {
logger.Info("invalid glob pattern in target-namespaces annotation, pattern will be skipped",
"pattern", invalid.Pattern,
"error", invalid.Error.Error(),
"source", source.GetName(),
"namespace", source.GetNamespace(),
)
}
// Filter to only valid patterns
var validPatterns []string
for _, result := range validationResults {
if result.Valid {
validPatterns = append(validPatterns, result.Pattern)
}
}
patterns = validPatterns
// If no valid patterns remain, return empty
if len(patterns) == 0 {
return nil, nil
}
}
// Get all namespace info in a single API call (more efficient than 3 separate calls)
nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list namespaces: %w", err)
}
// Resolve target namespaces using the pre-categorized namespace info
targetNamespaces := filter.ResolveTargetNamespaces(
patterns,
nsInfo.All,
nsInfo.AllowMirrors,
nsInfo.OptOut,
source.GetNamespace(),
r.Filter,
)
// Enforce max targets limit
if r.Config != nil && r.Config.MaxTargetsPerResource > 0 && len(targetNamespaces) > r.Config.MaxTargetsPerResource {
targetNamespaces = targetNamespaces[:r.Config.MaxTargetsPerResource]
}
return targetNamespaces, nil
}
// reconcileMirror creates or updates a mirror in the target namespace.
// This calls the mirror creation logic from the SourceReconciler.
func (r *NamespaceReconciler) reconcileMirror(ctx context.Context, source *unstructured.Unstructured, targetNamespace string) error {
// Create a temporary SourceReconciler to use its mirror creation logic
// This avoids code duplication
sourceReconciler := &SourceReconciler{
Client: r.Client,
Scheme: r.Scheme,
Config: r.Config,
Filter: r.Filter,
NamespaceLister: r.NamespaceLister,
GVK: source.GroupVersionKind(),
}
return sourceReconciler.reconcileMirror(ctx, source, source, targetNamespace)
}
// SetupWithManager sets up the controller with the Manager.
func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Create predicate to only watch for relevant namespace events
namespacePredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// Always reconcile new namespaces
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
// Only reconcile if labels changed (specifically allow-mirrors label)
oldNs, okOld := e.ObjectOld.(*corev1.Namespace)
newNs, okNew := e.ObjectNew.(*corev1.Namespace)
if !okOld || !okNew {
return false
}
// Check if allow-mirrors label changed
// Use GetLabels() to safely handle nil labels map
oldLabels := oldNs.GetLabels()
newLabels := newNs.GetLabels()
// Get label values with nil-safe access
var oldLabel, newLabel string
if oldLabels != nil {
oldLabel = oldLabels[constants.LabelAllowMirrors]
}
if newLabels != nil {
newLabel = newLabels[constants.LabelAllowMirrors]
}
return oldLabel != newLabel
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Don't reconcile on delete - source reconcilers will handle cleanup via finalizers
return false
},
}
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Namespace{}).
WithEventFilter(namespacePredicate).
Complete(r)
}