From 7a33e01863e4815bbd3be0f81a1836ba587fb33e Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Wed, 6 May 2026 10:45:10 +0100 Subject: [PATCH] fix: 4 P0 concurrency races in forward + k8s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0 #2 — currentConfig data race Manager.currentConfig was written without locking in Start/Reload but read from the health-checker callback goroutine. All accesses now go through workersMu (read or write as appropriate). P0 #3 — Reload kills health checker permanently Reload's zero-forward branch called m.Stop() which tore down the health checker, watchdog, and event bus. After that, EnableForward silently registered callbacks against dead components. Now the branch stops only the running workers; the supervisory infrastructure stays alive across config changes. P0 #4 — rest.Config write-write race executePortForward was mutating .Dial on the cached *rest.Config shared by all forwards in the same kube context. Cloning the config with rest.CopyConfig before mutation isolates per-forward dialers. P0 #5 — ForwardWorker.Stop() double-close panic close(w.stopChan) is now wrapped in sync.Once, so concurrent Stop calls (Manager.Stop racing stopWorkerInternal) are safe. New tests in internal/forward/concurrency_test.go exercise each fix under -race: 16 concurrent worker Stops, repeated sequential Stops, empty-Reload preserves infra pointers, and concurrent currentConfig read/write. --- internal/forward/concurrency_test.go | 167 +++++++++++++++++++++++++++ internal/forward/manager.go | 63 +++++++--- internal/forward/worker.go | 6 +- internal/k8s/portforward.go | 12 +- 4 files changed, 229 insertions(+), 19 deletions(-) create mode 100644 internal/forward/concurrency_test.go diff --git a/internal/forward/concurrency_test.go b/internal/forward/concurrency_test.go new file mode 100644 index 0000000..9e63e80 --- /dev/null +++ b/internal/forward/concurrency_test.go @@ -0,0 +1,167 @@ +package forward + +import ( + "sync" + "testing" + "time" + + "github.com/lukaszraczylo/kportal/internal/config" + "github.com/lukaszraczylo/kportal/internal/events" + "github.com/stretchr/testify/assert" +) + +// TestForwardWorker_Stop_Concurrent verifies that concurrent calls to Stop() +// are safe and do not panic from a double-close of stopChan (Bug 4). +// Run under -race to catch the underlying issue. +func TestForwardWorker_Stop_Concurrent(t *testing.T) { + fwd := config.Forward{ + Resource: "pod/my-app", + LocalPort: 18080, + Port: 80, + } + + worker := NewForwardWorker(fwd, nil, false, nil, nil, nil) + + // Pretend the run loop has finished so Stop() does not block on doneChan. + close(worker.doneChan) + + const callers = 16 + var wg sync.WaitGroup + wg.Add(callers) + start := make(chan struct{}) + + for i := 0; i < callers; i++ { + go func() { + defer wg.Done() + <-start + // Each call must complete without panicking. + worker.Stop() + }() + } + + close(start) // Release all goroutines simultaneously. + wg.Wait() + + // stopChan must be closed exactly once and observable as closed. + select { + case <-worker.stopChan: + // closed — expected + default: + t.Fatal("stopChan should be closed after Stop()") + } +} + +// TestForwardWorker_Stop_Idempotent verifies sequential repeated Stop calls +// also do not panic. +func TestForwardWorker_Stop_Idempotent(t *testing.T) { + fwd := config.Forward{ + Resource: "pod/my-app", + LocalPort: 18081, + Port: 80, + } + + worker := NewForwardWorker(fwd, nil, false, nil, nil, nil) + close(worker.doneChan) + + worker.Stop() + worker.Stop() + worker.Stop() +} + +// TestManager_Reload_EmptyKeepsInfraAlive verifies Bug 2 fix: a Reload that +// drops to zero forwards must NOT tear down healthChecker / watchdog / +// eventBus, so subsequent reloads with forwards continue to work. +func TestManager_Reload_EmptyKeepsInfraAlive(t *testing.T) { + manager, err := NewManager(false) + if err != nil { + t.Skip("Skipping test - no kubeconfig available") + } + defer manager.Stop() + + // Start with an empty config (Start tolerates this without errors). + emptyCfg := &config.Config{} + if err := manager.Start(emptyCfg); err != nil { + t.Fatalf("Start(empty) failed: %v", err) + } + + // Capture references to long-lived components. + hcBefore := manager.healthChecker + wdBefore := manager.watchdog + busBefore := manager.eventBus + + // Reload with another empty config - must not destroy these. + if err := manager.Reload(&config.Config{}); err != nil { + t.Fatalf("Reload(empty) failed: %v", err) + } + + assert.Same(t, hcBefore, manager.healthChecker, "healthChecker must be preserved across empty reload") + assert.Same(t, wdBefore, manager.watchdog, "watchdog must be preserved across empty reload") + assert.Same(t, busBefore, manager.eventBus, "eventBus must be preserved across empty reload") + + // Event bus must still accept subscribers (would panic / fail if Close was called). + manager.eventBus.SubscribeAll(func(_ events.Event) {}) +} + +// TestManager_CurrentConfig_RaceFree exercises Bug 1: concurrent Reload and +// reads of currentConfig (as performed by the health-checker callback path) +// must be race-free under -race. +func TestManager_CurrentConfig_RaceFree(t *testing.T) { + manager, err := NewManager(false) + if err != nil { + t.Skip("Skipping test - no kubeconfig available") + } + defer manager.Stop() + + cfgA := &config.Config{} + cfgB := &config.Config{} + if err := manager.Start(cfgA); err != nil { + t.Fatalf("Start failed: %v", err) + } + + stop := make(chan struct{}) + var wg sync.WaitGroup + + // Writer goroutine: alternates between two configs via Reload. + wg.Add(1) + go func() { + defer wg.Done() + toggle := false + for { + select { + case <-stop: + return + default: + } + if toggle { + _ = manager.Reload(cfgA) + } else { + _ = manager.Reload(cfgB) + } + toggle = !toggle + } + }() + + // Reader goroutines: emulate health-checker callback's read of + // currentConfig. Use the same locking discipline as the production code. + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + manager.workersMu.RLock() + cfg := manager.currentConfig + _ = cfg + manager.workersMu.RUnlock() + } + }() + } + + time.Sleep(150 * time.Millisecond) + close(stop) + wg.Wait() +} diff --git a/internal/forward/manager.go b/internal/forward/manager.go index ae51630..5ffb18a 100644 --- a/internal/forward/manager.go +++ b/internal/forward/manager.go @@ -48,6 +48,9 @@ type Manager struct { watchdog *Watchdog mdnsPublisher *mdns.Publisher eventBus *events.Bus + // currentConfig holds the active configuration. Access MUST be guarded by + // workersMu — it is read from the health-checker callback goroutine + // (registered in startWorker) and written by Start/Reload. currentConfig *config.Config workersMu sync.RWMutex verbose bool @@ -159,7 +162,9 @@ func (m *Manager) Start(cfg *config.Config) error { return fmt.Errorf("configuration is nil") } + m.workersMu.Lock() m.currentConfig = cfg + m.workersMu.Unlock() // Configure health checker with settings from config m.configureHealthChecker(cfg) @@ -284,9 +289,27 @@ func (m *Manager) Reload(newCfg *config.Config) error { newForwards := newCfg.GetAllForwards() if len(newForwards) == 0 { - log.Printf("New configuration has no forwards, stopping all") - m.Stop() + log.Printf("New configuration has no forwards, stopping all workers") + // Do NOT call m.Stop() here: it tears down healthChecker, watchdog + // and eventBus, which must remain alive so subsequent + // EnableForward / Reload calls can register against them. + // Only stop currently-running workers and update currentConfig. + m.workersMu.RLock() + ids := make([]string, 0, len(m.workers)) + for id := range m.workers { + ids = append(ids, id) + } + m.workersMu.RUnlock() + + for _, id := range ids { + if err := m.stopWorkerInternal(id, true); err != nil { + log.Printf("Failed to stop worker %s: %v", id, err) + } + } + + m.workersMu.Lock() m.currentConfig = newCfg + m.workersMu.Unlock() return nil } @@ -369,7 +392,9 @@ func (m *Manager) Reload(newCfg *config.Config) error { } // Update current config + m.workersMu.Lock() m.currentConfig = newCfg + m.workersMu.Unlock() log.Printf("Configuration reloaded successfully") return nil @@ -424,20 +449,24 @@ func (m *Manager) startWorker(fwd config.Forward) error { } } - // Handle stale connections: trigger reconnection if retryOnStale is enabled - if status == healthcheck.StatusStale && m.currentConfig.GetRetryOnStale() { - logger.Info("Stale connection detected, triggering reconnection", map[string]interface{}{ - "forward_id": forwardID, - "reason": errorMsg, - }) - - // Find and notify the worker to reconnect + // Handle stale connections: trigger reconnection if retryOnStale is enabled. + // Read currentConfig and worker map under a single lock acquisition + // to avoid racing with Reload/Start writes. + if status == healthcheck.StatusStale { m.workersMu.RLock() + retryOnStale := m.currentConfig != nil && m.currentConfig.GetRetryOnStale() staleWorker, exists := m.workers[forwardID] m.workersMu.RUnlock() - if exists { - staleWorker.TriggerReconnect("stale connection") + if retryOnStale { + logger.Info("Stale connection detected, triggering reconnection", map[string]interface{}{ + "forward_id": forwardID, + "reason": errorMsg, + }) + + if exists { + staleWorker.TriggerReconnect("stale connection") + } } } }) @@ -545,12 +574,16 @@ func (m *Manager) DisableForward(id string) error { // EnableForward re-enables a previously disabled forward func (m *Manager) EnableForward(id string) error { - // Find the forward configuration in current config - if m.currentConfig == nil { + // Find the forward configuration in current config (read under lock) + m.workersMu.RLock() + cfg := m.currentConfig + m.workersMu.RUnlock() + + if cfg == nil { return fmt.Errorf("no configuration available") } - forwards := m.currentConfig.GetAllForwards() + forwards := cfg.GetAllForwards() var targetFwd *config.Forward for _, fwd := range forwards { if fwd.ID() == id { diff --git a/internal/forward/worker.go b/internal/forward/worker.go index d4b18f4..dec1d99 100644 --- a/internal/forward/worker.go +++ b/internal/forward/worker.go @@ -39,6 +39,7 @@ type ForwardWorker struct { lastPod string forward config.Forward forwardCancelMu sync.Mutex + stopOnce sync.Once // Guards close(stopChan) against concurrent Stop() calls verbose bool } @@ -97,9 +98,12 @@ func (w *ForwardWorker) Start() { } // Stop gracefully stops the port-forward worker. +// Safe to call concurrently and multiple times — stopChan is closed exactly once. func (w *ForwardWorker) Stop() { w.cancel() - close(w.stopChan) + w.stopOnce.Do(func() { + close(w.stopChan) + }) // Wait for worker to finish with timeout to prevent blocking forever select { diff --git a/internal/k8s/portforward.go b/internal/k8s/portforward.go index 2b42c80..2a424ce 100644 --- a/internal/k8s/portforward.go +++ b/internal/k8s/portforward.go @@ -185,9 +185,15 @@ func (pf *PortForwarder) forwardToService(ctx context.Context, req *ForwardReque // executePortForward performs the actual port-forward operation. func (pf *PortForwarder) executePortForward(config *rest.Config, url *url.URL, req *ForwardRequest) error { + // Clone the rest.Config before mutating. ClientPool.GetRestConfig returns a + // cached pointer shared across all forwards on the same context; mutating + // config.Dial directly causes a write-write race when multiple forwards + // run concurrently against the same context. + cfg := rest.CopyConfig(config) + // Configure TCP settings on the underlying connection // This is set in the rest.Config which will be used by the SPDY transport - if config.Dial == nil { + if cfg.Dial == nil { // Create a custom dialer with configurable timeout and keepalive // - Timeout: How long to wait for connection to establish // - KeepAlive: TCP keepalive helps OS detect dead connections at network layer @@ -195,11 +201,11 @@ func (pf *PortForwarder) executePortForward(config *rest.Config, url *url.URL, r Timeout: pf.dialTimeout, // Configurable dial timeout KeepAlive: pf.tcpKeepalive, // Configurable keepalive interval } - config.Dial = dialer.DialContext + cfg.Dial = dialer.DialContext } // Create SPDY roundtripper - transport, upgrader, err := spdy.RoundTripperFor(config) + transport, upgrader, err := spdy.RoundTripperFor(cfg) if err != nil { return fmt.Errorf("failed to create round tripper: %w", err) }