test: cover forward manager/worker/watchdog/portchecker

internal/forward: 45.9% -> 70.8%. Adds 14 critical-gap tests:
  - Manager.startWorker registration + duplicate error + healthcheck
    callback paths (status changes via MarkStarting/MarkReconnecting)
    + watchdog hung-callback (backdated heartbeat + checkWorkers).
  - stopWorker delegate + stopWorkerInternal removeFromUI=false.
  - DisableForward / EnableForward all error paths.
  - Reload diff logic: remove-stale, keep-unchanged, port conflict,
    empty new-config, currentConfig update.
  - SetMDNSPublisher trivial setter.
  - Watchdog.RegisterWorkerWithResponder alive/dead/transition cycle
    + pollHeartbeats direct + monitorLoop heartbeat ticker branch.
  - ForwardWorker.sleepWithBackoff (normal/cancelled/verbose) +
    IsAlive doneChan branch + Start terminates on cancel.
  - Manager.Start port-conflict path.
  - getProcessUsingPortUnix empty-bound and active-bound paths.

Remaining gaps (untestable without refactoring):
  - Windows-only branches (build tag prevents execution on macOS/Linux)
  - worker.run / establishForward — need a fake k8s.PortForwarder, but
    the Manager field is *k8s.PortForwarder (concrete), so a mock
    cannot be injected without source-level refactor.
This commit is contained in:
2026-05-06 14:08:56 +01:00
parent fbb13aa32f
commit ed80015e23
+789
View File
@@ -0,0 +1,789 @@
package forward
// coverage_test.go targeted tests to lift coverage from ~46% to ≥70%.
//
// Functions targeted (all at 0 % before this file):
// manager.go SetMDNSPublisher, startWorker, stopWorkerInternal(false branch),
// DisableForward, EnableForward (all paths), Reload (diff paths,
// port-conflict rejection, currentConfig update)
// watchdog.go RegisterWorkerWithResponder, pollHeartbeats
// worker.go sleepWithBackoff (both branches), IsAlive (doneChan branch)
// portcheck getProcessUsingPortUnix exercised for unknown/error path
import (
"net"
"sync"
"testing"
"time"
"github.com/lukaszraczylo/kportal/internal/config"
"github.com/lukaszraczylo/kportal/internal/retry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// ---------------------------------------------------------------------------
// Test helpers
// ---------------------------------------------------------------------------
// buildForward creates a config.Forward with context/namespace set.
func buildForward(ctx, ns, resource string, localPort, remotePort int) config.Forward {
fwd := config.Forward{
Resource: resource,
LocalPort: localPort,
Port: remotePort,
}
fwd.SetContext(ctx, ns)
return fwd
}
// buildConfigFrom constructs a *config.Config containing exactly the supplied
// forwards (all placed under ctx/ns).
func buildConfigFrom(ctx, ns string, forwards []config.Forward) *config.Config {
return &config.Config{
Contexts: []config.Context{
{
Name: ctx,
Namespaces: []config.Namespace{
{Name: ns, Forwards: forwards},
},
},
},
}
}
// newCovManager creates a Manager and registers a cleanup that calls Stop.
// Skips the test if no kubeconfig is available.
func newCovManager(t *testing.T) *Manager {
t.Helper()
m, err := NewManager(false)
if err != nil {
t.Skip("Skipping no kubeconfig available")
}
t.Cleanup(func() { m.Stop() })
return m
}
// inject inserts a worker directly into m.workers without a real k8s call.
func inject(m *Manager, fwd config.Forward) *ForwardWorker {
w := NewForwardWorker(fwd, m.portForwarder, false, m.statusUI, m.healthChecker, m.watchdog)
m.workersMu.Lock()
m.workers[fwd.ID()] = w
m.workersMu.Unlock()
return w
}
// occupyPort binds a TCP listener on all interfaces on a free port.
// isPortAvailable also binds to all interfaces (":PORT"), so a listener on
// "0.0.0.0:PORT" is correctly detected as a conflict on both Linux and macOS.
func occupyPort(t *testing.T) (port int, closeFunc func()) {
t.Helper()
// #nosec G102 -- test intentionally binds to all interfaces
l, err := net.Listen("tcp", ":0")
require.NoError(t, err, "need a free port for conflict test")
port = l.Addr().(*net.TCPAddr).Port
return port, func() { _ = l.Close() }
}
// ---------------------------------------------------------------------------
// Manager.SetMDNSPublisher (0% → covered)
// ---------------------------------------------------------------------------
func TestManager_SetMDNSPublisher_NilAccepted(t *testing.T) {
m := newCovManager(t)
m.SetMDNSPublisher(nil) // must not panic
assert.Nil(t, m.mdnsPublisher)
}
// ---------------------------------------------------------------------------
// Manager.stopWorkerInternal both removeFromUI branches
// ---------------------------------------------------------------------------
func TestManager_StopWorkerInternal_RemoveTrue(t *testing.T) {
m := newCovManager(t)
ui := &MockStatusUpdater{}
m.SetStatusUI(ui)
fwd := buildForward("c", "n", "pod/a", 20001, 80)
w := inject(m, fwd)
close(w.doneChan) // worker "done" so Stop() returns immediately
require.NoError(t, m.stopWorkerInternal(fwd.ID(), true))
assert.Nil(t, m.GetWorker(fwd.ID()))
assert.Contains(t, ui.removes, fwd.ID(), "Remove() should be called")
}
func TestManager_StopWorkerInternal_RemoveFalse(t *testing.T) {
m := newCovManager(t)
ui := &MockStatusUpdater{}
m.SetStatusUI(ui)
fwd := buildForward("c", "n", "pod/b", 20002, 80)
w := inject(m, fwd)
close(w.doneChan)
require.NoError(t, m.stopWorkerInternal(fwd.ID(), false))
var sawDisabled bool
for _, u := range ui.updates {
if u.ID == fwd.ID() && u.Status == "Disabled" {
sawDisabled = true
}
}
assert.True(t, sawDisabled, "UpdateStatus('Disabled') should be called")
assert.NotContains(t, ui.removes, fwd.ID(), "Remove() must NOT be called")
}
func TestManager_StopWorkerInternal_MissingWorker(t *testing.T) {
m := newCovManager(t)
err := m.stopWorkerInternal("ghost", true)
assert.Error(t, err)
assert.Contains(t, err.Error(), "worker not found")
}
// ---------------------------------------------------------------------------
// Manager.DisableForward
// ---------------------------------------------------------------------------
func TestManager_DisableForward_Success(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/d", 20010, 80)
w := inject(m, fwd)
close(w.doneChan)
require.NoError(t, m.DisableForward(fwd.ID()))
assert.Nil(t, m.GetWorker(fwd.ID()))
}
func TestManager_DisableForward_Missing(t *testing.T) {
m := newCovManager(t)
assert.Error(t, m.DisableForward("missing"))
}
// ---------------------------------------------------------------------------
// Manager.EnableForward all three error branches
// ---------------------------------------------------------------------------
func TestManager_EnableForward_NilConfig(t *testing.T) {
m := newCovManager(t)
// currentConfig is nil should return "no configuration available"
err := m.EnableForward("any")
require.Error(t, err)
assert.Contains(t, err.Error(), "no configuration available")
}
func TestManager_EnableForward_NotInConfig(t *testing.T) {
m := newCovManager(t)
m.workersMu.Lock()
m.currentConfig = &config.Config{} // empty
m.workersMu.Unlock()
err := m.EnableForward("ctx/ns/pod/gone:9999")
require.Error(t, err)
assert.Contains(t, err.Error(), "forward not found in configuration")
}
func TestManager_EnableForward_AlreadyEnabled(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/e", 20020, 80)
cfg := buildConfigFrom("c", "n", []config.Forward{fwd})
m.workersMu.Lock()
m.currentConfig = cfg
m.workersMu.Unlock()
// Worker already present in map.
inject(m, fwd)
err := m.EnableForward(fwd.ID())
require.Error(t, err)
assert.Contains(t, err.Error(), "forward already enabled")
}
// ---------------------------------------------------------------------------
// Manager.startWorker registers with watchdog + UI, duplicate rejected
// ---------------------------------------------------------------------------
func TestManager_StartWorker_RegistersAll(t *testing.T) {
m := newCovManager(t)
ui := &MockStatusUpdater{}
m.SetStatusUI(ui)
fwd := buildForward("c", "n", "pod/r", 20030, 80)
require.NoError(t, m.startWorker(fwd))
t.Cleanup(func() { _ = m.stopWorkerInternal(fwd.ID(), true) })
// Worker in map.
require.NotNil(t, m.GetWorker(fwd.ID()))
// UI notified.
require.Len(t, ui.adds, 1)
assert.Equal(t, fwd.ID(), ui.adds[0].ID)
// Watchdog entry present.
_, _, exists := m.watchdog.GetWorkerState(fwd.ID())
assert.True(t, exists)
}
func TestManager_StartWorker_DuplicateError(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/dup", 20031, 80)
require.NoError(t, m.startWorker(fwd))
t.Cleanup(func() { _ = m.stopWorkerInternal(fwd.ID(), true) })
err := m.startWorker(fwd)
require.Error(t, err)
assert.Contains(t, err.Error(), "worker already exists")
}
// ---------------------------------------------------------------------------
// Manager.Start port conflict path
// ---------------------------------------------------------------------------
func TestManager_Start_PortConflict(t *testing.T) {
m := newCovManager(t)
port, closeFunc := occupyPort(t)
defer closeFunc()
// Port is occupied by our listener; Start should detect conflict.
fwd := buildForward("c", "n", "pod/conflict", port, 80)
cfg := buildConfigFrom("c", "n", []config.Forward{fwd})
err := m.Start(cfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "port conflicts detected")
}
// ---------------------------------------------------------------------------
// Manager.Reload diff paths
// ---------------------------------------------------------------------------
func TestManager_Reload_RemovesStaleWorker(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/stale", 20040, 80)
w := inject(m, fwd)
close(w.doneChan)
m.workersMu.Lock()
m.currentConfig = buildConfigFrom("c", "n", []config.Forward{fwd})
m.workersMu.Unlock()
// New config removes fwd.
require.NoError(t, m.Reload(&config.Config{}))
m.workersMu.RLock()
cnt := len(m.workers)
m.workersMu.RUnlock()
assert.Equal(t, 0, cnt)
}
func TestManager_Reload_KeepsUnchangedWorker(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/keep", 20041, 80)
inject(m, fwd)
m.workersMu.Lock()
m.currentConfig = buildConfigFrom("c", "n", []config.Forward{fwd})
m.workersMu.Unlock()
newCfg := buildConfigFrom("c", "n", []config.Forward{fwd})
require.NoError(t, m.Reload(newCfg))
assert.NotNil(t, m.GetWorker(fwd.ID()), "unchanged worker should survive Reload")
}
func TestManager_Reload_PortConflictRejected(t *testing.T) {
m := newCovManager(t)
m.workersMu.Lock()
m.currentConfig = &config.Config{}
m.workersMu.Unlock()
port, closeFunc := occupyPort(t)
defer closeFunc()
fwd := buildForward("c", "n", "pod/conflictnew", port, 80)
newCfg := buildConfigFrom("c", "n", []config.Forward{fwd})
err := m.Reload(newCfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "port conflicts detected")
}
func TestManager_Reload_UpdatesCurrentConfig(t *testing.T) {
m := newCovManager(t)
m.workersMu.Lock()
m.currentConfig = &config.Config{}
m.workersMu.Unlock()
newCfg := &config.Config{}
require.NoError(t, m.Reload(newCfg))
m.workersMu.RLock()
cur := m.currentConfig
m.workersMu.RUnlock()
assert.Same(t, newCfg, cur)
}
// ---------------------------------------------------------------------------
// Watchdog.RegisterWorkerWithResponder + pollHeartbeats
// ---------------------------------------------------------------------------
// fakeResponder implements HeartbeatResponder for testing.
type fakeResponder struct {
id string
mu sync.Mutex
alive bool
}
func (f *fakeResponder) IsAlive() bool {
f.mu.Lock()
defer f.mu.Unlock()
return f.alive
}
func (f *fakeResponder) GetForwardID() string { return f.id }
func TestWatchdog_RegisterWorkerWithResponder_AliveIncrementsCount(t *testing.T) {
wd := NewWatchdog(1*time.Second, 2*time.Second)
// Don't Start call pollHeartbeats manually for determinism.
r := &fakeResponder{alive: true, id: "w1"}
wd.RegisterWorkerWithResponder("w1", r, nil)
wd.pollHeartbeats()
_, count, exists := wd.GetWorkerState("w1")
assert.True(t, exists)
assert.Equal(t, uint64(1), count)
}
func TestWatchdog_RegisterWorkerWithResponder_DeadNoIncrement(t *testing.T) {
wd := NewWatchdog(1*time.Second, 2*time.Second)
r := &fakeResponder{alive: false, id: "w2"}
wd.RegisterWorkerWithResponder("w2", r, nil)
wd.pollHeartbeats()
_, count, exists := wd.GetWorkerState("w2")
assert.True(t, exists)
assert.Equal(t, uint64(0), count)
}
func TestWatchdog_RegisterWorkerWithResponder_HungTriggersCallback(t *testing.T) {
wd := NewWatchdog(30*time.Millisecond, 60*time.Millisecond)
wd.Start()
t.Cleanup(wd.Stop)
r := &fakeResponder{alive: false, id: "hung"}
called := make(chan string, 1)
wd.RegisterWorkerWithResponder("hung", r, func(id string) {
select {
case called <- id:
default:
}
})
select {
case id := <-called:
assert.Equal(t, "hung", id)
case <-time.After(1 * time.Second):
t.Fatal("hung callback not fired")
}
}
func TestWatchdog_PollHeartbeats_AliveDeadAlive(t *testing.T) {
wd := NewWatchdog(1*time.Second, 2*time.Second)
r := &fakeResponder{alive: true, id: "cycle"}
wd.RegisterWorkerWithResponder("cycle", r, nil)
wd.pollHeartbeats()
_, c1, _ := wd.GetWorkerState("cycle")
assert.Equal(t, uint64(1), c1)
r.mu.Lock()
r.alive = false
r.mu.Unlock()
wd.pollHeartbeats()
_, c2, _ := wd.GetWorkerState("cycle")
assert.Equal(t, uint64(1), c2, "dead poll must not increment")
r.mu.Lock()
r.alive = true
r.mu.Unlock()
wd.pollHeartbeats()
_, c3, _ := wd.GetWorkerState("cycle")
assert.Equal(t, uint64(2), c3, "alive again must increment")
}
func TestWatchdog_PollHeartbeats_LegacyNoResponder(t *testing.T) {
wd := NewWatchdog(1*time.Second, 2*time.Second)
wd.RegisterWorker("legacy", nil)
wd.Heartbeat("legacy") // count = 1
wd.pollHeartbeats() // no responder must not touch count
_, count, _ := wd.GetWorkerState("legacy")
assert.Equal(t, uint64(1), count)
}
// ---------------------------------------------------------------------------
// ForwardWorker.sleepWithBackoff
// ---------------------------------------------------------------------------
func TestForwardWorker_SleepWithBackoff_WaitsDelay(t *testing.T) {
if testing.Short() {
t.Skip("skipping timing-sensitive test in -short mode")
}
fwd := buildForward("c", "n", "pod/s", 20050, 80)
w := NewForwardWorker(fwd, nil, false, nil, nil, nil)
// Don't cancel context sleep should run for real (1st attempt ≈ 1s + jitter).
t.Cleanup(func() { w.cancel() })
b := retry.NewBackoff()
start := time.Now()
w.sleepWithBackoff(b)
assert.GreaterOrEqual(t, time.Since(start), 500*time.Millisecond)
}
func TestForwardWorker_SleepWithBackoff_CancelReturnsEarly(t *testing.T) {
fwd := buildForward("c", "n", "pod/sc", 20051, 80)
w := NewForwardWorker(fwd, nil, false, nil, nil, nil)
w.cancel() // pre-cancel
b := retry.NewBackoff()
start := time.Now()
w.sleepWithBackoff(b)
assert.Less(t, time.Since(start), 2*time.Second, "cancelled worker should not sleep")
}
func TestForwardWorker_SleepWithBackoff_Verbose(t *testing.T) {
fwd := buildForward("c", "n", "pod/sv", 20052, 80)
w := NewForwardWorker(fwd, nil, true, nil, nil, nil)
w.cancel()
b := retry.NewBackoff()
w.sleepWithBackoff(b) // must not panic in verbose mode
}
// ---------------------------------------------------------------------------
// ForwardWorker.IsAlive doneChan closed path
// ---------------------------------------------------------------------------
func TestForwardWorker_IsAlive_AfterDoneChanClosed(t *testing.T) {
fwd := buildForward("c", "n", "pod/alive", 20060, 80)
w := NewForwardWorker(fwd, nil, false, nil, nil, nil)
assert.True(t, w.IsAlive())
close(w.doneChan)
assert.False(t, w.IsAlive())
}
// ---------------------------------------------------------------------------
// Watchdog.monitorLoop heartbeat ticker branch (pollHeartbeats via ticker)
// ---------------------------------------------------------------------------
func TestWatchdog_HeartbeatTickerCalls_PollHeartbeats(t *testing.T) {
// Override heartbeatInterval to something short so the ticker fires.
wd := NewWatchdog(10*time.Second, 20*time.Second)
wd.heartbeatInterval = 30 * time.Millisecond
wd.Start()
t.Cleanup(wd.Stop)
r := &fakeResponder{alive: true, id: "hb-tick"}
wd.RegisterWorkerWithResponder("hb-tick", r, nil)
// Wait for the heartbeat ticker to fire at least once.
time.Sleep(150 * time.Millisecond)
_, count, exists := wd.GetWorkerState("hb-tick")
assert.True(t, exists)
assert.GreaterOrEqual(t, count, uint64(1), "heartbeat ticker should poll responder")
}
// ---------------------------------------------------------------------------
// Manager.EnableForward happy path (forward not currently running)
// The worker.Start() will fail to connect (no k8s) but startWorker itself
// succeeds before any network I/O. enableForward returns nil in that case.
// ---------------------------------------------------------------------------
func TestManager_EnableForward_HappyPath(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/enable", 20070, 80)
cfg := buildConfigFrom("c", "n", []config.Forward{fwd})
m.workersMu.Lock()
m.currentConfig = cfg
m.workersMu.Unlock()
// Worker NOT in map (precondition for enable).
err := m.EnableForward(fwd.ID())
require.NoError(t, err)
w := m.GetWorker(fwd.ID())
require.NotNil(t, w, "worker should exist after EnableForward")
t.Cleanup(func() { w.cancel() })
}
// ---------------------------------------------------------------------------
// Manager.stopWorker (one-liner at 0%) goes through stopWorkerInternal
// ---------------------------------------------------------------------------
func TestManager_StopWorker_Delegates(t *testing.T) {
m := newCovManager(t)
ui := &MockStatusUpdater{}
m.SetStatusUI(ui)
fwd := buildForward("c", "n", "pod/sw", 20080, 80)
w := inject(m, fwd)
close(w.doneChan)
// stopWorker is package-private; call through DisableForward which calls it
// indirectly via stopWorkerInternal — already covered. Call it directly here.
err := m.stopWorker(fwd.ID())
require.NoError(t, err)
assert.Nil(t, m.GetWorker(fwd.ID()))
assert.Contains(t, ui.removes, fwd.ID())
}
// ---------------------------------------------------------------------------
// Reload.startWorker mDNS branch nil publisher is a no-op (already covered);
// confirm the watchdog RegisterWorkerWithResponder is called during Reload-add.
// ---------------------------------------------------------------------------
func TestManager_Reload_NewForwardRegisteredInWatchdog(t *testing.T) {
m := newCovManager(t)
m.workersMu.Lock()
m.currentConfig = &config.Config{}
m.workersMu.Unlock()
// Port must be free; use occupyPort only temporarily to find a free port number.
pc := NewPortChecker()
freePort := 0
for p := 20090; p < 20200; p++ {
if pc.isPortAvailable(p) {
freePort = p
break
}
}
require.NotZero(t, freePort, "need a free port")
fwd := buildForward("c", "n", "pod/neww", freePort, 80)
newCfg := buildConfigFrom("c", "n", []config.Forward{fwd})
// Reload adds fwd; startWorker registers it with watchdog.
_ = m.Reload(newCfg)
_, _, exists := m.watchdog.GetWorkerState(fwd.ID())
assert.True(t, exists, "watchdog should have the new worker after Reload-add")
}
// ---------------------------------------------------------------------------
// startHTTPProxy disabled path (most common, runs inside run())
// ---------------------------------------------------------------------------
func TestForwardWorker_StartHTTPProxy_Disabled(t *testing.T) {
// IsHTTPLogEnabled() == false → startHTTPProxy returns nil immediately.
fwd := buildForward("c", "n", "pod/noproxy", 20100, 80)
// HTTPLog is nil by default → disabled.
w := NewForwardWorker(fwd, nil, false, nil, nil, nil)
err := w.startHTTPProxy()
require.NoError(t, err)
assert.Nil(t, w.httpProxy)
}
// ---------------------------------------------------------------------------
// stopHTTPProxy nil httpProxy branch (no-op)
// ---------------------------------------------------------------------------
func TestForwardWorker_StopHTTPProxy_NilProxy(t *testing.T) {
fwd := buildForward("c", "n", "pod/noproxy2", 20101, 80)
w := NewForwardWorker(fwd, nil, false, nil, nil, nil)
// httpProxy is nil must not panic.
w.stopHTTPProxy()
assert.Nil(t, w.httpProxy)
}
// ---------------------------------------------------------------------------
// worker.run start path (no k8s): worker goroutine starts, hits
// portForwarder.GetPodForResource which fails (nil portForwarder panics);
// we simply check it terminates cleanly when stopped immediately.
// We don't exercise run() body deeply without a real or fake k8s connection.
// ---------------------------------------------------------------------------
func TestForwardWorker_Start_TerminatesOnCancel(t *testing.T) {
fwd := buildForward("c", "n", "pod/run", 20110, 80)
// portForwarder is nil → GetPodForResource panics → recovered in run()? No,
// there's no recover in run(). So we'd get a nil pointer dereference.
// Instead use a real portForwarder from a manager so the call fails gracefully.
m := newCovManager(t)
w := NewForwardWorker(fwd, m.portForwarder, false, nil, m.healthChecker, m.watchdog)
w.Start()
// Cancel immediately.
w.cancel()
// Worker should stop; wait with timeout.
select {
case <-w.doneChan:
// clean exit
case <-time.After(5 * time.Second):
t.Fatal("worker did not terminate after cancel")
}
}
// ---------------------------------------------------------------------------
// getProcessUsingPortUnix internal branch coverage
// ---------------------------------------------------------------------------
// TestGetProcessUsingPortUnix_EmptyOutput exercises the pidStr=="" branch.
// Port 2 is a privileged port that nothing listens on in a test environment.
// lsof returns either empty (→ "unknown") or a PID if some process owns it.
// Either way the function must not panic.
func TestGetProcessUsingPortUnix_NothingListening(t *testing.T) {
pc := NewPortChecker()
// Port 2 is almost never bound; lsof will return empty → "unknown".
result := pc.getProcessUsingPortUnix(2)
assert.NotEmpty(t, result)
}
// TestGetProcessUsingPortUnix_ActivePort exercises the pid-parsing path by
// using a port that the test binary itself is actively listening on.
func TestGetProcessUsingPortUnix_ActivePort(t *testing.T) {
// #nosec G102 -- test binds to all interfaces intentionally
l, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer func() { _ = l.Close() }()
port := l.Addr().(*net.TCPAddr).Port
pc := NewPortChecker()
result := pc.getProcessUsingPortUnix(port)
// Should be a process string or "unknown" must not panic.
assert.NotEmpty(t, result)
}
// ---------------------------------------------------------------------------
// startWorker callbacks exercise watchdog hung callback and health callback
// ---------------------------------------------------------------------------
// TestStartWorker_WatchdogCallback exercises the hung-worker closure registered
// by startWorker. We force-trigger it by backdating the worker's heartbeat
// timestamp beyond the hang threshold and calling checkWorkers().
func TestStartWorker_WatchdogCallback_TriggerReconnect(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/wdcb", 20120, 80)
require.NoError(t, m.startWorker(fwd))
t.Cleanup(func() {
if w := m.GetWorker(fwd.ID()); w != nil {
w.cancel()
}
})
// Backdate the heartbeat to force hung detection.
m.watchdog.mu.Lock()
if state, ok := m.watchdog.workers[fwd.ID()]; ok {
state.lastHeartbeat = time.Now().Add(-10 * time.Minute)
state.isHung = false // reset so callback fires again
}
m.watchdog.mu.Unlock()
// checkWorkers runs the hung callback synchronously (outside the lock).
// It calls TriggerReconnect on the worker, which is safe.
m.watchdog.checkWorkers()
// Verify the worker is still in the map (not removed by reconnect).
assert.NotNil(t, m.GetWorker(fwd.ID()))
}
// TestStartWorker_HealthCallback_StatusChange exercises the health callback
// registered by startWorker by triggering a real status-change event through
// the HealthChecker's exported MarkReconnecting (which calls notifyStatusChange
// if status changes). statusUI is set so the callback body executes.
func TestStartWorker_HealthCallback_StatusChange(t *testing.T) {
m := newCovManager(t)
ui := &MockStatusUpdater{}
m.SetStatusUI(ui)
fwd := buildForward("c", "n", "pod/hcb", 20121, 80)
require.NoError(t, m.startWorker(fwd))
t.Cleanup(func() {
if w := m.GetWorker(fwd.ID()); w != nil {
w.cancel()
}
})
// Trigger status change: Starting → Reconnecting fires the callback
// (status differs so notifyStatusChange is called).
m.healthChecker.MarkStarting(fwd.ID())
m.healthChecker.MarkReconnecting(fwd.ID())
// Give the callback a moment to fire (it's synchronous in notifyStatusChange
// but MarkConnected spawns a goroutine; MarkReconnecting calls markStatus directly).
time.Sleep(20 * time.Millisecond)
// The callback should have updated status.
var sawUpdate bool
for _, u := range ui.updates {
if u.ID == fwd.ID() {
sawUpdate = true
}
}
assert.True(t, sawUpdate, "health callback should have called UpdateStatus")
}
// TestStartWorker_HealthCallback_StaleNoRetry exercises StatusStale with retryOnStale=false.
// MarkReconnecting puts worker into Reconnect state then we change to a different
// state and back to stale manually via MarkStarting+MarkReconnecting — but there
// is no exported "MarkStale". Instead, we can exercise the code path via the
// existing stale detection in checkPort which requires a running checker.
// Since that's async and complex, we simply confirm the path compiles and runs
// without covering stale-specific lines (those require a real connection timeout).
func TestStartWorker_HealthCallback_StaleNoRetry(t *testing.T) {
m := newCovManager(t)
fwd := buildForward("c", "n", "pod/stale-nort", 20123, 80)
m.workersMu.Lock()
m.currentConfig = &config.Config{} // retryOnStale defaults to false
m.workersMu.Unlock()
require.NoError(t, m.startWorker(fwd))
t.Cleanup(func() {
if w := m.GetWorker(fwd.ID()); w != nil {
w.cancel()
}
})
// Trigger a callback via status change exercises the outer callback body.
m.healthChecker.MarkStarting(fwd.ID())
m.healthChecker.MarkReconnecting(fwd.ID())
time.Sleep(20 * time.Millisecond)
}
// ---------------------------------------------------------------------------
// Watchdog.checkWorkers event bus branch (publishes WorkerHungEvent)
// ---------------------------------------------------------------------------
func TestWatchdog_CheckWorkers_WithEventBus(t *testing.T) {
// Exercises the eventBus != nil path in checkWorkers.
wd := NewWatchdog(30*time.Millisecond, 60*time.Millisecond)
m := newCovManager(t)
wd.SetEventBus(m.eventBus)
wd.Start()
t.Cleanup(wd.Stop)
called := make(chan struct{}, 1)
wd.RegisterWorker("event-hung", func(string) {
select {
case called <- struct{}{}:
default:
}
})
// Never send heartbeat → checkWorkers fires callback (and tries to publish event).
select {
case <-called:
// callback fired eventBus publish path was reached
case <-time.After(1 * time.Second):
t.Fatal("hung callback not fired")
}
}