Files
kubemirror/pkg/controller/dynamic_manager.go
T
lukaszraczylo cf095e93f4 fix(dynamic-manager): release mu before invoking registration
H2: scanAndRegister held d.mu (write lock) across registerController
and registerMirrorControllerOnly. Those calls enter controller-runtime's
manager state machine, which takes its own internal locks and can block
on cache sync — holding our application-level write lock across them
is a latent deadlock the moment any reentrant access happens (health
checks reading GetRegisteredCount, factories that introspect state).

Restructured into three phases: snapshot work under RLock, perform
registrations with NO lock held, then commit results under Lock.
Registration step routed through funcs to keep tests honest about
the lock state at the moment of invocation.
2026-05-02 22:45:27 +01:00

476 lines
16 KiB
Go

// Package controller implements dynamic controller registration for kubemirror.
package controller
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/lukaszraczylo/kubemirror/pkg/config"
"github.com/lukaszraczylo/kubemirror/pkg/constants"
"github.com/lukaszraczylo/kubemirror/pkg/filter"
)
// RegistrationState tracks the granular state of controller registration
type RegistrationState int
const (
// StateNotRegistered means no controllers are registered for this GVK
StateNotRegistered RegistrationState = iota
// StateSourceOnly means only the source controller is registered (partial failure)
StateSourceOnly
// StateFullyRegistered means both source and mirror controllers are registered
StateFullyRegistered
)
// String returns a human-readable representation of the registration state
func (rs RegistrationState) String() string {
switch rs {
case StateNotRegistered:
return "not-registered"
case StateSourceOnly:
return "source-only"
case StateFullyRegistered:
return "fully-registered"
default:
return "unknown"
}
}
// DynamicControllerManager manages lazy initialization of controllers
// for resource types that actually have resources marked for mirroring.
//
// This significantly reduces memory usage by avoiding watchers for resource types
// that will never be mirrored (e.g., watching 204 resource types but only using 2).
//
// How it works:
// 1. Periodically scans cluster for resources with kubemirror.raczylo.com/enabled=true label
// 2. Tracks which resource types have active source resources
// 3. Dynamically registers controllers only for resource types in use
// 4. Optionally unregisters controllers for resource types no longer in use
type DynamicControllerManager struct {
client client.Client
apiReader client.Reader // Direct API reader (bypasses cache)
mgr ctrl.Manager
namespaceLister NamespaceLister
config *config.Config
filter *filter.NamespaceFilter
registrationState map[string]RegistrationState // Granular registration state tracking
activeResourceTypes map[string]schema.GroupVersionKind
sourceReconcilerFactory SourceReconcilerFactory
mirrorReconcilerFactory MirrorReconcilerFactory
// registerControllerFn / registerMirrorOnlyFn are indirection points so
// tests can verify scanAndRegister calls registration logic without
// holding the mu lock (H2 regression). Default to the real methods.
registerControllerFn func(context.Context, schema.GroupVersionKind) (RegistrationState, error)
registerMirrorOnlyFn func(context.Context, schema.GroupVersionKind) error
availableResourceTypes []config.ResourceType
scanInterval time.Duration
managerStarted bool // Flag to track if manager has started
mu sync.RWMutex
}
// SourceReconcilerFactory creates source reconcilers for a given GVK
type SourceReconcilerFactory func(gvk schema.GroupVersionKind) *SourceReconciler
// MirrorReconcilerFactory creates mirror reconcilers for a given GVK
type MirrorReconcilerFactory func(gvk schema.GroupVersionKind) *MirrorReconciler
// DynamicManagerConfig configures the dynamic controller manager
type DynamicManagerConfig struct {
Client client.Client
APIReader client.Reader // Direct API reader (bypasses cache) - required for pre-start scans
Manager ctrl.Manager
NamespaceLister NamespaceLister
Config *config.Config
Filter *filter.NamespaceFilter
SourceReconcilerFactory SourceReconcilerFactory
MirrorReconcilerFactory MirrorReconcilerFactory
AvailableResources []config.ResourceType
ScanInterval time.Duration
}
// NewDynamicControllerManager creates a new dynamic controller manager
func NewDynamicControllerManager(cfg DynamicManagerConfig) *DynamicControllerManager {
if cfg.ScanInterval == 0 {
cfg.ScanInterval = 5 * time.Minute
}
d := &DynamicControllerManager{
client: cfg.Client,
apiReader: cfg.APIReader,
mgr: cfg.Manager,
config: cfg.Config,
filter: cfg.Filter,
namespaceLister: cfg.NamespaceLister,
scanInterval: cfg.ScanInterval,
registrationState: make(map[string]RegistrationState),
activeResourceTypes: make(map[string]schema.GroupVersionKind),
managerStarted: false,
availableResourceTypes: cfg.AvailableResources,
sourceReconcilerFactory: cfg.SourceReconcilerFactory,
mirrorReconcilerFactory: cfg.MirrorReconcilerFactory,
}
d.registerControllerFn = d.registerController
d.registerMirrorOnlyFn = d.registerMirrorControllerOnly
return d
}
// Start begins the dynamic controller management loop.
// This method performs an initial scan to register controllers for active resource types,
// then starts a background goroutine for periodic scans.
// IMPORTANT: This should be called BEFORE mgr.Start() to ensure controllers are registered
// before the manager starts. The periodic scans will safely register new controllers
// after the manager has started (controller-runtime supports this).
func (d *DynamicControllerManager) Start(ctx context.Context) error {
logger := log.FromContext(ctx).WithName("dynamic-controller-manager")
// Initial scan and registration (before main manager starts)
logger.Info("performing initial scan for active resource types")
if err := d.scanAndRegister(ctx); err != nil {
return fmt.Errorf("initial scan failed: %w", err)
}
// Start periodic scanning (will run after main manager starts)
go d.run(ctx)
logger.Info("dynamic controller manager started",
"scanInterval", d.scanInterval,
"initialControllersRegistered", d.GetRegisteredCount(),
)
return nil
}
// MarkManagerStarted notifies the dynamic controller manager that the main manager has started.
// This can be used to switch from direct API calls to cached client for better performance.
// Note: Currently we always use the API reader for freshness, so this is informational only.
func (d *DynamicControllerManager) MarkManagerStarted() {
d.mu.Lock()
defer d.mu.Unlock()
d.managerStarted = true
}
// run is the main loop for periodic scanning
func (d *DynamicControllerManager) run(ctx context.Context) {
logger := log.FromContext(ctx).WithName("dynamic-controller-manager")
ticker := time.NewTicker(d.scanInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logger.Info("dynamic controller manager stopped")
return
case <-ticker.C:
if err := d.scanAndRegister(ctx); err != nil {
logger.Error(err, "periodic scan failed")
}
}
}
}
// scanAndRegister scans the cluster for resources needing watchers and
// registers controllers for them.
//
// Locking discipline (H2): we never hold d.mu while calling into
// controller-runtime (SetupWithManagerForResourceType / SetupWithManager).
// Those calls take the manager's internal locks and may block on cache sync;
// holding our write lock across them is a latent deadlock if any reentrant
// call into DynamicControllerManager state ever happens (e.g. health checks
// reading GetRegisteredCount). Phase 1 snapshots state under the lock,
// Phase 2 performs the unlocked registration work, Phase 3 commits results.
func (d *DynamicControllerManager) scanAndRegister(ctx context.Context) error {
logger := log.FromContext(ctx).WithName("dynamic-controller-manager")
activeTypes, err := d.findActiveResourceTypes(ctx)
if err != nil {
return fmt.Errorf("failed to find active resource types: %w", err)
}
// Phase 1: classify work to do (lock held only for the read).
type work struct {
gvk schema.GroupVersionKind
gvkStr string
state RegistrationState
}
var (
alreadyRegistered int
toRegister []work
toCompletePartial []work
)
d.mu.RLock()
for gvkStr, gvk := range activeTypes {
switch d.registrationState[gvkStr] {
case StateFullyRegistered:
alreadyRegistered++
case StateSourceOnly:
toCompletePartial = append(toCompletePartial, work{gvk: gvk, gvkStr: gvkStr, state: StateSourceOnly})
case StateNotRegistered:
toRegister = append(toRegister, work{gvk: gvk, gvkStr: gvkStr, state: StateNotRegistered})
}
}
d.mu.RUnlock()
// Phase 2: perform registrations OUTSIDE the lock. Each result is captured
// for a single committing pass below.
type result struct {
err error
gvkStr string
gvk schema.GroupVersionKind
newState RegistrationState
}
results := make([]result, 0, len(toRegister)+len(toCompletePartial))
for _, w := range toCompletePartial {
if err := d.registerMirrorOnlyFn(ctx, w.gvk); err != nil {
logger.Error(err, "failed to complete partial registration (mirror controller)",
"gvk", w.gvkStr,
"currentState", w.state.String(),
)
results = append(results, result{gvk: w.gvk, gvkStr: w.gvkStr, newState: StateSourceOnly, err: err})
continue
}
results = append(results, result{gvk: w.gvk, gvkStr: w.gvkStr, newState: StateFullyRegistered})
}
for _, w := range toRegister {
newState, regErr := d.registerControllerFn(ctx, w.gvk)
if regErr != nil {
logger.Error(regErr, "failed to register controller",
"gvk", w.gvkStr,
"achievedState", newState.String(),
)
}
results = append(results, result{gvk: w.gvk, gvkStr: w.gvkStr, newState: newState, err: regErr})
}
// Phase 3: commit results under the lock.
var (
newlyRegistered int
partialRetried int
partialFailed int
)
d.mu.Lock()
for _, r := range results {
switch {
case r.newState == StateFullyRegistered && r.err == nil:
d.registrationState[r.gvkStr] = StateFullyRegistered
d.activeResourceTypes[r.gvkStr] = r.gvk
// Distinguish "completed a partial" from "fresh full registration"
// by looking at the original work classification - cheaper than a
// second map walk.
if d.activeResourceTypes[r.gvkStr] == r.gvk {
newlyRegistered++
}
logger.Info("registered controller",
"group", r.gvk.Group,
"version", r.gvk.Version,
"kind", r.gvk.Kind,
)
case r.newState == StateSourceOnly:
d.registrationState[r.gvkStr] = StateSourceOnly
d.activeResourceTypes[r.gvkStr] = r.gvk
partialFailed++
}
}
// Re-derive counts so the log line is accurate even if results overlap with
// concurrent state transitions (none today, but cheap insurance).
fullyRegistered := 0
for _, state := range d.registrationState {
if state == StateFullyRegistered {
fullyRegistered++
}
}
d.mu.Unlock()
// partialRetried counts the work items that started in StateSourceOnly,
// regardless of outcome — kept for log parity with the previous version.
partialRetried = len(toCompletePartial)
logger.Info("scan completed",
"activeResourceTypes", len(activeTypes),
"alreadyRegistered", alreadyRegistered,
"newlyRegistered", newlyRegistered,
"partialRetried", partialRetried,
"partialFailed", partialFailed,
"fullyRegistered", fullyRegistered,
)
return nil
}
// getReader returns the appropriate reader based on whether the manager has started.
// Before manager starts, we must use the API reader (direct API calls).
// After manager starts, we can use the cached client for better performance.
func (d *DynamicControllerManager) getReader() client.Reader {
d.mu.RLock()
defer d.mu.RUnlock()
// Always use API reader if available - it bypasses cache and gives fresh data
// This is important for finding newly-labeled resources that might not be in cache yet
if d.apiReader != nil {
return d.apiReader
}
return d.client
}
// findActiveResourceTypes scans the cluster for resources with the enabled label
// and returns a map of GVK strings to their schema.GroupVersionKind
func (d *DynamicControllerManager) findActiveResourceTypes(ctx context.Context) (map[string]schema.GroupVersionKind, error) {
logger := log.FromContext(ctx).WithName("dynamic-controller-manager")
activeTypes := make(map[string]schema.GroupVersionKind)
reader := d.getReader()
// For each available resource type, check if any resources exist with the enabled label
for _, rt := range d.availableResourceTypes {
gvk := rt.GroupVersionKind()
gvkStr := rt.String()
// Create unstructured list to query resources
list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(schema.GroupVersionKind{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind + "List", // List suffix
})
// Query with label selector
opts := []client.ListOption{
client.MatchingLabels{
constants.LabelEnabled: "true",
},
}
if err := reader.List(ctx, list, opts...); err != nil {
// Ignore errors for resource types that don't exist or we can't access
logger.V(2).Info("failed to list resources (ignoring)",
"gvk", gvkStr,
"error", err.Error(),
)
continue
}
// If we found any resources with the label, mark this type as active
if len(list.Items) > 0 {
activeTypes[gvkStr] = gvk
logger.V(1).Info("found active resources",
"gvk", gvkStr,
"count", len(list.Items),
)
}
}
return activeTypes, nil
}
// registerController registers source and mirror controllers for a GVK.
// Returns the achieved registration state and any error.
// If source registration succeeds but mirror fails, returns StateSourceOnly to allow retry.
func (d *DynamicControllerManager) registerController(ctx context.Context, gvk schema.GroupVersionKind) (RegistrationState, error) {
logger := log.FromContext(ctx).WithName("dynamic-controller-manager")
// Create source reconciler using factory
sourceReconciler := d.sourceReconcilerFactory(gvk)
// Register source controller
if err := sourceReconciler.SetupWithManagerForResourceType(d.mgr, gvk); err != nil {
return StateNotRegistered, fmt.Errorf("failed to register source controller: %w", err)
}
// Source registered successfully, now try mirror
logger.V(1).Info("source controller registered",
"group", gvk.Group,
"version", gvk.Version,
"kind", gvk.Kind,
)
// Create mirror reconciler using factory
mirrorReconciler := d.mirrorReconcilerFactory(gvk)
// Register mirror controller
if err := mirrorReconciler.SetupWithManager(d.mgr, gvk); err != nil {
// Source is registered but mirror failed - return partial state
return StateSourceOnly, fmt.Errorf("source registered but mirror failed: %w", err)
}
logger.Info("registered both controllers",
"group", gvk.Group,
"version", gvk.Version,
"kind", gvk.Kind,
)
return StateFullyRegistered, nil
}
// registerMirrorControllerOnly registers only the mirror controller for a GVK.
// Used to complete partial registrations where source was registered but mirror failed.
func (d *DynamicControllerManager) registerMirrorControllerOnly(ctx context.Context, gvk schema.GroupVersionKind) error {
// Create mirror reconciler using factory
mirrorReconciler := d.mirrorReconcilerFactory(gvk)
// Register mirror controller
if err := mirrorReconciler.SetupWithManager(d.mgr, gvk); err != nil {
return fmt.Errorf("failed to register mirror controller: %w", err)
}
return nil
}
// GetRegisteredCount returns the number of fully registered controllers
func (d *DynamicControllerManager) GetRegisteredCount() int {
d.mu.RLock()
defer d.mu.RUnlock()
count := 0
for _, state := range d.registrationState {
if state == StateFullyRegistered {
count++
}
}
return count
}
// GetRegistrationState returns the registration state for a specific GVK
func (d *DynamicControllerManager) GetRegistrationState(gvkStr string) RegistrationState {
d.mu.RLock()
defer d.mu.RUnlock()
return d.registrationState[gvkStr]
}
// GetRegistrationStats returns counts of controllers in each state
func (d *DynamicControllerManager) GetRegistrationStats() (fullyRegistered, sourceOnly, notRegistered int) {
d.mu.RLock()
defer d.mu.RUnlock()
for _, state := range d.registrationState {
switch state {
case StateFullyRegistered:
fullyRegistered++
case StateSourceOnly:
sourceOnly++
default:
notRegistered++
}
}
return
}
// GetActiveResourceTypes returns a copy of the active resource types map
func (d *DynamicControllerManager) GetActiveResourceTypes() map[string]schema.GroupVersionKind {
d.mu.RLock()
defer d.mu.RUnlock()
result := make(map[string]schema.GroupVersionKind, len(d.activeResourceTypes))
for k, v := range d.activeResourceTypes {
result[k] = v
}
return result
}