fix: Manager.Stop() is now idempotent

Previously a second Stop() call would panic from a double-close on
eventBus and re-stop the healthChecker/watchdog whose contexts had
already been cancelled. Wrapping the body in sync.Once makes
sequential and concurrent double-Stop safe.

TestManager_Stop_Idempotent covers both paths.
This commit is contained in:
2026-05-06 11:02:41 +01:00
parent 0a8c872b01
commit 0ccc855123
2 changed files with 80 additions and 36 deletions
+41
View File
@@ -165,3 +165,44 @@ func TestManager_CurrentConfig_RaceFree(t *testing.T) {
close(stop)
wg.Wait()
}
// TestManager_Stop_Idempotent verifies that calling Manager.Stop() multiple
// times — sequentially or concurrently — does not panic from a double-close
// of eventBus or a double Stop on healthChecker/watchdog. The body of Stop()
// is wrapped in sync.Once.
func TestManager_Stop_Idempotent(t *testing.T) {
manager, err := NewManager(false)
if err != nil {
t.Skip("Skipping test - no kubeconfig available")
}
if err := manager.Start(&config.Config{}); err != nil {
t.Fatalf("Start failed: %v", err)
}
// Sequential double-stop must not panic.
manager.Stop()
manager.Stop()
// Build a second manager and call Stop concurrently from many goroutines —
// any non-idempotent close path would panic at least one of them.
m2, err := NewManager(false)
if err != nil {
t.Skip("Skipping test - no kubeconfig available")
}
if err := m2.Start(&config.Config{}); err != nil {
t.Fatalf("Start failed: %v", err)
}
const callers = 16
var wg sync.WaitGroup
wg.Add(callers)
start := make(chan struct{})
for i := 0; i < callers; i++ {
go func() {
defer wg.Done()
<-start
m2.Stop()
}()
}
close(start)
wg.Wait()
}
+39 -36
View File
@@ -53,6 +53,7 @@ type Manager struct {
// (registered in startWorker) and written by Start/Reload.
currentConfig *config.Config
workersMu sync.RWMutex
stopOnce sync.Once
verbose bool
}
@@ -223,52 +224,54 @@ func (m *Manager) Start(cfg *config.Config) error {
// Stop gracefully stops all port-forward workers.
func (m *Manager) Stop() {
log.Printf("Stopping all port-forwards...")
m.stopOnce.Do(func() {
log.Printf("Stopping all port-forwards...")
// Stop health checker and watchdog first
m.healthChecker.Stop()
m.watchdog.Stop()
// Stop health checker and watchdog first
m.healthChecker.Stop()
m.watchdog.Stop()
// Close event bus
if m.eventBus != nil {
m.eventBus.Close()
}
// Close event bus
if m.eventBus != nil {
m.eventBus.Close()
}
// Stop mDNS publisher
if m.mdnsPublisher != nil {
m.mdnsPublisher.Stop()
}
// Stop mDNS publisher
if m.mdnsPublisher != nil {
m.mdnsPublisher.Stop()
}
m.workersMu.Lock()
workers := make([]*ForwardWorker, 0, len(m.workers))
for _, worker := range m.workers {
workers = append(workers, worker)
}
m.workersMu.Unlock()
m.workersMu.Lock()
workers := make([]*ForwardWorker, 0, len(m.workers))
for _, worker := range m.workers {
workers = append(workers, worker)
}
m.workersMu.Unlock()
// Stop all workers with limited concurrency to avoid unbounded goroutine creation
var wg sync.WaitGroup
sem := make(chan struct{}, 10) // Limit to 10 concurrent stops
// Stop all workers with limited concurrency to avoid unbounded goroutine creation
var wg sync.WaitGroup
sem := make(chan struct{}, 10) // Limit to 10 concurrent stops
for _, worker := range workers {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
for _, worker := range workers {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(w *ForwardWorker) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
w.Stop()
}(worker)
}
go func(w *ForwardWorker) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
w.Stop()
}(worker)
}
wg.Wait()
wg.Wait()
// Clear workers map
m.workersMu.Lock()
m.workers = make(map[string]*ForwardWorker)
m.workersMu.Unlock()
// Clear workers map
m.workersMu.Lock()
m.workers = make(map[string]*ForwardWorker)
m.workersMu.Unlock()
log.Printf("All port-forwards stopped")
log.Printf("All port-forwards stopped")
})
}
// Reload applies a new configuration with hot-reload logic.