diff --git a/cmd/kubemirror/main.go b/cmd/kubemirror/main.go index 5e28a1f..4a6f451 100644 --- a/cmd/kubemirror/main.go +++ b/cmd/kubemirror/main.go @@ -115,8 +115,10 @@ func main() { flag.DurationVar(&watcherScanInterval, "watcher-scan-interval", 5*time.Minute, "Interval for scanning cluster to detect new resource types needing watchers (lazy-watcher-init mode only).") + // Default to production logger (JSON output, no DPanic-on-error). Operators + // can opt into development mode via the --zap-devel flag bound below. opts := zap.Options{ - Development: true, + Development: false, } opts.BindFlags(flag.CommandLine) flag.Parse() @@ -281,10 +283,9 @@ func main() { os.Exit(1) } - // Wait for initial discovery with 30s timeout - waitCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - err = discoveryMgr.WaitForInitialDiscovery(waitCtx, 30*time.Second) + // Wait for initial discovery with 30s timeout, anchored on the signal + // context so SIGTERM during startup actually aborts the wait. + err = discoveryMgr.WaitForInitialDiscovery(signalCtx, 30*time.Second) if err != nil { setupLog.Error(err, "timeout waiting for initial resource discovery") os.Exit(1) diff --git a/pkg/hash/content.go b/pkg/hash/content.go index 93cdfe7..0c0e25d 100644 --- a/pkg/hash/content.go +++ b/pkg/hash/content.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -85,50 +86,70 @@ func extractConfigMapContent(cm *corev1.ConfigMap) map[string]interface{} { } // extractUnstructuredContent extracts content from an unstructured resource (CRDs, etc.). +// +// Hashes every non-Kubernetes-managed field at the top level — not only spec +// — so resources with both spec and data (e.g. an unstructured Secret/CM, or +// a CRD using a custom schema) detect drift on every content field, matching +// the fields that updateUnstructuredMirror copies to the mirror. +// +// When a transform annotation is present the source's labels and annotations +// are also folded into the hash, because templates can read them via +// TransformContext.Labels / .Annotations and a label change would otherwise +// be invisible to NeedsSync. func extractUnstructuredContent(obj runtime.Object) (interface{}, error) { - // Convert to unstructured unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return nil, fmt.Errorf("failed to convert to unstructured: %w", err) } - u := &unstructured.Unstructured{Object: unstructuredObj} - // Make a deep copy to avoid race conditions when accessing nested fields - // NestedMap modifies the underlying map, so we need our own copy - uCopy := u.DeepCopy() + // (NestedMap may modify the underlying map). + u := (&unstructured.Unstructured{Object: unstructuredObj}).DeepCopy() - // Extract spec (most resources have spec) - spec, found, err := unstructured.NestedMap(uCopy.Object, "spec") - if err != nil { - return nil, fmt.Errorf("failed to extract spec: %w", err) + skipFields := map[string]bool{ + "metadata": true, + "status": true, + "apiVersion": true, + "kind": true, } content := make(map[string]interface{}) - if found { - content["spec"] = spec - } - - // For resources without spec, include all fields except metadata and status - if !found { - for key, value := range uCopy.Object { - if key != "metadata" && key != "status" && key != "apiVersion" && key != "kind" { - content[key] = value - } + for key, value := range u.Object { + if !skipFields[key] { + content[key] = value } } - // Include transform annotation in hash so changes to transformation rules trigger updates - annotations := uCopy.GetAnnotations() - if annotations != nil { - if transform, exists := annotations[constants.AnnotationTransform]; exists { - content["transform"] = transform - } + annotations := u.GetAnnotations() + if transform, exists := annotations[constants.AnnotationTransform]; exists && transform != "" { + content["transform"] = transform + // Templates can read source labels and annotations; include them so a + // label/annotation change triggers re-render of transformed mirrors. + // Filter out the kubemirror.raczylo.com/* keys to avoid the source's + // own bookkeeping (sync-status annotation, etc.) churning the hash. + content["sourceLabels"] = filterKubeMirror(u.GetLabels()) + content["sourceAnnotations"] = filterKubeMirror(annotations) } return content, nil } +// filterKubeMirror returns a copy of m with all kubemirror.raczylo.com/* keys +// removed. Used to exclude controller-managed keys from content hashing so +// the controller's own writes don't churn the hash. +func filterKubeMirror(m map[string]string) map[string]string { + if len(m) == 0 { + return nil + } + out := make(map[string]string, len(m)) + for k, v := range m { + if !strings.HasPrefix(k, constants.Domain+"/") { + out[k] = v + } + } + return out +} + // NeedsSync determines if a target resource needs to be synced based on content changes. // It uses a multi-layer strategy: // 1. Check generation field (if available) - fastest diff --git a/pkg/hash/content_test.go b/pkg/hash/content_test.go index 90f36b9..4b6d91b 100644 --- a/pkg/hash/content_test.go +++ b/pkg/hash/content_test.go @@ -641,3 +641,76 @@ func BenchmarkNeedsSync(b *testing.B) { _, _ = NeedsSync(source, target, annotations) } } +func TestComputeContentHash_Unstructured_HashesAllNonMetaFields(t *testing.T) { + // Regression (M7): the previous implementation only hashed `spec` when it + // was present, dropping any other top-level content (data, type, custom + // CRD fields). Drift to those fields was invisible until the next resync. + objSpecOnly := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Custom", + "spec": map[string]interface{}{"field": "v1"}, + "data": map[string]interface{}{"k": "v1"}, + }} + objSpecAndDifferentData := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Custom", + "spec": map[string]interface{}{"field": "v1"}, + "data": map[string]interface{}{"k": "v2"}, // only data differs + }} + + h1, err := ComputeContentHash(objSpecOnly) + require.NoError(t, err) + h2, err := ComputeContentHash(objSpecAndDifferentData) + require.NoError(t, err) + assert.NotEqual(t, h1, h2, "data field must contribute to hash even when spec exists") +} + +func TestComputeContentHash_Unstructured_TransformIncludesLabelsAndAnnotations(t *testing.T) { + // Regression (M6): templates can read source labels/annotations via + // TransformContext. When a transform annotation is present, label / + // annotation changes must therefore re-hash so NeedsSync re-renders. + make := func(label, annot string) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{"app": label}, + "annotations": map[string]interface{}{constants.AnnotationTransform: "rules: []", "tier": annot}, + }, + "data": map[string]interface{}{"k": "v"}, + }} + } + + base, err := ComputeContentHash(make("v1", "prod")) + require.NoError(t, err) + + labelChanged, err := ComputeContentHash(make("v2", "prod")) + require.NoError(t, err) + assert.NotEqual(t, base, labelChanged, "label change must re-hash when transform is present") + + annotChanged, err := ComputeContentHash(make("v1", "stage")) + require.NoError(t, err) + assert.NotEqual(t, base, annotChanged, "annotation change must re-hash when transform is present") +} + +func TestComputeContentHash_Unstructured_LabelChangesIgnoredWithoutTransform(t *testing.T) { + // Counterpart to the above: when there is NO transform annotation, label + // changes must NOT churn the hash — that would cause unnecessary mirror + // re-writes for plain (non-transformed) mirrors. + make := func(label string) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{"app": label}, + }, + "data": map[string]interface{}{"k": "v"}, + }} + } + + h1, err := ComputeContentHash(make("v1")) + require.NoError(t, err) + h2, err := ComputeContentHash(make("v2")) + require.NoError(t, err) + assert.Equal(t, h1, h2, "label changes must not re-hash without a transform annotation") +} diff --git a/pkg/transformer/transformer.go b/pkg/transformer/transformer.go index 17eab61..5161075 100644 --- a/pkg/transformer/transformer.go +++ b/pkg/transformer/transformer.go @@ -15,6 +15,18 @@ import ( "github.com/lukaszraczylo/kubemirror/pkg/constants" ) +// maxConcurrentTemplateExecutions caps the number of in-flight template +// executions across the process. text/template.Execute is not context-aware, +// so when applyTemplateRule times out the executor goroutine continues to +// run until the template returns on its own. This semaphore bounds the +// damage from a pathological template (e.g. {{ range }} that never +// terminates): once the cap is hit, applyTemplateRule fails fast instead +// of leaking another runaway goroutine. The cap is intentionally generous +// — normal workloads should never approach it. +const maxConcurrentTemplateExecutions = 64 + +var templateExecSemaphore = make(chan struct{}, maxConcurrentTemplateExecutions) + // Transformer applies transformation rules to Kubernetes resources. type Transformer struct { options TransformOptions @@ -168,6 +180,19 @@ func (t *Transformer) applyTemplateRule(u *unstructured.Unstructured, rule Rule, return fmt.Errorf("failed to parse template: %w", err) } + // Acquire a slot in the global template-execution semaphore. If saturated, + // fail fast rather than spawning yet another goroutine that may leak when + // it times out (text/template is not context-aware so timed-out goroutines + // continue running until the template returns). + select { + case templateExecSemaphore <- struct{}{}: + defer func() { <-templateExecSemaphore }() + default: + return fmt.Errorf("template execution rejected: %d concurrent executions in flight, "+ + "likely indicates one or more runaway templates leaking goroutines", + maxConcurrentTemplateExecutions) + } + // Execute template with timeout ctxWithTimeout, cancel := context.WithTimeout(context.Background(), t.options.TemplateTimeout) defer cancel() diff --git a/pkg/transformer/transformer_test.go b/pkg/transformer/transformer_test.go index 1a3332f..d50324d 100644 --- a/pkg/transformer/transformer_test.go +++ b/pkg/transformer/transformer_test.go @@ -734,6 +734,44 @@ func TestTransformer_TemplateTimeout(t *testing.T) { t.Skip("Template timeout testing is unreliable in unit tests - covered by integration tests") } +func TestTransformer_TemplateConcurrencyCap(t *testing.T) { + // Regression (H3): text/template.Execute is not context-aware, so a + // timed-out template execution leaves its goroutine running until the + // template returns on its own. We bound that by a global semaphore; + // when saturated, applyTemplateRule must fail fast instead of spawning + // another goroutine. + // + // This test saturates the semaphore directly, then asserts the next + // call returns the cap-exceeded error rather than blocking or panicking. + for i := 0; i < maxConcurrentTemplateExecutions; i++ { + templateExecSemaphore <- struct{}{} + } + defer func() { + // Drain whatever the test left in the semaphore so subsequent tests + // see a clean state. + for { + select { + case <-templateExecSemaphore: + default: + return + } + } + }() + + tmpl := "hello" + tr := NewDefaultTransformer() + rule := Rule{Path: "data.greeting", Template: &tmpl} + u := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "data": map[string]interface{}{}, + }} + + err := tr.applyTemplateRule(u, rule, TransformContext{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "rejected", "saturated semaphore must reject new template executions") +} + func TestMatchGlob(t *testing.T) { tests := []struct { name string