Files
kubemirror/pkg/controller/source_reconciler.go
T
lukaszraczylo dfe08b35d1 fix(controller): stop self-triggered reconcile loops
C2: updateLastSyncStatus wrote the sync-status annotation on every
successful reconcile. Because the source's watch predicate is the
'enabled' label (server-side filter), that Update fires a watch event
that re-enters Reconcile. With reconciled/error counts varying across
cycles, the value differs each time, so the API server bumps RV and
the loop never quiesces. Now skips the Update when the value matches
the existing annotation.

C3: NamespaceReconciler's happy-path returned RequeueAfter=3s
unconditionally. Every namespace in the cluster re-reconciled every
3 seconds forever, generating constant List calls per source kind.
Now returns ctrl.Result{}; cache-staleness windows are handled by
the manager's resync period and source freshness verification.
2026-05-02 22:39:09 +01:00

764 lines
26 KiB
Go

// Package controller implements the kubemirror reconciliation logic.
package controller
import (
"context"
stderrors "errors"
"fmt"
"slices"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/lukaszraczylo/kubemirror/pkg/circuitbreaker"
"github.com/lukaszraczylo/kubemirror/pkg/config"
"github.com/lukaszraczylo/kubemirror/pkg/constants"
"github.com/lukaszraczylo/kubemirror/pkg/filter"
"github.com/lukaszraczylo/kubemirror/pkg/hash"
)
// SourceReconciler reconciles source resources that need mirroring.
type SourceReconciler struct {
client.Client
NamespaceLister NamespaceLister
APIReader client.Reader
Scheme *runtime.Scheme
Config *config.Config
Filter *filter.NamespaceFilter
CircuitBreaker *circuitbreaker.CircuitBreaker
GVK schema.GroupVersionKind
}
// NamespaceLister provides a list of all namespaces in the cluster.
// This interface allows for testing with mocks.
type NamespaceLister interface {
ListNamespaces(ctx context.Context) ([]string, error)
ListAllowMirrorsNamespaces(ctx context.Context) ([]string, error)
ListOptOutNamespaces(ctx context.Context) ([]string, error)
// ListNamespacesWithLabels returns all namespace info in a single API call (preferred)
ListNamespacesWithLabels(ctx context.Context) (*NamespaceInfo, error)
}
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=get;list;watch
// getSourceWithFreshness fetches a source resource with optional freshness verification.
// This implements a hybrid caching strategy:
// 1. First read from informer cache (fast, local)
// 2. If VerifySourceFreshness is enabled, make direct API call via APIReader
// 3. If resourceVersions differ, cache is stale - return fresh version from API
// 4. If resourceVersions match, cache is current - return cached version
//
// This prevents the race condition where:
// - Watch event arrives: "Secret changed!"
// - Reconciliation starts immediately
// - Cache hasn't updated yet (5-20 second lag)
// - We read stale data and mirror it
//
// Trade-off: 2x API calls when cache is stale, but guarantees data freshness.
func (r *SourceReconciler) getSourceWithFreshness(ctx context.Context, key client.ObjectKey, gvk schema.GroupVersionKind) (*unstructured.Unstructured, error) {
logger := log.FromContext(ctx)
// First try: Read from cache (fast)
cached := &unstructured.Unstructured{}
cached.SetGroupVersionKind(gvk)
if err := r.Get(ctx, key, cached); err != nil {
return nil, err
}
// If freshness verification is disabled, return cached version immediately
if !r.Config.VerifySourceFreshness {
logger.V(2).Info("using cached source (freshness check disabled)", "resourceVersion", cached.GetResourceVersion())
return cached, nil
}
// If APIReader is not available (e.g., in tests), fall back to cached version
if r.APIReader == nil {
logger.V(2).Info("using cached source (no APIReader available)", "resourceVersion", cached.GetResourceVersion())
return cached, nil
}
cachedRV := cached.GetResourceVersion()
// Second try: Direct API read to verify freshness (bypasses cache)
fresh := &unstructured.Unstructured{}
fresh.SetGroupVersionKind(gvk)
if err := r.APIReader.Get(ctx, key, fresh); err != nil {
// If direct API read fails, fall back to cached version
logger.V(1).Info("direct API read failed, using cached version", "error", err, "cachedRV", cachedRV)
return cached, nil
}
freshRV := fresh.GetResourceVersion()
// Compare resource versions
if cachedRV != freshRV {
// Cache is stale - return fresh version from API
logger.V(1).Info("cache stale, using fresh API version",
"cachedRV", cachedRV,
"freshRV", freshRV)
return fresh, nil
}
// Cache is current - return cached version (saves memory allocation)
logger.V(2).Info("cache current", "resourceVersion", cachedRV)
return cached, nil
}
// Reconcile processes a single source resource.
func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues(
"namespace", req.Namespace,
"name", req.Name,
"kind", r.GVK.Kind,
"group", r.GVK.Group,
"version", r.GVK.Version,
)
// Fetch the source resource with optional freshness verification
source, err := r.getSourceWithFreshness(ctx, req.NamespacedName, r.GVK)
if err != nil {
if errors.IsNotFound(err) {
// Resource deleted - nothing to do
return ctrl.Result{}, nil
}
logger.Error(err, "failed to get resource")
return ctrl.Result{}, err
}
sourceObj := source
// Check if this is a mirror resource (shouldn't reconcile mirrors as sources)
if IsMirrorResource(sourceObj) {
// Silently skip - mirrors reconcile via watch, not as sources
return ctrl.Result{}, nil
}
// Check circuit breaker - skip if circuit is open (too many failures)
if r.CircuitBreaker != nil {
if !r.CircuitBreaker.AllowRequest(req.Namespace, req.Name, r.GVK.Kind) {
cbState := r.CircuitBreaker.GetState(req.Namespace, req.Name, r.GVK.Kind)
failCount := r.CircuitBreaker.GetFailureCount(req.Namespace, req.Name, r.GVK.Kind)
logger.Info("circuit breaker open, skipping reconciliation",
"state", cbState.String(),
"consecutiveFailures", failCount,
"lastError", r.CircuitBreaker.GetLastError(req.Namespace, req.Name, r.GVK.Kind))
// Requeue after circuit breaker reset timeout to try again
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}
}
// Check if resource is enabled for mirroring
// Check if resource is being deleted
if !sourceObj.GetDeletionTimestamp().IsZero() {
// Resource is being deleted - clean up mirrors and remove finalizer
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
logger.Info("source being deleted, cleaning up all mirrors")
deleteErr := r.deleteAllMirrors(ctx, sourceObj)
if deleteErr != nil {
logger.Error(deleteErr, "failed to delete all mirrors during source deletion")
return ctrl.Result{}, deleteErr
}
// Remove finalizer to allow resource deletion
logger.Info("removing finalizer from source resource")
finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName)
sourceObj.SetFinalizers(finalizers)
updateErr := r.Update(ctx, source)
if updateErr != nil {
logger.Error(updateErr, "failed to remove finalizer")
return ctrl.Result{}, updateErr
}
logger.Info("finalizer removed, resource can now be deleted")
}
return ctrl.Result{}, nil
}
if !isEnabledForMirroring(sourceObj) {
// Resource is disabled - remove finalizer if present and delete all mirrors
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
return r.handleDisabled(ctx, sourceObj)
}
// No finalizer, just skip
return ctrl.Result{}, nil
}
// Refuse to mirror sensitive Secret types (service-account tokens, bootstrap
// tokens, helm release blobs). Mirroring these to other namespaces is a
// credential exposure path. If a finalizer is already set (the resource was
// enabled before we started enforcing this list), tear down via handleDisabled
// so any prior mirrors are cleaned up.
if isBlacklistedSecret(sourceObj) {
secretType, _, _ := unstructured.NestedString(sourceObj.Object, "type")
logger.Info("refusing to mirror blacklisted Secret type",
"type", secretType,
"reason", "credential exposure risk")
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
return r.handleDisabled(ctx, sourceObj)
}
return ctrl.Result{}, nil
}
// Add finalizer if not present
if !slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
logger.Info("adding finalizer to source resource")
finalizers := append(sourceObj.GetFinalizers(), constants.FinalizerName)
sourceObj.SetFinalizers(finalizers)
addFinalizerErr := r.Update(ctx, source)
if addFinalizerErr != nil {
logger.Error(addFinalizerErr, "failed to add finalizer")
return ctrl.Result{}, addFinalizerErr
}
logger.Info("finalizer added")
// Requeue to continue with reconciliation after finalizer is added
return ctrl.Result{Requeue: true}, nil
}
// Get target namespaces
targetNamespaces, err := r.resolveTargetNamespaces(ctx, sourceObj)
if err != nil {
logger.Error(err, "failed to resolve target namespaces")
if r.CircuitBreaker != nil {
r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err)
}
return ctrl.Result{}, err
}
if len(targetNamespaces) == 0 {
logger.V(1).Info("no target namespaces resolved")
return ctrl.Result{}, nil
}
logger.V(1).Info("reconciling mirrors", "targetCount", len(targetNamespaces))
// Reconcile each target namespace
var reconciledCount, errorCount int
for _, targetNs := range targetNamespaces {
reconcileErr := r.reconcileMirror(ctx, source, sourceObj, targetNs)
if reconcileErr != nil {
logger.Error(reconcileErr, "failed to reconcile mirror", "targetNamespace", targetNs)
errorCount++
} else {
reconciledCount++
}
}
// Clean up orphaned mirrors (namespaces that no longer match the target criteria)
orphanedCount, err := r.cleanupOrphanedMirrors(ctx, sourceObj, targetNamespaces)
if err != nil {
logger.Error(err, "failed to cleanup orphaned mirrors")
// Don't fail reconciliation for cleanup errors, just log them
} else if orphanedCount > 0 {
logger.Info("cleaned up orphaned mirrors", "count", orphanedCount)
}
// Update status annotation with last sync info
if err := r.updateLastSyncStatus(ctx, source, sourceObj, reconciledCount, errorCount); err != nil {
logger.Error(err, "failed to update sync status")
if r.CircuitBreaker != nil {
r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err)
}
return ctrl.Result{}, err
}
logger.Info("reconciliation complete",
"reconciled", reconciledCount,
"errors", errorCount,
"total", len(targetNamespaces))
// Return error if there were errors (controller-runtime will automatically requeue with exponential backoff)
if errorCount > 0 {
err := fmt.Errorf("failed to reconcile %d/%d mirrors", errorCount, len(targetNamespaces))
// Record failure with circuit breaker
if r.CircuitBreaker != nil {
state, justOpened := r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err)
if justOpened {
logger.Info("circuit breaker opened due to repeated failures",
"state", state.String(),
"consecutiveFailures", r.CircuitBreaker.GetFailureCount(req.Namespace, req.Name, r.GVK.Kind))
}
}
return ctrl.Result{}, err
}
// Record success with circuit breaker
if r.CircuitBreaker != nil {
r.CircuitBreaker.RecordSuccess(req.Namespace, req.Name, r.GVK.Kind)
}
return ctrl.Result{}, nil
}
// handleDisabled removes mirrors when a resource is disabled.
func (r *SourceReconciler) handleDisabled(ctx context.Context, sourceObj metav1.Object) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// Delete all mirrors for this disabled source
if err := r.deleteAllMirrors(ctx, sourceObj); err != nil {
logger.Error(err, "failed to delete mirrors for disabled resource")
return ctrl.Result{}, err
}
// Remove finalizer if present
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
logger.Info("removing finalizer from disabled resource")
finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName)
sourceObj.SetFinalizers(finalizers)
// Get the unstructured object to update - sourceObj is already *unstructured.Unstructured
source := sourceObj.(*unstructured.Unstructured)
if err := r.Update(ctx, source); err != nil {
logger.Error(err, "failed to remove finalizer from disabled resource")
return ctrl.Result{}, err
}
logger.V(1).Info("finalizer removed from disabled resource")
}
logger.V(1).Info("mirrors deleted for disabled resource")
return ctrl.Result{}, nil
}
// reconcileMirror creates or updates a mirror in the target namespace.
func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.Object, sourceObj metav1.Object, targetNs string) error {
logger := log.FromContext(ctx).WithValues("targetNamespace", targetNs)
// Try to get existing mirror as unstructured
sourceUnstructured := source.(*unstructured.Unstructured)
existing := &unstructured.Unstructured{}
existing.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
err := r.Get(ctx, client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()}, existing)
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to get existing mirror: %w", err)
}
// If freshness verification is enabled and mirror exists, verify it's fresh too
if err == nil && r.Config.VerifySourceFreshness && r.APIReader != nil {
fresh := &unstructured.Unstructured{}
fresh.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
if apiErr := r.APIReader.Get(ctx, client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()}, fresh); apiErr == nil {
if fresh.GetResourceVersion() != existing.GetResourceVersion() {
logger.V(2).Info("mirror cache stale, using fresh API version",
"cachedRV", existing.GetResourceVersion(),
"freshRV", fresh.GetResourceVersion())
existing = fresh
}
}
}
if err == nil {
// Mirror exists - check if it's managed by us
if !IsManagedByUs(existing) {
logger.V(1).Info("target resource exists but not managed by kubemirror, skipping")
return nil
}
// Check if update is needed
needsSync, syncCheckErr := hash.NeedsSync(source, existing, existing.GetAnnotations())
if syncCheckErr != nil {
return fmt.Errorf("failed to check if sync needed: %w", syncCheckErr)
}
if !needsSync {
logger.V(2).Info("mirror is up to date")
return nil
}
// Update mirror
updateErr := UpdateMirror(existing, source)
if updateErr != nil {
return fmt.Errorf("failed to update mirror: %w", updateErr)
}
clusterUpdateErr := r.Update(ctx, existing)
if clusterUpdateErr != nil {
return fmt.Errorf("failed to update mirror in cluster: %w", clusterUpdateErr)
}
logger.V(1).Info("mirror updated")
return nil
}
// Create new mirror
mirror, err := CreateMirror(source, targetNs)
if err != nil {
return fmt.Errorf("failed to create mirror: %w", err)
}
mirrorObj := mirror.(client.Object)
if err := r.Create(ctx, mirrorObj); err != nil {
return fmt.Errorf("failed to create mirror in cluster: %w", err)
}
// Verify mirror was actually created (catches webhook rejections, quota issues)
verifyMirror := &unstructured.Unstructured{}
verifyMirror.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
verifyKey := client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()}
if verifyErr := r.Get(ctx, verifyKey, verifyMirror); verifyErr != nil {
logger.Error(verifyErr, "mirror creation verification failed - mirror may have been rejected")
return fmt.Errorf("mirror creation verification failed: %w", verifyErr)
}
logger.V(1).Info("mirror created and verified")
return nil
}
// deleteAllMirrors deletes all mirrors that this source owns across the cluster.
// It verifies ownership (managed-by label + source-reference annotation) before
// deleting anything to avoid destroying unrelated resources that happen to share
// the source's name. Per-namespace failures are aggregated so callers can defer
// finalizer removal until cleanup actually succeeds.
func (r *SourceReconciler) deleteAllMirrors(ctx context.Context, sourceObj metav1.Object) error {
logger := log.FromContext(ctx)
allNamespaces, err := r.NamespaceLister.ListNamespaces(ctx)
if err != nil {
return fmt.Errorf("failed to list namespaces: %w", err)
}
sourceUnstructured, ok := sourceObj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("source object is not unstructured")
}
var (
deleteCount int
deleteErrs []error
)
for _, ns := range allNamespaces {
if ns == sourceObj.GetNamespace() {
continue
}
existing := &unstructured.Unstructured{}
existing.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
getErr := r.Get(ctx, client.ObjectKey{Namespace: ns, Name: sourceObj.GetName()}, existing)
if errors.IsNotFound(getErr) {
continue
}
if getErr != nil {
logger.Error(getErr, "failed to fetch potential mirror", "namespace", ns)
deleteErrs = append(deleteErrs, fmt.Errorf("get mirror %s/%s: %w", ns, sourceObj.GetName(), getErr))
continue
}
if !IsManagedByUs(existing) {
continue
}
srcNs, srcName, _, found := GetSourceReference(existing)
if !found || srcNs != sourceObj.GetNamespace() || srcName != sourceObj.GetName() {
continue
}
if delErr := r.Delete(ctx, existing); delErr != nil && !errors.IsNotFound(delErr) {
logger.Error(delErr, "failed to delete mirror", "namespace", ns)
deleteErrs = append(deleteErrs, fmt.Errorf("delete mirror %s/%s: %w", ns, sourceObj.GetName(), delErr))
continue
}
deleteCount++
}
logger.Info("deleted mirrors", "count", deleteCount, "errors", len(deleteErrs))
if len(deleteErrs) > 0 {
return stderrors.Join(deleteErrs...)
}
return nil
}
// cleanupOrphanedMirrors removes mirrors that exist but are no longer in the target list.
// This handles cases where target-namespaces annotation changes (e.g., "all" → "all-labeled" or "app-*" → "prod-*").
func (r *SourceReconciler) cleanupOrphanedMirrors(ctx context.Context, sourceObj metav1.Object, targetNamespaces []string) (int, error) {
logger := log.FromContext(ctx)
// List all namespaces using unified method for consistency
nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx)
if err != nil {
return 0, fmt.Errorf("failed to list namespaces: %w", err)
}
allNamespaces := nsInfo.All
// Get GVK from source object
sourceUnstructured, ok := sourceObj.(*unstructured.Unstructured)
if !ok {
return 0, fmt.Errorf("source object is not unstructured")
}
// Create a set of target namespaces for quick lookup
targetSet := make(map[string]bool)
for _, ns := range targetNamespaces {
targetSet[ns] = true
}
var deletedCount int
for _, ns := range allNamespaces {
// Skip source namespace
if ns == sourceObj.GetNamespace() {
continue
}
// Skip if this namespace IS in the current target list
if targetSet[ns] {
continue
}
// Check if a mirror exists in this namespace
mirror := &unstructured.Unstructured{}
mirror.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
mirror.SetNamespace(ns)
mirror.SetName(sourceObj.GetName())
err := r.Get(ctx, client.ObjectKey{Namespace: ns, Name: sourceObj.GetName()}, mirror)
if errors.IsNotFound(err) {
// No mirror exists, nothing to clean up
continue
}
if err != nil {
logger.Error(err, "failed to check for mirror", "namespace", ns)
continue
}
// Verify this is actually our mirror (not someone else's resource with the same name)
if !IsManagedByUs(mirror) {
continue
}
// Verify this mirror points to our source
srcNs, srcName, _, found := GetSourceReference(mirror)
if !found || srcNs != sourceObj.GetNamespace() || srcName != sourceObj.GetName() {
continue
}
// This is an orphaned mirror - delete it
if err := r.Delete(ctx, mirror); err != nil {
logger.Error(err, "failed to delete orphaned mirror", "namespace", ns)
continue
}
deletedCount++
logger.V(1).Info("deleted orphaned mirror", "namespace", ns)
}
return deletedCount, nil
}
// resolveTargetNamespaces determines which namespaces should receive mirrors.
func (r *SourceReconciler) resolveTargetNamespaces(ctx context.Context, sourceObj metav1.Object) ([]string, error) {
annotations := sourceObj.GetAnnotations()
if annotations == nil {
return nil, nil
}
targetNsAnnotation := annotations[constants.AnnotationTargetNamespaces]
if targetNsAnnotation == "" {
return nil, nil
}
// Parse patterns
patterns := filter.ParseTargetNamespaces(targetNsAnnotation)
if len(patterns) == 0 {
return nil, nil
}
// Validate patterns and log warnings for invalid ones
validationResults, allValid := filter.ValidatePatterns(patterns)
if !allValid {
logger := log.FromContext(ctx)
invalidPatterns := filter.InvalidPatterns(validationResults)
for _, invalid := range invalidPatterns {
logger.Info("invalid glob pattern in target-namespaces annotation, pattern will be skipped",
"pattern", invalid.Pattern,
"error", invalid.Error.Error(),
"source", sourceObj.GetName(),
"namespace", sourceObj.GetNamespace(),
)
}
// Filter to only valid patterns
var validPatterns []string
for _, result := range validationResults {
if result.Valid {
validPatterns = append(validPatterns, result.Pattern)
}
}
patterns = validPatterns
// If no valid patterns remain, return empty
if len(patterns) == 0 {
return nil, nil
}
}
// Get all namespace info in a single API call (more efficient than 3 separate calls)
nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list namespaces: %w", err)
}
// Resolve target namespaces using the pre-categorized namespace info
targetNamespaces := filter.ResolveTargetNamespaces(
patterns,
nsInfo.All,
nsInfo.AllowMirrors,
nsInfo.OptOut,
sourceObj.GetNamespace(),
r.Filter,
)
// Enforce max targets limit
if r.Config != nil && r.Config.MaxTargetsPerResource > 0 && len(targetNamespaces) > r.Config.MaxTargetsPerResource {
targetNamespaces = targetNamespaces[:r.Config.MaxTargetsPerResource]
}
return targetNamespaces, nil
}
// updateLastSyncStatus updates the source resource's sync-status annotation.
// Skips the Update call entirely if the value is unchanged; otherwise the
// resulting watch event re-fires Reconcile and the controller spins on its
// own writes. This is a no-op for steady-state convergence.
func (r *SourceReconciler) updateLastSyncStatus(ctx context.Context, source runtime.Object, sourceObj metav1.Object, reconciledCount, errorCount int) error {
desired := fmt.Sprintf("reconciled:%d,errors:%d", reconciledCount, errorCount)
annotations := sourceObj.GetAnnotations()
if annotations[constants.AnnotationSyncStatus] == desired {
return nil
}
if annotations == nil {
annotations = make(map[string]string)
}
annotations[constants.AnnotationSyncStatus] = desired
sourceObj.SetAnnotations(annotations)
// source (*unstructured.Unstructured) already implements client.Object
return r.Update(ctx, source.(*unstructured.Unstructured))
}
// isBlacklistedSecret reports whether the given source is a core/v1 Secret with
// a Type that must never be mirrored across namespaces.
func isBlacklistedSecret(obj *unstructured.Unstructured) bool {
if obj.GetKind() != "Secret" || obj.GetAPIVersion() != "v1" {
return false
}
secretType, found, err := unstructured.NestedString(obj.Object, "type")
if err != nil || !found {
return false
}
return slices.Contains(constants.BlacklistedSecretTypes, secretType)
}
// isEnabledForMirroring checks if a resource has both the label and annotation for mirroring.
func isEnabledForMirroring(obj metav1.Object) bool {
// Check label
labels := obj.GetLabels()
if labels == nil || labels[constants.LabelEnabled] != "true" {
return false
}
// Check annotation
annotations := obj.GetAnnotations()
if annotations == nil || annotations[constants.AnnotationSync] != "true" {
return false
}
return true
}
// SetupWithManager sets up the controller with the Manager.
func (r *SourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Build predicate to only watch resources with enabled label
// This reduces API server load by ~90%
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Secret{}).
Complete(r)
}
// SetupWithManagerForResourceType sets up a controller for a specific resource type.
// This allows dynamic controller registration for any discovered resource type.
func (r *SourceReconciler) SetupWithManagerForResourceType(
mgr ctrl.Manager,
gvk schema.GroupVersionKind,
) error {
// Create an unstructured object for this GVK
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
// Create unique controller name including version and group to avoid collisions
// e.g., "HorizontalPodAutoscaler.v1.autoscaling" or "Secret.v1." (empty group for core resources)
// This matches the naming convention used by mirror reconcilers
controllerName := gvk.Kind + "." + gvk.Version + "." + gvk.Group
// Create mirror object for watching
mirrorObj := &unstructured.Unstructured{}
mirrorObj.SetGroupVersionKind(gvk)
// Create predicates to only watch mirror deletions
mirrorDeletePredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return IsMirrorResource(e.Object) },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
return ctrl.NewControllerManagedBy(mgr).
For(obj).
Named(controllerName).
// Watch mirror resources - when deleted, enqueue source for reconciliation
Watches(
mirrorObj,
handler.EnqueueRequestsFromMapFunc(r.mapMirrorToSource),
builder.WithPredicates(mirrorDeletePredicate),
).
Complete(r)
}
// mapMirrorToSource maps a mirror resource to its source for reconciliation.
func (r *SourceReconciler) mapMirrorToSource(ctx context.Context, obj client.Object) []reconcile.Request {
// Only process if this is a mirror
if !IsMirrorResource(obj) {
return nil
}
// Get source reference from annotations
sourceNs, sourceName, _, found := GetSourceReference(obj)
if !found {
return nil
}
// Enqueue reconciliation request for the source
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: sourceNs,
Name: sourceName,
},
},
}
}
// removeString removes a string from a slice.
func removeString(slice []string, s string) []string {
result := make([]string, 0, len(slice))
for _, item := range slice {
if item != s {
result = append(result, item)
}
}
return result
}