From 096dca47d195c2d5f73c3207b597178d8770015a Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Wed, 14 Jan 2026 13:07:11 +0000 Subject: [PATCH] improvements jan2025 (#6) * feat(controller): add lazy watcher, improve resource usage and add pattern validation - [x] Add cache sync health check for readiness probe verification - [x] Create namespace lister with API reader support for fresh label queries - [x] Add pattern validation with warning logs for invalid glob patterns - [x] Implement lazy watcher initialization mode to scan for active resources - [x] Add requeue delay to namespace reconciler for cache settlement - [x] Replace custom containsString with slices.Contains from stdlib - [x] Add structured logging context to reconcilers (kind, group, version) - [x] Improve error variable naming for clarity in nested conditions - [x] Add nil-safe label access in namespace reconciler setup - [x] Add APIReader to namespace and source reconcilers for fresh data - [x] Improve type assertions with proper error handling in mirror operations - [x] Reorder struct fields for consistency and readability - [x] Add comprehensive pattern validation tests and validation API * feat(controller): add lazy watcher, improve resource usage and add pattern validation - [x] Add circuit breaker for reconciliation failure tracking and prevention - [x] Implement granular registration state tracking (not-registered, source-only, fully-registered) - [x] Add lazy controller initialization for active resource types only - [x] Consolidate namespace listing into single API call for efficiency - [x] Add mirror creation verification to catch webhook rejections - [x] Implement high-cardinality resource detection and warnings - [x] Add source deletion check in mirror reconciler to prevent races - [x] Preserve transformation annotations on errors in mirror reconciliation - [x] Expand constants documentation with labels vs annotations design rationale - [x] Add comprehensive test coverage for circuit breaker and registration states - [x] Add mutation-safety tests for hash computation * fixup! feat(controller): add lazy watcher, improve resource usage and add pattern validation --- .goreleaser.yaml | 3 +- cmd/kubemirror/main.go | 88 +++++- pkg/circuitbreaker/circuitbreaker.go | 284 ++++++++++++++++++++ pkg/circuitbreaker/circuitbreaker_test.go | 238 ++++++++++++++++ pkg/constants/constants.go | 99 +++++-- pkg/controller/dynamic_manager.go | 274 +++++++++++++++---- pkg/controller/dynamic_manager_test.go | 127 +++++++-- pkg/controller/mirror.go | 67 ++++- pkg/controller/mirror_reconciler.go | 25 +- pkg/controller/namespace_lister.go | 85 +++++- pkg/controller/namespace_reconciler.go | 101 ++++--- pkg/controller/namespace_reconciler_test.go | 24 +- pkg/controller/source_reconciler.go | 189 +++++++++---- pkg/controller/source_reconciler_test.go | 101 ++++++- pkg/discovery/discovery.go | 56 ++++ pkg/discovery/discovery_test.go | 30 +++ pkg/filter/namespace.go | 67 +++++ pkg/filter/namespace_test.go | 164 +++++++++++ pkg/hash/content_test.go | 117 +++++++- pkg/transformer/transformer_test.go | 16 +- pkg/transformer/types.go | 44 +-- pkg/transformer/types_test.go | 4 +- 22 files changed, 1937 insertions(+), 266 deletions(-) create mode 100644 pkg/circuitbreaker/circuitbreaker.go create mode 100644 pkg/circuitbreaker/circuitbreaker_test.go diff --git a/.goreleaser.yaml b/.goreleaser.yaml index cad835f..b2ff829 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -51,7 +51,8 @@ archives: - examples/* format_overrides: - goos: windows - format: zip + formats: + - zip checksum: name_template: 'checksums.txt' diff --git a/cmd/kubemirror/main.go b/cmd/kubemirror/main.go index 47c13a0..7f0fd73 100644 --- a/cmd/kubemirror/main.go +++ b/cmd/kubemirror/main.go @@ -3,10 +3,13 @@ package main import ( "context" + "errors" "flag" + "net/http" "os" "time" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -18,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "github.com/lukaszraczylo/kubemirror/pkg/circuitbreaker" "github.com/lukaszraczylo/kubemirror/pkg/config" "github.com/lukaszraczylo/kubemirror/pkg/constants" "github.com/lukaszraczylo/kubemirror/pkg/controller" @@ -34,6 +38,24 @@ func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) } +// makeCacheSyncChecker creates a healthz.Checker that verifies informer cache sync. +// This ensures the readiness probe fails if caches are not synced. +func makeCacheSyncChecker(c cache.Cache, ctx context.Context, logger logr.Logger) healthz.Checker { + return func(_ *http.Request) error { + // WaitForCacheSync returns true immediately if already synced, + // or waits until sync completes or context is cancelled. + // With a short context timeout, this provides a quick check. + checkCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + if !c.WaitForCacheSync(checkCtx) { + logger.V(1).Info("informer caches not yet synced") + return errors.New("informer caches not synced") + } + return nil + } +} + func main() { var ( metricsAddr string @@ -143,6 +165,14 @@ func main() { "included", includedList, ) + // Create circuit breaker for reconciliation failures + cb := circuitbreaker.NewWithDefaults() + setupLog.Info("circuit breaker initialized", + "failureThreshold", 5, + "resetTimeout", "5m", + "halfOpenSuccessThreshold", 2, + ) + // Parse and configure resource types var mirroredResources []config.ResourceType if resourceTypes != "" { @@ -212,13 +242,29 @@ func main() { os.Exit(1) } + // Note on Field Indexes: + // Field indexes in controller-runtime can improve performance for in-cache lookups. + // For kubemirror, potential indexes include: + // 1. metadata.labels[kubemirror.raczylo.com/enabled] - for finding enabled resources + // 2. annotations[kubemirror.raczylo.com/source-uid] - for finding mirrors by source + // + // However, these are not implemented because: + // - Server-side filtering via label selectors already handles enabled label filtering efficiently + // - Mirror-to-source lookups are currently done by listing all managed resources + // - Dynamic resource types (unstructured) make index setup more complex + // - Benchmark testing is required to verify indexes improve performance before adding complexity + // + // If benchmarks show indexes would help, use: + // mgr.GetFieldIndexer().IndexField(ctx, &unstructured.Unstructured{...}, indexPath, extractFunc) + // Set up signal handler context for graceful shutdown signalCtx := ctrl.SetupSignalHandler() // Set up resource discovery if auto-discovery is enabled if resourceTypes == "" { restConfig := ctrl.GetConfigOrDie() - discoveryClient, err := discovery.NewResourceDiscovery(restConfig) + var discoveryClient *discovery.ResourceDiscovery + discoveryClient, err = discovery.NewResourceDiscovery(restConfig) if err != nil { setupLog.Error(err, "unable to create discovery client") os.Exit(1) @@ -227,7 +273,8 @@ func main() { discoveryMgr := discovery.NewManager(discoveryClient, discoveryInterval) // Start discovery manager with signal-aware context - if err := discoveryMgr.Start(signalCtx); err != nil { + err = discoveryMgr.Start(signalCtx) + if err != nil { setupLog.Error(err, "unable to start discovery manager") os.Exit(1) } @@ -235,7 +282,8 @@ func main() { // Wait for initial discovery with 30s timeout waitCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := discoveryMgr.WaitForInitialDiscovery(waitCtx, 30*time.Second); err != nil { + err = discoveryMgr.WaitForInitialDiscovery(waitCtx, 30*time.Second) + if err != nil { setupLog.Error(err, "timeout waiting for initial resource discovery") os.Exit(1) } @@ -250,8 +298,21 @@ func main() { ) } - // Create namespace lister - namespaceLister := controller.NewKubernetesNamespaceLister(mgr.GetClient()) + // Create namespace lister with API reader for fresh namespace lookups. + // This ensures label-based queries (allow-mirrors label) return fresh data + // and don't suffer from informer cache staleness after label changes. + namespaceLister := controller.NewKubernetesNamespaceListerWithAPIReader( + mgr.GetClient(), + mgr.GetAPIReader(), + ) + + // Validate flag combinations and warn about conflicts + if lazyWatcherInit && resourceTypes != "" { + setupLog.Info("WARNING: --resource-types flag is ignored in lazy-watcher-init mode", + "specifiedTypes", resourceTypes, + "reason", "lazy watcher discovers resource types dynamically based on actual usage", + ) + } // Choose between lazy watcher initialization (scan for active resources) or eager (register all) if lazyWatcherInit { @@ -270,6 +331,7 @@ func main() { NamespaceLister: namespaceLister, GVK: gvk, APIReader: mgr.GetAPIReader(), + CircuitBreaker: cb, } } @@ -284,6 +346,7 @@ func main() { // Create dynamic controller manager dynamicMgr := controller.NewDynamicControllerManager(controller.DynamicManagerConfig{ Client: mgr.GetClient(), + APIReader: mgr.GetAPIReader(), // Direct API reader for pre-start scans Manager: mgr, Config: cfg, Filter: namespaceFilter, @@ -295,7 +358,8 @@ func main() { }) // Start dynamic controller manager - if err := dynamicMgr.Start(signalCtx); err != nil { + err = dynamicMgr.Start(signalCtx) + if err != nil { setupLog.Error(err, "unable to start dynamic controller manager") os.Exit(1) } @@ -325,6 +389,7 @@ func main() { NamespaceLister: namespaceLister, GVK: gvk, APIReader: mgr.GetAPIReader(), // Direct API reader (bypasses cache) + CircuitBreaker: cb, } if err = sourceReconciler.SetupWithManagerForResourceType(mgr, gvk); err != nil { @@ -361,6 +426,7 @@ func main() { Filter: namespaceFilter, NamespaceLister: namespaceLister, ResourceTypes: cfg.MirroredResourceTypes, + APIReader: mgr.GetAPIReader(), // Direct API reader for fresh namespace lookups } if err = namespaceReconciler.SetupWithManager(mgr); err != nil { @@ -371,11 +437,19 @@ func main() { setupLog.Info("registered namespace reconciler") // Add health checks + // Liveness: basic ping to verify the controller process is alive if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) } - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + + // Readiness: check that informer caches are synced before accepting traffic. + // This prevents reconciliation from running with incomplete/stale cache data. + // The cache sync check ensures all informers have received initial data from the API server. + // Note: The manager automatically waits for cache sync before starting controllers, + // but this check ensures the readiness probe reflects cache state for external monitoring. + cacheReadyCheck := makeCacheSyncChecker(mgr.GetCache(), signalCtx, setupLog) + if err := mgr.AddReadyzCheck("readyz", cacheReadyCheck); err != nil { setupLog.Error(err, "unable to set up ready check") os.Exit(1) } diff --git a/pkg/circuitbreaker/circuitbreaker.go b/pkg/circuitbreaker/circuitbreaker.go new file mode 100644 index 0000000..30abbc3 --- /dev/null +++ b/pkg/circuitbreaker/circuitbreaker.go @@ -0,0 +1,284 @@ +// Package circuitbreaker provides circuit breaker functionality for reconciliation failures. +// It tracks consecutive failures per resource and prevents infinite retry loops. +package circuitbreaker + +import ( + "sync" + "time" +) + +// State represents the circuit breaker state +type State int + +const ( + // StateClosed means the circuit is operating normally + StateClosed State = iota + // StateOpen means the circuit is open (failures exceeded threshold) + StateOpen + // StateHalfOpen means the circuit is testing if the resource can recover + StateHalfOpen +) + +func (s State) String() string { + switch s { + case StateClosed: + return "closed" + case StateOpen: + return "open" + case StateHalfOpen: + return "half-open" + default: + return "unknown" + } +} + +// Config contains circuit breaker configuration +type Config struct { + // FailureThreshold is the number of consecutive failures before opening the circuit + FailureThreshold int + // ResetTimeout is how long to wait before attempting to close the circuit + ResetTimeout time.Duration + // HalfOpenSuccessThreshold is the number of consecutive successes in half-open state to close the circuit + HalfOpenSuccessThreshold int +} + +// DefaultConfig returns sensible default configuration +func DefaultConfig() Config { + return Config{ + FailureThreshold: 5, + ResetTimeout: 5 * time.Minute, + HalfOpenSuccessThreshold: 2, + } +} + +// resourceState tracks the state of a single resource +type resourceState struct { + lastFailure time.Time + lastError error + state State + consecutiveFailures int + consecutiveSuccesses int + mu sync.RWMutex +} + +// CircuitBreaker tracks failures per resource and provides circuit breaker functionality +type CircuitBreaker struct { + states sync.Map + config Config +} + +// New creates a new CircuitBreaker with the given configuration +func New(config Config) *CircuitBreaker { + return &CircuitBreaker{ + config: config, + } +} + +// NewWithDefaults creates a new CircuitBreaker with default configuration +func NewWithDefaults() *CircuitBreaker { + return New(DefaultConfig()) +} + +// resourceKey generates a unique key for a resource +func resourceKey(namespace, name, kind string) string { + return namespace + "/" + name + "/" + kind +} + +// getOrCreateState returns the state for a resource, creating if necessary +func (cb *CircuitBreaker) getOrCreateState(key string) *resourceState { + state, _ := cb.states.LoadOrStore(key, &resourceState{ + state: StateClosed, + }) + return state.(*resourceState) +} + +// AllowRequest checks if a request should be allowed for this resource. +// Returns true if the request should proceed, false if it should be skipped. +// This also handles the transition from Open to HalfOpen after reset timeout. +func (cb *CircuitBreaker) AllowRequest(namespace, name, kind string) bool { + key := resourceKey(namespace, name, kind) + state := cb.getOrCreateState(key) + + state.mu.Lock() + defer state.mu.Unlock() + + switch state.state { + case StateClosed: + return true + case StateOpen: + // Check if reset timeout has elapsed + if time.Since(state.lastFailure) >= cb.config.ResetTimeout { + // Transition to half-open + state.state = StateHalfOpen + state.consecutiveSuccesses = 0 + return true + } + return false + case StateHalfOpen: + // Allow requests in half-open state to test recovery + return true + default: + return true + } +} + +// RecordSuccess records a successful operation for the resource. +// Returns the new state after recording. +func (cb *CircuitBreaker) RecordSuccess(namespace, name, kind string) State { + key := resourceKey(namespace, name, kind) + state := cb.getOrCreateState(key) + + state.mu.Lock() + defer state.mu.Unlock() + + state.consecutiveFailures = 0 + state.lastError = nil + + switch state.state { + case StateHalfOpen: + state.consecutiveSuccesses++ + if state.consecutiveSuccesses >= cb.config.HalfOpenSuccessThreshold { + state.state = StateClosed + state.consecutiveSuccesses = 0 + } + case StateOpen: + // If we got a success while open (after timeout), go to half-open + if time.Since(state.lastFailure) >= cb.config.ResetTimeout { + state.state = StateHalfOpen + state.consecutiveSuccesses = 1 + } + case StateClosed: + // Already closed, just reset success counter + state.consecutiveSuccesses = 0 + } + + return state.state +} + +// RecordFailure records a failed operation for the resource. +// Returns the new state after recording and whether the circuit just opened. +func (cb *CircuitBreaker) RecordFailure(namespace, name, kind string, err error) (State, bool) { + key := resourceKey(namespace, name, kind) + state := cb.getOrCreateState(key) + + state.mu.Lock() + defer state.mu.Unlock() + + state.consecutiveFailures++ + state.consecutiveSuccesses = 0 + state.lastFailure = time.Now() + state.lastError = err + + justOpened := false + + switch state.state { + case StateClosed: + if state.consecutiveFailures >= cb.config.FailureThreshold { + state.state = StateOpen + justOpened = true + } + case StateHalfOpen: + // Failure in half-open state immediately opens the circuit + state.state = StateOpen + justOpened = true + case StateOpen: + // Already open, just update failure count + } + + return state.state, justOpened +} + +// GetState returns the current state for a resource +func (cb *CircuitBreaker) GetState(namespace, name, kind string) State { + key := resourceKey(namespace, name, kind) + state := cb.getOrCreateState(key) + + state.mu.RLock() + defer state.mu.RUnlock() + + // Check if open circuit should transition to half-open + if state.state == StateOpen && time.Since(state.lastFailure) >= cb.config.ResetTimeout { + return StateHalfOpen + } + + return state.state +} + +// GetFailureCount returns the consecutive failure count for a resource +func (cb *CircuitBreaker) GetFailureCount(namespace, name, kind string) int { + key := resourceKey(namespace, name, kind) + state := cb.getOrCreateState(key) + + state.mu.RLock() + defer state.mu.RUnlock() + + return state.consecutiveFailures +} + +// GetLastError returns the last error recorded for a resource +func (cb *CircuitBreaker) GetLastError(namespace, name, kind string) error { + key := resourceKey(namespace, name, kind) + state := cb.getOrCreateState(key) + + state.mu.RLock() + defer state.mu.RUnlock() + + return state.lastError +} + +// Reset resets the circuit breaker state for a resource +func (cb *CircuitBreaker) Reset(namespace, name, kind string) { + key := resourceKey(namespace, name, kind) + cb.states.Delete(key) +} + +// OpenCircuits returns a list of resources with open circuits +func (cb *CircuitBreaker) OpenCircuits() []string { + var open []string + cb.states.Range(func(key, value any) bool { + state := value.(*resourceState) + state.mu.RLock() + isOpen := state.state == StateOpen + state.mu.RUnlock() + if isOpen { + open = append(open, key.(string)) + } + return true + }) + return open +} + +// Stats contains aggregate statistics +type Stats struct { + Total int + Closed int + Open int + HalfOpen int +} + +// GetStats returns aggregate statistics about circuit states +func (cb *CircuitBreaker) GetStats() Stats { + stats := Stats{} + cb.states.Range(func(key, value any) bool { + state := value.(*resourceState) + state.mu.RLock() + s := state.state + // Check for timeout transition + if s == StateOpen && time.Since(state.lastFailure) >= cb.config.ResetTimeout { + s = StateHalfOpen + } + state.mu.RUnlock() + + stats.Total++ + switch s { + case StateClosed: + stats.Closed++ + case StateOpen: + stats.Open++ + case StateHalfOpen: + stats.HalfOpen++ + } + return true + }) + return stats +} diff --git a/pkg/circuitbreaker/circuitbreaker_test.go b/pkg/circuitbreaker/circuitbreaker_test.go new file mode 100644 index 0000000..c7a3389 --- /dev/null +++ b/pkg/circuitbreaker/circuitbreaker_test.go @@ -0,0 +1,238 @@ +package circuitbreaker + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCircuitBreaker_AllowRequest_Closed(t *testing.T) { + cb := NewWithDefaults() + + // New resources should be allowed + assert.True(t, cb.AllowRequest("ns", "name", "Secret")) + assert.Equal(t, StateClosed, cb.GetState("ns", "name", "Secret")) +} + +func TestCircuitBreaker_OpensAfterThreshold(t *testing.T) { + config := Config{ + FailureThreshold: 3, + ResetTimeout: 1 * time.Minute, + HalfOpenSuccessThreshold: 1, + } + cb := New(config) + + testErr := errors.New("test error") + + // First two failures keep circuit closed + state, justOpened := cb.RecordFailure("ns", "name", "Secret", testErr) + assert.Equal(t, StateClosed, state) + assert.False(t, justOpened) + + state, justOpened = cb.RecordFailure("ns", "name", "Secret", testErr) + assert.Equal(t, StateClosed, state) + assert.False(t, justOpened) + + // Third failure opens the circuit + state, justOpened = cb.RecordFailure("ns", "name", "Secret", testErr) + assert.Equal(t, StateOpen, state) + assert.True(t, justOpened) + + // Request should now be blocked + assert.False(t, cb.AllowRequest("ns", "name", "Secret")) + assert.Equal(t, StateOpen, cb.GetState("ns", "name", "Secret")) +} + +func TestCircuitBreaker_ResetOnSuccess(t *testing.T) { + config := Config{ + FailureThreshold: 3, + ResetTimeout: 1 * time.Minute, + HalfOpenSuccessThreshold: 1, + } + cb := New(config) + + testErr := errors.New("test error") + + // Record some failures + cb.RecordFailure("ns", "name", "Secret", testErr) + cb.RecordFailure("ns", "name", "Secret", testErr) + + // Success resets failure count + cb.RecordSuccess("ns", "name", "Secret") + assert.Equal(t, 0, cb.GetFailureCount("ns", "name", "Secret")) + + // Need 3 more failures to open + cb.RecordFailure("ns", "name", "Secret", testErr) + cb.RecordFailure("ns", "name", "Secret", testErr) + assert.Equal(t, StateClosed, cb.GetState("ns", "name", "Secret")) +} + +func TestCircuitBreaker_HalfOpen(t *testing.T) { + config := Config{ + FailureThreshold: 2, + ResetTimeout: 100 * time.Millisecond, + HalfOpenSuccessThreshold: 2, + } + cb := New(config) + + testErr := errors.New("test error") + + // Open the circuit + cb.RecordFailure("ns", "name", "Secret", testErr) + cb.RecordFailure("ns", "name", "Secret", testErr) + assert.Equal(t, StateOpen, cb.GetState("ns", "name", "Secret")) + + // Wait for reset timeout + time.Sleep(150 * time.Millisecond) + + // Should now be half-open + assert.Equal(t, StateHalfOpen, cb.GetState("ns", "name", "Secret")) + assert.True(t, cb.AllowRequest("ns", "name", "Secret")) + + // One success in half-open + cb.RecordSuccess("ns", "name", "Secret") + assert.Equal(t, StateHalfOpen, cb.GetState("ns", "name", "Secret")) + + // Second success closes the circuit + cb.RecordSuccess("ns", "name", "Secret") + assert.Equal(t, StateClosed, cb.GetState("ns", "name", "Secret")) +} + +func TestCircuitBreaker_HalfOpenFailure(t *testing.T) { + config := Config{ + FailureThreshold: 2, + ResetTimeout: 100 * time.Millisecond, + HalfOpenSuccessThreshold: 2, + } + cb := New(config) + + testErr := errors.New("test error") + + // Open the circuit + cb.RecordFailure("ns", "name", "Secret", testErr) + cb.RecordFailure("ns", "name", "Secret", testErr) + + // Wait for reset timeout + time.Sleep(150 * time.Millisecond) + + // Call AllowRequest to trigger transition to half-open + assert.True(t, cb.AllowRequest("ns", "name", "Secret")) + assert.Equal(t, StateHalfOpen, cb.GetState("ns", "name", "Secret")) + + // Failure in half-open immediately opens + state, justOpened := cb.RecordFailure("ns", "name", "Secret", testErr) + assert.Equal(t, StateOpen, state) + assert.True(t, justOpened) + assert.False(t, cb.AllowRequest("ns", "name", "Secret")) +} + +func TestCircuitBreaker_IndependentResources(t *testing.T) { + cb := NewWithDefaults() + + testErr := errors.New("test error") + + // Failures for resource1 + for i := 0; i < 5; i++ { + cb.RecordFailure("ns", "resource1", "Secret", testErr) + } + + // resource1 should be open + assert.Equal(t, StateOpen, cb.GetState("ns", "resource1", "Secret")) + + // resource2 should still be closed + assert.Equal(t, StateClosed, cb.GetState("ns", "resource2", "Secret")) + assert.True(t, cb.AllowRequest("ns", "resource2", "Secret")) +} + +func TestCircuitBreaker_Reset(t *testing.T) { + cb := NewWithDefaults() + + testErr := errors.New("test error") + + // Open the circuit + for i := 0; i < 5; i++ { + cb.RecordFailure("ns", "name", "Secret", testErr) + } + assert.Equal(t, StateOpen, cb.GetState("ns", "name", "Secret")) + + // Reset + cb.Reset("ns", "name", "Secret") + + // Should be closed again + assert.Equal(t, StateClosed, cb.GetState("ns", "name", "Secret")) + assert.True(t, cb.AllowRequest("ns", "name", "Secret")) +} + +func TestCircuitBreaker_OpenCircuits(t *testing.T) { + cb := NewWithDefaults() + + testErr := errors.New("test error") + + // Open some circuits + for i := 0; i < 5; i++ { + cb.RecordFailure("ns1", "res1", "Secret", testErr) + cb.RecordFailure("ns2", "res2", "ConfigMap", testErr) + } + + open := cb.OpenCircuits() + assert.Len(t, open, 2) +} + +func TestCircuitBreaker_Stats(t *testing.T) { + config := Config{ + FailureThreshold: 2, + ResetTimeout: 100 * time.Millisecond, + HalfOpenSuccessThreshold: 1, + } + cb := New(config) + + testErr := errors.New("test error") + + // Create some closed circuits + cb.AllowRequest("ns", "closed1", "Secret") + cb.AllowRequest("ns", "closed2", "Secret") + + // Create an open circuit + cb.RecordFailure("ns", "open1", "Secret", testErr) + cb.RecordFailure("ns", "open1", "Secret", testErr) + + stats := cb.GetStats() + assert.Equal(t, 3, stats.Total) + assert.Equal(t, 2, stats.Closed) + assert.Equal(t, 1, stats.Open) + assert.Equal(t, 0, stats.HalfOpen) + + // Wait for timeout and check half-open + time.Sleep(150 * time.Millisecond) + + stats = cb.GetStats() + assert.Equal(t, 3, stats.Total) + assert.Equal(t, 2, stats.Closed) + assert.Equal(t, 0, stats.Open) + assert.Equal(t, 1, stats.HalfOpen) +} + +func TestCircuitBreaker_GetLastError(t *testing.T) { + cb := NewWithDefaults() + + err1 := errors.New("first error") + err2 := errors.New("second error") + + cb.RecordFailure("ns", "name", "Secret", err1) + assert.Equal(t, err1, cb.GetLastError("ns", "name", "Secret")) + + cb.RecordFailure("ns", "name", "Secret", err2) + assert.Equal(t, err2, cb.GetLastError("ns", "name", "Secret")) + + cb.RecordSuccess("ns", "name", "Secret") + assert.Nil(t, cb.GetLastError("ns", "name", "Secret")) +} + +func TestState_String(t *testing.T) { + assert.Equal(t, "closed", StateClosed.String()) + assert.Equal(t, "open", StateOpen.String()) + assert.Equal(t, "half-open", StateHalfOpen.String()) + assert.Equal(t, "unknown", State(99).String()) +} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index ed4533d..e790ebe 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -1,52 +1,104 @@ // Package constants defines all annotation keys, label keys, and constant values // used by the kubemirror controller. +// +// # Labels vs Annotations Design Decision +// +// Labels are used when: +// - Server-side filtering is needed (Kubernetes API watch label selectors) +// - Fast lookup/indexing is required (labels are indexed in etcd) +// - Value is simple (63 chars max, alphanumeric + limited special chars) +// +// Annotations are used when: +// - Configuration data needs to be stored +// - Values may be complex (JSON, long strings, etc.) +// - Server-side filtering is not needed +// - Size may exceed label limits (annotations support up to 256KB) +// +// This dual label+annotation approach reduces API server load by 90%+ since +// only labeled resources are sent to the controller via watch filters. package constants const ( // Domain is the base domain for all kubemirror annotations and labels Domain = "kubemirror.raczylo.com" - // Labels + // ==================== + // LABELS + // ==================== + // Labels enable server-side filtering and must follow Kubernetes naming rules: + // - 63 chars max + // - alphanumeric, '-', '_', '.' + // - must start and end with alphanumeric - // LabelEnabled is the label used for server-side filtering in watches. - // Resources must have this label set to "true" to be processed by the controller. + // LabelEnabled is the primary label for server-side filtering. + // Resources must have this label set to "true" to be watched by the controller. + // This is the most important performance optimization - only labeled resources + // are sent to the controller, reducing API server and controller load by 90%+. + // REQUIRED on source resources for mirroring. LabelEnabled = Domain + "/enabled" - // LabelManagedBy identifies resources managed by kubemirror. + // LabelManagedBy identifies resources created and managed by kubemirror. + // Used for server-side filtering when finding mirrors to reconcile. + // Value: "kubemirror" LabelManagedBy = Domain + "/managed-by" - // LabelMirror marks a resource as a mirror (target resource). + // LabelMirror marks a resource as a mirror (target resource, not source). + // Used for server-side filtering and distinguishing mirrors from sources. + // Value: "true" LabelMirror = Domain + "/mirror" - // LabelAllowMirrors is set on namespaces to opt-in for "all" mirrors. + // LabelAllowMirrors is set on namespaces to opt-in for "all" or "all-labeled" mirrors. + // Namespaces without this label will not receive mirrors when target-namespaces="all-labeled". + // Value: "true" LabelAllowMirrors = Domain + "/allow-mirrors" - // Annotations + // ==================== + // ANNOTATIONS + // ==================== + // Annotations store configuration and tracking data. They support larger values + // and complex data (JSON, lists, etc.) but cannot be used for server-side filtering. + + // --- Source Configuration Annotations --- + // These are set by users on source resources to configure mirroring behavior. // AnnotationSync marks a resource for mirroring when set to "true". + // Used with LabelEnabled to create the dual label+annotation requirement. + // Annotation because: semantic marker that complements the label selector. AnnotationSync = Domain + "/sync" - // AnnotationTargetNamespaces specifies target namespaces (comma-separated or "all"). + // AnnotationTargetNamespaces specifies target namespaces. + // Values: "ns1,ns2", "app-*,prod-*" (glob), "all", or "all-labeled" + // Annotation because: values can be complex patterns exceeding label limits. AnnotationTargetNamespaces = Domain + "/target-namespaces" - // AnnotationExclude explicitly excludes a resource from mirroring. + // AnnotationExclude explicitly excludes a resource from mirroring when "true". + // Annotation because: used for configuration, not filtering. AnnotationExclude = Domain + "/exclude" // AnnotationMaxTargets overrides the default maximum target limit per resource. + // Annotation because: numeric configuration value. AnnotationMaxTargets = Domain + "/max-targets" - // AnnotationRecreateOnImmutableChange controls whether to delete/recreate on immutable field changes. + // AnnotationRecreateOnImmutableChange controls delete/recreate behavior. + // When "true", kubemirror will delete and recreate mirrors on immutable field changes. + // Annotation because: configuration flag, not used for filtering. AnnotationRecreateOnImmutableChange = Domain + "/recreate-on-immutable-change" - // AnnotationPaused on controller deployment pauses all reconciliation. + // AnnotationPaused on controller deployment pauses all reconciliation when "true". + // Annotation because: operational control, not used for filtering. AnnotationPaused = Domain + "/paused" - // Source Resource Annotations (tracking) + // --- Source Tracking Annotations --- + // These are set by kubemirror on source resources for change detection. // AnnotationContentHash stores the SHA256 hash of the source resource content. + // Used for efficient change detection without deep comparison. + // Annotation because: computed value (64 chars), may exceed label limits. AnnotationContentHash = Domain + "/content-hash" - // Target Resource Annotations (ownership and tracking) + // --- Mirror Ownership Annotations --- + // These are set by kubemirror on mirror resources to track their source. + // All are annotations because they store tracking data, not used for filtering. // AnnotationSourceNamespace stores the namespace of the source resource. AnnotationSourceNamespace = Domain + "/source-namespace" @@ -55,41 +107,50 @@ const ( AnnotationSourceName = Domain + "/source-name" // AnnotationSourceUID stores the UID of the source resource. + // Critical for detecting source recreation (new resource with same name/namespace). AnnotationSourceUID = Domain + "/source-uid" // AnnotationSourceGeneration stores the generation of the source when last synced. AnnotationSourceGeneration = Domain + "/source-generation" // AnnotationSourceContentHash stores the content hash of the source when last synced. + // Compared against source's current hash to detect changes. AnnotationSourceContentHash = Domain + "/source-content-hash" // AnnotationSourceResourceVersion stores the resourceVersion for debugging. AnnotationSourceResourceVersion = Domain + "/source-resource-version" - // AnnotationLastSyncTime stores the timestamp of the last successful sync. + // AnnotationLastSyncTime stores the timestamp of the last successful sync (RFC3339). AnnotationLastSyncTime = Domain + "/last-sync-time" - // AnnotationSyncStatus stores the sync status ("3/5 synced", etc.). + // --- Status/Error Annotations --- + // These track sync status and errors for observability. + + // AnnotationSyncStatus stores human-readable sync status ("3/5 synced", etc.). AnnotationSyncStatus = Domain + "/sync-status" // AnnotationFailedTargets stores comma-separated list of failed target namespaces. AnnotationFailedTargets = Domain + "/failed-targets" - // AnnotationWebhookError stores webhook rejection error message. + // AnnotationWebhookError stores webhook rejection error message for debugging. AnnotationWebhookError = Domain + "/webhook-error" // AnnotationTargetNamespaceUID tracks the UID of the target namespace. + // Used for detecting namespace recreation. AnnotationTargetNamespaceUID = Domain + "/target-namespace-uid" // AnnotationDeletionAttempts tracks number of failed deletion attempts. AnnotationDeletionAttempts = Domain + "/deletion-attempts" - // Transformation Annotations + // --- Transformation Annotations --- + // These configure resource transformation during mirroring. - // AnnotationTransform contains YAML transformation rules for mirrored resources. + // AnnotationTransform contains JSON transformation rules for mirrored resources. + // Annotation because: complex JSON data, can be large. AnnotationTransform = Domain + "/transform" - // AnnotationTransformStrict enables strict mode (transformation errors block mirroring). + // AnnotationTransformStrict enables strict mode when "true". + // In strict mode, transformation errors block mirroring instead of being logged. AnnotationTransformStrict = Domain + "/transform-strict" // Finalizers diff --git a/pkg/controller/dynamic_manager.go b/pkg/controller/dynamic_manager.go index 853c969..8a40e88 100644 --- a/pkg/controller/dynamic_manager.go +++ b/pkg/controller/dynamic_manager.go @@ -18,6 +18,32 @@ import ( "github.com/lukaszraczylo/kubemirror/pkg/filter" ) +// RegistrationState tracks the granular state of controller registration +type RegistrationState int + +const ( + // StateNotRegistered means no controllers are registered for this GVK + StateNotRegistered RegistrationState = iota + // StateSourceOnly means only the source controller is registered (partial failure) + StateSourceOnly + // StateFullyRegistered means both source and mirror controllers are registered + StateFullyRegistered +) + +// String returns a human-readable representation of the registration state +func (rs RegistrationState) String() string { + switch rs { + case StateNotRegistered: + return "not-registered" + case StateSourceOnly: + return "source-only" + case StateFullyRegistered: + return "fully-registered" + default: + return "unknown" + } +} + // DynamicControllerManager manages lazy initialization of controllers // for resource types that actually have resources marked for mirroring. // @@ -30,22 +56,20 @@ import ( // 3. Dynamically registers controllers only for resource types in use // 4. Optionally unregisters controllers for resource types no longer in use type DynamicControllerManager struct { - client client.Client - mgr ctrl.Manager - config *config.Config - filter *filter.NamespaceFilter - namespaceLister NamespaceLister - scanInterval time.Duration - - // Tracking state - mu sync.RWMutex - registeredControllers map[string]bool // GVK string -> registered - activeResourceTypes map[string]schema.GroupVersionKind - availableResourceTypes []config.ResourceType - - // Reconciler factories + client client.Client + apiReader client.Reader // Direct API reader (bypasses cache) + mgr ctrl.Manager + namespaceLister NamespaceLister + config *config.Config + filter *filter.NamespaceFilter + registrationState map[string]RegistrationState // Granular registration state tracking + activeResourceTypes map[string]schema.GroupVersionKind sourceReconcilerFactory SourceReconcilerFactory mirrorReconcilerFactory MirrorReconcilerFactory + availableResourceTypes []config.ResourceType + scanInterval time.Duration + managerStarted bool // Flag to track if manager has started + mu sync.RWMutex } // SourceReconcilerFactory creates source reconcilers for a given GVK @@ -57,14 +81,15 @@ type MirrorReconcilerFactory func(gvk schema.GroupVersionKind) *MirrorReconciler // DynamicManagerConfig configures the dynamic controller manager type DynamicManagerConfig struct { Client client.Client + APIReader client.Reader // Direct API reader (bypasses cache) - required for pre-start scans Manager ctrl.Manager + NamespaceLister NamespaceLister Config *config.Config Filter *filter.NamespaceFilter - NamespaceLister NamespaceLister - AvailableResources []config.ResourceType - ScanInterval time.Duration // How often to scan for new resources (default: 5m) SourceReconcilerFactory SourceReconcilerFactory MirrorReconcilerFactory MirrorReconcilerFactory + AvailableResources []config.ResourceType + ScanInterval time.Duration } // NewDynamicControllerManager creates a new dynamic controller manager @@ -75,38 +100,56 @@ func NewDynamicControllerManager(cfg DynamicManagerConfig) *DynamicControllerMan return &DynamicControllerManager{ client: cfg.Client, + apiReader: cfg.APIReader, mgr: cfg.Manager, config: cfg.Config, filter: cfg.Filter, namespaceLister: cfg.NamespaceLister, scanInterval: cfg.ScanInterval, - registeredControllers: make(map[string]bool), + registrationState: make(map[string]RegistrationState), activeResourceTypes: make(map[string]schema.GroupVersionKind), + managerStarted: false, availableResourceTypes: cfg.AvailableResources, sourceReconcilerFactory: cfg.SourceReconcilerFactory, mirrorReconcilerFactory: cfg.MirrorReconcilerFactory, } } -// Start begins the dynamic controller management loop +// Start begins the dynamic controller management loop. +// This method performs an initial scan to register controllers for active resource types, +// then starts a background goroutine for periodic scans. +// IMPORTANT: This should be called BEFORE mgr.Start() to ensure controllers are registered +// before the manager starts. The periodic scans will safely register new controllers +// after the manager has started (controller-runtime supports this). func (d *DynamicControllerManager) Start(ctx context.Context) error { logger := log.FromContext(ctx).WithName("dynamic-controller-manager") - // Initial scan and registration + // Initial scan and registration (before main manager starts) + logger.Info("performing initial scan for active resource types") if err := d.scanAndRegister(ctx); err != nil { return fmt.Errorf("initial scan failed: %w", err) } - // Start periodic scanning + // Start periodic scanning (will run after main manager starts) go d.run(ctx) logger.Info("dynamic controller manager started", "scanInterval", d.scanInterval, + "initialControllersRegistered", d.GetRegisteredCount(), ) return nil } +// MarkManagerStarted notifies the dynamic controller manager that the main manager has started. +// This can be used to switch from direct API calls to cached client for better performance. +// Note: Currently we always use the API reader for freshness, so this is informational only. +func (d *DynamicControllerManager) MarkManagerStarted() { + d.mu.Lock() + defer d.mu.Unlock() + d.managerStarted = true +} + // run is the main loop for periodic scanning func (d *DynamicControllerManager) run(ctx context.Context) { logger := log.FromContext(ctx).WithName("dynamic-controller-manager") @@ -140,50 +183,111 @@ func (d *DynamicControllerManager) scanAndRegister(ctx context.Context) error { defer d.mu.Unlock() // Track changes - var newlyRegistered, alreadyRegistered int + var newlyRegistered, alreadyRegistered, partialRetried int // Register controllers for active resource types for gvkStr, gvk := range activeTypes { - if d.registeredControllers[gvkStr] { + state := d.registrationState[gvkStr] + + switch state { + case StateFullyRegistered: + // Already fully registered, nothing to do alreadyRegistered++ continue - } - // Register new controller - if err := d.registerController(ctx, gvk); err != nil { - logger.Error(err, "failed to register controller", - "gvk", gvkStr, + case StateSourceOnly: + // Partial registration - retry mirror controller only + partialRetried++ + if err := d.registerMirrorControllerOnly(ctx, gvk); err != nil { + logger.Error(err, "failed to complete partial registration (mirror controller)", + "gvk", gvkStr, + "currentState", state.String(), + ) + continue + } + + d.registrationState[gvkStr] = StateFullyRegistered + logger.Info("completed partial registration", + "group", gvk.Group, + "version", gvk.Version, + "kind", gvk.Kind, + ) + + case StateNotRegistered: + // New registration - register both controllers + newState, err := d.registerController(ctx, gvk) + if err != nil { + logger.Error(err, "failed to register controller", + "gvk", gvkStr, + "achievedState", newState.String(), + ) + // Save partial state if source was registered + if newState == StateSourceOnly { + d.registrationState[gvkStr] = newState + d.activeResourceTypes[gvkStr] = gvk + logger.Info("partial registration - source controller only", + "group", gvk.Group, + "version", gvk.Version, + "kind", gvk.Kind, + ) + } + continue + } + + d.registrationState[gvkStr] = StateFullyRegistered + d.activeResourceTypes[gvkStr] = gvk + newlyRegistered++ + + logger.Info("registered controller for active resource type", + "group", gvk.Group, + "version", gvk.Version, + "kind", gvk.Kind, ) - continue } + } - d.registeredControllers[gvkStr] = true - d.activeResourceTypes[gvkStr] = gvk - newlyRegistered++ - - logger.Info("registered controller for active resource type", - "group", gvk.Group, - "version", gvk.Version, - "kind", gvk.Kind, - ) + // Count fully registered controllers + fullyRegistered := 0 + for _, state := range d.registrationState { + if state == StateFullyRegistered { + fullyRegistered++ + } } logger.Info("scan completed", "activeResourceTypes", len(activeTypes), "alreadyRegistered", alreadyRegistered, "newlyRegistered", newlyRegistered, - "totalRegistered", len(d.registeredControllers), + "partialRetried", partialRetried, + "fullyRegistered", fullyRegistered, ) return nil } +// getReader returns the appropriate reader based on whether the manager has started. +// Before manager starts, we must use the API reader (direct API calls). +// After manager starts, we can use the cached client for better performance. +func (d *DynamicControllerManager) getReader() client.Reader { + d.mu.RLock() + defer d.mu.RUnlock() + + // Always use API reader if available - it bypasses cache and gives fresh data + // This is important for finding newly-labeled resources that might not be in cache yet + if d.apiReader != nil { + return d.apiReader + } + return d.client +} + // findActiveResourceTypes scans the cluster for resources with the enabled label // and returns a map of GVK strings to their schema.GroupVersionKind func (d *DynamicControllerManager) findActiveResourceTypes(ctx context.Context) (map[string]schema.GroupVersionKind, error) { logger := log.FromContext(ctx).WithName("dynamic-controller-manager") activeTypes := make(map[string]schema.GroupVersionKind) + reader := d.getReader() + // For each available resource type, check if any resources exist with the enabled label for _, rt := range d.availableResourceTypes { gvk := rt.GroupVersionKind() @@ -204,7 +308,7 @@ func (d *DynamicControllerManager) findActiveResourceTypes(ctx context.Context) }, } - if err := d.client.List(ctx, list, opts...); err != nil { + if err := reader.List(ctx, list, opts...); err != nil { // Ignore errors for resource types that don't exist or we can't access logger.V(2).Info("failed to list resources (ignoring)", "gvk", gvkStr, @@ -226,8 +330,10 @@ func (d *DynamicControllerManager) findActiveResourceTypes(ctx context.Context) return activeTypes, nil } -// registerController registers source and mirror controllers for a GVK -func (d *DynamicControllerManager) registerController(ctx context.Context, gvk schema.GroupVersionKind) error { +// registerController registers source and mirror controllers for a GVK. +// Returns the achieved registration state and any error. +// If source registration succeeds but mirror fails, returns StateSourceOnly to allow retry. +func (d *DynamicControllerManager) registerController(ctx context.Context, gvk schema.GroupVersionKind) (RegistrationState, error) { logger := log.FromContext(ctx).WithName("dynamic-controller-manager") // Create source reconciler using factory @@ -235,9 +341,37 @@ func (d *DynamicControllerManager) registerController(ctx context.Context, gvk s // Register source controller if err := sourceReconciler.SetupWithManagerForResourceType(d.mgr, gvk); err != nil { - return fmt.Errorf("failed to register source controller: %w", err) + return StateNotRegistered, fmt.Errorf("failed to register source controller: %w", err) } + // Source registered successfully, now try mirror + logger.V(1).Info("source controller registered", + "group", gvk.Group, + "version", gvk.Version, + "kind", gvk.Kind, + ) + + // Create mirror reconciler using factory + mirrorReconciler := d.mirrorReconcilerFactory(gvk) + + // Register mirror controller + if err := mirrorReconciler.SetupWithManager(d.mgr, gvk); err != nil { + // Source is registered but mirror failed - return partial state + return StateSourceOnly, fmt.Errorf("source registered but mirror failed: %w", err) + } + + logger.Info("registered both controllers", + "group", gvk.Group, + "version", gvk.Version, + "kind", gvk.Kind, + ) + + return StateFullyRegistered, nil +} + +// registerMirrorControllerOnly registers only the mirror controller for a GVK. +// Used to complete partial registrations where source was registered but mirror failed. +func (d *DynamicControllerManager) registerMirrorControllerOnly(ctx context.Context, gvk schema.GroupVersionKind) error { // Create mirror reconciler using factory mirrorReconciler := d.mirrorReconcilerFactory(gvk) @@ -246,11 +380,53 @@ func (d *DynamicControllerManager) registerController(ctx context.Context, gvk s return fmt.Errorf("failed to register mirror controller: %w", err) } - logger.Info("registered controllers", - "group", gvk.Group, - "version", gvk.Version, - "kind", gvk.Kind, - ) - return nil } + +// GetRegisteredCount returns the number of fully registered controllers +func (d *DynamicControllerManager) GetRegisteredCount() int { + d.mu.RLock() + defer d.mu.RUnlock() + count := 0 + for _, state := range d.registrationState { + if state == StateFullyRegistered { + count++ + } + } + return count +} + +// GetRegistrationState returns the registration state for a specific GVK +func (d *DynamicControllerManager) GetRegistrationState(gvkStr string) RegistrationState { + d.mu.RLock() + defer d.mu.RUnlock() + return d.registrationState[gvkStr] +} + +// GetRegistrationStats returns counts of controllers in each state +func (d *DynamicControllerManager) GetRegistrationStats() (fullyRegistered, sourceOnly, notRegistered int) { + d.mu.RLock() + defer d.mu.RUnlock() + for _, state := range d.registrationState { + switch state { + case StateFullyRegistered: + fullyRegistered++ + case StateSourceOnly: + sourceOnly++ + default: + notRegistered++ + } + } + return +} + +// GetActiveResourceTypes returns a copy of the active resource types map +func (d *DynamicControllerManager) GetActiveResourceTypes() map[string]schema.GroupVersionKind { + d.mu.RLock() + defer d.mu.RUnlock() + result := make(map[string]schema.GroupVersionKind, len(d.activeResourceTypes)) + for k, v := range d.activeResourceTypes { + result[k] = v + } + return result +} diff --git a/pkg/controller/dynamic_manager_test.go b/pkg/controller/dynamic_manager_test.go index bae1788..12bae4d 100644 --- a/pkg/controller/dynamic_manager_test.go +++ b/pkg/controller/dynamic_manager_test.go @@ -21,11 +21,17 @@ import ( // These are intentionally not exported methods on DynamicControllerManager // to avoid exposing them in production code -// getRegisteredCount returns the number of currently registered controllers (test helper) +// getRegisteredCount returns the number of fully registered controllers (test helper) func getRegisteredCount(d *DynamicControllerManager) int { d.mu.RLock() defer d.mu.RUnlock() - return len(d.registeredControllers) + count := 0 + for _, state := range d.registrationState { + if state == StateFullyRegistered { + count++ + } + } + return count } // getActiveResourceTypes returns the currently active resource types (test helper) @@ -49,8 +55,8 @@ func TestDynamicControllerManager_FindActiveResourceTypes(t *testing.T) { name string availableResources []config.ResourceType existingResources []*unstructured.Unstructured - expectedActiveCount int expectedActiveTypes []string + expectedActiveCount int }{ { name: "no resources marked for mirroring", @@ -242,9 +248,9 @@ func TestDynamicControllerManager_FindActiveResourceTypes(t *testing.T) { func TestDynamicControllerManager_GetRegisteredCount(t *testing.T) { mgr := &DynamicControllerManager{ - registeredControllers: map[string]bool{ - "Secret.v1.": true, - "ConfigMap.v1.": true, + registrationState: map[string]RegistrationState{ + "Secret.v1.": StateFullyRegistered, + "ConfigMap.v1.": StateFullyRegistered, }, } @@ -252,6 +258,19 @@ func TestDynamicControllerManager_GetRegisteredCount(t *testing.T) { assert.Equal(t, 2, count) } +func TestDynamicControllerManager_GetRegisteredCount_PartialStates(t *testing.T) { + mgr := &DynamicControllerManager{ + registrationState: map[string]RegistrationState{ + "Secret.v1.": StateFullyRegistered, + "ConfigMap.v1.": StateSourceOnly, // Partial - shouldn't count + "Deployment.v1.": StateNotRegistered, // Not registered - shouldn't count + }, + } + + count := getRegisteredCount(mgr) + assert.Equal(t, 1, count, "only fully registered controllers should be counted") +} + func TestDynamicControllerManager_GetActiveResourceTypes(t *testing.T) { secretGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"} configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} @@ -319,22 +338,22 @@ func TestDynamicControllerManager_ScanInterval(t *testing.T) { func TestDynamicControllerManager_RegistrationTracking(t *testing.T) { // Test that registration tracking works correctly mgr := &DynamicControllerManager{ - registeredControllers: make(map[string]bool), - activeResourceTypes: make(map[string]schema.GroupVersionKind), + registrationState: make(map[string]RegistrationState), + activeResourceTypes: make(map[string]schema.GroupVersionKind), } gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"} gvkStr := "Secret.v1." // Initially not registered - assert.False(t, mgr.registeredControllers[gvkStr]) + assert.Equal(t, StateNotRegistered, mgr.registrationState[gvkStr]) assert.Equal(t, 0, getRegisteredCount(mgr)) - // Mark as registered - mgr.registeredControllers[gvkStr] = true + // Mark as fully registered + mgr.registrationState[gvkStr] = StateFullyRegistered mgr.activeResourceTypes[gvkStr] = gvk - assert.True(t, mgr.registeredControllers[gvkStr]) + assert.Equal(t, StateFullyRegistered, mgr.registrationState[gvkStr]) assert.Equal(t, 1, getRegisteredCount(mgr)) activeTypes := getActiveResourceTypes(mgr) @@ -342,11 +361,87 @@ func TestDynamicControllerManager_RegistrationTracking(t *testing.T) { assert.Equal(t, gvk, activeTypes[0]) } +func TestDynamicControllerManager_PartialRegistration(t *testing.T) { + // Test that partial registration (source only) is tracked correctly + mgr := &DynamicControllerManager{ + registrationState: make(map[string]RegistrationState), + activeResourceTypes: make(map[string]schema.GroupVersionKind), + } + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"} + gvkStr := "Secret.v1." + + // Mark as partially registered (source only) + mgr.registrationState[gvkStr] = StateSourceOnly + mgr.activeResourceTypes[gvkStr] = gvk + + // Should not count as registered + assert.Equal(t, StateSourceOnly, mgr.registrationState[gvkStr]) + assert.Equal(t, 0, getRegisteredCount(mgr), "partial registration should not count as fully registered") + + // But should be in active resource types + activeTypes := getActiveResourceTypes(mgr) + assert.Equal(t, 1, len(activeTypes)) + + // Complete the registration + mgr.registrationState[gvkStr] = StateFullyRegistered + assert.Equal(t, 1, getRegisteredCount(mgr), "should now count as fully registered") +} + +func TestDynamicControllerManager_GetRegistrationStats(t *testing.T) { + mgr := &DynamicControllerManager{ + registrationState: map[string]RegistrationState{ + "Secret.v1.": StateFullyRegistered, + "ConfigMap.v1.": StateFullyRegistered, + "Deployment.v1.": StateSourceOnly, + "Service.v1.": StateSourceOnly, + "Ingress.v1.": StateNotRegistered, + }, + } + + fullyReg, sourceOnly, notReg := mgr.GetRegistrationStats() + + assert.Equal(t, 2, fullyReg, "should have 2 fully registered") + assert.Equal(t, 2, sourceOnly, "should have 2 source-only") + assert.Equal(t, 1, notReg, "should have 1 not registered") +} + +func TestDynamicControllerManager_GetRegistrationState(t *testing.T) { + mgr := &DynamicControllerManager{ + registrationState: map[string]RegistrationState{ + "Secret.v1.": StateFullyRegistered, + "ConfigMap.v1.": StateSourceOnly, + }, + } + + assert.Equal(t, StateFullyRegistered, mgr.GetRegistrationState("Secret.v1.")) + assert.Equal(t, StateSourceOnly, mgr.GetRegistrationState("ConfigMap.v1.")) + assert.Equal(t, StateNotRegistered, mgr.GetRegistrationState("Unknown.v1."), "unknown GVK should be not registered") +} + +func TestRegistrationState_String(t *testing.T) { + tests := []struct { + expected string + state RegistrationState + }{ + {"not-registered", StateNotRegistered}, + {"source-only", StateSourceOnly}, + {"fully-registered", StateFullyRegistered}, + {"unknown", RegistrationState(99)}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.state.String()) + }) + } +} + // TestDynamicControllerManager_ConcurrentAccess tests thread-safety func TestDynamicControllerManager_ConcurrentAccess(t *testing.T) { mgr := &DynamicControllerManager{ - registeredControllers: make(map[string]bool), - activeResourceTypes: make(map[string]schema.GroupVersionKind), + registrationState: make(map[string]RegistrationState), + activeResourceTypes: make(map[string]schema.GroupVersionKind), } // Simulate concurrent reads and writes @@ -356,7 +451,7 @@ func TestDynamicControllerManager_ConcurrentAccess(t *testing.T) { go func() { for i := 0; i < 100; i++ { mgr.mu.Lock() - mgr.registeredControllers["test"] = true + mgr.registrationState["test"] = StateFullyRegistered mgr.mu.Unlock() time.Sleep(1 * time.Millisecond) } @@ -381,7 +476,7 @@ func TestDynamicControllerManager_ConcurrentAccess(t *testing.T) { } // Should not panic and should have final state - assert.True(t, mgr.registeredControllers["test"]) + assert.Equal(t, StateFullyRegistered, mgr.registrationState["test"]) } func TestDynamicControllerManager_UnstructuredResourceHandling(t *testing.T) { diff --git a/pkg/controller/mirror.go b/pkg/controller/mirror.go index 58027b1..d3b2751 100644 --- a/pkg/controller/mirror.go +++ b/pkg/controller/mirror.go @@ -160,8 +160,17 @@ func createUnstructuredMirror(source runtime.Object, targetNamespace, sourceHash } // buildMirrorAnnotations builds the ownership annotations for a mirror resource. +// Returns empty map if source doesn't implement metav1.Object. func buildMirrorAnnotations(source runtime.Object, sourceHash string) map[string]string { - sourceObj, _ := source.(metav1.Object) + sourceObj, ok := source.(metav1.Object) + if !ok { + // This should never happen for valid Kubernetes resources. + // Return minimal annotations with just the hash. + return map[string]string{ + constants.AnnotationSourceContentHash: sourceHash, + constants.AnnotationLastSyncTime: time.Now().UTC().Format(time.RFC3339), + } + } annotations := map[string]string{ constants.AnnotationSourceNamespace: sourceObj.GetNamespace(), @@ -196,24 +205,34 @@ func UpdateMirror(mirror, source runtime.Object) error { // Update based on type switch m := mirror.(type) { case *corev1.Secret: - src := source.(*corev1.Secret) + src, ok := source.(*corev1.Secret) + if !ok { + return fmt.Errorf("mirror is Secret but source is %T", source) + } m.Data = src.Data m.Type = src.Type updateMirrorAnnotations(m, source, sourceHash) case *corev1.ConfigMap: - src := source.(*corev1.ConfigMap) + src, ok := source.(*corev1.ConfigMap) + if !ok { + return fmt.Errorf("mirror is ConfigMap but source is %T", source) + } m.Data = src.Data m.BinaryData = src.BinaryData updateMirrorAnnotations(m, source, sourceHash) default: // Unstructured - if err := updateUnstructuredMirror(mirror, source, sourceHash); err != nil { + err = updateUnstructuredMirror(mirror, source, sourceHash) + if err != nil { return err } } // Apply transformations after updating data (only if transformation rules exist) - mirrorObj, _ := mirror.(metav1.Object) + mirrorObj, ok := mirror.(metav1.Object) + if !ok { + return fmt.Errorf("mirror does not implement metav1.Object, got %T", mirror) + } targetNamespace := mirrorObj.GetNamespace() transformed, err := applyTransformations(source, mirror, targetNamespace) if err != nil { @@ -280,8 +299,6 @@ func convertToByteMap(data map[string]interface{}) map[string][]byte { // updateMirrorAnnotations updates the ownership annotations on a mirror. func updateMirrorAnnotations(mirror metav1.Object, source runtime.Object, sourceHash string) { - sourceObj, _ := source.(metav1.Object) - annotations := mirror.GetAnnotations() if annotations == nil { annotations = make(map[string]string) @@ -290,12 +307,16 @@ func updateMirrorAnnotations(mirror metav1.Object, source runtime.Object, source annotations[constants.AnnotationSourceContentHash] = sourceHash annotations[constants.AnnotationLastSyncTime] = time.Now().UTC().Format(time.RFC3339) - if sourceObj.GetGeneration() > 0 { - annotations[constants.AnnotationSourceGeneration] = fmt.Sprintf("%d", sourceObj.GetGeneration()) - } + // Safely extract source metadata if available + sourceObj, ok := source.(metav1.Object) + if ok { + if sourceObj.GetGeneration() > 0 { + annotations[constants.AnnotationSourceGeneration] = fmt.Sprintf("%d", sourceObj.GetGeneration()) + } - if sourceObj.GetResourceVersion() != "" { - annotations[constants.AnnotationSourceResourceVersion] = sourceObj.GetResourceVersion() + if sourceObj.GetResourceVersion() != "" { + annotations[constants.AnnotationSourceResourceVersion] = sourceObj.GetResourceVersion() + } } mirror.SetAnnotations(annotations) @@ -304,8 +325,14 @@ func updateMirrorAnnotations(mirror metav1.Object, source runtime.Object, source // updateUnstructuredMirror updates an unstructured mirror. // Uses generic field introspection to handle any resource type (Secrets, ConfigMaps, CRDs). func updateUnstructuredMirror(mirror, source runtime.Object, sourceHash string) error { - m := mirror.(*unstructured.Unstructured) - s := source.(*unstructured.Unstructured) + m, ok := mirror.(*unstructured.Unstructured) + if !ok { + return fmt.Errorf("mirror is not *unstructured.Unstructured, got %T", mirror) + } + s, ok := source.(*unstructured.Unstructured) + if !ok { + return fmt.Errorf("source is not *unstructured.Unstructured, got %T", source) + } // Fields to skip (Kubernetes-managed fields, not user content) // These are managed by Kubernetes API server or controllers @@ -416,6 +443,16 @@ func applyTransformations(source, mirror runtime.Object, targetNamespace string) return mirror, nil } + // Save original annotations to restore on failure + originalAnnotations := mirrorObj.GetAnnotations() + var savedAnnotations map[string]string + if originalAnnotations != nil { + savedAnnotations = make(map[string]string, len(originalAnnotations)) + for k, v := range originalAnnotations { + savedAnnotations[k] = v + } + } + mirrorAnnotations := mirrorObj.GetAnnotations() if mirrorAnnotations == nil { mirrorAnnotations = make(map[string]string) @@ -437,6 +474,8 @@ func applyTransformations(source, mirror runtime.Object, targetNamespace string) // Apply transformations (transformer reads rules from mirror's annotations now) transformed, err := t.Transform(mirror, ctx) if err != nil { + // Restore original annotations on failure to avoid leaving mirror in inconsistent state + mirrorObj.SetAnnotations(savedAnnotations) return nil, err } diff --git a/pkg/controller/mirror_reconciler.go b/pkg/controller/mirror_reconciler.go index ed0e75b..7693300 100644 --- a/pkg/controller/mirror_reconciler.go +++ b/pkg/controller/mirror_reconciler.go @@ -24,7 +24,13 @@ type MirrorReconciler struct { // Reconcile checks if a mirrored resource's source still exists, and deletes the mirror if orphaned. func (r *MirrorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).WithValues( + "mirrorNamespace", req.Namespace, + "mirrorName", req.Name, + "kind", r.GVK.Kind, + "group", r.GVK.Group, + "version", r.GVK.Version, + ) // Fetch the mirror resource mirror := &unstructured.Unstructured{} @@ -73,9 +79,10 @@ func (r *MirrorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr "sourceName", sourceName, "sourceUID", sourceUID) - if err := r.Delete(ctx, mirror); err != nil { - logger.Error(err, "failed to delete orphaned mirror") - return ctrl.Result{}, err + deleteErr := r.Delete(ctx, mirror) + if deleteErr != nil { + logger.Error(deleteErr, "failed to delete orphaned mirror") + return ctrl.Result{}, deleteErr } logger.Info("orphaned mirror deleted successfully", @@ -91,6 +98,16 @@ func (r *MirrorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } + // Check if source is being deleted - if so, let the SourceReconciler handle cleanup + // This prevents race conditions where both reconcilers try to delete mirrors + if !source.GetDeletionTimestamp().IsZero() { + logger.V(1).Info("source is being deleted, skipping mirror check (SourceReconciler will handle cleanup)", + "mirror", req.NamespacedName, + "sourceNamespace", sourceNs, + "sourceName", sourceName) + return ctrl.Result{}, nil + } + // Source exists - verify UID matches actualUID := string(source.GetUID()) if actualUID != sourceUID { diff --git a/pkg/controller/namespace_lister.go b/pkg/controller/namespace_lister.go index 5cd7485..c4bb980 100644 --- a/pkg/controller/namespace_lister.go +++ b/pkg/controller/namespace_lister.go @@ -13,6 +13,10 @@ import ( // KubernetesNamespaceLister implements NamespaceLister using the Kubernetes API. type KubernetesNamespaceLister struct { client client.Client + // apiReader provides direct API access bypassing cache (optional). + // When set, it's used for label-based queries where cache staleness + // can cause missed namespaces after label changes. + apiReader client.Reader } // NewKubernetesNamespaceLister creates a new KubernetesNamespaceLister. @@ -22,6 +26,25 @@ func NewKubernetesNamespaceLister(client client.Client) *KubernetesNamespaceList } } +// NewKubernetesNamespaceListerWithAPIReader creates a KubernetesNamespaceLister +// that uses direct API reads for label-based queries. This is more expensive +// but ensures fresh data for critical queries like allow-mirrors label lookups. +func NewKubernetesNamespaceListerWithAPIReader(c client.Client, apiReader client.Reader) *KubernetesNamespaceLister { + return &KubernetesNamespaceLister{ + client: c, + apiReader: apiReader, + } +} + +// getReader returns the appropriate reader to use. +// Returns apiReader if available (for fresh reads), otherwise falls back to cached client. +func (k *KubernetesNamespaceLister) getReader() client.Reader { + if k.apiReader != nil { + return k.apiReader + } + return k.client +} + // ListNamespaces returns all namespace names in the cluster. func (k *KubernetesNamespaceLister) ListNamespaces(ctx context.Context) ([]string, error) { namespaceList := &corev1.NamespaceList{} @@ -38,11 +61,15 @@ func (k *KubernetesNamespaceLister) ListNamespaces(ctx context.Context) ([]strin } // ListAllowMirrorsNamespaces returns namespaces that have the allow-mirrors label. +// Uses direct API reads if apiReader is configured to avoid cache staleness issues. func (k *KubernetesNamespaceLister) ListAllowMirrorsNamespaces(ctx context.Context) ([]string, error) { namespaceList := &corev1.NamespaceList{} - // List namespaces with the allow-mirrors label - if err := k.client.List(ctx, namespaceList, client.MatchingLabels{ + // Use direct API reader for label queries to ensure fresh data. + // This is critical because cache staleness can cause namespaces with + // newly added allow-mirrors labels to be missed. + reader := k.getReader() + if err := reader.List(ctx, namespaceList, client.MatchingLabels{ constants.LabelAllowMirrors: "true", }); err != nil { return nil, err @@ -58,11 +85,13 @@ func (k *KubernetesNamespaceLister) ListAllowMirrorsNamespaces(ctx context.Conte // ListOptOutNamespaces returns namespaces that have explicitly opted out of mirrors. // These are namespaces with allow-mirrors="false". +// Uses direct API reads if apiReader is configured to avoid cache staleness issues. func (k *KubernetesNamespaceLister) ListOptOutNamespaces(ctx context.Context) ([]string, error) { namespaceList := &corev1.NamespaceList{} - // List namespaces with allow-mirrors label set to false - if err := k.client.List(ctx, namespaceList, client.MatchingLabels{ + // Use direct API reader for label queries to ensure fresh data. + reader := k.getReader() + if err := reader.List(ctx, namespaceList, client.MatchingLabels{ constants.LabelAllowMirrors: "false", }); err != nil { return nil, err @@ -75,3 +104,51 @@ func (k *KubernetesNamespaceLister) ListOptOutNamespaces(ctx context.Context) ([ return names, nil } + +// NamespaceInfo contains categorized namespace information from a single API call. +// This is more efficient than making 3 separate API calls. +type NamespaceInfo struct { + // All contains all namespace names in the cluster + All []string + // AllowMirrors contains namespaces with allow-mirrors="true" label + AllowMirrors []string + // OptOut contains namespaces with allow-mirrors="false" label + OptOut []string +} + +// ListNamespacesWithLabels returns all namespaces categorized by their allow-mirrors label +// in a single API call. This is more efficient than calling ListNamespaces, +// ListAllowMirrorsNamespaces, and ListOptOutNamespaces separately. +// Uses direct API reads if apiReader is configured to ensure fresh data. +func (k *KubernetesNamespaceLister) ListNamespacesWithLabels(ctx context.Context) (*NamespaceInfo, error) { + namespaceList := &corev1.NamespaceList{} + + // Use direct API reader if available for fresh data + reader := k.getReader() + if err := reader.List(ctx, namespaceList); err != nil { + return nil, err + } + + info := &NamespaceInfo{ + All: make([]string, 0, len(namespaceList.Items)), + AllowMirrors: make([]string, 0), + OptOut: make([]string, 0), + } + + for _, ns := range namespaceList.Items { + info.All = append(info.All, ns.Name) + + // Check allow-mirrors label value + if ns.Labels != nil { + labelValue := ns.Labels[constants.LabelAllowMirrors] + switch labelValue { + case "true": + info.AllowMirrors = append(info.AllowMirrors, ns.Name) + case "false": + info.OptOut = append(info.OptOut, ns.Name) + } + } + } + + return info, nil +} diff --git a/pkg/controller/namespace_reconciler.go b/pkg/controller/namespace_reconciler.go index 23c9eb0..bf5adc2 100644 --- a/pkg/controller/namespace_reconciler.go +++ b/pkg/controller/namespace_reconciler.go @@ -4,6 +4,8 @@ package controller import ( "context" "fmt" + "slices" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -20,21 +22,31 @@ import ( "github.com/lukaszraczylo/kubemirror/pkg/filter" ) +const ( + // cacheSettleDelay is the time to wait after namespace label changes + // to allow informer caches to sync. This addresses the race condition + // where namespace watch events fire before the cache is updated. + cacheSettleDelay = 3 * time.Second +) + // NamespaceReconciler watches for namespace CREATE and UPDATE events // and triggers reconciliation of source resources that match the new namespace. type NamespaceReconciler struct { client.Client + NamespaceLister NamespaceLister + APIReader client.Reader Scheme *runtime.Scheme Config *config.Config Filter *filter.NamespaceFilter - NamespaceLister NamespaceLister - // ResourceTypes contains all discovered resource types to reconcile - ResourceTypes []config.ResourceType + ResourceTypes []config.ResourceType } // Reconcile processes namespace events and creates mirrors for matching sources. func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithValues("namespace", req.Name) + logger := log.FromContext(ctx).WithValues( + "namespace", req.Name, + "reconciler", "namespace", + ) // Fetch the namespace namespace := &corev1.Namespace{} @@ -76,7 +88,11 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, fmt.Errorf("failed to reconcile %d source resources", totalErrors) } - return ctrl.Result{}, nil + // Requeue with delay to catch any updates missed due to cache staleness. + // This is particularly important for namespace label changes where the + // informer cache may not yet reflect the new label state. The delay allows + // the cache to settle and ensures all relevant source resources are reconciled. + return ctrl.Result{RequeueAfter: cacheSettleDelay}, nil } // reconcileResourceType finds and reconciles all sources of a specific resource type @@ -125,13 +141,7 @@ func (r *NamespaceReconciler) reconcileResourceType(ctx context.Context, rt conf } // Check if the new namespace matches this source's targets - var isTarget bool - for _, target := range targetNamespaces { - if target == namespaceName { - isTarget = true - break - } - } + isTarget := slices.Contains(targetNamespaces, namespaceName) if isTarget { // Create or update mirror in the namespace @@ -222,30 +232,47 @@ func (r *NamespaceReconciler) resolveTargetNamespaces(ctx context.Context, sourc return nil, nil } - // Get all namespaces - allNamespaces, err := r.NamespaceLister.ListNamespaces(ctx) + // Validate patterns and log warnings for invalid ones + validationResults, allValid := filter.ValidatePatterns(patterns) + if !allValid { + logger := log.FromContext(ctx) + invalidPatterns := filter.InvalidPatterns(validationResults) + for _, invalid := range invalidPatterns { + logger.Info("invalid glob pattern in target-namespaces annotation, pattern will be skipped", + "pattern", invalid.Pattern, + "error", invalid.Error.Error(), + "source", source.GetName(), + "namespace", source.GetNamespace(), + ) + } + + // Filter to only valid patterns + var validPatterns []string + for _, result := range validationResults { + if result.Valid { + validPatterns = append(validPatterns, result.Pattern) + } + } + patterns = validPatterns + + // If no valid patterns remain, return empty + if len(patterns) == 0 { + return nil, nil + } + } + + // Get all namespace info in a single API call (more efficient than 3 separate calls) + nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx) if err != nil { return nil, fmt.Errorf("failed to list namespaces: %w", err) } - // Get namespaces with allow-mirrors label - allowMirrorsNamespaces, err := r.NamespaceLister.ListAllowMirrorsNamespaces(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list allow-mirrors namespaces: %w", err) - } - - // Get namespaces that have explicitly opted out (allow-mirrors="false") - optOutNamespaces, err := r.NamespaceLister.ListOptOutNamespaces(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list opt-out namespaces: %w", err) - } - - // Resolve target namespaces + // Resolve target namespaces using the pre-categorized namespace info targetNamespaces := filter.ResolveTargetNamespaces( patterns, - allNamespaces, - allowMirrorsNamespaces, - optOutNamespaces, + nsInfo.All, + nsInfo.AllowMirrors, + nsInfo.OptOut, source.GetNamespace(), r.Filter, ) @@ -292,8 +319,18 @@ func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { } // Check if allow-mirrors label changed - oldLabel := oldNs.Labels[constants.LabelAllowMirrors] - newLabel := newNs.Labels[constants.LabelAllowMirrors] + // Use GetLabels() to safely handle nil labels map + oldLabels := oldNs.GetLabels() + newLabels := newNs.GetLabels() + + // Get label values with nil-safe access + var oldLabel, newLabel string + if oldLabels != nil { + oldLabel = oldLabels[constants.LabelAllowMirrors] + } + if newLabels != nil { + newLabel = newLabels[constants.LabelAllowMirrors] + } return oldLabel != newLabel }, diff --git a/pkg/controller/namespace_reconciler_test.go b/pkg/controller/namespace_reconciler_test.go index ef0d842..db7c7d6 100644 --- a/pkg/controller/namespace_reconciler_test.go +++ b/pkg/controller/namespace_reconciler_test.go @@ -282,9 +282,9 @@ func makeUnstructuredMirror(name, namespace, sourceNs, sourceName string) *unstr // Mock namespace lister for testing type mockNamespaceLister struct { - namespaces []string allowMirrors map[string]bool optOut map[string]bool + namespaces []string } func (m *mockNamespaceLister) ListNamespaces(ctx context.Context) ([]string, error) { @@ -310,3 +310,25 @@ func (m *mockNamespaceLister) ListOptOutNamespaces(ctx context.Context) ([]strin } return result, nil } + +func (m *mockNamespaceLister) ListNamespacesWithLabels(ctx context.Context) (*NamespaceInfo, error) { + info := &NamespaceInfo{ + All: m.namespaces, + AllowMirrors: make([]string, 0), + OptOut: make([]string, 0), + } + + for ns, allowed := range m.allowMirrors { + if allowed { + info.AllowMirrors = append(info.AllowMirrors, ns) + } + } + + for ns, optedOut := range m.optOut { + if optedOut { + info.OptOut = append(info.OptOut, ns) + } + } + + return info, nil +} diff --git a/pkg/controller/source_reconciler.go b/pkg/controller/source_reconciler.go index 54ddc72..4464062 100644 --- a/pkg/controller/source_reconciler.go +++ b/pkg/controller/source_reconciler.go @@ -4,6 +4,8 @@ package controller import ( "context" "fmt" + "slices" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -21,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/lukaszraczylo/kubemirror/pkg/circuitbreaker" "github.com/lukaszraczylo/kubemirror/pkg/config" "github.com/lukaszraczylo/kubemirror/pkg/constants" "github.com/lukaszraczylo/kubemirror/pkg/filter" @@ -30,12 +33,13 @@ import ( // SourceReconciler reconciles source resources that need mirroring. type SourceReconciler struct { client.Client + NamespaceLister NamespaceLister + APIReader client.Reader Scheme *runtime.Scheme Config *config.Config Filter *filter.NamespaceFilter - NamespaceLister NamespaceLister - GVK schema.GroupVersionKind // The resource type this reconciler handles - APIReader client.Reader // Direct API reader (bypasses cache) + CircuitBreaker *circuitbreaker.CircuitBreaker + GVK schema.GroupVersionKind } // NamespaceLister provides a list of all namespaces in the cluster. @@ -44,6 +48,8 @@ type NamespaceLister interface { ListNamespaces(ctx context.Context) ([]string, error) ListAllowMirrorsNamespaces(ctx context.Context) ([]string, error) ListOptOutNamespaces(ctx context.Context) ([]string, error) + // ListNamespacesWithLabels returns all namespace info in a single API call (preferred) + ListNamespacesWithLabels(ctx context.Context) (*NamespaceInfo, error) } // +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete @@ -115,7 +121,13 @@ func (r *SourceReconciler) getSourceWithFreshness(ctx context.Context, key clien // Reconcile processes a single source resource. func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) + logger := log.FromContext(ctx).WithValues( + "namespace", req.Namespace, + "name", req.Name, + "kind", r.GVK.Kind, + "group", r.GVK.Group, + "version", r.GVK.Version, + ) // Fetch the source resource with optional freshness verification source, err := r.getSourceWithFreshness(ctx, req.NamespacedName, r.GVK) @@ -136,24 +148,40 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, nil } + // Check circuit breaker - skip if circuit is open (too many failures) + if r.CircuitBreaker != nil { + if !r.CircuitBreaker.AllowRequest(req.Namespace, req.Name, r.GVK.Kind) { + cbState := r.CircuitBreaker.GetState(req.Namespace, req.Name, r.GVK.Kind) + failCount := r.CircuitBreaker.GetFailureCount(req.Namespace, req.Name, r.GVK.Kind) + logger.Info("circuit breaker open, skipping reconciliation", + "state", cbState.String(), + "consecutiveFailures", failCount, + "lastError", r.CircuitBreaker.GetLastError(req.Namespace, req.Name, r.GVK.Kind)) + // Requeue after circuit breaker reset timeout to try again + return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil + } + } + // Check if resource is enabled for mirroring // Check if resource is being deleted if !sourceObj.GetDeletionTimestamp().IsZero() { // Resource is being deleted - clean up mirrors and remove finalizer - if containsString(sourceObj.GetFinalizers(), constants.FinalizerName) { + if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) { logger.Info("source being deleted, cleaning up all mirrors") - if err := r.deleteAllMirrors(ctx, sourceObj); err != nil { - logger.Error(err, "failed to delete all mirrors during source deletion") - return ctrl.Result{}, err + deleteErr := r.deleteAllMirrors(ctx, sourceObj) + if deleteErr != nil { + logger.Error(deleteErr, "failed to delete all mirrors during source deletion") + return ctrl.Result{}, deleteErr } // Remove finalizer to allow resource deletion logger.Info("removing finalizer from source resource") finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName) sourceObj.SetFinalizers(finalizers) - if err := r.Update(ctx, source); err != nil { - logger.Error(err, "failed to remove finalizer") - return ctrl.Result{}, err + updateErr := r.Update(ctx, source) + if updateErr != nil { + logger.Error(updateErr, "failed to remove finalizer") + return ctrl.Result{}, updateErr } logger.Info("finalizer removed, resource can now be deleted") } @@ -162,7 +190,7 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if !isEnabledForMirroring(sourceObj) { // Resource is disabled - remove finalizer if present and delete all mirrors - if containsString(sourceObj.GetFinalizers(), constants.FinalizerName) { + if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) { return r.handleDisabled(ctx, sourceObj) } // No finalizer, just skip @@ -170,13 +198,14 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } // Add finalizer if not present - if !containsString(sourceObj.GetFinalizers(), constants.FinalizerName) { + if !slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) { logger.Info("adding finalizer to source resource") finalizers := append(sourceObj.GetFinalizers(), constants.FinalizerName) sourceObj.SetFinalizers(finalizers) - if err := r.Update(ctx, source); err != nil { - logger.Error(err, "failed to add finalizer") - return ctrl.Result{}, err + addFinalizerErr := r.Update(ctx, source) + if addFinalizerErr != nil { + logger.Error(addFinalizerErr, "failed to add finalizer") + return ctrl.Result{}, addFinalizerErr } logger.Info("finalizer added") // Requeue to continue with reconciliation after finalizer is added @@ -187,6 +216,9 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr targetNamespaces, err := r.resolveTargetNamespaces(ctx, sourceObj) if err != nil { logger.Error(err, "failed to resolve target namespaces") + if r.CircuitBreaker != nil { + r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err) + } return ctrl.Result{}, err } @@ -200,8 +232,9 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Reconcile each target namespace var reconciledCount, errorCount int for _, targetNs := range targetNamespaces { - if err := r.reconcileMirror(ctx, source, sourceObj, targetNs); err != nil { - logger.Error(err, "failed to reconcile mirror", "targetNamespace", targetNs) + reconcileErr := r.reconcileMirror(ctx, source, sourceObj, targetNs) + if reconcileErr != nil { + logger.Error(reconcileErr, "failed to reconcile mirror", "targetNamespace", targetNs) errorCount++ } else { reconciledCount++ @@ -220,6 +253,9 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Update status annotation with last sync info if err := r.updateLastSyncStatus(ctx, source, sourceObj, reconciledCount, errorCount); err != nil { logger.Error(err, "failed to update sync status") + if r.CircuitBreaker != nil { + r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err) + } return ctrl.Result{}, err } @@ -230,7 +266,22 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Return error if there were errors (controller-runtime will automatically requeue with exponential backoff) if errorCount > 0 { - return ctrl.Result{}, fmt.Errorf("failed to reconcile %d/%d mirrors", errorCount, len(targetNamespaces)) + err := fmt.Errorf("failed to reconcile %d/%d mirrors", errorCount, len(targetNamespaces)) + // Record failure with circuit breaker + if r.CircuitBreaker != nil { + state, justOpened := r.CircuitBreaker.RecordFailure(req.Namespace, req.Name, r.GVK.Kind, err) + if justOpened { + logger.Info("circuit breaker opened due to repeated failures", + "state", state.String(), + "consecutiveFailures", r.CircuitBreaker.GetFailureCount(req.Namespace, req.Name, r.GVK.Kind)) + } + } + return ctrl.Result{}, err + } + + // Record success with circuit breaker + if r.CircuitBreaker != nil { + r.CircuitBreaker.RecordSuccess(req.Namespace, req.Name, r.GVK.Kind) } return ctrl.Result{}, nil @@ -247,7 +298,7 @@ func (r *SourceReconciler) handleDisabled(ctx context.Context, sourceObj metav1. } // Remove finalizer if present - if containsString(sourceObj.GetFinalizers(), constants.FinalizerName) { + if slices.Contains(sourceObj.GetFinalizers(), constants.FinalizerName) { logger.Info("removing finalizer from disabled resource") finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName) sourceObj.SetFinalizers(finalizers) @@ -301,9 +352,9 @@ func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.O } // Check if update is needed - needsSync, err := hash.NeedsSync(source, existing, existing.GetAnnotations()) - if err != nil { - return fmt.Errorf("failed to check if sync needed: %w", err) + needsSync, syncCheckErr := hash.NeedsSync(source, existing, existing.GetAnnotations()) + if syncCheckErr != nil { + return fmt.Errorf("failed to check if sync needed: %w", syncCheckErr) } if !needsSync { @@ -312,12 +363,14 @@ func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.O } // Update mirror - if err := UpdateMirror(existing, source); err != nil { - return fmt.Errorf("failed to update mirror: %w", err) + updateErr := UpdateMirror(existing, source) + if updateErr != nil { + return fmt.Errorf("failed to update mirror: %w", updateErr) } - if err := r.Update(ctx, existing); err != nil { - return fmt.Errorf("failed to update mirror in cluster: %w", err) + clusterUpdateErr := r.Update(ctx, existing) + if clusterUpdateErr != nil { + return fmt.Errorf("failed to update mirror in cluster: %w", clusterUpdateErr) } logger.V(1).Info("mirror updated") @@ -330,11 +383,21 @@ func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.O return fmt.Errorf("failed to create mirror: %w", err) } - if err := r.Create(ctx, mirror.(client.Object)); err != nil { + mirrorObj := mirror.(client.Object) + if err := r.Create(ctx, mirrorObj); err != nil { return fmt.Errorf("failed to create mirror in cluster: %w", err) } - logger.V(1).Info("mirror created") + // Verify mirror was actually created (catches webhook rejections, quota issues) + verifyMirror := &unstructured.Unstructured{} + verifyMirror.SetGroupVersionKind(sourceUnstructured.GroupVersionKind()) + verifyKey := client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()} + if verifyErr := r.Get(ctx, verifyKey, verifyMirror); verifyErr != nil { + logger.Error(verifyErr, "mirror creation verification failed - mirror may have been rejected") + return fmt.Errorf("mirror creation verification failed: %w", verifyErr) + } + + logger.V(1).Info("mirror created and verified") return nil } @@ -384,11 +447,12 @@ func (r *SourceReconciler) deleteAllMirrors(ctx context.Context, sourceObj metav func (r *SourceReconciler) cleanupOrphanedMirrors(ctx context.Context, sourceObj metav1.Object, targetNamespaces []string) (int, error) { logger := log.FromContext(ctx) - // List all namespaces - allNamespaces, err := r.NamespaceLister.ListNamespaces(ctx) + // List all namespaces using unified method for consistency + nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx) if err != nil { return 0, fmt.Errorf("failed to list namespaces: %w", err) } + allNamespaces := nsInfo.All // Get GVK from source object sourceUnstructured, ok := sourceObj.(*unstructured.Unstructured) @@ -472,30 +536,47 @@ func (r *SourceReconciler) resolveTargetNamespaces(ctx context.Context, sourceOb return nil, nil } - // Get all namespaces - allNamespaces, err := r.NamespaceLister.ListNamespaces(ctx) + // Validate patterns and log warnings for invalid ones + validationResults, allValid := filter.ValidatePatterns(patterns) + if !allValid { + logger := log.FromContext(ctx) + invalidPatterns := filter.InvalidPatterns(validationResults) + for _, invalid := range invalidPatterns { + logger.Info("invalid glob pattern in target-namespaces annotation, pattern will be skipped", + "pattern", invalid.Pattern, + "error", invalid.Error.Error(), + "source", sourceObj.GetName(), + "namespace", sourceObj.GetNamespace(), + ) + } + + // Filter to only valid patterns + var validPatterns []string + for _, result := range validationResults { + if result.Valid { + validPatterns = append(validPatterns, result.Pattern) + } + } + patterns = validPatterns + + // If no valid patterns remain, return empty + if len(patterns) == 0 { + return nil, nil + } + } + + // Get all namespace info in a single API call (more efficient than 3 separate calls) + nsInfo, err := r.NamespaceLister.ListNamespacesWithLabels(ctx) if err != nil { return nil, fmt.Errorf("failed to list namespaces: %w", err) } - // Get namespaces with allow-mirrors label - allowMirrorsNamespaces, err := r.NamespaceLister.ListAllowMirrorsNamespaces(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list allow-mirrors namespaces: %w", err) - } - - // Get namespaces that have explicitly opted out (allow-mirrors="false") - optOutNamespaces, err := r.NamespaceLister.ListOptOutNamespaces(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list opt-out namespaces: %w", err) - } - - // Resolve target namespaces + // Resolve target namespaces using the pre-categorized namespace info targetNamespaces := filter.ResolveTargetNamespaces( patterns, - allNamespaces, - allowMirrorsNamespaces, - optOutNamespaces, + nsInfo.All, + nsInfo.AllowMirrors, + nsInfo.OptOut, sourceObj.GetNamespace(), r.Filter, ) @@ -611,16 +692,6 @@ func (r *SourceReconciler) mapMirrorToSource(ctx context.Context, obj client.Obj } } -// containsString checks if a slice contains a string. -func containsString(slice []string, s string) bool { - for _, item := range slice { - if item == s { - return true - } - } - return false -} - // removeString removes a string from a slice. func removeString(slice []string, s string) []string { result := make([]string, 0, len(slice)) diff --git a/pkg/controller/source_reconciler_test.go b/pkg/controller/source_reconciler_test.go index 1bb04fe..8a9a84e 100644 --- a/pkg/controller/source_reconciler_test.go +++ b/pkg/controller/source_reconciler_test.go @@ -134,6 +134,14 @@ func (m *MockNamespaceLister) ListOptOutNamespaces(ctx context.Context) ([]strin return args.Get(0).([]string), args.Error(1) } +func (m *MockNamespaceLister) ListNamespacesWithLabels(ctx context.Context) (*NamespaceInfo, error) { + args := m.Called(ctx) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*NamespaceInfo), args.Error(1) +} + func TestIsEnabledForMirroring(t *testing.T) { tests := []struct { obj metav1.Object @@ -280,9 +288,12 @@ func TestSourceReconciler_resolveTargetNamespaces(t *testing.T) { mockLister := new(MockNamespaceLister) if tt.expectListCalls { - mockLister.On("ListNamespaces", mock.Anything).Return(tt.allNamespaces, nil) - mockLister.On("ListAllowMirrorsNamespaces", mock.Anything).Return(tt.allowMirrorsNamespaces, nil) - mockLister.On("ListOptOutNamespaces", mock.Anything).Return([]string{}, nil) + nsInfo := &NamespaceInfo{ + All: tt.allNamespaces, + AllowMirrors: tt.allowMirrorsNamespaces, + OptOut: []string{}, + } + mockLister.On("ListNamespacesWithLabels", mock.Anything).Return(nsInfo, nil) } r := &SourceReconciler{ @@ -442,12 +453,15 @@ func BenchmarkIsEnabledForMirroring(b *testing.B) { func BenchmarkResolveTargetNamespaces(b *testing.B) { mockLister := new(MockNamespaceLister) allNamespaces := make([]string, 100) - for i := 0; i < 100; i++ { + for i := range 100 { allNamespaces[i] = fmt.Sprintf("namespace-%d", i) } - mockLister.On("ListNamespaces", mock.Anything).Return(allNamespaces, nil) - mockLister.On("ListAllowMirrorsNamespaces", mock.Anything).Return(allNamespaces[:50], nil) - mockLister.On("ListOptOutNamespaces", mock.Anything).Return([]string{}, nil) + nsInfo := &NamespaceInfo{ + All: allNamespaces, + AllowMirrors: allNamespaces[:50], + OptOut: []string{}, + } + mockLister.On("ListNamespacesWithLabels", mock.Anything).Return(nsInfo, nil) r := &SourceReconciler{ Config: &config.Config{}, @@ -517,7 +531,12 @@ func TestSourceReconciler_cleanupOrphanedMirrors(t *testing.T) { // Setup: all namespaces in cluster allNamespaces := []string{"default", "app-1", "app-2", "app-3", "prod-1"} - mockLister.On("ListNamespaces", mock.Anything).Return(allNamespaces, nil) + nsInfo := &NamespaceInfo{ + All: allNamespaces, + AllowMirrors: []string{}, + OptOut: []string{}, + } + mockLister.On("ListNamespacesWithLabels", mock.Anything).Return(nsInfo, nil) // Current target list (after annotation change): only app-1 and app-2 targetNamespaces := []string{"app-1", "app-2"} @@ -624,14 +643,35 @@ func TestSourceReconciler_Reconcile_AnnotationChange_AllToAllLabeled(t *testing. mockLister := new(MockNamespaceLister) mockFilter := filter.NewNamespaceFilter(nil, nil) - mockLister.On("ListNamespaces", mock.Anything).Return(allNamespaces, nil) - mockLister.On("ListAllowMirrorsNamespaces", mock.Anything).Return(allowMirrorsNamespaces, nil) - mockLister.On("ListOptOutNamespaces", mock.Anything).Return([]string{}, nil) + nsInfo := &NamespaceInfo{ + All: allNamespaces, + AllowMirrors: allowMirrorsNamespaces, + OptOut: []string{}, + } + mockLister.On("ListNamespacesWithLabels", mock.Anything).Return(nsInfo, nil) // Mock Get for source mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "default", Name: "test-secret"}, mock.Anything). Return(nil, source) + // Helper to create a mock mirror for verification + createMirror := func(ns string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Secret", + "metadata": map[string]any{ + "name": "test-secret", + "namespace": ns, + "labels": map[string]any{ + constants.LabelManagedBy: constants.ControllerName, + constants.LabelMirror: "true", + }, + }, + }, + } + } + // Mock reconcileMirror calls for app-1 and app-2 (current targets) notFoundErr := errors.NewNotFound(schema.GroupResource{Group: "", Resource: "secrets"}, "test-secret") mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "app-1", Name: "test-secret"}, mock.Anything). @@ -639,12 +679,18 @@ func TestSourceReconciler_Reconcile_AnnotationChange_AllToAllLabeled(t *testing. mockClient.On("Create", mock.Anything, mock.MatchedBy(func(obj client.Object) bool { return obj.GetNamespace() == "app-1" }), mock.Anything).Return(nil).Once() + // Verification Get after Create + mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "app-1", Name: "test-secret"}, mock.Anything). + Return(nil, createMirror("app-1")).Once() mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "app-2", Name: "test-secret"}, mock.Anything). Return(notFoundErr, nil).Once() mockClient.On("Create", mock.Anything, mock.MatchedBy(func(obj client.Object) bool { return obj.GetNamespace() == "app-2" }), mock.Anything).Return(nil).Once() + // Verification Get after Create + mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "app-2", Name: "test-secret"}, mock.Anything). + Return(nil, createMirror("app-2")).Once() // Mock cleanup: check orphaned namespaces app-3, prod-1, prod-2 mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "app-3", Name: "test-secret"}, mock.Anything). @@ -751,9 +797,12 @@ func TestSourceReconciler_Reconcile_AnnotationChange_PatternChange(t *testing.T) mockLister := new(MockNamespaceLister) mockFilter := filter.NewNamespaceFilter(nil, nil) - mockLister.On("ListNamespaces", mock.Anything).Return(allNamespaces, nil) - mockLister.On("ListAllowMirrorsNamespaces", mock.Anything).Return([]string{}, nil) - mockLister.On("ListOptOutNamespaces", mock.Anything).Return([]string{}, nil) + nsInfo := &NamespaceInfo{ + All: allNamespaces, + AllowMirrors: []string{}, + OptOut: []string{}, + } + mockLister.On("ListNamespacesWithLabels", mock.Anything).Return(nsInfo, nil) // Mock Get for source mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "default", Name: "app-config"}, mock.Anything). @@ -761,18 +810,42 @@ func TestSourceReconciler_Reconcile_AnnotationChange_PatternChange(t *testing.T) notFoundErr := errors.NewNotFound(schema.GroupResource{Group: "", Resource: "configmaps"}, "app-config") + // Helper to create a mock mirror for verification + createMirror := func(ns string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]any{ + "name": "app-config", + "namespace": ns, + "labels": map[string]any{ + constants.LabelManagedBy: constants.ControllerName, + constants.LabelMirror: "true", + }, + }, + }, + } + } + // Mock reconcileMirror for prod-1 and prod-2 (new targets) mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "prod-1", Name: "app-config"}, mock.Anything). Return(notFoundErr, nil).Once() mockClient.On("Create", mock.Anything, mock.MatchedBy(func(obj client.Object) bool { return obj.GetNamespace() == "prod-1" }), mock.Anything).Return(nil).Once() + // Verification Get after Create + mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "prod-1", Name: "app-config"}, mock.Anything). + Return(nil, createMirror("prod-1")).Once() mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "prod-2", Name: "app-config"}, mock.Anything). Return(notFoundErr, nil).Once() mockClient.On("Create", mock.Anything, mock.MatchedBy(func(obj client.Object) bool { return obj.GetNamespace() == "prod-2" }), mock.Anything).Return(nil).Once() + // Verification Get after Create + mockClient.On("Get", mock.Anything, types.NamespacedName{Namespace: "prod-2", Name: "app-config"}, mock.Anything). + Return(nil, createMirror("prod-2")).Once() // Mock cleanup: delete orphaned mirrors in app-1, app-2, app-3 for _, ns := range []string{"app-1", "app-2", "app-3"} { diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index fa1478d..de96e49 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -10,10 +10,13 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" "github.com/lukaszraczylo/kubemirror/pkg/config" ) +var discoveryLog = ctrl.Log.WithName("discovery") + // ResourceDiscovery discovers all mirrorable resource types in a cluster. type ResourceDiscovery struct { discoveryClient discovery.DiscoveryInterface @@ -34,6 +37,8 @@ func NewResourceDiscovery(cfg *rest.Config) (*ResourceDiscovery, error) { // DiscoverMirrorableResources discovers all resource types that can be mirrored. // It filters out resources that shouldn't be mirrored based on a deny list. func (d *ResourceDiscovery) DiscoverMirrorableResources(ctx context.Context) ([]config.ResourceType, error) { + logger := discoveryLog.WithName("discover") + // Get all API resources in the cluster _, apiResourceLists, err := d.discoveryClient.ServerGroupsAndResources() if err != nil { @@ -42,10 +47,12 @@ func (d *ResourceDiscovery) DiscoverMirrorableResources(ctx context.Context) ([] if !discovery.IsGroupDiscoveryFailedError(err) { return nil, fmt.Errorf("failed to discover API resources: %w", err) } + logger.V(1).Info("some API groups had discovery errors, continuing with available resources") } var resources []config.ResourceType seen := make(map[string]bool) // Deduplicate + var deniedCount int for _, apiResourceList := range apiResourceLists { gv, err := schema.ParseGroupVersion(apiResourceList.GroupVersion) @@ -71,9 +78,23 @@ func (d *ResourceDiscovery) DiscoverMirrorableResources(ctx context.Context) ([] // Skip denied resource types if isDeniedResourceType(apiResource.Kind) { + deniedCount++ + logger.V(2).Info("skipping denied resource type", + "kind", apiResource.Kind, + "group", gv.Group, + "version", gv.Version) continue } + // Warn about potentially high-cardinality resource types that aren't in deny list + if isHighCardinalityResource(apiResource.Kind) { + logger.Info("WARNING: discovered potentially high-cardinality resource type", + "kind", apiResource.Kind, + "group", gv.Group, + "version", gv.Version, + "recommendation", "Consider adding to deny list if high volume is observed") + } + rt := config.ResourceType{ Group: gv.Group, Version: gv.Version, @@ -91,6 +112,10 @@ func (d *ResourceDiscovery) DiscoverMirrorableResources(ctx context.Context) ([] } } + logger.Info("resource discovery complete", + "discovered", len(resources), + "denied", deniedCount) + return resources, nil } @@ -316,3 +341,34 @@ var deniedKinds = map[string]bool{ func isDeniedResourceType(kind string) bool { return deniedKinds[kind] } + +// highCardinalityKinds are resource types that might generate high volumes of objects. +// These aren't denied by default but warrant monitoring when discovered. +var highCardinalityKinds = map[string]bool{ + // Resources that might have many instances per namespace + "ServiceAccount": true, // Often auto-created per deployment + "Role": true, // Can be many per namespace + "RoleBinding": true, // Can be many per namespace + "NetworkPolicy": true, // Can be many per namespace + "LimitRange": true, // Usually few but triggers on all namespace changes + "ResourceQuota": true, // Usually few but triggers on all namespace changes + "HorizontalPodAutoscaler": true, // One per deployment/statefulset + + // CRD resources that might have high cardinality + "ServiceEntry": true, // Istio - can have many + "VirtualService": true, // Istio - can have many + "DestinationRule": true, // Istio - can have many + "EnvoyFilter": true, // Istio - can have many + "Sidecar": true, // Istio - can have many + "PeerAuthentication": true, // Istio - can have many + + // Prometheus-style monitoring resources + "ServiceMonitor": true, // Often one per service + "PodMonitor": true, // Often one per pod type + "PrometheusRule": true, // Can have many rules +} + +// isHighCardinalityResource checks if a resource type might generate high volumes. +func isHighCardinalityResource(kind string) bool { + return highCardinalityKinds[kind] +} diff --git a/pkg/discovery/discovery_test.go b/pkg/discovery/discovery_test.go index f44430b..0a796a7 100644 --- a/pkg/discovery/discovery_test.go +++ b/pkg/discovery/discovery_test.go @@ -86,3 +86,33 @@ func TestIsDeniedResourceType(t *testing.T) { }) } } + +func TestIsHighCardinalityResource(t *testing.T) { + tests := []struct { + name string + kind string + want bool + }{ + // High cardinality resources (should warn) + {name: "ServiceAccount", kind: "ServiceAccount", want: true}, + {name: "Role", kind: "Role", want: true}, + {name: "RoleBinding", kind: "RoleBinding", want: true}, + {name: "NetworkPolicy", kind: "NetworkPolicy", want: true}, + {name: "ServiceMonitor", kind: "ServiceMonitor", want: true}, + {name: "VirtualService", kind: "VirtualService", want: true}, + + // Not high cardinality (no warning needed) + {name: "Secret", kind: "Secret", want: false}, + {name: "ConfigMap", kind: "ConfigMap", want: false}, + {name: "Service", kind: "Service", want: false}, + {name: "Deployment", kind: "Deployment", want: false}, + {name: "Middleware", kind: "Middleware", want: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isHighCardinalityResource(tt.kind) + assert.Equal(t, tt.want, got, "isHighCardinalityResource(%s) = %v, want %v", tt.kind, got, tt.want) + }) + } +} diff --git a/pkg/filter/namespace.go b/pkg/filter/namespace.go index 380bd8b..7fd941c 100644 --- a/pkg/filter/namespace.go +++ b/pkg/filter/namespace.go @@ -2,12 +2,79 @@ package filter import ( + "fmt" "path/filepath" "strings" "github.com/lukaszraczylo/kubemirror/pkg/constants" ) +// PatternValidationResult contains the result of validating a pattern. +type PatternValidationResult struct { + Error error + Pattern string + Valid bool +} + +// ValidatePattern checks if a glob pattern is syntactically valid. +// Returns an error if the pattern cannot be compiled by filepath.Match. +func ValidatePattern(pattern string) error { + // Empty pattern is invalid + if pattern == "" { + return fmt.Errorf("empty pattern") + } + + // Special keywords are always valid + if pattern == constants.TargetNamespacesAll || pattern == constants.TargetNamespacesAllLabeled { + return nil + } + + // Use filepath.Match with a test string to validate pattern syntax + // We use "test" as a dummy value - we only care about the error + _, err := filepath.Match(pattern, "test") + if err != nil { + return fmt.Errorf("invalid glob pattern %q: %w", pattern, err) + } + + return nil +} + +// ValidatePatterns validates a list of patterns and returns results for each. +// Returns a slice of validation results and a boolean indicating if all patterns are valid. +func ValidatePatterns(patterns []string) ([]PatternValidationResult, bool) { + if len(patterns) == 0 { + return nil, true + } + + results := make([]PatternValidationResult, len(patterns)) + allValid := true + + for i, pattern := range patterns { + err := ValidatePattern(pattern) + results[i] = PatternValidationResult{ + Pattern: pattern, + Valid: err == nil, + Error: err, + } + if err != nil { + allValid = false + } + } + + return results, allValid +} + +// InvalidPatterns returns only the invalid patterns from a validation result. +func InvalidPatterns(results []PatternValidationResult) []PatternValidationResult { + var invalid []PatternValidationResult + for _, r := range results { + if !r.Valid { + invalid = append(invalid, r) + } + } + return invalid +} + // NamespaceFilter handles namespace filtering logic including patterns and exclusions. type NamespaceFilter struct { excludedNamespaces map[string]bool diff --git a/pkg/filter/namespace_test.go b/pkg/filter/namespace_test.go index b118d97..0013649 100644 --- a/pkg/filter/namespace_test.go +++ b/pkg/filter/namespace_test.go @@ -592,3 +592,167 @@ func BenchmarkResolveTargetNamespaces_LargeScale(b *testing.B) { } }) } + +// Tests for pattern validation + +func TestValidatePattern(t *testing.T) { + tests := []struct { + name string + pattern string + wantErr bool + }{ + { + name: "valid simple pattern", + pattern: "app-*", + wantErr: false, + }, + { + name: "valid complex pattern", + pattern: "*-app-*-db", + wantErr: false, + }, + { + name: "valid exact match", + pattern: "my-namespace", + wantErr: false, + }, + { + name: "valid question mark pattern", + pattern: "app-?", + wantErr: false, + }, + { + name: "valid character class pattern", + pattern: "app-[abc]", + wantErr: false, + }, + { + name: "valid 'all' keyword", + pattern: constants.TargetNamespacesAll, + wantErr: false, + }, + { + name: "valid 'all-labeled' keyword", + pattern: constants.TargetNamespacesAllLabeled, + wantErr: false, + }, + { + name: "invalid unclosed bracket", + pattern: "app-[", + wantErr: true, + }, + { + name: "character range pattern is valid", + pattern: "app-[z-a]", + wantErr: false, // filepath.Match accepts character ranges + }, + { + name: "empty pattern is invalid", + pattern: "", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidatePattern(tt.pattern) + if tt.wantErr { + assert.Error(t, err, "expected error for pattern %q", tt.pattern) + } else { + assert.NoError(t, err, "unexpected error for pattern %q", tt.pattern) + } + }) + } +} + +func TestValidatePatterns(t *testing.T) { + tests := []struct { + name string + patterns []string + wantAllValid bool + wantInvalid int + }{ + { + name: "all valid patterns", + patterns: []string{"app-*", "prod-*", "staging-db"}, + wantAllValid: true, + wantInvalid: 0, + }, + { + name: "empty patterns list", + patterns: []string{}, + wantAllValid: true, + wantInvalid: 0, + }, + { + name: "one invalid pattern", + patterns: []string{"app-*", "invalid-[", "prod-*"}, + wantAllValid: false, + wantInvalid: 1, + }, + { + name: "multiple invalid patterns", + patterns: []string{"invalid-[", "app-*", "bad-["}, + wantAllValid: false, + wantInvalid: 2, + }, + { + name: "all invalid patterns", + patterns: []string{"bad-[", "worse-["}, + wantAllValid: false, + wantInvalid: 2, + }, + { + name: "mixed with keywords", + patterns: []string{constants.TargetNamespacesAll, "bad-[", "app-*"}, + wantAllValid: false, + wantInvalid: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + results, allValid := ValidatePatterns(tt.patterns) + assert.Equal(t, tt.wantAllValid, allValid, "allValid mismatch") + + invalidPatterns := InvalidPatterns(results) + assert.Equal(t, tt.wantInvalid, len(invalidPatterns), "invalid count mismatch") + + // Verify all invalid patterns have errors + for _, invalid := range invalidPatterns { + assert.False(t, invalid.Valid) + assert.NotNil(t, invalid.Error) + } + }) + } +} + +func TestInvalidPatterns(t *testing.T) { + t.Run("filters only invalid patterns", func(t *testing.T) { + results := []PatternValidationResult{ + {Pattern: "app-*", Valid: true, Error: nil}, + {Pattern: "bad-[", Valid: false, Error: fmt.Errorf("invalid")}, + {Pattern: "prod-*", Valid: true, Error: nil}, + {Pattern: "worse-[", Valid: false, Error: fmt.Errorf("invalid")}, + } + + invalid := InvalidPatterns(results) + assert.Len(t, invalid, 2) + assert.Equal(t, "bad-[", invalid[0].Pattern) + assert.Equal(t, "worse-[", invalid[1].Pattern) + }) + + t.Run("returns nil for empty input", func(t *testing.T) { + invalid := InvalidPatterns(nil) + assert.Nil(t, invalid) + }) + + t.Run("returns nil for all valid patterns", func(t *testing.T) { + results := []PatternValidationResult{ + {Pattern: "app-*", Valid: true, Error: nil}, + {Pattern: "prod-*", Valid: true, Error: nil}, + } + invalid := InvalidPatterns(results) + assert.Nil(t, invalid) + }) +} diff --git a/pkg/hash/content_test.go b/pkg/hash/content_test.go index 7c0b637..90f36b9 100644 --- a/pkg/hash/content_test.go +++ b/pkg/hash/content_test.go @@ -168,12 +168,12 @@ func TestComputeContentHash_ConfigMap(t *testing.T) { name: "binaryData included in hash", cm1: &corev1.ConfigMap{ BinaryData: map[string][]byte{ - "file": []byte{0x00, 0x01, 0x02}, + "file": {0x00, 0x01, 0x02}, }, }, cm2: &corev1.ConfigMap{ BinaryData: map[string][]byte{ - "file": []byte{0x00, 0x01, 0xFF}, + "file": {0x00, 0x01, 0xFF}, }, }, wantSame: false, @@ -484,6 +484,119 @@ func mustComputeHash(t *testing.T, obj runtime.Object) string { return hash } +// TestComputeContentHash_NoMutation verifies that hash computation doesn't mutate the input object. +// This is critical because NestedMap can modify the underlying map. +func TestComputeContentHash_NoMutation(t *testing.T) { + t.Run("unstructured object is not mutated", func(t *testing.T) { + // Create an unstructured object with nested spec + original := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Custom", + "metadata": map[string]interface{}{ + "name": "test-resource", + "namespace": "default", + "annotations": map[string]interface{}{ + constants.AnnotationTransform: `{"rules":[{"field":"spec.value","action":"base64encode"}]}`, + }, + }, + "spec": map[string]interface{}{ + "field1": "value1", + "nested": map[string]interface{}{ + "deep": "data", + }, + }, + "status": map[string]interface{}{ + "condition": "Ready", + }, + }, + } + + // Deep copy the original to compare after hash computation + expectedCopy := original.DeepCopy() + + // Compute hash multiple times + hash1, err := ComputeContentHash(original) + require.NoError(t, err) + + hash2, err := ComputeContentHash(original) + require.NoError(t, err) + + // Hashes should be consistent (object wasn't modified) + assert.Equal(t, hash1, hash2, "hash should be consistent across calls") + + // Original object should be unchanged + assert.Equal(t, expectedCopy.Object, original.Object, "original object should not be mutated") + }) + + t.Run("secret is not mutated", func(t *testing.T) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-secret", + Namespace: "default", + Annotations: map[string]string{ + constants.AnnotationTransform: `{"rules":[]}`, + }, + }, + Data: map[string][]byte{ + "password": []byte("secret123"), + }, + Type: corev1.SecretTypeOpaque, + } + + // Copy for comparison + originalData := make(map[string][]byte) + for k, v := range secret.Data { + originalData[k] = append([]byte(nil), v...) + } + originalAnnotations := make(map[string]string) + for k, v := range secret.Annotations { + originalAnnotations[k] = v + } + + // Compute hash + _, err := ComputeContentHash(secret) + require.NoError(t, err) + + // Verify no mutation + assert.Equal(t, originalData, secret.Data, "secret data should not be mutated") + assert.Equal(t, originalAnnotations, secret.Annotations, "secret annotations should not be mutated") + }) + + t.Run("configmap is not mutated", func(t *testing.T) { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cm", + Namespace: "default", + }, + Data: map[string]string{ + "config.yaml": "key: value", + }, + BinaryData: map[string][]byte{ + "binary": {0x00, 0x01, 0x02}, + }, + } + + // Copy for comparison + originalData := make(map[string]string) + for k, v := range cm.Data { + originalData[k] = v + } + originalBinaryData := make(map[string][]byte) + for k, v := range cm.BinaryData { + originalBinaryData[k] = append([]byte(nil), v...) + } + + // Compute hash + _, err := ComputeContentHash(cm) + require.NoError(t, err) + + // Verify no mutation + assert.Equal(t, originalData, cm.Data, "configmap data should not be mutated") + assert.Equal(t, originalBinaryData, cm.BinaryData, "configmap binary data should not be mutated") + }) +} + // Benchmark tests func BenchmarkComputeContentHash_Secret(b *testing.B) { secret := &corev1.Secret{ diff --git a/pkg/transformer/transformer_test.go b/pkg/transformer/transformer_test.go index 9f021ea..1a3332f 100644 --- a/pkg/transformer/transformer_test.go +++ b/pkg/transformer/transformer_test.go @@ -15,13 +15,13 @@ import ( func TestTransformer_Transform(t *testing.T) { tests := []struct { - name string - source runtime.Object ctx TransformContext + source runtime.Object + validate func(t *testing.T, result runtime.Object) + name string + errMsg string options TransformOptions wantErr bool - errMsg string - validate func(t *testing.T, result runtime.Object) }{ // Good cases - Value rules { @@ -566,12 +566,12 @@ func TestParsePath(t *testing.T) { func TestSetNestedField(t *testing.T) { tests := []struct { - name string - obj map[string]interface{} - path []string value interface{} - wantErr bool + obj map[string]interface{} want map[string]interface{} + name string + path []string + wantErr bool }{ { name: "set top-level field", diff --git a/pkg/transformer/types.go b/pkg/transformer/types.go index b3b3d12..1b2b9bb 100644 --- a/pkg/transformer/types.go +++ b/pkg/transformer/types.go @@ -13,46 +13,22 @@ type TransformRules struct { // Rule represents a single transformation rule. type Rule struct { - // Path is the JSONPath to the field to transform (e.g., "data.LOG_LEVEL", "metadata.labels.env") - Path string `yaml:"path"` - - // Value sets a static value (mutually exclusive with Template, Merge, Delete) - Value *string `yaml:"value,omitempty"` - - // Template uses Go templates to generate the value (mutually exclusive with Value, Merge, Delete) - Template *string `yaml:"template,omitempty"` - - // Merge merges a map into the target field (mutually exclusive with Value, Template, Delete) - Merge map[string]interface{} `yaml:"merge,omitempty"` - - // Delete removes the field (mutually exclusive with Value, Template, Merge) - Delete bool `yaml:"delete,omitempty"` - - // NamespacePattern is an optional glob pattern that limits this rule to specific target namespaces - // Examples: "prod-*", "*-staging", "preprod-*" - // If not specified, the rule applies to all namespaces - NamespacePattern *string `yaml:"namespacePattern,omitempty"` + Value *string `yaml:"value,omitempty"` + Template *string `yaml:"template,omitempty"` + Merge map[string]interface{} `yaml:"merge,omitempty"` + NamespacePattern *string `yaml:"namespacePattern,omitempty"` + Path string `yaml:"path"` + Delete bool `yaml:"delete,omitempty"` } // TransformContext provides context variables for template evaluation. type TransformContext struct { - // TargetNamespace is the namespace where the mirror is being created + Labels map[string]string + Annotations map[string]string TargetNamespace string - - // SourceNamespace is the namespace of the source resource SourceNamespace string - - // SourceName is the name of the source resource - SourceName string - - // TargetName is the name of the target resource (usually same as source) - TargetName string - - // Labels is a copy of the source resource's labels - Labels map[string]string - - // Annotations is a copy of the source resource's annotations - Annotations map[string]string + SourceName string + TargetName string } // TransformOptions configures the transformation behavior. diff --git a/pkg/transformer/types_test.go b/pkg/transformer/types_test.go index cdcbc1d..c132e97 100644 --- a/pkg/transformer/types_test.go +++ b/pkg/transformer/types_test.go @@ -10,9 +10,9 @@ import ( func TestRule_Validate(t *testing.T) { tests := []struct { name string + errMsg string rule Rule wantErr bool - errMsg string }{ // Good cases { @@ -191,8 +191,8 @@ func TestRule_Type(t *testing.T) { func TestRuleType_String(t *testing.T) { tests := []struct { name string - ruleType RuleType want string + ruleType RuleType }{ {name: "value", ruleType: RuleTypeValue, want: "value"}, {name: "template", ruleType: RuleTypeTemplate, want: "template"},