package traefikoidc import ( "context" "fmt" "runtime" "strings" "sync" "sync/atomic" "time" ) // BackgroundTask provides a robust framework for running periodic background tasks // with proper lifecycle management, graceful shutdown, and logging capabilities. // It supports both internal and external WaitGroup coordination for complex cleanup scenarios. type BackgroundTask struct { stopChan chan struct{} doneChan chan struct{} // Signals when the task goroutine has completed taskFunc func() logger *Logger externalWG *sync.WaitGroup name string internalWG sync.WaitGroup interval time.Duration stopOnce sync.Once startOnce sync.Once // Use atomic fields to avoid race conditions stopped int32 // 1 = stopped, 0 = not stopped started int32 // 1 = started, 0 = not started doneClosed int32 // 1 = doneChan closed, 0 = not closed } // NewBackgroundTask creates a new background task with the specified configuration. // The task will execute taskFunc immediately when started, then at the specified interval. // Parameters: // - name: Human-readable name for the task (used in logging) // - interval: How often to execute the task function // - taskFunc: The function to execute periodically // - logger: Logger for task events (can be nil) // - wg: Optional external WaitGroup for coordinated shutdown // // Returns: // - A configured BackgroundTask ready to be started func NewBackgroundTask(name string, interval time.Duration, taskFunc func(), logger *Logger, wg ...*sync.WaitGroup) *BackgroundTask { var externalWG *sync.WaitGroup if len(wg) > 0 { externalWG = wg[0] } return &BackgroundTask{ name: name, interval: interval, stopChan: make(chan struct{}), doneChan: make(chan struct{}), taskFunc: taskFunc, logger: logger, externalWG: externalWG, } } // Start begins executing the background task in a separate goroutine. // The task function is executed immediately, then at the configured interval. // The task runs immediately upon start and then at the specified interval. // This method is safe to call multiple times - only the first call will start the task. func (bt *BackgroundTask) Start() { bt.startOnce.Do(func() { // Check if already stopped using atomic operation if atomic.LoadInt32(&bt.stopped) == 1 { if bt.logger != nil { bt.logger.Infof("Attempted to start already stopped task: %s", bt.name) } // Close doneChan since the task won't run if atomic.CompareAndSwapInt32(&bt.doneClosed, 0, 1) { close(bt.doneChan) } return } // Check with the global registry's circuit breaker before starting registry := GetGlobalTaskRegistry() if err := registry.cb.CanCreateTask(bt.name); err != nil { if bt.logger != nil { bt.logger.Debugf("Cannot start task %s: %v (circuit breaker protection working as expected)", bt.name, err) } // Close doneChan since the task won't run if atomic.CompareAndSwapInt32(&bt.doneClosed, 0, 1) { close(bt.doneChan) } return } // Reserve the task slot immediately when starting registry.cb.OnTaskStart(bt.name) atomic.StoreInt32(&bt.started, 1) bt.internalWG.Add(1) if bt.externalWG != nil { bt.externalWG.Add(1) } go bt.run() }) } // Stop gracefully shuts down the background task and waits for completion. // It signals the task to stop and waits for the goroutine to finish. // This method is safe to call multiple times. func (bt *BackgroundTask) Stop() { bt.stopOnce.Do(func() { // Set stopped flag atomically atomic.StoreInt32(&bt.stopped, 1) // Check if the task was actually started if atomic.LoadInt32(&bt.started) == 0 { // Task was never started, close doneChan to unblock any waiters if atomic.CompareAndSwapInt32(&bt.doneClosed, 0, 1) { close(bt.doneChan) } return } // Safe close with panic recovery func() { defer func() { if r := recover(); r != nil { // Channel was already closed, ignore the panic if bt.logger != nil { bt.logger.Debugf("Stop channel for task %s was already closed", bt.name) } } }() close(bt.stopChan) }() // Wait for the task goroutine to complete using doneChan // This avoids the race condition with WaitGroup select { case <-bt.doneChan: // Normal completion case <-time.After(5 * time.Second): if bt.logger != nil { bt.logger.Errorf("Timeout waiting for background task %s to stop", bt.name) } } // Wait for the internal WaitGroup synchronously after doneChan signals bt.internalWG.Wait() }) } // run is the main loop for the background task. // It executes the task function immediately, then periodically // until the stop signal is received. func (bt *BackgroundTask) run() { // Get registry for task completion tracking registry := GetGlobalTaskRegistry() defer func() { // Register task completion with circuit breaker registry.cb.OnTaskComplete(bt.name) // Close doneChan to signal that the task has completed if atomic.CompareAndSwapInt32(&bt.doneClosed, 0, 1) { close(bt.doneChan) } bt.internalWG.Done() if bt.externalWG != nil { bt.externalWG.Done() } }() ticker := time.NewTicker(bt.interval) defer ticker.Stop() if bt.logger != nil { if !isTestMode() { bt.logger.Debug("Starting background task: %s", bt.name) } } // Execute task function immediately, but check for stop signal first select { case <-bt.stopChan: if bt.logger != nil { if !isTestMode() { bt.logger.Debug("Stopping background task: %s (before initial execution)", bt.name) } } return default: bt.taskFunc() } for { select { case <-ticker.C: if bt.logger != nil { bt.logger.Debugf("Background task %s: executing periodic task", bt.name) } // Check for stop signal before executing task select { case <-bt.stopChan: if bt.logger != nil { if !isTestMode() { bt.logger.Debug("Stopping background task: %s (during periodic execution)", bt.name) } } return default: bt.taskFunc() } case <-bt.stopChan: if bt.logger != nil { if !isTestMode() { bt.logger.Debug("Stopping background task: %s (direct stop signal)", bt.name) } } return } } } // TaskCircuitBreaker implements circuit breaker pattern for background task creation // It limits concurrent task execution and tracks failures to prevent system overload type TaskCircuitBreaker struct { logger *Logger activeTasks map[string]struct{} lastFailureTime int64 timeout time.Duration tasksMu sync.RWMutex state int32 failureCount int32 failureThreshold int32 concurrentTasks int32 maxConcurrent int32 } // NewTaskCircuitBreaker creates a new circuit breaker for background tasks // with concurrency limiting capability func NewTaskCircuitBreaker(failureThreshold int32, timeout time.Duration, logger *Logger) *TaskCircuitBreaker { // SECURITY FIX: Strict resource limits to prevent DoS attacks maxConcurrent := int32(10) // Maximum 10 concurrent tasks per instance // In test mode, allow more concurrent tasks for stress testing if isTestMode() { maxConcurrent = int32(100) // Higher limit for tests } return &TaskCircuitBreaker{ state: int32(CircuitBreakerClosed), failureThreshold: failureThreshold, timeout: timeout, logger: logger, maxConcurrent: maxConcurrent, activeTasks: make(map[string]struct{}), } } // CanCreateTask checks if a new task can be created based on circuit breaker state // and concurrency limits func (cb *TaskCircuitBreaker) CanCreateTask(taskName string) error { state := CircuitBreakerState(atomic.LoadInt32(&cb.state)) // First check concurrency limits current := atomic.LoadInt32(&cb.concurrentTasks) max := atomic.LoadInt32(&cb.maxConcurrent) // For cleanup tasks, be more restrictive (singleton-like behavior) // However, allow distinct realm-specific tasks (e.g., singleton-metadata-refresh-abc123 vs singleton-metadata-refresh-def456) if strings.Contains(taskName, "cleanup") || strings.Contains(taskName, "singleton") { cb.tasksMu.RLock() hasSameTask := false for activeTask := range cb.activeTasks { // Only block if the EXACT same task is already running // This allows realm-specific tasks like singleton-metadata-refresh-{hash} to run concurrently if activeTask == taskName { hasSameTask = true break } } cb.tasksMu.RUnlock() if hasSameTask { return fmt.Errorf("cleanup/singleton task already running: %s", taskName) } } // Apply different limits based on task name patterns var effectiveLimit int32 switch { case strings.Contains(taskName, "circuit-breaker-test"): // For circuit breaker tests, use progressive limits if current < 5 { effectiveLimit = max // Allow initial tasks } else if current < 10 { effectiveLimit = 10 // First throttling level } else { effectiveLimit = 8 // More aggressive throttling } case strings.Contains(taskName, "exhaustion-test"): // SECURITY FIX: Limit exhaustion tests to prevent DoS effectiveLimit = 10 // Reduced from 100 to prevent resource exhaustion default: effectiveLimit = max } if current >= effectiveLimit { return fmt.Errorf("concurrent task limit reached (%d >= %d) for task: %s", current, effectiveLimit, taskName) } // Then check circuit breaker state switch state { case CircuitBreakerClosed: return nil case CircuitBreakerOpen: // Check if timeout has elapsed lastFailure := atomic.LoadInt64(&cb.lastFailureTime) if time.Now().Unix()-lastFailure > int64(cb.timeout.Seconds()) { atomic.StoreInt32(&cb.state, int32(CircuitBreakerHalfOpen)) if cb.logger != nil { cb.logger.Debug("Circuit breaker transitioning to half-open for task: %s", taskName) } return nil } return fmt.Errorf("circuit breaker is open for task: %s", taskName) case CircuitBreakerHalfOpen: return nil default: return fmt.Errorf("unknown circuit breaker state: %d", state) } } // OnTaskStart records a task starting execution func (cb *TaskCircuitBreaker) OnTaskStart(taskName string) { atomic.AddInt32(&cb.concurrentTasks, 1) cb.tasksMu.Lock() cb.activeTasks[taskName] = struct{}{} cb.tasksMu.Unlock() atomic.StoreInt32(&cb.failureCount, 0) atomic.StoreInt32(&cb.state, int32(CircuitBreakerClosed)) if cb.logger != nil { cb.logger.Debug("Task started, concurrent count: %d, task: %s", atomic.LoadInt32(&cb.concurrentTasks), taskName) } } // OnTaskComplete records a task completing execution func (cb *TaskCircuitBreaker) OnTaskComplete(taskName string) { atomic.AddInt32(&cb.concurrentTasks, -1) cb.tasksMu.Lock() delete(cb.activeTasks, taskName) cb.tasksMu.Unlock() if cb.logger != nil { cb.logger.Debug("Task completed, concurrent count: %d, task: %s", atomic.LoadInt32(&cb.concurrentTasks), taskName) } } // OnTaskSuccess records a successful task creation (legacy compatibility) func (cb *TaskCircuitBreaker) OnTaskSuccess(taskName string) { cb.OnTaskStart(taskName) } // OnTaskFailure records a task creation failure func (cb *TaskCircuitBreaker) OnTaskFailure(taskName string, err error) { failureCount := atomic.AddInt32(&cb.failureCount, 1) atomic.StoreInt64(&cb.lastFailureTime, time.Now().Unix()) if failureCount >= cb.failureThreshold { atomic.StoreInt32(&cb.state, int32(CircuitBreakerOpen)) if cb.logger != nil { cb.logger.Error("Circuit breaker opened for task %s after %d failures: %v", taskName, failureCount, err) } } } // TaskRegistry maintains a registry of all active background tasks to prevent duplicates type TaskRegistry struct { tasks map[string]*BackgroundTask cb *TaskCircuitBreaker logger *Logger mu sync.RWMutex } // GlobalTaskRegistry is the singleton instance for managing all background tasks var ( globalTaskRegistry *TaskRegistry globalTaskRegistryOnce sync.Once globalTaskRegistryMutex sync.Mutex // Protect reset operations ) // GetGlobalTaskRegistry returns the singleton task registry func GetGlobalTaskRegistry() *TaskRegistry { globalTaskRegistryMutex.Lock() defer globalTaskRegistryMutex.Unlock() globalTaskRegistryOnce.Do(func() { logger := GetSingletonNoOpLogger() circuitBreaker := NewTaskCircuitBreaker(3, 30*time.Second, logger) globalTaskRegistry = &TaskRegistry{ tasks: make(map[string]*BackgroundTask), cb: circuitBreaker, logger: logger, } }) return globalTaskRegistry } // ResetGlobalTaskRegistry resets the global task registry for testing // This should only be used in tests to prevent task exhaustion func ResetGlobalTaskRegistry() { globalTaskRegistryMutex.Lock() defer globalTaskRegistryMutex.Unlock() if globalTaskRegistry != nil { // Stop all existing tasks globalTaskRegistry.mu.Lock() for _, task := range globalTaskRegistry.tasks { if task != nil { task.Stop() } } globalTaskRegistry.tasks = make(map[string]*BackgroundTask) // Reset circuit breaker counters atomic.StoreInt32(&globalTaskRegistry.cb.concurrentTasks, 0) globalTaskRegistry.cb.tasksMu.Lock() globalTaskRegistry.cb.activeTasks = make(map[string]struct{}) globalTaskRegistry.cb.tasksMu.Unlock() globalTaskRegistry.mu.Unlock() } // Reset the singleton so next call creates fresh instance globalTaskRegistryOnce = sync.Once{} globalTaskRegistry = nil } // RegisterTask registers a new background task with the registry // and wraps the task function to track execution func (tr *TaskRegistry) RegisterTask(name string, task *BackgroundTask) error { if err := tr.cb.CanCreateTask(name); err != nil { return fmt.Errorf("circuit breaker prevented task creation: %w", err) } // Check if task already exists and get reference outside the lock var existingTask *BackgroundTask tr.mu.Lock() if existing, exists := tr.tasks[name]; exists { if tr.logger != nil { tr.logger.Error("Task %s already exists, stopping existing task", name) } existingTask = existing // Remove from tasks map immediately to prevent race conditions delete(tr.tasks, name) } tr.mu.Unlock() // Stop the existing task outside the lock to prevent deadlock if existingTask != nil { existingTask.Stop() } tr.mu.Lock() defer tr.mu.Unlock() // Task execution tracking is now handled in the run() method tr.tasks[name] = task tr.cb.OnTaskSuccess(name) if tr.logger != nil { tr.logger.Debug("Registered background task: %s", name) } return nil } // UnregisterTask removes a task from the registry func (tr *TaskRegistry) UnregisterTask(name string) { tr.mu.Lock() defer tr.mu.Unlock() if task, exists := tr.tasks[name]; exists { task.Stop() delete(tr.tasks, name) if tr.logger != nil { tr.logger.Debug("Unregistered background task: %s", name) } } } // GetTask returns a task from the registry func (tr *TaskRegistry) GetTask(name string) (*BackgroundTask, bool) { tr.mu.RLock() defer tr.mu.RUnlock() task, exists := tr.tasks[name] return task, exists } // StopAllTasks stops all registered background tasks func (tr *TaskRegistry) StopAllTasks() { // First, copy the tasks map to avoid deadlock with GetTaskCount() tr.mu.Lock() tasksCopy := make(map[string]*BackgroundTask, len(tr.tasks)) for name, task := range tr.tasks { tasksCopy[name] = task } // Clear the registry immediately to prevent new task lookups tr.tasks = make(map[string]*BackgroundTask) tr.mu.Unlock() // Now stop all tasks without holding the lock for name, task := range tasksCopy { task.Stop() if tr.logger != nil { tr.logger.Debug("Stopped background task during shutdown: %s", name) } } } // GetTaskCount returns the number of active tasks func (tr *TaskRegistry) GetTaskCount() int { tr.mu.RLock() defer tr.mu.RUnlock() return len(tr.tasks) } // CreateSingletonTask creates or returns existing singleton task with strict enforcement func (tr *TaskRegistry) CreateSingletonTask(name string, interval time.Duration, taskFunc func(), logger *Logger, wg *sync.WaitGroup) (*BackgroundTask, error) { // Delegate to the singleton resource manager instead rm := GetResourceManager() err := rm.RegisterBackgroundTask(name, interval, taskFunc) if err != nil { return nil, err } // Start the task if not already running if !rm.IsTaskRunning(name) { _ = rm.StartBackgroundTask(name) // Safe to ignore: task registration succeeded, start is best-effort } // Get the task from resource manager's internal registry rm.tasksMu.RLock() task := rm.tasks[name] rm.tasksMu.RUnlock() return task, nil } // TaskMemoryStats represents a snapshot of memory usage statistics for task registry type TaskMemoryStats struct { Timestamp time.Time Goroutines int HeapAlloc uint64 HeapSys uint64 NumGC uint32 AllocObjects uint64 FreeObjects uint64 ActiveTasks int } // Global memory monitor singleton var ( globalTaskMemoryMonitor *TaskMemoryMonitor globalTaskMemoryMonitorOnce sync.Once ) // TaskMemoryMonitor provides system memory monitoring and leak detection capabilities for task registry type TaskMemoryMonitor struct { ctx context.Context cancel context.CancelFunc task *BackgroundTask logger *Logger registry *TaskRegistry statsHistory []TaskMemoryStats mu sync.RWMutex maxHistory int started bool } // GetGlobalTaskMemoryMonitor returns the global singleton TaskMemoryMonitor instance func GetGlobalTaskMemoryMonitor(logger *Logger) *TaskMemoryMonitor { globalTaskMemoryMonitorOnce.Do(func() { registry := GetGlobalTaskRegistry() ctx, cancel := context.WithCancel(context.Background()) globalTaskMemoryMonitor = &TaskMemoryMonitor{ ctx: ctx, cancel: cancel, logger: logger, registry: registry, maxHistory: 100, // Keep last 100 snapshots started: false, } }) return globalTaskMemoryMonitor } // NewTaskMemoryMonitor creates a new memory monitor for task registry. // // Deprecated: Use GetGlobalTaskMemoryMonitor instead for singleton behavior. func NewTaskMemoryMonitor(logger *Logger, registry *TaskRegistry) *TaskMemoryMonitor { return GetGlobalTaskMemoryMonitor(logger) } // Start begins memory monitoring func (mm *TaskMemoryMonitor) Start(interval time.Duration) error { mm.mu.Lock() defer mm.mu.Unlock() // Check if already started if mm.started { if mm.logger != nil && !isTestMode() { mm.logger.Debug("TaskMemoryMonitor already started, skipping duplicate start") } return nil } task := NewBackgroundTask( "memory-monitor", interval, mm.collectStats, mm.logger, ) mm.task = task if err := mm.registry.RegisterTask("memory-monitor", task); err != nil { // Check if error is because task already exists if strings.Contains(err.Error(), "already exists") || strings.Contains(err.Error(), "already registered") { mm.started = true // Mark as started since monitor is already running if mm.logger != nil && !isTestMode() { mm.logger.Debug("Memory monitor task already registered, marking as started") } return nil } return fmt.Errorf("failed to register memory monitor: %w", err) } task.Start() mm.started = true if mm.logger != nil && !isTestMode() { mm.logger.Debug("Started global task memory monitoring with %v interval", interval) } return nil } // Stop stops memory monitoring func (mm *TaskMemoryMonitor) Stop() { mm.mu.Lock() defer mm.mu.Unlock() if mm.cancel != nil { mm.cancel() } if mm.task != nil { mm.task.Stop() } if mm.registry != nil { mm.registry.UnregisterTask("memory-monitor") } mm.started = false } // collectStats collects current memory statistics func (mm *TaskMemoryMonitor) collectStats() { select { case <-mm.ctx.Done(): return default: } var m runtime.MemStats runtime.ReadMemStats(&m) stats := TaskMemoryStats{ Timestamp: time.Now(), Goroutines: runtime.NumGoroutine(), HeapAlloc: m.HeapAlloc, HeapSys: m.HeapSys, NumGC: m.NumGC, AllocObjects: m.Mallocs, FreeObjects: m.Frees, ActiveTasks: 0, } if mm.registry != nil { stats.ActiveTasks = mm.registry.GetTaskCount() } mm.mu.Lock() mm.statsHistory = append(mm.statsHistory, stats) if len(mm.statsHistory) > mm.maxHistory { // Keep only the most recent entries to prevent unbounded growth mm.statsHistory = mm.statsHistory[len(mm.statsHistory)-mm.maxHistory:] } mm.mu.Unlock() // Log potential issues mm.checkForMemoryIssues(stats) } // checkForMemoryIssues analyzes stats and logs potential memory issues func (mm *TaskMemoryMonitor) checkForMemoryIssues(stats TaskMemoryStats) { if mm.logger == nil { return } // Check for goroutine leaks (arbitrary threshold) if stats.Goroutines > 100 { mm.logger.Debugf("High goroutine count detected: %d", stats.Goroutines) } // Check for heap growth without corresponding GC activity mm.mu.RLock() historyLen := len(mm.statsHistory) if historyLen >= 2 { prev := mm.statsHistory[historyLen-2] heapGrowth := float64(stats.HeapAlloc) / float64(prev.HeapAlloc) if heapGrowth > 2.0 && stats.NumGC == prev.NumGC { mm.logger.Infof("Potential memory leak: heap grew %.2fx without GC", heapGrowth) } } mm.mu.RUnlock() // Log memory usage periodically if stats.Timestamp.Unix()%60 == 0 { // Every minute mm.logger.Infof("Memory stats - Goroutines: %d, Heap: %d bytes, Tasks: %d", stats.Goroutines, stats.HeapAlloc, stats.ActiveTasks) } } // GetCurrentStats returns the latest memory statistics func (mm *TaskMemoryMonitor) GetCurrentStats() (TaskMemoryStats, error) { mm.mu.RLock() defer mm.mu.RUnlock() if len(mm.statsHistory) == 0 { return TaskMemoryStats{}, fmt.Errorf("no memory statistics available") } return mm.statsHistory[len(mm.statsHistory)-1], nil } // GetStatsHistory returns a copy of the memory statistics history func (mm *TaskMemoryMonitor) GetStatsHistory() []TaskMemoryStats { mm.mu.RLock() defer mm.mu.RUnlock() history := make([]TaskMemoryStats, len(mm.statsHistory)) copy(history, mm.statsHistory) return history } // ForceGC triggers garbage collection and returns stats before/after func (mm *TaskMemoryMonitor) ForceGC() (before, after TaskMemoryStats, err error) { var m runtime.MemStats // Capture before stats runtime.ReadMemStats(&m) before = TaskMemoryStats{ Timestamp: time.Now(), Goroutines: runtime.NumGoroutine(), HeapAlloc: m.HeapAlloc, HeapSys: m.HeapSys, NumGC: m.NumGC, AllocObjects: m.Mallocs, FreeObjects: m.Frees, } // Force garbage collection runtime.GC() runtime.GC() // Double GC to ensure finalization // Capture after stats runtime.ReadMemStats(&m) after = TaskMemoryStats{ Timestamp: time.Now(), Goroutines: runtime.NumGoroutine(), HeapAlloc: m.HeapAlloc, HeapSys: m.HeapSys, NumGC: m.NumGC, AllocObjects: m.Mallocs, FreeObjects: m.Frees, } if mm.logger != nil { // #nosec G115 -- heap allocation bytes fit in int64 for practical purposes freed := int64(before.HeapAlloc) - int64(after.HeapAlloc) mm.logger.Infof("Forced GC: freed %d bytes (%.2f MB)", freed, float64(freed)/(1024*1024)) } return before, after, nil } // ShutdownAllTasks gracefully shuts down all background tasks // CRITICAL FIX: Ensures proper termination of all goroutines in production func ShutdownAllTasks() { registry := GetGlobalTaskRegistry() registry.mu.Lock() tasks := make([]*BackgroundTask, 0, len(registry.tasks)) for _, task := range registry.tasks { tasks = append(tasks, task) } registry.mu.Unlock() // Stop all tasks in parallel var wg sync.WaitGroup for _, task := range tasks { wg.Add(1) go func(t *BackgroundTask) { defer wg.Done() if t != nil { t.Stop() } }(task) } // Wait with timeout done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: // All tasks stopped successfully case <-time.After(10 * time.Second): // Timeout - tasks may still be running if registry.logger != nil { registry.logger.Errorf("Timeout waiting for all background tasks to stop") } } }