mirror of
https://github.com/lukaszraczylo/kportal.git
synced 2026-06-15 02:45:33 +00:00
Fix the watchdog being too aggressive.
This commit is contained in:
@@ -92,14 +92,15 @@ func (w *ForwardWorker) Stop() {
|
|||||||
func (w *ForwardWorker) run() {
|
func (w *ForwardWorker) run() {
|
||||||
defer close(w.doneChan)
|
defer close(w.doneChan)
|
||||||
|
|
||||||
|
// Start heartbeat goroutine to continuously send heartbeats to watchdog
|
||||||
|
// This prevents false "hung worker" detection when connections are long-lived
|
||||||
|
if w.watchdog != nil {
|
||||||
|
go w.heartbeatLoop()
|
||||||
|
}
|
||||||
|
|
||||||
backoff := retry.NewBackoff()
|
backoff := retry.NewBackoff()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Send heartbeat to watchdog to indicate we're alive
|
|
||||||
if w.watchdog != nil {
|
|
||||||
w.watchdog.Heartbeat(w.forward.ID())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we should stop
|
// Check if we should stop
|
||||||
select {
|
select {
|
||||||
case <-w.ctx.Done():
|
case <-w.ctx.Done():
|
||||||
@@ -202,6 +203,26 @@ func (w *ForwardWorker) run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// heartbeatLoop sends periodic heartbeats to the watchdog to prove the worker is alive
|
||||||
|
// This runs in a separate goroutine and continues throughout the worker's lifetime
|
||||||
|
func (w *ForwardWorker) heartbeatLoop() {
|
||||||
|
// Send heartbeats every 15 seconds (well within typical 60s watchdog timeout)
|
||||||
|
ticker := time.NewTicker(15 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Send immediate heartbeat
|
||||||
|
w.watchdog.Heartbeat(w.forward.ID())
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
w.watchdog.Heartbeat(w.forward.ID())
|
||||||
|
case <-w.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// establishForward establishes a port-forward connection.
|
// establishForward establishes a port-forward connection.
|
||||||
// This blocks until the connection is closed or an error occurs.
|
// This blocks until the connection is closed or an error occurs.
|
||||||
func (w *ForwardWorker) establishForward(podName string) error {
|
func (w *ForwardWorker) establishForward(podName string) error {
|
||||||
|
|||||||
@@ -313,6 +313,12 @@ func (c *Checker) checkPort(forwardID string) {
|
|||||||
health.Status = newStatus
|
health.Status = newStatus
|
||||||
health.LastCheck = now
|
health.LastCheck = now
|
||||||
health.ErrorMessage = errorMsg
|
health.ErrorMessage = errorMsg
|
||||||
|
|
||||||
|
// Successful health check indicates connection is active
|
||||||
|
// This prevents false positives where healthy connections are marked as idle
|
||||||
|
if newStatus == StatusHealthy {
|
||||||
|
health.LastActivity = now
|
||||||
|
}
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
|||||||
@@ -288,7 +288,8 @@ func (s *HealthCheckTestSuite) TestConnectionAgeDetection() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestIdleTimeDetection tests max idle time detection
|
// TestIdleTimeDetection tests that connections with passing health checks are NOT marked as stale
|
||||||
|
// This verifies that successful health checks update LastActivity, preventing false idle detection
|
||||||
func (s *HealthCheckTestSuite) TestIdleTimeDetection() {
|
func (s *HealthCheckTestSuite) TestIdleTimeDetection() {
|
||||||
statusChanges := make(chan Status, 10)
|
statusChanges := make(chan Status, 10)
|
||||||
callback := func(forwardID string, status Status, errorMsg string) {
|
callback := func(forwardID string, status Status, errorMsg string) {
|
||||||
@@ -307,26 +308,23 @@ func (s *HealthCheckTestSuite) TestIdleTimeDetection() {
|
|||||||
|
|
||||||
checker.Register("test-forward", s.port, callback)
|
checker.Register("test-forward", s.port, callback)
|
||||||
|
|
||||||
// Wait for initial healthy status
|
// Wait long enough that idle time WOULD be exceeded if health checks didn't update LastActivity
|
||||||
var gotHealthy, gotStale bool
|
time.Sleep(500 * time.Millisecond)
|
||||||
timeout := time.After(1 * time.Second)
|
|
||||||
|
|
||||||
for {
|
// Verify connection is still healthy, not stale
|
||||||
select {
|
// This proves that successful health checks are updating LastActivity
|
||||||
case status := <-statusChanges:
|
status, exists := checker.GetStatus("test-forward")
|
||||||
if status == StatusHealthy || status == StatusStarting {
|
require.True(s.T(), exists)
|
||||||
gotHealthy = true
|
assert.Equal(s.T(), StatusHealthy, status, "Connection with passing health checks should NOT be marked as stale")
|
||||||
}
|
|
||||||
if status == StatusStale {
|
// Verify we never received a StatusStale callback
|
||||||
gotStale = true
|
select {
|
||||||
}
|
case status := <-statusChanges:
|
||||||
if gotHealthy && gotStale {
|
if status == StatusStale {
|
||||||
return // Test passed
|
s.T().Fatal("Connection should NOT be marked as stale when health checks are passing")
|
||||||
}
|
|
||||||
case <-timeout:
|
|
||||||
s.T().Fatalf("Expected StatusStale after max idle time exceeded. gotHealthy=%v, gotStale=%v",
|
|
||||||
gotHealthy, gotStale)
|
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
// No stale status - this is correct
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user