mirror of
https://github.com/lukaszraczylo/kubemirror.git
synced 2026-06-05 22:43:51 +00:00
fix: hash drift, transformer leak guard, prod logger, ctx-aware wait
M7: extractUnstructuredContent only hashed 'spec' when present, dropping all other top-level content fields. Resources with both spec and data (or any non-spec content) silently drifted until the next 10m resync. Now hashes every non-Kubernetes-managed top-level field, matching the fields updateUnstructuredMirror copies. M6: when a source has a transform annotation, also hash the source's labels and annotations (filtered of kubemirror.raczylo.com/* keys to avoid the controller's own bookkeeping churning the hash). Templates read these via TransformContext; without this a label change wouldn't re-render the transformed mirror. H3: text/template.Execute is not context-aware, so applyTemplateRule's timeout cancels the select but leaks the executor goroutine. Added a process-wide semaphore (cap 64) so a runaway template can't spawn an unbounded number of stuck goroutines on every reconcile. M4: zap dev mode (DPanic-on-error, console output, stacktraces on warning) was hardcoded on. Defaulted to production; --zap-devel flag remains for opt-in. M5: WaitForInitialDiscovery was anchored on context.Background() with its own WithTimeout, so SIGTERM during startup couldn't abort the wait. Now anchors on signalCtx.
This commit is contained in:
+46
-25
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
@@ -85,50 +86,70 @@ func extractConfigMapContent(cm *corev1.ConfigMap) map[string]interface{} {
|
||||
}
|
||||
|
||||
// extractUnstructuredContent extracts content from an unstructured resource (CRDs, etc.).
|
||||
//
|
||||
// Hashes every non-Kubernetes-managed field at the top level — not only spec
|
||||
// — so resources with both spec and data (e.g. an unstructured Secret/CM, or
|
||||
// a CRD using a custom schema) detect drift on every content field, matching
|
||||
// the fields that updateUnstructuredMirror copies to the mirror.
|
||||
//
|
||||
// When a transform annotation is present the source's labels and annotations
|
||||
// are also folded into the hash, because templates can read them via
|
||||
// TransformContext.Labels / .Annotations and a label change would otherwise
|
||||
// be invisible to NeedsSync.
|
||||
func extractUnstructuredContent(obj runtime.Object) (interface{}, error) {
|
||||
// Convert to unstructured
|
||||
unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert to unstructured: %w", err)
|
||||
}
|
||||
|
||||
u := &unstructured.Unstructured{Object: unstructuredObj}
|
||||
|
||||
// Make a deep copy to avoid race conditions when accessing nested fields
|
||||
// NestedMap modifies the underlying map, so we need our own copy
|
||||
uCopy := u.DeepCopy()
|
||||
// (NestedMap may modify the underlying map).
|
||||
u := (&unstructured.Unstructured{Object: unstructuredObj}).DeepCopy()
|
||||
|
||||
// Extract spec (most resources have spec)
|
||||
spec, found, err := unstructured.NestedMap(uCopy.Object, "spec")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to extract spec: %w", err)
|
||||
skipFields := map[string]bool{
|
||||
"metadata": true,
|
||||
"status": true,
|
||||
"apiVersion": true,
|
||||
"kind": true,
|
||||
}
|
||||
|
||||
content := make(map[string]interface{})
|
||||
if found {
|
||||
content["spec"] = spec
|
||||
}
|
||||
|
||||
// For resources without spec, include all fields except metadata and status
|
||||
if !found {
|
||||
for key, value := range uCopy.Object {
|
||||
if key != "metadata" && key != "status" && key != "apiVersion" && key != "kind" {
|
||||
content[key] = value
|
||||
}
|
||||
for key, value := range u.Object {
|
||||
if !skipFields[key] {
|
||||
content[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
// Include transform annotation in hash so changes to transformation rules trigger updates
|
||||
annotations := uCopy.GetAnnotations()
|
||||
if annotations != nil {
|
||||
if transform, exists := annotations[constants.AnnotationTransform]; exists {
|
||||
content["transform"] = transform
|
||||
}
|
||||
annotations := u.GetAnnotations()
|
||||
if transform, exists := annotations[constants.AnnotationTransform]; exists && transform != "" {
|
||||
content["transform"] = transform
|
||||
// Templates can read source labels and annotations; include them so a
|
||||
// label/annotation change triggers re-render of transformed mirrors.
|
||||
// Filter out the kubemirror.raczylo.com/* keys to avoid the source's
|
||||
// own bookkeeping (sync-status annotation, etc.) churning the hash.
|
||||
content["sourceLabels"] = filterKubeMirror(u.GetLabels())
|
||||
content["sourceAnnotations"] = filterKubeMirror(annotations)
|
||||
}
|
||||
|
||||
return content, nil
|
||||
}
|
||||
|
||||
// filterKubeMirror returns a copy of m with all kubemirror.raczylo.com/* keys
|
||||
// removed. Used to exclude controller-managed keys from content hashing so
|
||||
// the controller's own writes don't churn the hash.
|
||||
func filterKubeMirror(m map[string]string) map[string]string {
|
||||
if len(m) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make(map[string]string, len(m))
|
||||
for k, v := range m {
|
||||
if !strings.HasPrefix(k, constants.Domain+"/") {
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// NeedsSync determines if a target resource needs to be synced based on content changes.
|
||||
// It uses a multi-layer strategy:
|
||||
// 1. Check generation field (if available) - fastest
|
||||
|
||||
@@ -641,3 +641,76 @@ func BenchmarkNeedsSync(b *testing.B) {
|
||||
_, _ = NeedsSync(source, target, annotations)
|
||||
}
|
||||
}
|
||||
func TestComputeContentHash_Unstructured_HashesAllNonMetaFields(t *testing.T) {
|
||||
// Regression (M7): the previous implementation only hashed `spec` when it
|
||||
// was present, dropping any other top-level content (data, type, custom
|
||||
// CRD fields). Drift to those fields was invisible until the next resync.
|
||||
objSpecOnly := &unstructured.Unstructured{Object: map[string]interface{}{
|
||||
"apiVersion": "v1",
|
||||
"kind": "Custom",
|
||||
"spec": map[string]interface{}{"field": "v1"},
|
||||
"data": map[string]interface{}{"k": "v1"},
|
||||
}}
|
||||
objSpecAndDifferentData := &unstructured.Unstructured{Object: map[string]interface{}{
|
||||
"apiVersion": "v1",
|
||||
"kind": "Custom",
|
||||
"spec": map[string]interface{}{"field": "v1"},
|
||||
"data": map[string]interface{}{"k": "v2"}, // only data differs
|
||||
}}
|
||||
|
||||
h1, err := ComputeContentHash(objSpecOnly)
|
||||
require.NoError(t, err)
|
||||
h2, err := ComputeContentHash(objSpecAndDifferentData)
|
||||
require.NoError(t, err)
|
||||
assert.NotEqual(t, h1, h2, "data field must contribute to hash even when spec exists")
|
||||
}
|
||||
|
||||
func TestComputeContentHash_Unstructured_TransformIncludesLabelsAndAnnotations(t *testing.T) {
|
||||
// Regression (M6): templates can read source labels/annotations via
|
||||
// TransformContext. When a transform annotation is present, label /
|
||||
// annotation changes must therefore re-hash so NeedsSync re-renders.
|
||||
make := func(label, annot string) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{Object: map[string]interface{}{
|
||||
"apiVersion": "v1",
|
||||
"kind": "ConfigMap",
|
||||
"metadata": map[string]interface{}{
|
||||
"labels": map[string]interface{}{"app": label},
|
||||
"annotations": map[string]interface{}{constants.AnnotationTransform: "rules: []", "tier": annot},
|
||||
},
|
||||
"data": map[string]interface{}{"k": "v"},
|
||||
}}
|
||||
}
|
||||
|
||||
base, err := ComputeContentHash(make("v1", "prod"))
|
||||
require.NoError(t, err)
|
||||
|
||||
labelChanged, err := ComputeContentHash(make("v2", "prod"))
|
||||
require.NoError(t, err)
|
||||
assert.NotEqual(t, base, labelChanged, "label change must re-hash when transform is present")
|
||||
|
||||
annotChanged, err := ComputeContentHash(make("v1", "stage"))
|
||||
require.NoError(t, err)
|
||||
assert.NotEqual(t, base, annotChanged, "annotation change must re-hash when transform is present")
|
||||
}
|
||||
|
||||
func TestComputeContentHash_Unstructured_LabelChangesIgnoredWithoutTransform(t *testing.T) {
|
||||
// Counterpart to the above: when there is NO transform annotation, label
|
||||
// changes must NOT churn the hash — that would cause unnecessary mirror
|
||||
// re-writes for plain (non-transformed) mirrors.
|
||||
make := func(label string) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{Object: map[string]interface{}{
|
||||
"apiVersion": "v1",
|
||||
"kind": "ConfigMap",
|
||||
"metadata": map[string]interface{}{
|
||||
"labels": map[string]interface{}{"app": label},
|
||||
},
|
||||
"data": map[string]interface{}{"k": "v"},
|
||||
}}
|
||||
}
|
||||
|
||||
h1, err := ComputeContentHash(make("v1"))
|
||||
require.NoError(t, err)
|
||||
h2, err := ComputeContentHash(make("v2"))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, h1, h2, "label changes must not re-hash without a transform annotation")
|
||||
}
|
||||
|
||||
@@ -15,6 +15,18 @@ import (
|
||||
"github.com/lukaszraczylo/kubemirror/pkg/constants"
|
||||
)
|
||||
|
||||
// maxConcurrentTemplateExecutions caps the number of in-flight template
|
||||
// executions across the process. text/template.Execute is not context-aware,
|
||||
// so when applyTemplateRule times out the executor goroutine continues to
|
||||
// run until the template returns on its own. This semaphore bounds the
|
||||
// damage from a pathological template (e.g. {{ range }} that never
|
||||
// terminates): once the cap is hit, applyTemplateRule fails fast instead
|
||||
// of leaking another runaway goroutine. The cap is intentionally generous
|
||||
// — normal workloads should never approach it.
|
||||
const maxConcurrentTemplateExecutions = 64
|
||||
|
||||
var templateExecSemaphore = make(chan struct{}, maxConcurrentTemplateExecutions)
|
||||
|
||||
// Transformer applies transformation rules to Kubernetes resources.
|
||||
type Transformer struct {
|
||||
options TransformOptions
|
||||
@@ -168,6 +180,19 @@ func (t *Transformer) applyTemplateRule(u *unstructured.Unstructured, rule Rule,
|
||||
return fmt.Errorf("failed to parse template: %w", err)
|
||||
}
|
||||
|
||||
// Acquire a slot in the global template-execution semaphore. If saturated,
|
||||
// fail fast rather than spawning yet another goroutine that may leak when
|
||||
// it times out (text/template is not context-aware so timed-out goroutines
|
||||
// continue running until the template returns).
|
||||
select {
|
||||
case templateExecSemaphore <- struct{}{}:
|
||||
defer func() { <-templateExecSemaphore }()
|
||||
default:
|
||||
return fmt.Errorf("template execution rejected: %d concurrent executions in flight, "+
|
||||
"likely indicates one or more runaway templates leaking goroutines",
|
||||
maxConcurrentTemplateExecutions)
|
||||
}
|
||||
|
||||
// Execute template with timeout
|
||||
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), t.options.TemplateTimeout)
|
||||
defer cancel()
|
||||
|
||||
@@ -734,6 +734,44 @@ func TestTransformer_TemplateTimeout(t *testing.T) {
|
||||
t.Skip("Template timeout testing is unreliable in unit tests - covered by integration tests")
|
||||
}
|
||||
|
||||
func TestTransformer_TemplateConcurrencyCap(t *testing.T) {
|
||||
// Regression (H3): text/template.Execute is not context-aware, so a
|
||||
// timed-out template execution leaves its goroutine running until the
|
||||
// template returns on its own. We bound that by a global semaphore;
|
||||
// when saturated, applyTemplateRule must fail fast instead of spawning
|
||||
// another goroutine.
|
||||
//
|
||||
// This test saturates the semaphore directly, then asserts the next
|
||||
// call returns the cap-exceeded error rather than blocking or panicking.
|
||||
for i := 0; i < maxConcurrentTemplateExecutions; i++ {
|
||||
templateExecSemaphore <- struct{}{}
|
||||
}
|
||||
defer func() {
|
||||
// Drain whatever the test left in the semaphore so subsequent tests
|
||||
// see a clean state.
|
||||
for {
|
||||
select {
|
||||
case <-templateExecSemaphore:
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
tmpl := "hello"
|
||||
tr := NewDefaultTransformer()
|
||||
rule := Rule{Path: "data.greeting", Template: &tmpl}
|
||||
u := &unstructured.Unstructured{Object: map[string]interface{}{
|
||||
"apiVersion": "v1",
|
||||
"kind": "ConfigMap",
|
||||
"data": map[string]interface{}{},
|
||||
}}
|
||||
|
||||
err := tr.applyTemplateRule(u, rule, TransformContext{})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "rejected", "saturated semaphore must reject new template executions")
|
||||
}
|
||||
|
||||
func TestMatchGlob(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
Reference in New Issue
Block a user