diff --git a/internal/forward/concurrency_test.go b/internal/forward/concurrency_test.go new file mode 100644 index 0000000..9e63e80 --- /dev/null +++ b/internal/forward/concurrency_test.go @@ -0,0 +1,167 @@ +package forward + +import ( + "sync" + "testing" + "time" + + "github.com/lukaszraczylo/kportal/internal/config" + "github.com/lukaszraczylo/kportal/internal/events" + "github.com/stretchr/testify/assert" +) + +// TestForwardWorker_Stop_Concurrent verifies that concurrent calls to Stop() +// are safe and do not panic from a double-close of stopChan (Bug 4). +// Run under -race to catch the underlying issue. +func TestForwardWorker_Stop_Concurrent(t *testing.T) { + fwd := config.Forward{ + Resource: "pod/my-app", + LocalPort: 18080, + Port: 80, + } + + worker := NewForwardWorker(fwd, nil, false, nil, nil, nil) + + // Pretend the run loop has finished so Stop() does not block on doneChan. + close(worker.doneChan) + + const callers = 16 + var wg sync.WaitGroup + wg.Add(callers) + start := make(chan struct{}) + + for i := 0; i < callers; i++ { + go func() { + defer wg.Done() + <-start + // Each call must complete without panicking. + worker.Stop() + }() + } + + close(start) // Release all goroutines simultaneously. + wg.Wait() + + // stopChan must be closed exactly once and observable as closed. + select { + case <-worker.stopChan: + // closed — expected + default: + t.Fatal("stopChan should be closed after Stop()") + } +} + +// TestForwardWorker_Stop_Idempotent verifies sequential repeated Stop calls +// also do not panic. +func TestForwardWorker_Stop_Idempotent(t *testing.T) { + fwd := config.Forward{ + Resource: "pod/my-app", + LocalPort: 18081, + Port: 80, + } + + worker := NewForwardWorker(fwd, nil, false, nil, nil, nil) + close(worker.doneChan) + + worker.Stop() + worker.Stop() + worker.Stop() +} + +// TestManager_Reload_EmptyKeepsInfraAlive verifies Bug 2 fix: a Reload that +// drops to zero forwards must NOT tear down healthChecker / watchdog / +// eventBus, so subsequent reloads with forwards continue to work. +func TestManager_Reload_EmptyKeepsInfraAlive(t *testing.T) { + manager, err := NewManager(false) + if err != nil { + t.Skip("Skipping test - no kubeconfig available") + } + defer manager.Stop() + + // Start with an empty config (Start tolerates this without errors). + emptyCfg := &config.Config{} + if err := manager.Start(emptyCfg); err != nil { + t.Fatalf("Start(empty) failed: %v", err) + } + + // Capture references to long-lived components. + hcBefore := manager.healthChecker + wdBefore := manager.watchdog + busBefore := manager.eventBus + + // Reload with another empty config - must not destroy these. + if err := manager.Reload(&config.Config{}); err != nil { + t.Fatalf("Reload(empty) failed: %v", err) + } + + assert.Same(t, hcBefore, manager.healthChecker, "healthChecker must be preserved across empty reload") + assert.Same(t, wdBefore, manager.watchdog, "watchdog must be preserved across empty reload") + assert.Same(t, busBefore, manager.eventBus, "eventBus must be preserved across empty reload") + + // Event bus must still accept subscribers (would panic / fail if Close was called). + manager.eventBus.SubscribeAll(func(_ events.Event) {}) +} + +// TestManager_CurrentConfig_RaceFree exercises Bug 1: concurrent Reload and +// reads of currentConfig (as performed by the health-checker callback path) +// must be race-free under -race. +func TestManager_CurrentConfig_RaceFree(t *testing.T) { + manager, err := NewManager(false) + if err != nil { + t.Skip("Skipping test - no kubeconfig available") + } + defer manager.Stop() + + cfgA := &config.Config{} + cfgB := &config.Config{} + if err := manager.Start(cfgA); err != nil { + t.Fatalf("Start failed: %v", err) + } + + stop := make(chan struct{}) + var wg sync.WaitGroup + + // Writer goroutine: alternates between two configs via Reload. + wg.Add(1) + go func() { + defer wg.Done() + toggle := false + for { + select { + case <-stop: + return + default: + } + if toggle { + _ = manager.Reload(cfgA) + } else { + _ = manager.Reload(cfgB) + } + toggle = !toggle + } + }() + + // Reader goroutines: emulate health-checker callback's read of + // currentConfig. Use the same locking discipline as the production code. + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + manager.workersMu.RLock() + cfg := manager.currentConfig + _ = cfg + manager.workersMu.RUnlock() + } + }() + } + + time.Sleep(150 * time.Millisecond) + close(stop) + wg.Wait() +} diff --git a/internal/forward/manager.go b/internal/forward/manager.go index ae51630..5ffb18a 100644 --- a/internal/forward/manager.go +++ b/internal/forward/manager.go @@ -48,6 +48,9 @@ type Manager struct { watchdog *Watchdog mdnsPublisher *mdns.Publisher eventBus *events.Bus + // currentConfig holds the active configuration. Access MUST be guarded by + // workersMu — it is read from the health-checker callback goroutine + // (registered in startWorker) and written by Start/Reload. currentConfig *config.Config workersMu sync.RWMutex verbose bool @@ -159,7 +162,9 @@ func (m *Manager) Start(cfg *config.Config) error { return fmt.Errorf("configuration is nil") } + m.workersMu.Lock() m.currentConfig = cfg + m.workersMu.Unlock() // Configure health checker with settings from config m.configureHealthChecker(cfg) @@ -284,9 +289,27 @@ func (m *Manager) Reload(newCfg *config.Config) error { newForwards := newCfg.GetAllForwards() if len(newForwards) == 0 { - log.Printf("New configuration has no forwards, stopping all") - m.Stop() + log.Printf("New configuration has no forwards, stopping all workers") + // Do NOT call m.Stop() here: it tears down healthChecker, watchdog + // and eventBus, which must remain alive so subsequent + // EnableForward / Reload calls can register against them. + // Only stop currently-running workers and update currentConfig. + m.workersMu.RLock() + ids := make([]string, 0, len(m.workers)) + for id := range m.workers { + ids = append(ids, id) + } + m.workersMu.RUnlock() + + for _, id := range ids { + if err := m.stopWorkerInternal(id, true); err != nil { + log.Printf("Failed to stop worker %s: %v", id, err) + } + } + + m.workersMu.Lock() m.currentConfig = newCfg + m.workersMu.Unlock() return nil } @@ -369,7 +392,9 @@ func (m *Manager) Reload(newCfg *config.Config) error { } // Update current config + m.workersMu.Lock() m.currentConfig = newCfg + m.workersMu.Unlock() log.Printf("Configuration reloaded successfully") return nil @@ -424,20 +449,24 @@ func (m *Manager) startWorker(fwd config.Forward) error { } } - // Handle stale connections: trigger reconnection if retryOnStale is enabled - if status == healthcheck.StatusStale && m.currentConfig.GetRetryOnStale() { - logger.Info("Stale connection detected, triggering reconnection", map[string]interface{}{ - "forward_id": forwardID, - "reason": errorMsg, - }) - - // Find and notify the worker to reconnect + // Handle stale connections: trigger reconnection if retryOnStale is enabled. + // Read currentConfig and worker map under a single lock acquisition + // to avoid racing with Reload/Start writes. + if status == healthcheck.StatusStale { m.workersMu.RLock() + retryOnStale := m.currentConfig != nil && m.currentConfig.GetRetryOnStale() staleWorker, exists := m.workers[forwardID] m.workersMu.RUnlock() - if exists { - staleWorker.TriggerReconnect("stale connection") + if retryOnStale { + logger.Info("Stale connection detected, triggering reconnection", map[string]interface{}{ + "forward_id": forwardID, + "reason": errorMsg, + }) + + if exists { + staleWorker.TriggerReconnect("stale connection") + } } } }) @@ -545,12 +574,16 @@ func (m *Manager) DisableForward(id string) error { // EnableForward re-enables a previously disabled forward func (m *Manager) EnableForward(id string) error { - // Find the forward configuration in current config - if m.currentConfig == nil { + // Find the forward configuration in current config (read under lock) + m.workersMu.RLock() + cfg := m.currentConfig + m.workersMu.RUnlock() + + if cfg == nil { return fmt.Errorf("no configuration available") } - forwards := m.currentConfig.GetAllForwards() + forwards := cfg.GetAllForwards() var targetFwd *config.Forward for _, fwd := range forwards { if fwd.ID() == id { diff --git a/internal/forward/worker.go b/internal/forward/worker.go index d4b18f4..dec1d99 100644 --- a/internal/forward/worker.go +++ b/internal/forward/worker.go @@ -39,6 +39,7 @@ type ForwardWorker struct { lastPod string forward config.Forward forwardCancelMu sync.Mutex + stopOnce sync.Once // Guards close(stopChan) against concurrent Stop() calls verbose bool } @@ -97,9 +98,12 @@ func (w *ForwardWorker) Start() { } // Stop gracefully stops the port-forward worker. +// Safe to call concurrently and multiple times — stopChan is closed exactly once. func (w *ForwardWorker) Stop() { w.cancel() - close(w.stopChan) + w.stopOnce.Do(func() { + close(w.stopChan) + }) // Wait for worker to finish with timeout to prevent blocking forever select { diff --git a/internal/k8s/portforward.go b/internal/k8s/portforward.go index 2b42c80..2a424ce 100644 --- a/internal/k8s/portforward.go +++ b/internal/k8s/portforward.go @@ -185,9 +185,15 @@ func (pf *PortForwarder) forwardToService(ctx context.Context, req *ForwardReque // executePortForward performs the actual port-forward operation. func (pf *PortForwarder) executePortForward(config *rest.Config, url *url.URL, req *ForwardRequest) error { + // Clone the rest.Config before mutating. ClientPool.GetRestConfig returns a + // cached pointer shared across all forwards on the same context; mutating + // config.Dial directly causes a write-write race when multiple forwards + // run concurrently against the same context. + cfg := rest.CopyConfig(config) + // Configure TCP settings on the underlying connection // This is set in the rest.Config which will be used by the SPDY transport - if config.Dial == nil { + if cfg.Dial == nil { // Create a custom dialer with configurable timeout and keepalive // - Timeout: How long to wait for connection to establish // - KeepAlive: TCP keepalive helps OS detect dead connections at network layer @@ -195,11 +201,11 @@ func (pf *PortForwarder) executePortForward(config *rest.Config, url *url.URL, r Timeout: pf.dialTimeout, // Configurable dial timeout KeepAlive: pf.tcpKeepalive, // Configurable keepalive interval } - config.Dial = dialer.DialContext + cfg.Dial = dialer.DialContext } // Create SPDY roundtripper - transport, upgrader, err := spdy.RoundTripperFor(config) + transport, upgrader, err := spdy.RoundTripperFor(cfg) if err != nil { return fmt.Errorf("failed to create round tripper: %w", err) }