Reliabity improvements.

This commit is contained in:
2025-12-26 17:30:13 +00:00
parent ceff0ed67f
commit c8ebfe376b
9 changed files with 1550 additions and 210 deletions
+44 -1
View File
@@ -393,18 +393,61 @@ func GetSourceReference(mirror metav1.Object) (namespace, name, uid string, foun
// applyTransformations applies transformation rules from the source to the mirror.
// Returns the transformed mirror, or the original mirror if no rules are present.
func applyTransformations(source, mirror runtime.Object, targetNamespace string) (runtime.Object, error) {
// Get source annotations to check for transform rules
sourceObj, ok := source.(metav1.Object)
if !ok {
return mirror, nil
}
sourceAnnotations := sourceObj.GetAnnotations()
if sourceAnnotations == nil {
return mirror, nil
}
transformRules, hasTransform := sourceAnnotations[transformer.AnnotationTransform]
if !hasTransform || transformRules == "" {
return mirror, nil // No transformation rules
}
// Temporarily copy transform annotations to mirror for Transform to read
// The Transform function reads rules from the object being transformed
mirrorObj, ok := mirror.(metav1.Object)
if !ok {
return mirror, nil
}
mirrorAnnotations := mirrorObj.GetAnnotations()
if mirrorAnnotations == nil {
mirrorAnnotations = make(map[string]string)
}
// Copy transform annotations from source
mirrorAnnotations[transformer.AnnotationTransform] = transformRules
if strictMode, hasStrict := sourceAnnotations[transformer.AnnotationTransformStrict]; hasStrict {
mirrorAnnotations[transformer.AnnotationTransformStrict] = strictMode
}
mirrorObj.SetAnnotations(mirrorAnnotations)
// Build transformation context
ctx := buildTransformContext(source, mirror, targetNamespace)
// Create transformer with default options
t := transformer.NewDefaultTransformer()
// Apply transformations (transformer handles case of no rules gracefully)
// Apply transformations (transformer reads rules from mirror's annotations now)
transformed, err := t.Transform(mirror, ctx)
if err != nil {
return nil, err
}
// Remove transform annotations from result (they shouldn't persist on mirrors)
if transformedObj, ok := transformed.(metav1.Object); ok {
annotations := transformedObj.GetAnnotations()
delete(annotations, transformer.AnnotationTransform)
delete(annotations, transformer.AnnotationTransformStrict)
transformedObj.SetAnnotations(annotations)
}
return transformed, nil
}
+152
View File
@@ -0,0 +1,152 @@
package controller
import (
"context"
"github.com/lukaszraczylo/kubemirror/pkg/constants"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
// MirrorReconciler reconciles mirrored resources to detect and clean up orphans.
// This reconciler watches resources with the managed-by label and verifies their source still exists.
type MirrorReconciler struct {
client.Client
Scheme *runtime.Scheme
GVK schema.GroupVersionKind // The resource type this reconciler handles
}
// Reconcile checks if a mirrored resource's source still exists, and deletes the mirror if orphaned.
func (r *MirrorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// Fetch the mirror resource
mirror := &unstructured.Unstructured{}
gv := schema.GroupVersion{Group: r.GVK.Group, Version: r.GVK.Version}
mirror.SetGroupVersionKind(gv.WithKind(r.GVK.Kind))
if err := r.Get(ctx, req.NamespacedName, mirror); err != nil {
// Mirror already deleted or doesn't exist - nothing to do
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Extract annotations using unstructured helper methods
annotations := mirror.GetAnnotations()
if annotations == nil {
// No annotations - not a valid mirror, skip
return ctrl.Result{}, nil
}
// Extract source reference from annotations
sourceNs, hasSourceNs := annotations[constants.AnnotationSourceNamespace]
sourceName, hasSourceName := annotations[constants.AnnotationSourceName]
sourceUID, hasSourceUID := annotations[constants.AnnotationSourceUID]
if !hasSourceNs || !hasSourceName || !hasSourceUID {
// Missing source reference annotations - not a valid mirror or corrupted
logger.V(1).Info("mirror missing source reference annotations, skipping",
"namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
}
// Try to fetch the source resource
source := &unstructured.Unstructured{}
source.SetGroupVersionKind(gv.WithKind(r.GVK.Kind))
sourceKey := types.NamespacedName{
Namespace: sourceNs,
Name: sourceName,
}
err := r.Get(ctx, sourceKey, source)
if err != nil {
if client.IgnoreNotFound(err) == nil {
// Source not found - this is an orphaned mirror, delete it
logger.Info("orphaned mirror detected (source deleted), cleaning up",
"mirror", req.NamespacedName,
"sourceNamespace", sourceNs,
"sourceName", sourceName,
"sourceUID", sourceUID)
if err := r.Delete(ctx, mirror); err != nil {
logger.Error(err, "failed to delete orphaned mirror")
return ctrl.Result{}, err
}
logger.Info("orphaned mirror deleted successfully",
"mirror", req.NamespacedName,
"sourceNamespace", sourceNs,
"sourceName", sourceName)
return ctrl.Result{}, nil
}
// Some other error fetching source
logger.Error(err, "failed to fetch source resource for mirror",
"sourceNamespace", sourceNs, "sourceName", sourceName)
return ctrl.Result{}, err
}
// Source exists - verify UID matches
actualUID := string(source.GetUID())
if actualUID != sourceUID {
// Source was recreated with different UID - this is a stale mirror
logger.Info("stale mirror detected (source recreated with different UID), cleaning up",
"mirror", req.NamespacedName,
"sourceNamespace", sourceNs,
"sourceName", sourceName,
"expectedUID", sourceUID,
"actualUID", actualUID)
if err := r.Delete(ctx, mirror); err != nil {
logger.Error(err, "failed to delete stale mirror")
return ctrl.Result{}, err
}
logger.Info("stale mirror deleted successfully",
"mirror", req.NamespacedName,
"sourceNamespace", sourceNs,
"sourceName", sourceName)
return ctrl.Result{}, nil
}
// Source exists and UID matches - mirror is valid
logger.V(1).Info("mirror source verified",
"mirror", req.NamespacedName,
"sourceNamespace", sourceNs,
"sourceName", sourceName)
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *MirrorReconciler) SetupWithManager(mgr ctrl.Manager, gvk schema.GroupVersionKind) error {
// Create a predicate that only watches resources with the managed-by label
managedByPredicate := predicate.NewPredicateFuncs(func(obj client.Object) bool {
labels := obj.GetLabels()
if labels == nil {
return false
}
managedBy, exists := labels[constants.LabelManagedBy]
return exists && managedBy == "kubemirror"
})
// Convert GVK to resource object for watching
obj := &unstructured.Unstructured{}
gv := schema.GroupVersion{Group: gvk.Group, Version: gvk.Version}
obj.SetGroupVersionKind(gv.WithKind(gvk.Kind))
// Set custom controller name to avoid conflicts with source reconciler and multiple API versions
// Include group and version to make it truly unique
controllerName := gvk.Kind + "." + gvk.Version + "." + gvk.Group + "-mirror"
return ctrl.NewControllerManagedBy(mgr).
For(obj).
Named(controllerName).
WithEventFilter(managedByPredicate).
Complete(r)
}
+75 -57
View File
@@ -15,7 +15,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -138,25 +137,50 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
// Check if resource is enabled for mirroring
if !isEnabledForMirroring(sourceObj) {
// Silently skip - don't log as it would be too noisy
return r.handleDisabled(ctx, sourceObj)
// Check if resource is being deleted
if !sourceObj.GetDeletionTimestamp().IsZero() {
// Resource is being deleted - clean up mirrors and remove finalizer
if containsString(sourceObj.GetFinalizers(), constants.FinalizerName) {
logger.Info("source being deleted, cleaning up all mirrors")
if err := r.deleteAllMirrors(ctx, sourceObj); err != nil {
logger.Error(err, "failed to delete all mirrors during source deletion")
return ctrl.Result{}, err
}
// Remove finalizer to allow resource deletion
logger.Info("removing finalizer from source resource")
finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName)
sourceObj.SetFinalizers(finalizers)
if err := r.Update(ctx, source); err != nil {
logger.Error(err, "failed to remove finalizer")
return ctrl.Result{}, err
}
logger.Info("finalizer removed, resource can now be deleted")
}
return ctrl.Result{}, nil
}
// Handle deletion
if !sourceObj.GetDeletionTimestamp().IsZero() {
return r.handleDeletion(ctx, source, sourceObj)
if !isEnabledForMirroring(sourceObj) {
// Resource is disabled - remove finalizer if present and delete all mirrors
if containsString(sourceObj.GetFinalizers(), constants.FinalizerName) {
return r.handleDisabled(ctx, sourceObj)
}
// No finalizer, just skip
return ctrl.Result{}, nil
}
// Add finalizer if not present
// source (*unstructured.Unstructured) already implements client.Object
if !controllerutil.ContainsFinalizer(source, constants.FinalizerName) {
controllerutil.AddFinalizer(source, constants.FinalizerName)
if !containsString(sourceObj.GetFinalizers(), constants.FinalizerName) {
logger.Info("adding finalizer to source resource")
finalizers := append(sourceObj.GetFinalizers(), constants.FinalizerName)
sourceObj.SetFinalizers(finalizers)
if err := r.Update(ctx, source); err != nil {
logger.Error(err, "failed to add finalizer")
return ctrl.Result{}, err
}
logger.V(1).Info("added finalizer")
logger.Info("finalizer added")
// Requeue to continue with reconciliation after finalizer is added
return ctrl.Result{Requeue: true}, nil
}
// Get target namespaces
@@ -212,57 +236,32 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, nil
}
// handleDeletion removes finalizer after cleaning up all mirrors.
func (r *SourceReconciler) handleDeletion(ctx context.Context, source runtime.Object, sourceObj metav1.Object) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// source (*unstructured.Unstructured) already implements client.Object
sourceUnstructured := source.(*unstructured.Unstructured)
if !controllerutil.ContainsFinalizer(sourceUnstructured, constants.FinalizerName) {
return ctrl.Result{}, nil
}
// Delete all mirrors
if err := r.deleteAllMirrors(ctx, sourceObj); err != nil {
logger.Error(err, "failed to delete mirrors")
return ctrl.Result{}, err
}
// Remove finalizer
controllerutil.RemoveFinalizer(sourceUnstructured, constants.FinalizerName)
if err := r.Update(ctx, sourceUnstructured); err != nil {
logger.Error(err, "failed to remove finalizer")
return ctrl.Result{}, err
}
logger.Info("finalizer removed, mirrors deleted")
return ctrl.Result{}, nil
}
// handleDisabled removes mirrors when a resource is disabled.
func (r *SourceReconciler) handleDisabled(ctx context.Context, sourceObj metav1.Object) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// Source is already a client.Object (unstructured implements it)
sourceClient := sourceObj.(client.Object)
// Delete all mirrors for this disabled source
if err := r.deleteAllMirrors(ctx, sourceObj); err != nil {
logger.Error(err, "failed to delete mirrors for disabled resource")
return ctrl.Result{}, err
}
// If resource has finalizer, clean up mirrors and remove it
if controllerutil.ContainsFinalizer(sourceClient, constants.FinalizerName) {
if err := r.deleteAllMirrors(ctx, sourceObj); err != nil {
logger.Error(err, "failed to delete mirrors for disabled resource")
return ctrl.Result{}, err
}
// Remove finalizer if present
if containsString(sourceObj.GetFinalizers(), constants.FinalizerName) {
logger.Info("removing finalizer from disabled resource")
finalizers := removeString(sourceObj.GetFinalizers(), constants.FinalizerName)
sourceObj.SetFinalizers(finalizers)
// Remove finalizer
controllerutil.RemoveFinalizer(sourceClient, constants.FinalizerName)
if err := r.Update(ctx, sourceClient); err != nil {
// Get the unstructured object to update - sourceObj is already *unstructured.Unstructured
source := sourceObj.(*unstructured.Unstructured)
if err := r.Update(ctx, source); err != nil {
logger.Error(err, "failed to remove finalizer from disabled resource")
return ctrl.Result{}, err
}
logger.Info("mirrors deleted and finalizer removed for disabled resource")
logger.V(1).Info("finalizer removed from disabled resource")
}
logger.V(1).Info("mirrors deleted for disabled resource")
return ctrl.Result{}, nil
}
@@ -559,12 +558,10 @@ func (r *SourceReconciler) SetupWithManagerForResourceType(
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
// Create unique controller name including version to avoid collisions
// e.g., "HorizontalPodAutoscaler.v1.autoscaling"
controllerName := gvk.Kind + "." + gvk.Version
if gvk.Group != "" {
controllerName += "." + gvk.Group
}
// Create unique controller name including version and group to avoid collisions
// e.g., "HorizontalPodAutoscaler.v1.autoscaling" or "Secret.v1." (empty group for core resources)
// This matches the naming convention used by mirror reconcilers
controllerName := gvk.Kind + "." + gvk.Version + "." + gvk.Group
// Create mirror object for watching
mirrorObj := &unstructured.Unstructured{}
@@ -613,3 +610,24 @@ func (r *SourceReconciler) mapMirrorToSource(ctx context.Context, obj client.Obj
},
}
}
// containsString checks if a slice contains a string.
func containsString(slice []string, s string) bool {
for _, item := range slice {
if item == s {
return true
}
}
return false
}
// removeString removes a string from a slice.
func removeString(slice []string, s string) []string {
result := make([]string, 0, len(slice))
for _, item := range slice {
if item != s {
result = append(result, item)
}
}
return result
}
+39 -1
View File
@@ -3,6 +3,7 @@ package transformer
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"strings"
"text/template"
@@ -409,7 +410,20 @@ func setNestedField(obj map[string]interface{}, path []string, value interface{}
return fmt.Errorf("cannot set key %s on non-map %T", finalSegment, current)
}
currentMap[finalSegment] = value
// Special handling for Secret data fields
// Secrets require base64-encoded values in .data field
finalValue := value
if isSecretDataField(obj, path) {
// Convert value to string and base64-encode it
strValue, ok := value.(string)
if !ok {
// Try to convert to string
strValue = fmt.Sprintf("%v", value)
}
finalValue = base64Encode(strValue)
}
currentMap[finalSegment] = finalValue
return nil
}
@@ -512,3 +526,27 @@ func templateFuncs() template.FuncMap {
},
}
}
// isSecretDataField checks if the path points to a Secret's .data field.
func isSecretDataField(obj map[string]interface{}, path []string) bool {
// Check if this is a Secret by looking at apiVersion and kind
kind, hasKind := obj["kind"]
apiVersion, hasAPI := obj["apiVersion"]
if !hasKind || !hasAPI {
return false
}
// Check if it's a Secret (kind=Secret, apiVersion=v1)
if kind != "Secret" || apiVersion != "v1" {
return false
}
// Check if path starts with "data"
return len(path) >= 1 && path[0] == "data"
}
// base64Encode encodes a string to base64.
func base64Encode(s string) string {
return base64.StdEncoding.EncodeToString([]byte(s))
}