From 0ccc8551235de367a5da247eb637c89c109c0ecc Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Wed, 6 May 2026 11:02:41 +0100 Subject: [PATCH] fix: Manager.Stop() is now idempotent Previously a second Stop() call would panic from a double-close on eventBus and re-stop the healthChecker/watchdog whose contexts had already been cancelled. Wrapping the body in sync.Once makes sequential and concurrent double-Stop safe. TestManager_Stop_Idempotent covers both paths. --- internal/forward/concurrency_test.go | 41 +++++++++++++++ internal/forward/manager.go | 75 +++++++++++++++------------- 2 files changed, 80 insertions(+), 36 deletions(-) diff --git a/internal/forward/concurrency_test.go b/internal/forward/concurrency_test.go index 9e63e80..6c543f0 100644 --- a/internal/forward/concurrency_test.go +++ b/internal/forward/concurrency_test.go @@ -165,3 +165,44 @@ func TestManager_CurrentConfig_RaceFree(t *testing.T) { close(stop) wg.Wait() } + +// TestManager_Stop_Idempotent verifies that calling Manager.Stop() multiple +// times — sequentially or concurrently — does not panic from a double-close +// of eventBus or a double Stop on healthChecker/watchdog. The body of Stop() +// is wrapped in sync.Once. +func TestManager_Stop_Idempotent(t *testing.T) { + manager, err := NewManager(false) + if err != nil { + t.Skip("Skipping test - no kubeconfig available") + } + if err := manager.Start(&config.Config{}); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Sequential double-stop must not panic. + manager.Stop() + manager.Stop() + + // Build a second manager and call Stop concurrently from many goroutines — + // any non-idempotent close path would panic at least one of them. + m2, err := NewManager(false) + if err != nil { + t.Skip("Skipping test - no kubeconfig available") + } + if err := m2.Start(&config.Config{}); err != nil { + t.Fatalf("Start failed: %v", err) + } + const callers = 16 + var wg sync.WaitGroup + wg.Add(callers) + start := make(chan struct{}) + for i := 0; i < callers; i++ { + go func() { + defer wg.Done() + <-start + m2.Stop() + }() + } + close(start) + wg.Wait() +} diff --git a/internal/forward/manager.go b/internal/forward/manager.go index 5ffb18a..c96f487 100644 --- a/internal/forward/manager.go +++ b/internal/forward/manager.go @@ -53,6 +53,7 @@ type Manager struct { // (registered in startWorker) and written by Start/Reload. currentConfig *config.Config workersMu sync.RWMutex + stopOnce sync.Once verbose bool } @@ -223,52 +224,54 @@ func (m *Manager) Start(cfg *config.Config) error { // Stop gracefully stops all port-forward workers. func (m *Manager) Stop() { - log.Printf("Stopping all port-forwards...") + m.stopOnce.Do(func() { + log.Printf("Stopping all port-forwards...") - // Stop health checker and watchdog first - m.healthChecker.Stop() - m.watchdog.Stop() + // Stop health checker and watchdog first + m.healthChecker.Stop() + m.watchdog.Stop() - // Close event bus - if m.eventBus != nil { - m.eventBus.Close() - } + // Close event bus + if m.eventBus != nil { + m.eventBus.Close() + } - // Stop mDNS publisher - if m.mdnsPublisher != nil { - m.mdnsPublisher.Stop() - } + // Stop mDNS publisher + if m.mdnsPublisher != nil { + m.mdnsPublisher.Stop() + } - m.workersMu.Lock() - workers := make([]*ForwardWorker, 0, len(m.workers)) - for _, worker := range m.workers { - workers = append(workers, worker) - } - m.workersMu.Unlock() + m.workersMu.Lock() + workers := make([]*ForwardWorker, 0, len(m.workers)) + for _, worker := range m.workers { + workers = append(workers, worker) + } + m.workersMu.Unlock() - // Stop all workers with limited concurrency to avoid unbounded goroutine creation - var wg sync.WaitGroup - sem := make(chan struct{}, 10) // Limit to 10 concurrent stops + // Stop all workers with limited concurrency to avoid unbounded goroutine creation + var wg sync.WaitGroup + sem := make(chan struct{}, 10) // Limit to 10 concurrent stops - for _, worker := range workers { - wg.Add(1) - sem <- struct{}{} // Acquire semaphore + for _, worker := range workers { + wg.Add(1) + sem <- struct{}{} // Acquire semaphore - go func(w *ForwardWorker) { - defer wg.Done() - defer func() { <-sem }() // Release semaphore - w.Stop() - }(worker) - } + go func(w *ForwardWorker) { + defer wg.Done() + defer func() { <-sem }() // Release semaphore + w.Stop() + }(worker) + } - wg.Wait() + wg.Wait() - // Clear workers map - m.workersMu.Lock() - m.workers = make(map[string]*ForwardWorker) - m.workersMu.Unlock() + // Clear workers map + m.workersMu.Lock() + m.workers = make(map[string]*ForwardWorker) + m.workersMu.Unlock() - log.Printf("All port-forwards stopped") + log.Printf("All port-forwards stopped") + }) } // Reload applies a new configuration with hot-reload logic.