fix: 4 P0 concurrency races in forward + k8s

P0 #2 — currentConfig data race
  Manager.currentConfig was written without locking in Start/Reload but
  read from the health-checker callback goroutine. All accesses now go
  through workersMu (read or write as appropriate).

P0 #3 — Reload kills health checker permanently
  Reload's zero-forward branch called m.Stop() which tore down the
  health checker, watchdog, and event bus. After that, EnableForward
  silently registered callbacks against dead components. Now the branch
  stops only the running workers; the supervisory infrastructure stays
  alive across config changes.

P0 #4 — rest.Config write-write race
  executePortForward was mutating .Dial on the cached *rest.Config
  shared by all forwards in the same kube context. Cloning the config
  with rest.CopyConfig before mutation isolates per-forward dialers.

P0 #5 — ForwardWorker.Stop() double-close panic
  close(w.stopChan) is now wrapped in sync.Once, so concurrent Stop
  calls (Manager.Stop racing stopWorkerInternal) are safe.

New tests in internal/forward/concurrency_test.go exercise each fix
under -race: 16 concurrent worker Stops, repeated sequential Stops,
empty-Reload preserves infra pointers, and concurrent currentConfig
read/write.
This commit is contained in:
2026-05-06 10:45:10 +01:00
parent 614b6e6396
commit 7a33e01863
4 changed files with 229 additions and 19 deletions
+167
View File
@@ -0,0 +1,167 @@
package forward
import (
"sync"
"testing"
"time"
"github.com/lukaszraczylo/kportal/internal/config"
"github.com/lukaszraczylo/kportal/internal/events"
"github.com/stretchr/testify/assert"
)
// TestForwardWorker_Stop_Concurrent verifies that concurrent calls to Stop()
// are safe and do not panic from a double-close of stopChan (Bug 4).
// Run under -race to catch the underlying issue.
func TestForwardWorker_Stop_Concurrent(t *testing.T) {
fwd := config.Forward{
Resource: "pod/my-app",
LocalPort: 18080,
Port: 80,
}
worker := NewForwardWorker(fwd, nil, false, nil, nil, nil)
// Pretend the run loop has finished so Stop() does not block on doneChan.
close(worker.doneChan)
const callers = 16
var wg sync.WaitGroup
wg.Add(callers)
start := make(chan struct{})
for i := 0; i < callers; i++ {
go func() {
defer wg.Done()
<-start
// Each call must complete without panicking.
worker.Stop()
}()
}
close(start) // Release all goroutines simultaneously.
wg.Wait()
// stopChan must be closed exactly once and observable as closed.
select {
case <-worker.stopChan:
// closed — expected
default:
t.Fatal("stopChan should be closed after Stop()")
}
}
// TestForwardWorker_Stop_Idempotent verifies sequential repeated Stop calls
// also do not panic.
func TestForwardWorker_Stop_Idempotent(t *testing.T) {
fwd := config.Forward{
Resource: "pod/my-app",
LocalPort: 18081,
Port: 80,
}
worker := NewForwardWorker(fwd, nil, false, nil, nil, nil)
close(worker.doneChan)
worker.Stop()
worker.Stop()
worker.Stop()
}
// TestManager_Reload_EmptyKeepsInfraAlive verifies Bug 2 fix: a Reload that
// drops to zero forwards must NOT tear down healthChecker / watchdog /
// eventBus, so subsequent reloads with forwards continue to work.
func TestManager_Reload_EmptyKeepsInfraAlive(t *testing.T) {
manager, err := NewManager(false)
if err != nil {
t.Skip("Skipping test - no kubeconfig available")
}
defer manager.Stop()
// Start with an empty config (Start tolerates this without errors).
emptyCfg := &config.Config{}
if err := manager.Start(emptyCfg); err != nil {
t.Fatalf("Start(empty) failed: %v", err)
}
// Capture references to long-lived components.
hcBefore := manager.healthChecker
wdBefore := manager.watchdog
busBefore := manager.eventBus
// Reload with another empty config - must not destroy these.
if err := manager.Reload(&config.Config{}); err != nil {
t.Fatalf("Reload(empty) failed: %v", err)
}
assert.Same(t, hcBefore, manager.healthChecker, "healthChecker must be preserved across empty reload")
assert.Same(t, wdBefore, manager.watchdog, "watchdog must be preserved across empty reload")
assert.Same(t, busBefore, manager.eventBus, "eventBus must be preserved across empty reload")
// Event bus must still accept subscribers (would panic / fail if Close was called).
manager.eventBus.SubscribeAll(func(_ events.Event) {})
}
// TestManager_CurrentConfig_RaceFree exercises Bug 1: concurrent Reload and
// reads of currentConfig (as performed by the health-checker callback path)
// must be race-free under -race.
func TestManager_CurrentConfig_RaceFree(t *testing.T) {
manager, err := NewManager(false)
if err != nil {
t.Skip("Skipping test - no kubeconfig available")
}
defer manager.Stop()
cfgA := &config.Config{}
cfgB := &config.Config{}
if err := manager.Start(cfgA); err != nil {
t.Fatalf("Start failed: %v", err)
}
stop := make(chan struct{})
var wg sync.WaitGroup
// Writer goroutine: alternates between two configs via Reload.
wg.Add(1)
go func() {
defer wg.Done()
toggle := false
for {
select {
case <-stop:
return
default:
}
if toggle {
_ = manager.Reload(cfgA)
} else {
_ = manager.Reload(cfgB)
}
toggle = !toggle
}
}()
// Reader goroutines: emulate health-checker callback's read of
// currentConfig. Use the same locking discipline as the production code.
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
}
manager.workersMu.RLock()
cfg := manager.currentConfig
_ = cfg
manager.workersMu.RUnlock()
}
}()
}
time.Sleep(150 * time.Millisecond)
close(stop)
wg.Wait()
}
+48 -15
View File
@@ -48,6 +48,9 @@ type Manager struct {
watchdog *Watchdog
mdnsPublisher *mdns.Publisher
eventBus *events.Bus
// currentConfig holds the active configuration. Access MUST be guarded by
// workersMu — it is read from the health-checker callback goroutine
// (registered in startWorker) and written by Start/Reload.
currentConfig *config.Config
workersMu sync.RWMutex
verbose bool
@@ -159,7 +162,9 @@ func (m *Manager) Start(cfg *config.Config) error {
return fmt.Errorf("configuration is nil")
}
m.workersMu.Lock()
m.currentConfig = cfg
m.workersMu.Unlock()
// Configure health checker with settings from config
m.configureHealthChecker(cfg)
@@ -284,9 +289,27 @@ func (m *Manager) Reload(newCfg *config.Config) error {
newForwards := newCfg.GetAllForwards()
if len(newForwards) == 0 {
log.Printf("New configuration has no forwards, stopping all")
m.Stop()
log.Printf("New configuration has no forwards, stopping all workers")
// Do NOT call m.Stop() here: it tears down healthChecker, watchdog
// and eventBus, which must remain alive so subsequent
// EnableForward / Reload calls can register against them.
// Only stop currently-running workers and update currentConfig.
m.workersMu.RLock()
ids := make([]string, 0, len(m.workers))
for id := range m.workers {
ids = append(ids, id)
}
m.workersMu.RUnlock()
for _, id := range ids {
if err := m.stopWorkerInternal(id, true); err != nil {
log.Printf("Failed to stop worker %s: %v", id, err)
}
}
m.workersMu.Lock()
m.currentConfig = newCfg
m.workersMu.Unlock()
return nil
}
@@ -369,7 +392,9 @@ func (m *Manager) Reload(newCfg *config.Config) error {
}
// Update current config
m.workersMu.Lock()
m.currentConfig = newCfg
m.workersMu.Unlock()
log.Printf("Configuration reloaded successfully")
return nil
@@ -424,20 +449,24 @@ func (m *Manager) startWorker(fwd config.Forward) error {
}
}
// Handle stale connections: trigger reconnection if retryOnStale is enabled
if status == healthcheck.StatusStale && m.currentConfig.GetRetryOnStale() {
logger.Info("Stale connection detected, triggering reconnection", map[string]interface{}{
"forward_id": forwardID,
"reason": errorMsg,
})
// Find and notify the worker to reconnect
// Handle stale connections: trigger reconnection if retryOnStale is enabled.
// Read currentConfig and worker map under a single lock acquisition
// to avoid racing with Reload/Start writes.
if status == healthcheck.StatusStale {
m.workersMu.RLock()
retryOnStale := m.currentConfig != nil && m.currentConfig.GetRetryOnStale()
staleWorker, exists := m.workers[forwardID]
m.workersMu.RUnlock()
if exists {
staleWorker.TriggerReconnect("stale connection")
if retryOnStale {
logger.Info("Stale connection detected, triggering reconnection", map[string]interface{}{
"forward_id": forwardID,
"reason": errorMsg,
})
if exists {
staleWorker.TriggerReconnect("stale connection")
}
}
}
})
@@ -545,12 +574,16 @@ func (m *Manager) DisableForward(id string) error {
// EnableForward re-enables a previously disabled forward
func (m *Manager) EnableForward(id string) error {
// Find the forward configuration in current config
if m.currentConfig == nil {
// Find the forward configuration in current config (read under lock)
m.workersMu.RLock()
cfg := m.currentConfig
m.workersMu.RUnlock()
if cfg == nil {
return fmt.Errorf("no configuration available")
}
forwards := m.currentConfig.GetAllForwards()
forwards := cfg.GetAllForwards()
var targetFwd *config.Forward
for _, fwd := range forwards {
if fwd.ID() == id {
+5 -1
View File
@@ -39,6 +39,7 @@ type ForwardWorker struct {
lastPod string
forward config.Forward
forwardCancelMu sync.Mutex
stopOnce sync.Once // Guards close(stopChan) against concurrent Stop() calls
verbose bool
}
@@ -97,9 +98,12 @@ func (w *ForwardWorker) Start() {
}
// Stop gracefully stops the port-forward worker.
// Safe to call concurrently and multiple times — stopChan is closed exactly once.
func (w *ForwardWorker) Stop() {
w.cancel()
close(w.stopChan)
w.stopOnce.Do(func() {
close(w.stopChan)
})
// Wait for worker to finish with timeout to prevent blocking forever
select {
+9 -3
View File
@@ -185,9 +185,15 @@ func (pf *PortForwarder) forwardToService(ctx context.Context, req *ForwardReque
// executePortForward performs the actual port-forward operation.
func (pf *PortForwarder) executePortForward(config *rest.Config, url *url.URL, req *ForwardRequest) error {
// Clone the rest.Config before mutating. ClientPool.GetRestConfig returns a
// cached pointer shared across all forwards on the same context; mutating
// config.Dial directly causes a write-write race when multiple forwards
// run concurrently against the same context.
cfg := rest.CopyConfig(config)
// Configure TCP settings on the underlying connection
// This is set in the rest.Config which will be used by the SPDY transport
if config.Dial == nil {
if cfg.Dial == nil {
// Create a custom dialer with configurable timeout and keepalive
// - Timeout: How long to wait for connection to establish
// - KeepAlive: TCP keepalive helps OS detect dead connections at network layer
@@ -195,11 +201,11 @@ func (pf *PortForwarder) executePortForward(config *rest.Config, url *url.URL, r
Timeout: pf.dialTimeout, // Configurable dial timeout
KeepAlive: pf.tcpKeepalive, // Configurable keepalive interval
}
config.Dial = dialer.DialContext
cfg.Dial = dialer.DialContext
}
// Create SPDY roundtripper
transport, upgrader, err := spdy.RoundTripperFor(config)
transport, upgrader, err := spdy.RoundTripperFor(cfg)
if err != nil {
return fmt.Errorf("failed to create round tripper: %w", err)
}