mirror of
https://github.com/lukaszraczylo/kubemirror.git
synced 2026-06-05 22:43:51 +00:00
4277c8ac39
C1: deleteAllMirrors used to issue a blind Delete on every namespace matching the source name+GVK, which would destroy unrelated resources (e.g. a 'default' SA, 'ca-bundle' ConfigMap) sharing the source name. Now reads each candidate, verifies managed-by label and source-reference annotation, and only deletes confirmed mirrors. M1: BlacklistedSecretTypes was declared but never enforced. Enabling mirroring on a service-account-token / bootstrap-token / helm release Secret would mirror credentials cluster-wide. Now refused at Reconcile. M3: deleteAllMirrors swallowed per-namespace errors and returned nil, so callers removed the finalizer even on partial failure (orphans). Errors are now joined and returned.
756 lines
26 KiB
Go
756 lines
26 KiB
Go
// Package controller implements the kubemirror reconciliation logic.
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
stderrors "errors"
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/builder"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
"github.com/lukaszraczylo/kubemirror/pkg/circuitbreaker"
|
|
"github.com/lukaszraczylo/kubemirror/pkg/config"
|
|
"github.com/lukaszraczylo/kubemirror/pkg/constants"
|
|
"github.com/lukaszraczylo/kubemirror/pkg/filter"
|
|
"github.com/lukaszraczylo/kubemirror/pkg/hash"
|
|
)
|
|
|
|
// SourceReconciler reconciles source resources that need mirroring.
|
|
type SourceReconciler struct {
|
|
client.Client
|
|
NamespaceLister NamespaceLister
|
|
APIReader client.Reader
|
|
Scheme *runtime.Scheme
|
|
Config *config.Config
|
|
Filter *filter.NamespaceFilter
|
|
CircuitBreaker *circuitbreaker.CircuitBreaker
|
|
GVK schema.GroupVersionKind
|
|
}
|
|
|
|
// NamespaceLister provides a list of all namespaces in the cluster.
|
|
// This interface allows for testing with mocks.
|
|
type NamespaceLister interface {
|
|
ListNamespaces(ctx context.Context) ([]string, error)
|
|
ListAllowMirrorsNamespaces(ctx context.Context) ([]string, error)
|
|
ListOptOutNamespaces(ctx context.Context) ([]string, error)
|
|
// ListNamespacesWithLabels returns all namespace info in a single API call (preferred)
|
|
ListNamespacesWithLabels(ctx context.Context) (*NamespaceInfo, error)
|
|
}
|
|
|
|
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=get;list;watch
|
|
|
|
// getSourceWithFreshness fetches a source resource with optional freshness verification.
|
|
// This implements a hybrid caching strategy:
|
|
// 1. First read from informer cache (fast, local)
|
|
// 2. If VerifySourceFreshness is enabled, make direct API call via APIReader
|
|
// 3. If resourceVersions differ, cache is stale - return fresh version from API
|
|
// 4. If resourceVersions match, cache is current - return cached version
|
|
//
|
|
// This prevents the race condition where:
|
|
// - Watch event arrives: "Secret changed!"
|
|
// - Reconciliation starts immediately
|
|
// - Cache hasn't updated yet (5-20 second lag)
|
|
// - We read stale data and mirror it
|
|
//
|
|
// Trade-off: 2x API calls when cache is stale, but guarantees data freshness.
|
|
func (r *SourceReconciler) getSourceWithFreshness(ctx context.Context, key client.ObjectKey, gvk schema.GroupVersionKind) (*unstructured.Unstructured, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
// First try: Read from cache (fast)
|
|
cached := &unstructured.Unstructured{}
|
|
cached.SetGroupVersionKind(gvk)
|
|
if err := r.Get(ctx, key, cached); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If freshness verification is disabled, return cached version immediately
|
|
if !r.Config.VerifySourceFreshness {
|
|
logger.V(2).Info("using cached source (freshness check disabled)", "resourceVersion", cached.GetResourceVersion())
|
|
return cached, nil
|
|
}
|
|
|
|
// If APIReader is not available (e.g., in tests), fall back to cached version
|
|
if r.APIReader == nil {
|
|
logger.V(2).Info("using cached source (no APIReader available)", "resourceVersion", cached.GetResourceVersion())
|
|
return cached, nil
|
|
}
|
|
|
|
cachedRV := cached.GetResourceVersion()
|
|
|
|
// Second try: Direct API read to verify freshness (bypasses cache)
|
|
fresh := &unstructured.Unstructured{}
|
|
fresh.SetGroupVersionKind(gvk)
|
|
if err := r.APIReader.Get(ctx, key, fresh); err != nil {
|
|
// If direct API read fails, fall back to cached version
|
|
logger.V(1).Info("direct API read failed, using cached version", "error", err, "cachedRV", cachedRV)
|
|
return cached, nil
|
|
}
|
|
|
|
freshRV := fresh.GetResourceVersion()
|
|
|
|
// Compare resource versions
|
|
if cachedRV != freshRV {
|
|
// Cache is stale - return fresh version from API
|
|
logger.V(1).Info("cache stale, using fresh API version",
|
|
"cachedRV", cachedRV,
|
|
"freshRV", freshRV)
|
|
return fresh, nil
|
|
}
|
|
|
|
// Cache is current - return cached version (saves memory allocation)
|
|
logger.V(2).Info("cache current", "resourceVersion", cachedRV)
|
|
return cached, nil
|
|
}
|
|
|
|
// Reconcile processes a single source resource.
|
|
func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx).WithValues(
|
|
"namespace", req.Namespace,
|
|
"name", req.Name,
|
|
"kind", r.GVK.Kind,
|
|
"group", r.GVK.Group,
|
|
"version", r.GVK.Version,
|
|
)
|
|
|
|
// Fetch the source resource with optional freshness verification
|
|
source, err := r.getSourceWithFreshness(ctx, req.NamespacedName, r.GVK)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
// Resource deleted - nothing to do
|
|
return ctrl.Result{}, nil
|
|
}
|
|
logger.Error(err, "failed to get resource")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
sourceObj := source
|
|
|
|
// Check if this is a mirror resource (shouldn't reconcile mirrors as sources)
|
|
if IsMirrorResource(sourceObj) {
|
|
// Silently skip - mirrors reconcile via watch, not as sources
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Check circuit breaker - skip if circuit is open (too many failures)
|
|
if r.CircuitBreaker != nil {
|
|
if !r.CircuitBreaker.AllowRequest(req.Namespace, req.Name, r.GVK.Kind) {
|
|
cbState := r.CircuitBreaker.GetState(req.Namespace, req.Name, r.GVK.Kind)
|
|
failCount := r.CircuitBreaker.GetFailureCount(req.Namespace, req.Name, r.GVK.Kind)
|
|
logger.Info("circuit breaker open, skipping reconciliation",
|
|
"state", cbState.String(),
|
|
"consecutiveFailures", failCount,
|
|
"lastError", r.CircuitBreaker.GetLastError(req.Namespace, req.Name, r.GVK.Kind))
|
|
// Requeue after circuit breaker reset timeout to try again
|
|
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
|
|
}
|
|
}
|
|
|
|
// Check if resource is enabled for mirroring
|
|
// Check if resource is being deleted
|
|
if !sourceObj.GetDeletionTimestamp().IsZero() {
|
|
// Resource is being deleted - clean up mirrors and remove finalizer
|
|
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
|
|
logger.Info("source being deleted, cleaning up all mirrors")
|
|
deleteErr := r.deleteAllMirrors(ctx, sourceObj)
|
|
if deleteErr != nil {
|
|
logger.Error(deleteErr, "failed to delete all mirrors during source deletion")
|
|
return ctrl.Result{}, deleteErr
|
|
}
|
|
|
|
// Remove finalizer to allow resource deletion
|
|
logger.Info("removing finalizer from source resource")
|
|
finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName)
|
|
sourceObj.SetFinalizers(finalizers)
|
|
updateErr := r.Update(ctx, source)
|
|
if updateErr != nil {
|
|
logger.Error(updateErr, "failed to remove finalizer")
|
|
return ctrl.Result{}, updateErr
|
|
}
|
|
logger.Info("finalizer removed, resource can now be deleted")
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
if !isEnabledForMirroring(sourceObj) {
|
|
// Resource is disabled - remove finalizer if present and delete all mirrors
|
|
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
|
|
return r.handleDisabled(ctx, sourceObj)
|
|
}
|
|
// No finalizer, just skip
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Refuse to mirror sensitive Secret types (service-account tokens, bootstrap
|
|
// tokens, helm release blobs). Mirroring these to other namespaces is a
|
|
// credential exposure path. If a finalizer is already set (the resource was
|
|
// enabled before we started enforcing this list), tear down via handleDisabled
|
|
// so any prior mirrors are cleaned up.
|
|
if isBlacklistedSecret(sourceObj) {
|
|
secretType, _, _ := unstructured.NestedString(sourceObj.Object, "type")
|
|
logger.Info("refusing to mirror blacklisted Secret type",
|
|
"type", secretType,
|
|
"reason", "credential exposure risk")
|
|
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
|
|
return r.handleDisabled(ctx, sourceObj)
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Add finalizer if not present
|
|
if !slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
|
|
logger.Info("adding finalizer to source resource")
|
|
finalizers := append(sourceObj.GetFinalizers(), constants.FinalizerName)
|
|
sourceObj.SetFinalizers(finalizers)
|
|
addFinalizerErr := r.Update(ctx, source)
|
|
if addFinalizerErr != nil {
|
|
logger.Error(addFinalizerErr, "failed to add finalizer")
|
|
return ctrl.Result{}, addFinalizerErr
|
|
}
|
|
logger.Info("finalizer added")
|
|
// Requeue to continue with reconciliation after finalizer is added
|
|
return ctrl.Result{Requeue: true}, nil
|
|
}
|
|
|
|
// Get target namespaces
|
|
targetNamespaces, err := r.resolveTargetNamespaces(ctx, sourceObj)
|
|
if err != nil {
|
|
logger.Error(err, "failed to resolve target namespaces")
|
|
if r.CircuitBreaker != nil {
|
|
r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err)
|
|
}
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
if len(targetNamespaces) == 0 {
|
|
logger.V(1).Info("no target namespaces resolved")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
logger.V(1).Info("reconciling mirrors", "targetCount", len(targetNamespaces))
|
|
|
|
// Reconcile each target namespace
|
|
var reconciledCount, errorCount int
|
|
for _, targetNs := range targetNamespaces {
|
|
reconcileErr := r.reconcileMirror(ctx, source, sourceObj, targetNs)
|
|
if reconcileErr != nil {
|
|
logger.Error(reconcileErr, "failed to reconcile mirror", "targetNamespace", targetNs)
|
|
errorCount++
|
|
} else {
|
|
reconciledCount++
|
|
}
|
|
}
|
|
|
|
// Clean up orphaned mirrors (namespaces that no longer match the target criteria)
|
|
orphanedCount, err := r.cleanupOrphanedMirrors(ctx, sourceObj, targetNamespaces)
|
|
if err != nil {
|
|
logger.Error(err, "failed to cleanup orphaned mirrors")
|
|
// Don't fail reconciliation for cleanup errors, just log them
|
|
} else if orphanedCount > 0 {
|
|
logger.Info("cleaned up orphaned mirrors", "count", orphanedCount)
|
|
}
|
|
|
|
// Update status annotation with last sync info
|
|
if err := r.updateLastSyncStatus(ctx, source, sourceObj, reconciledCount, errorCount); err != nil {
|
|
logger.Error(err, "failed to update sync status")
|
|
if r.CircuitBreaker != nil {
|
|
r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err)
|
|
}
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
logger.Info("reconciliation complete",
|
|
"reconciled", reconciledCount,
|
|
"errors", errorCount,
|
|
"total", len(targetNamespaces))
|
|
|
|
// Return error if there were errors (controller-runtime will automatically requeue with exponential backoff)
|
|
if errorCount > 0 {
|
|
err := fmt.Errorf("failed to reconcile %d/%d mirrors", errorCount, len(targetNamespaces))
|
|
// Record failure with circuit breaker
|
|
if r.CircuitBreaker != nil {
|
|
state, justOpened := r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err)
|
|
if justOpened {
|
|
logger.Info("circuit breaker opened due to repeated failures",
|
|
"state", state.String(),
|
|
"consecutiveFailures", r.CircuitBreaker.GetFailureCount(req.Namespace, req.Name, r.GVK.Kind))
|
|
}
|
|
}
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Record success with circuit breaker
|
|
if r.CircuitBreaker != nil {
|
|
r.CircuitBreaker.RecordSuccess(req.Namespace, req.Name, r.GVK.Kind)
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// handleDisabled removes mirrors when a resource is disabled.
|
|
func (r *SourceReconciler) handleDisabled(ctx context.Context, sourceObj metav1.Object) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
// Delete all mirrors for this disabled source
|
|
if err := r.deleteAllMirrors(ctx, sourceObj); err != nil {
|
|
logger.Error(err, "failed to delete mirrors for disabled resource")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Remove finalizer if present
|
|
if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) {
|
|
logger.Info("removing finalizer from disabled resource")
|
|
finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName)
|
|
sourceObj.SetFinalizers(finalizers)
|
|
|
|
// Get the unstructured object to update - sourceObj is already *unstructured.Unstructured
|
|
source := sourceObj.(*unstructured.Unstructured)
|
|
if err := r.Update(ctx, source); err != nil {
|
|
logger.Error(err, "failed to remove finalizer from disabled resource")
|
|
return ctrl.Result{}, err
|
|
}
|
|
logger.V(1).Info("finalizer removed from disabled resource")
|
|
}
|
|
|
|
logger.V(1).Info("mirrors deleted for disabled resource")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// reconcileMirror creates or updates a mirror in the target namespace.
|
|
func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.Object, sourceObj metav1.Object, targetNs string) error {
|
|
logger := log.FromContext(ctx).WithValues("targetNamespace", targetNs)
|
|
|
|
// Try to get existing mirror as unstructured
|
|
sourceUnstructured := source.(*unstructured.Unstructured)
|
|
existing := &unstructured.Unstructured{}
|
|
existing.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
|
|
|
|
err := r.Get(ctx, client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()}, existing)
|
|
if err != nil && !errors.IsNotFound(err) {
|
|
return fmt.Errorf("failed to get existing mirror: %w", err)
|
|
}
|
|
|
|
// If freshness verification is enabled and mirror exists, verify it's fresh too
|
|
if err == nil && r.Config.VerifySourceFreshness && r.APIReader != nil {
|
|
fresh := &unstructured.Unstructured{}
|
|
fresh.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
|
|
if apiErr := r.APIReader.Get(ctx, client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()}, fresh); apiErr == nil {
|
|
if fresh.GetResourceVersion() != existing.GetResourceVersion() {
|
|
logger.V(2).Info("mirror cache stale, using fresh API version",
|
|
"cachedRV", existing.GetResourceVersion(),
|
|
"freshRV", fresh.GetResourceVersion())
|
|
existing = fresh
|
|
}
|
|
}
|
|
}
|
|
|
|
if err == nil {
|
|
// Mirror exists - check if it's managed by us
|
|
if !IsManagedByUs(existing) {
|
|
logger.V(1).Info("target resource exists but not managed by kubemirror, skipping")
|
|
return nil
|
|
}
|
|
|
|
// Check if update is needed
|
|
needsSync, syncCheckErr := hash.NeedsSync(source, existing, existing.GetAnnotations())
|
|
if syncCheckErr != nil {
|
|
return fmt.Errorf("failed to check if sync needed: %w", syncCheckErr)
|
|
}
|
|
|
|
if !needsSync {
|
|
logger.V(2).Info("mirror is up to date")
|
|
return nil
|
|
}
|
|
|
|
// Update mirror
|
|
updateErr := UpdateMirror(existing, source)
|
|
if updateErr != nil {
|
|
return fmt.Errorf("failed to update mirror: %w", updateErr)
|
|
}
|
|
|
|
clusterUpdateErr := r.Update(ctx, existing)
|
|
if clusterUpdateErr != nil {
|
|
return fmt.Errorf("failed to update mirror in cluster: %w", clusterUpdateErr)
|
|
}
|
|
|
|
logger.V(1).Info("mirror updated")
|
|
return nil
|
|
}
|
|
|
|
// Create new mirror
|
|
mirror, err := CreateMirror(source, targetNs)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create mirror: %w", err)
|
|
}
|
|
|
|
mirrorObj := mirror.(client.Object)
|
|
if err := r.Create(ctx, mirrorObj); err != nil {
|
|
return fmt.Errorf("failed to create mirror in cluster: %w", err)
|
|
}
|
|
|
|
// Verify mirror was actually created (catches webhook rejections, quota issues)
|
|
verifyMirror := &unstructured.Unstructured{}
|
|
verifyMirror.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
|
|
verifyKey := client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()}
|
|
if verifyErr := r.Get(ctx, verifyKey, verifyMirror); verifyErr != nil {
|
|
logger.Error(verifyErr, "mirror creation verification failed - mirror may have been rejected")
|
|
return fmt.Errorf("mirror creation verification failed: %w", verifyErr)
|
|
}
|
|
|
|
logger.V(1).Info("mirror created and verified")
|
|
return nil
|
|
}
|
|
|
|
// deleteAllMirrors deletes all mirrors that this source owns across the cluster.
|
|
// It verifies ownership (managed-by label + source-reference annotation) before
|
|
// deleting anything to avoid destroying unrelated resources that happen to share
|
|
// the source's name. Per-namespace failures are aggregated so callers can defer
|
|
// finalizer removal until cleanup actually succeeds.
|
|
func (r *SourceReconciler) deleteAllMirrors(ctx context.Context, sourceObj metav1.Object) error {
|
|
logger := log.FromContext(ctx)
|
|
|
|
allNamespaces, err := r.NamespaceLister.ListNamespaces(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list namespaces: %w", err)
|
|
}
|
|
|
|
sourceUnstructured, ok := sourceObj.(*unstructured.Unstructured)
|
|
if !ok {
|
|
return fmt.Errorf("source object is not unstructured")
|
|
}
|
|
|
|
var (
|
|
deleteCount int
|
|
deleteErrs []error
|
|
)
|
|
for _, ns := range allNamespaces {
|
|
if ns == sourceObj.GetNamespace() {
|
|
continue
|
|
}
|
|
|
|
existing := &unstructured.Unstructured{}
|
|
existing.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
|
|
getErr := r.Get(ctx, client.ObjectKey{Namespace: ns, Name: sourceObj.GetName()}, existing)
|
|
if errors.IsNotFound(getErr) {
|
|
continue
|
|
}
|
|
if getErr != nil {
|
|
logger.Error(getErr, "failed to fetch potential mirror", "namespace", ns)
|
|
deleteErrs = append(deleteErrs, fmt.Errorf("get mirror %s/%s: %w", ns, sourceObj.GetName(), getErr))
|
|
continue
|
|
}
|
|
|
|
if !IsManagedByUs(existing) {
|
|
continue
|
|
}
|
|
srcNs, srcName, _, found := GetSourceReference(existing)
|
|
if !found || srcNs != sourceObj.GetNamespace() || srcName != sourceObj.GetName() {
|
|
continue
|
|
}
|
|
|
|
if delErr := r.Delete(ctx, existing); delErr != nil && !errors.IsNotFound(delErr) {
|
|
logger.Error(delErr, "failed to delete mirror", "namespace", ns)
|
|
deleteErrs = append(deleteErrs, fmt.Errorf("delete mirror %s/%s: %w", ns, sourceObj.GetName(), delErr))
|
|
continue
|
|
}
|
|
deleteCount++
|
|
}
|
|
|
|
logger.Info("deleted mirrors", "count", deleteCount, "errors", len(deleteErrs))
|
|
if len(deleteErrs) > 0 {
|
|
return stderrors.Join(deleteErrs...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleanupOrphanedMirrors removes mirrors that exist but are no longer in the target list.
|
|
// This handles cases where target-namespaces annotation changes (e.g., "all" → "all-labeled" or "app-*" → "prod-*").
|
|
func (r *SourceReconciler) cleanupOrphanedMirrors(ctx context.Context, sourceObj metav1.Object, targetNamespaces []string) (int, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
// List all namespaces using unified method for consistency
|
|
nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to list namespaces: %w", err)
|
|
}
|
|
allNamespaces := nsInfo.All
|
|
|
|
// Get GVK from source object
|
|
sourceUnstructured, ok := sourceObj.(*unstructured.Unstructured)
|
|
if !ok {
|
|
return 0, fmt.Errorf("source object is not unstructured")
|
|
}
|
|
|
|
// Create a set of target namespaces for quick lookup
|
|
targetSet := make(map[string]bool)
|
|
for _, ns := range targetNamespaces {
|
|
targetSet[ns] = true
|
|
}
|
|
|
|
var deletedCount int
|
|
for _, ns := range allNamespaces {
|
|
// Skip source namespace
|
|
if ns == sourceObj.GetNamespace() {
|
|
continue
|
|
}
|
|
|
|
// Skip if this namespace IS in the current target list
|
|
if targetSet[ns] {
|
|
continue
|
|
}
|
|
|
|
// Check if a mirror exists in this namespace
|
|
mirror := &unstructured.Unstructured{}
|
|
mirror.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
|
|
mirror.SetNamespace(ns)
|
|
mirror.SetName(sourceObj.GetName())
|
|
|
|
err := r.Get(ctx, client.ObjectKey{Namespace: ns, Name: sourceObj.GetName()}, mirror)
|
|
if errors.IsNotFound(err) {
|
|
// No mirror exists, nothing to clean up
|
|
continue
|
|
}
|
|
if err != nil {
|
|
logger.Error(err, "failed to check for mirror", "namespace", ns)
|
|
continue
|
|
}
|
|
|
|
// Verify this is actually our mirror (not someone else's resource with the same name)
|
|
if !IsManagedByUs(mirror) {
|
|
continue
|
|
}
|
|
|
|
// Verify this mirror points to our source
|
|
srcNs, srcName, _, found := GetSourceReference(mirror)
|
|
if !found || srcNs != sourceObj.GetNamespace() || srcName != sourceObj.GetName() {
|
|
continue
|
|
}
|
|
|
|
// This is an orphaned mirror - delete it
|
|
if err := r.Delete(ctx, mirror); err != nil {
|
|
logger.Error(err, "failed to delete orphaned mirror", "namespace", ns)
|
|
continue
|
|
}
|
|
|
|
deletedCount++
|
|
logger.V(1).Info("deleted orphaned mirror", "namespace", ns)
|
|
}
|
|
|
|
return deletedCount, nil
|
|
}
|
|
|
|
// resolveTargetNamespaces determines which namespaces should receive mirrors.
|
|
func (r *SourceReconciler) resolveTargetNamespaces(ctx context.Context, sourceObj metav1.Object) ([]string, error) {
|
|
annotations := sourceObj.GetAnnotations()
|
|
if annotations == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
targetNsAnnotation := annotations[constants.AnnotationTargetNamespaces]
|
|
if targetNsAnnotation == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
// Parse patterns
|
|
patterns := filter.ParseTargetNamespaces(targetNsAnnotation)
|
|
if len(patterns) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Validate patterns and log warnings for invalid ones
|
|
validationResults, allValid := filter.ValidatePatterns(patterns)
|
|
if !allValid {
|
|
logger := log.FromContext(ctx)
|
|
invalidPatterns := filter.InvalidPatterns(validationResults)
|
|
for _, invalid := range invalidPatterns {
|
|
logger.Info("invalid glob pattern in target-namespaces annotation, pattern will be skipped",
|
|
"pattern", invalid.Pattern,
|
|
"error", invalid.Error.Error(),
|
|
"source", sourceObj.GetName(),
|
|
"namespace", sourceObj.GetNamespace(),
|
|
)
|
|
}
|
|
|
|
// Filter to only valid patterns
|
|
var validPatterns []string
|
|
for _, result := range validationResults {
|
|
if result.Valid {
|
|
validPatterns = append(validPatterns, result.Pattern)
|
|
}
|
|
}
|
|
patterns = validPatterns
|
|
|
|
// If no valid patterns remain, return empty
|
|
if len(patterns) == 0 {
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
// Get all namespace info in a single API call (more efficient than 3 separate calls)
|
|
nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list namespaces: %w", err)
|
|
}
|
|
|
|
// Resolve target namespaces using the pre-categorized namespace info
|
|
targetNamespaces := filter.ResolveTargetNamespaces(
|
|
patterns,
|
|
nsInfo.All,
|
|
nsInfo.AllowMirrors,
|
|
nsInfo.OptOut,
|
|
sourceObj.GetNamespace(),
|
|
r.Filter,
|
|
)
|
|
|
|
// Enforce max targets limit
|
|
if r.Config != nil && r.Config.MaxTargetsPerResource > 0 && len(targetNamespaces) > r.Config.MaxTargetsPerResource {
|
|
targetNamespaces = targetNamespaces[:r.Config.MaxTargetsPerResource]
|
|
}
|
|
|
|
return targetNamespaces, nil
|
|
}
|
|
|
|
// updateLastSyncStatus updates the source resource's annotations with sync status.
|
|
func (r *SourceReconciler) updateLastSyncStatus(ctx context.Context, source runtime.Object, sourceObj metav1.Object, reconciledCount, errorCount int) error {
|
|
annotations := sourceObj.GetAnnotations()
|
|
if annotations == nil {
|
|
annotations = make(map[string]string)
|
|
}
|
|
|
|
annotations[constants.AnnotationSyncStatus] = fmt.Sprintf("reconciled:%d,errors:%d", reconciledCount, errorCount)
|
|
|
|
sourceObj.SetAnnotations(annotations)
|
|
// source (*unstructured.Unstructured) already implements client.Object
|
|
return r.Update(ctx, source.(*unstructured.Unstructured))
|
|
}
|
|
|
|
// isBlacklistedSecret reports whether the given source is a core/v1 Secret with
|
|
// a Type that must never be mirrored across namespaces.
|
|
func isBlacklistedSecret(obj *unstructured.Unstructured) bool {
|
|
if obj.GetKind() != "Secret" || obj.GetAPIVersion() != "v1" {
|
|
return false
|
|
}
|
|
secretType, found, err := unstructured.NestedString(obj.Object, "type")
|
|
if err != nil || !found {
|
|
return false
|
|
}
|
|
return slices.Contains(constants.BlacklistedSecretTypes, secretType)
|
|
}
|
|
|
|
// isEnabledForMirroring checks if a resource has both the label and annotation for mirroring.
|
|
func isEnabledForMirroring(obj metav1.Object) bool {
|
|
// Check label
|
|
labels := obj.GetLabels()
|
|
if labels == nil || labels[constants.LabelEnabled] != "true" {
|
|
return false
|
|
}
|
|
|
|
// Check annotation
|
|
annotations := obj.GetAnnotations()
|
|
if annotations == nil || annotations[constants.AnnotationSync] != "true" {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// SetupWithManager sets up the controller with the Manager.
|
|
func (r *SourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
// Build predicate to only watch resources with enabled label
|
|
// This reduces API server load by ~90%
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&corev1.Secret{}).
|
|
Complete(r)
|
|
}
|
|
|
|
// SetupWithManagerForResourceType sets up a controller for a specific resource type.
|
|
// This allows dynamic controller registration for any discovered resource type.
|
|
func (r *SourceReconciler) SetupWithManagerForResourceType(
|
|
mgr ctrl.Manager,
|
|
gvk schema.GroupVersionKind,
|
|
) error {
|
|
// Create an unstructured object for this GVK
|
|
obj := &unstructured.Unstructured{}
|
|
obj.SetGroupVersionKind(gvk)
|
|
|
|
// Create unique controller name including version and group to avoid collisions
|
|
// e.g., "HorizontalPodAutoscaler.v1.autoscaling" or "Secret.v1." (empty group for core resources)
|
|
// This matches the naming convention used by mirror reconcilers
|
|
controllerName := gvk.Kind + "." + gvk.Version + "." + gvk.Group
|
|
|
|
// Create mirror object for watching
|
|
mirrorObj := &unstructured.Unstructured{}
|
|
mirrorObj.SetGroupVersionKind(gvk)
|
|
|
|
// Create predicates to only watch mirror deletions
|
|
mirrorDeletePredicate := predicate.Funcs{
|
|
CreateFunc: func(e event.CreateEvent) bool { return false },
|
|
UpdateFunc: func(e event.UpdateEvent) bool { return false },
|
|
DeleteFunc: func(e event.DeleteEvent) bool { return IsMirrorResource(e.Object) },
|
|
GenericFunc: func(e event.GenericEvent) bool { return false },
|
|
}
|
|
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(obj).
|
|
Named(controllerName).
|
|
// Watch mirror resources - when deleted, enqueue source for reconciliation
|
|
Watches(
|
|
mirrorObj,
|
|
handler.EnqueueRequestsFromMapFunc(r.mapMirrorToSource),
|
|
builder.WithPredicates(mirrorDeletePredicate),
|
|
).
|
|
Complete(r)
|
|
}
|
|
|
|
// mapMirrorToSource maps a mirror resource to its source for reconciliation.
|
|
func (r *SourceReconciler) mapMirrorToSource(ctx context.Context, obj client.Object) []reconcile.Request {
|
|
// Only process if this is a mirror
|
|
if !IsMirrorResource(obj) {
|
|
return nil
|
|
}
|
|
|
|
// Get source reference from annotations
|
|
sourceNs, sourceName, _, found := GetSourceReference(obj)
|
|
if !found {
|
|
return nil
|
|
}
|
|
|
|
// Enqueue reconciliation request for the source
|
|
return []reconcile.Request{
|
|
{
|
|
NamespacedName: types.NamespacedName{
|
|
Namespace: sourceNs,
|
|
Name: sourceName,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// removeString removes a string from a slice.
|
|
func removeString(slice []string, s string) []string {
|
|
result := make([]string, 0, len(slice))
|
|
for _, item := range slice {
|
|
if item != s {
|
|
result = append(result, item)
|
|
}
|
|
}
|
|
return result
|
|
}
|