mirror of
https://github.com/lukaszraczylo/kubemirror.git
synced 2026-06-09 23:03:49 +00:00
CRD discovery, log noise reduction, e2e tests
This commit is contained in:
@@ -36,6 +36,7 @@ type SourceReconciler struct {
|
||||
Filter *filter.NamespaceFilter
|
||||
NamespaceLister NamespaceLister
|
||||
GVK schema.GroupVersionKind // The resource type this reconciler handles
|
||||
APIReader client.Reader // Direct API reader (bypasses cache)
|
||||
}
|
||||
|
||||
// NamespaceLister provides a list of all namespaces in the cluster.
|
||||
@@ -43,20 +44,83 @@ type SourceReconciler struct {
|
||||
type NamespaceLister interface {
|
||||
ListNamespaces(ctx context.Context) ([]string, error)
|
||||
ListAllowMirrorsNamespaces(ctx context.Context) ([]string, error)
|
||||
ListOptOutNamespaces(ctx context.Context) ([]string, error)
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=get;list;watch
|
||||
|
||||
// getSourceWithFreshness fetches a source resource with optional freshness verification.
|
||||
// This implements a hybrid caching strategy:
|
||||
// 1. First read from informer cache (fast, local)
|
||||
// 2. If VerifySourceFreshness is enabled, make direct API call via APIReader
|
||||
// 3. If resourceVersions differ, cache is stale - return fresh version from API
|
||||
// 4. If resourceVersions match, cache is current - return cached version
|
||||
//
|
||||
// This prevents the race condition where:
|
||||
// - Watch event arrives: "Secret changed!"
|
||||
// - Reconciliation starts immediately
|
||||
// - Cache hasn't updated yet (5-20 second lag)
|
||||
// - We read stale data and mirror it
|
||||
//
|
||||
// Trade-off: 2x API calls when cache is stale, but guarantees data freshness.
|
||||
func (r *SourceReconciler) getSourceWithFreshness(ctx context.Context, key client.ObjectKey, gvk schema.GroupVersionKind) (*unstructured.Unstructured, error) {
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
// First try: Read from cache (fast)
|
||||
cached := &unstructured.Unstructured{}
|
||||
cached.SetGroupVersionKind(gvk)
|
||||
if err := r.Get(ctx, key, cached); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If freshness verification is disabled, return cached version immediately
|
||||
if !r.Config.VerifySourceFreshness {
|
||||
logger.V(2).Info("using cached source (freshness check disabled)", "resourceVersion", cached.GetResourceVersion())
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
// If APIReader is not available (e.g., in tests), fall back to cached version
|
||||
if r.APIReader == nil {
|
||||
logger.V(2).Info("using cached source (no APIReader available)", "resourceVersion", cached.GetResourceVersion())
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
cachedRV := cached.GetResourceVersion()
|
||||
|
||||
// Second try: Direct API read to verify freshness (bypasses cache)
|
||||
fresh := &unstructured.Unstructured{}
|
||||
fresh.SetGroupVersionKind(gvk)
|
||||
if err := r.APIReader.Get(ctx, key, fresh); err != nil {
|
||||
// If direct API read fails, fall back to cached version
|
||||
logger.V(1).Info("direct API read failed, using cached version", "error", err, "cachedRV", cachedRV)
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
freshRV := fresh.GetResourceVersion()
|
||||
|
||||
// Compare resource versions
|
||||
if cachedRV != freshRV {
|
||||
// Cache is stale - return fresh version from API
|
||||
logger.V(1).Info("cache stale, using fresh API version",
|
||||
"cachedRV", cachedRV,
|
||||
"freshRV", freshRV)
|
||||
return fresh, nil
|
||||
}
|
||||
|
||||
// Cache is current - return cached version (saves memory allocation)
|
||||
logger.V(2).Info("cache current", "resourceVersion", cachedRV)
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
// Reconcile processes a single source resource.
|
||||
func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name)
|
||||
|
||||
// Fetch the source resource as unstructured (works for all resource types)
|
||||
source := &unstructured.Unstructured{}
|
||||
source.SetGroupVersionKind(r.GVK) // Set the GVK so the client knows what to fetch
|
||||
if err := r.Get(ctx, req.NamespacedName, source); err != nil {
|
||||
// Fetch the source resource with optional freshness verification
|
||||
source, err := r.getSourceWithFreshness(ctx, req.NamespacedName, r.GVK)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
// Resource deleted - nothing to do
|
||||
return ctrl.Result{}, nil
|
||||
@@ -216,10 +280,24 @@ func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.O
|
||||
return fmt.Errorf("failed to get existing mirror: %w", err)
|
||||
}
|
||||
|
||||
// If freshness verification is enabled and mirror exists, verify it's fresh too
|
||||
if err == nil && r.Config.VerifySourceFreshness && r.APIReader != nil {
|
||||
fresh := &unstructured.Unstructured{}
|
||||
fresh.SetGroupVersionKind(sourceUnstructured.GroupVersionKind())
|
||||
if apiErr := r.APIReader.Get(ctx, client.ObjectKey{Namespace: targetNs, Name: sourceObj.GetName()}, fresh); apiErr == nil {
|
||||
if fresh.GetResourceVersion() != existing.GetResourceVersion() {
|
||||
logger.V(2).Info("mirror cache stale, using fresh API version",
|
||||
"cachedRV", existing.GetResourceVersion(),
|
||||
"freshRV", fresh.GetResourceVersion())
|
||||
existing = fresh
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
// Mirror exists - check if it's managed by us
|
||||
if !IsManagedByUs(existing) {
|
||||
logger.Info("target resource exists but not managed by kubemirror, skipping")
|
||||
logger.V(1).Info("target resource exists but not managed by kubemirror, skipping")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -230,7 +308,7 @@ func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.O
|
||||
}
|
||||
|
||||
if !needsSync {
|
||||
logger.V(1).Info("mirror is up to date")
|
||||
logger.V(2).Info("mirror is up to date")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -243,7 +321,7 @@ func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.O
|
||||
return fmt.Errorf("failed to update mirror in cluster: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("mirror updated")
|
||||
logger.V(1).Info("mirror updated")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -257,7 +335,7 @@ func (r *SourceReconciler) reconcileMirror(ctx context.Context, source runtime.O
|
||||
return fmt.Errorf("failed to create mirror in cluster: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("mirror created")
|
||||
logger.V(1).Info("mirror created")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -407,11 +485,18 @@ func (r *SourceReconciler) resolveTargetNamespaces(ctx context.Context, sourceOb
|
||||
return nil, fmt.Errorf("failed to list allow-mirrors namespaces: %w", err)
|
||||
}
|
||||
|
||||
// Get namespaces that have explicitly opted out (allow-mirrors="false")
|
||||
optOutNamespaces, err := r.NamespaceLister.ListOptOutNamespaces(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list opt-out namespaces: %w", err)
|
||||
}
|
||||
|
||||
// Resolve target namespaces
|
||||
targetNamespaces := filter.ResolveTargetNamespaces(
|
||||
patterns,
|
||||
allNamespaces,
|
||||
allowMirrorsNamespaces,
|
||||
optOutNamespaces,
|
||||
sourceObj.GetNamespace(),
|
||||
r.Filter,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user