mirror of
https://github.com/lukaszraczylo/kubemirror.git
synced 2026-06-10 23:09:14 +00:00
Add lazy watcher, improving resource usage; update website.
This commit is contained in:
+135
-37
@@ -7,10 +7,13 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log/zap"
|
||||
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
|
||||
@@ -47,6 +50,8 @@ func main() {
|
||||
rateLimitBurst int
|
||||
resyncPeriod time.Duration
|
||||
verifySourceFreshness bool
|
||||
lazyWatcherInit bool
|
||||
watcherScanInterval time.Duration
|
||||
)
|
||||
|
||||
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
|
||||
@@ -73,12 +78,18 @@ func main() {
|
||||
"QPS rate limit for API server requests.")
|
||||
flag.IntVar(&rateLimitBurst, "rate-limit-burst", 100,
|
||||
"Burst limit for API server requests.")
|
||||
flag.DurationVar(&resyncPeriod, "resync-period", 30*time.Second,
|
||||
flag.DurationVar(&resyncPeriod, "resync-period", 10*time.Minute,
|
||||
"Period for resyncing all resources (catches updates missed due to informer cache delays).")
|
||||
flag.BoolVar(&verifySourceFreshness, "verify-source-freshness", false,
|
||||
"Verify source resource freshness by comparing cache with direct API read. "+
|
||||
"Prevents mirroring stale data when cache lags behind watch events. "+
|
||||
"Trade-off: Extra API call when cache is stale.")
|
||||
flag.BoolVar(&lazyWatcherInit, "lazy-watcher-init", false,
|
||||
"Enable lazy watcher initialization - only create informers for resource types that have resources marked for mirroring. "+
|
||||
"Significantly reduces memory usage by avoiding watchers for unused resource types. "+
|
||||
"Recommended for production environments with many unused resource types.")
|
||||
flag.DurationVar(&watcherScanInterval, "watcher-scan-interval", 5*time.Minute,
|
||||
"Interval for scanning cluster to detect new resource types needing watchers (lazy-watcher-init mode only).")
|
||||
|
||||
opts := zap.Options{
|
||||
Development: true,
|
||||
@@ -150,7 +161,34 @@ func main() {
|
||||
|
||||
cfg.MirroredResourceTypes = mirroredResources
|
||||
|
||||
// Set up controller manager
|
||||
// Create cache transform function to strip unnecessary fields and reduce memory usage
|
||||
// This can reduce memory consumption by 50-70% by removing:
|
||||
// - managedFields (often several KB per resource)
|
||||
// - large annotations like kubectl.kubernetes.io/last-applied-configuration
|
||||
transformFunc := func(obj interface{}) (interface{}, error) {
|
||||
// Type assert to unstructured
|
||||
u, ok := obj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return obj, nil // Not unstructured, return as-is
|
||||
}
|
||||
|
||||
// Strip managedFields - can be several KB per resource
|
||||
u.SetManagedFields(nil)
|
||||
|
||||
// Strip large annotations that we don't need for reconciliation
|
||||
annotations := u.GetAnnotations()
|
||||
if annotations != nil {
|
||||
// Remove kubectl last-applied-configuration (can be very large)
|
||||
delete(annotations, "kubectl.kubernetes.io/last-applied-configuration")
|
||||
// Remove other large annotations we don't need
|
||||
delete(annotations, "deployment.kubernetes.io/revision")
|
||||
u.SetAnnotations(annotations)
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// Set up controller manager with cache configuration
|
||||
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
|
||||
Scheme: scheme,
|
||||
Metrics: metricsserver.Options{
|
||||
@@ -162,6 +200,12 @@ func main() {
|
||||
LeaseDuration: &cfg.LeaderElection.LeaseDuration,
|
||||
RenewDeadline: &cfg.LeaderElection.RenewDeadline,
|
||||
RetryPeriod: &cfg.LeaderElection.RetryPeriod,
|
||||
Cache: cache.Options{
|
||||
// Use the transform function to reduce memory usage
|
||||
DefaultTransform: transformFunc,
|
||||
// Increase the resync period to reduce memory churn
|
||||
SyncPeriod: &resyncPeriod,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
setupLog.Error(err, "unable to create manager")
|
||||
@@ -209,52 +253,106 @@ func main() {
|
||||
// Create namespace lister
|
||||
namespaceLister := controller.NewKubernetesNamespaceLister(mgr.GetClient())
|
||||
|
||||
// Dynamically register controllers for all discovered resource types
|
||||
// Create a separate reconciler instance for each resource type
|
||||
for _, rt := range cfg.MirroredResourceTypes {
|
||||
gvk := rt.GroupVersionKind()
|
||||
setupLog.Info("registering controller for resource type",
|
||||
"group", gvk.Group,
|
||||
"version", gvk.Version,
|
||||
"kind", gvk.Kind,
|
||||
// Choose between lazy watcher initialization (scan for active resources) or eager (register all)
|
||||
if lazyWatcherInit {
|
||||
setupLog.Info("using lazy watcher initialization",
|
||||
"availableResourceTypes", len(cfg.MirroredResourceTypes),
|
||||
"scanInterval", watcherScanInterval,
|
||||
)
|
||||
|
||||
// Create a source reconciler instance for this specific resource type
|
||||
sourceReconciler := &controller.SourceReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
Config: cfg,
|
||||
Filter: namespaceFilter,
|
||||
NamespaceLister: namespaceLister,
|
||||
GVK: gvk,
|
||||
APIReader: mgr.GetAPIReader(), // Direct API reader (bypasses cache)
|
||||
// Factory functions for creating reconcilers
|
||||
sourceFactory := func(gvk schema.GroupVersionKind) *controller.SourceReconciler {
|
||||
return &controller.SourceReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
Config: cfg,
|
||||
Filter: namespaceFilter,
|
||||
NamespaceLister: namespaceLister,
|
||||
GVK: gvk,
|
||||
APIReader: mgr.GetAPIReader(),
|
||||
}
|
||||
}
|
||||
|
||||
if err = sourceReconciler.SetupWithManagerForResourceType(mgr, gvk); err != nil {
|
||||
setupLog.Error(err, "unable to create source controller",
|
||||
"resourceType", rt.String(),
|
||||
)
|
||||
mirrorFactory := func(gvk schema.GroupVersionKind) *controller.MirrorReconciler {
|
||||
return &controller.MirrorReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
GVK: gvk,
|
||||
}
|
||||
}
|
||||
|
||||
// Create dynamic controller manager
|
||||
dynamicMgr := controller.NewDynamicControllerManager(controller.DynamicManagerConfig{
|
||||
Client: mgr.GetClient(),
|
||||
Manager: mgr,
|
||||
Config: cfg,
|
||||
Filter: namespaceFilter,
|
||||
NamespaceLister: namespaceLister,
|
||||
AvailableResources: cfg.MirroredResourceTypes,
|
||||
ScanInterval: watcherScanInterval,
|
||||
SourceReconcilerFactory: sourceFactory,
|
||||
MirrorReconcilerFactory: mirrorFactory,
|
||||
})
|
||||
|
||||
// Start dynamic controller manager
|
||||
if err := dynamicMgr.Start(signalCtx); err != nil {
|
||||
setupLog.Error(err, "unable to start dynamic controller manager")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Create a mirror reconciler instance for orphan detection
|
||||
// This watches mirrored resources (with managed-by label) and verifies their source still exists
|
||||
mirrorReconciler := &controller.MirrorReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
GVK: gvk,
|
||||
setupLog.Info("dynamic controller manager started - controllers will be registered on-demand")
|
||||
} else {
|
||||
setupLog.Info("using eager watcher initialization",
|
||||
"resourceTypes", len(cfg.MirroredResourceTypes),
|
||||
)
|
||||
|
||||
// Eager mode: Register controllers for all discovered resource types upfront
|
||||
// Create a separate reconciler instance for each resource type
|
||||
for _, rt := range cfg.MirroredResourceTypes {
|
||||
gvk := rt.GroupVersionKind()
|
||||
setupLog.Info("registering controller for resource type",
|
||||
"group", gvk.Group,
|
||||
"version", gvk.Version,
|
||||
"kind", gvk.Kind,
|
||||
)
|
||||
|
||||
// Create a source reconciler instance for this specific resource type
|
||||
sourceReconciler := &controller.SourceReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
Config: cfg,
|
||||
Filter: namespaceFilter,
|
||||
NamespaceLister: namespaceLister,
|
||||
GVK: gvk,
|
||||
APIReader: mgr.GetAPIReader(), // Direct API reader (bypasses cache)
|
||||
}
|
||||
|
||||
if err = sourceReconciler.SetupWithManagerForResourceType(mgr, gvk); err != nil {
|
||||
setupLog.Error(err, "unable to create source controller",
|
||||
"resourceType", rt.String(),
|
||||
)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Create a mirror reconciler instance for orphan detection
|
||||
// This watches mirrored resources (with managed-by label) and verifies their source still exists
|
||||
mirrorReconciler := &controller.MirrorReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
GVK: gvk,
|
||||
}
|
||||
|
||||
if err = mirrorReconciler.SetupWithManager(mgr, gvk); err != nil {
|
||||
setupLog.Error(err, "unable to create mirror controller",
|
||||
"resourceType", rt.String(),
|
||||
)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
if err = mirrorReconciler.SetupWithManager(mgr, gvk); err != nil {
|
||||
setupLog.Error(err, "unable to create mirror controller",
|
||||
"resourceType", rt.String(),
|
||||
)
|
||||
os.Exit(1)
|
||||
}
|
||||
setupLog.Info("registered source and mirror controllers", "count", len(cfg.MirroredResourceTypes))
|
||||
}
|
||||
|
||||
setupLog.Info("registered source and mirror controllers", "count", len(cfg.MirroredResourceTypes))
|
||||
|
||||
// Register namespace reconciler to watch for new namespaces and label changes
|
||||
namespaceReconciler := &controller.NamespaceReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
|
||||
Reference in New Issue
Block a user