From 112969b82aabbd545e419173a36aa62e3e606c25 Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Sun, 23 Nov 2025 15:56:57 +0000 Subject: [PATCH] Add active healtchecks to verify the connectivity. --- internal/forward/manager.go | 22 +++- internal/forward/worker.go | 23 ++-- internal/healthcheck/checker.go | 221 ++++++++++++++++++++++++++++++++ 3 files changed, 254 insertions(+), 12 deletions(-) create mode 100644 internal/healthcheck/checker.go diff --git a/internal/forward/manager.go b/internal/forward/manager.go index 89f8290..bca7e3d 100644 --- a/internal/forward/manager.go +++ b/internal/forward/manager.go @@ -4,8 +4,10 @@ import ( "fmt" "log" "sync" + "time" "github.com/nvm/kportal/internal/config" + "github.com/nvm/kportal/internal/healthcheck" "github.com/nvm/kportal/internal/k8s" ) @@ -25,6 +27,7 @@ type Manager struct { resolver *k8s.ResourceResolver portForwarder *k8s.PortForwarder portChecker *PortChecker + healthChecker *healthcheck.Checker verbose bool currentConfig *config.Config statusUI StatusUpdater @@ -40,12 +43,16 @@ func NewManager(verbose bool) *Manager { resolver := k8s.NewResourceResolver(clientPool) portForwarder := k8s.NewPortForwarder(clientPool, resolver) + // Create health checker: check every 5 seconds with 2 second timeout + healthChecker := healthcheck.NewChecker(5*time.Second, 2*time.Second) + return &Manager{ workers: make(map[string]*ForwardWorker), clientPool: clientPool, resolver: resolver, portForwarder: portForwarder, portChecker: NewPortChecker(), + healthChecker: healthChecker, verbose: verbose, } } @@ -99,6 +106,9 @@ func (m *Manager) Start(cfg *config.Config) error { func (m *Manager) Stop() { log.Printf("Stopping all port-forwards...") + // Stop health checker first + m.healthChecker.Stop() + m.workersMu.Lock() workers := make([]*ForwardWorker, 0, len(m.workers)) for _, worker := range m.workers { @@ -248,8 +258,15 @@ func (m *Manager) startWorker(fwd config.Forward) error { m.statusUI.AddForward(fwd.ID(), &fwd) } + // Register with health checker + m.healthChecker.Register(fwd.ID(), fwd.LocalPort, func(forwardID string, status healthcheck.Status) { + if m.statusUI != nil { + m.statusUI.UpdateStatus(forwardID, string(status)) + } + }) + // Create and start worker - worker := NewForwardWorker(fwd, m.portForwarder, m.verbose, m.statusUI) + worker := NewForwardWorker(fwd, m.portForwarder, m.verbose, m.statusUI, m.healthChecker) worker.Start() // Store worker @@ -269,6 +286,9 @@ func (m *Manager) stopWorker(id string) error { delete(m.workers, id) m.workersMu.Unlock() + // Unregister from health checker + m.healthChecker.Unregister(id) + // Stop the worker worker.Stop() diff --git a/internal/forward/worker.go b/internal/forward/worker.go index 69542f8..2e110fc 100644 --- a/internal/forward/worker.go +++ b/internal/forward/worker.go @@ -8,6 +8,7 @@ import ( "time" "github.com/nvm/kportal/internal/config" + "github.com/nvm/kportal/internal/healthcheck" "github.com/nvm/kportal/internal/k8s" "github.com/nvm/kportal/internal/retry" ) @@ -23,10 +24,11 @@ type ForwardWorker struct { verbose bool lastPod string // Track the last pod we connected to statusUI StatusUpdater + healthChecker *healthcheck.Checker } // NewForwardWorker creates a new ForwardWorker for a single forward configuration. -func NewForwardWorker(fwd config.Forward, portForwarder *k8s.PortForwarder, verbose bool, statusUI StatusUpdater) *ForwardWorker { +func NewForwardWorker(fwd config.Forward, portForwarder *k8s.PortForwarder, verbose bool, statusUI StatusUpdater, healthChecker *healthcheck.Checker) *ForwardWorker { ctx, cancel := context.WithCancel(context.Background()) return &ForwardWorker{ @@ -38,6 +40,7 @@ func NewForwardWorker(fwd config.Forward, portForwarder *k8s.PortForwarder, verb doneChan: make(chan struct{}), verbose: verbose, statusUI: statusUI, + healthChecker: healthChecker, } } @@ -88,22 +91,20 @@ func (w *ForwardWorker) run() { // Check if pod changed (restart detected) if w.lastPod != "" && w.lastPod != podName { - if w.statusUI != nil { - w.statusUI.UpdateStatus(w.forward.ID(), "Reconnecting") + if w.healthChecker != nil { + w.healthChecker.MarkReconnecting(w.forward.ID()) } log.Printf("[%s] Switched to new pod: %s → %s", w.forward.ID(), w.lastPod, podName) } else if w.lastPod == "" { log.Printf("[%s] Forwarding %s → localhost:%d", w.forward.ID(), w.forward.String(), w.forward.LocalPort) + if w.healthChecker != nil { + w.healthChecker.MarkStarting(w.forward.ID()) + } } w.lastPod = podName - // Update status to active - if w.statusUI != nil { - w.statusUI.UpdateStatus(w.forward.ID(), "Active") - } - // Establish port-forward connection err = w.establishForward(podName) @@ -114,9 +115,9 @@ func (w *ForwardWorker) run() { return } - // Update status to error - if w.statusUI != nil { - w.statusUI.UpdateStatus(w.forward.ID(), "Reconnecting") + // Update status to reconnecting + if w.healthChecker != nil { + w.healthChecker.MarkReconnecting(w.forward.ID()) } // Log the error diff --git a/internal/healthcheck/checker.go b/internal/healthcheck/checker.go new file mode 100644 index 0000000..519017b --- /dev/null +++ b/internal/healthcheck/checker.go @@ -0,0 +1,221 @@ +package healthcheck + +import ( + "context" + "fmt" + "net" + "sync" + "time" +) + +// Status represents the health status of a port forward +type Status string + +const ( + StatusHealthy Status = "Active" + StatusUnhealthy Status = "Error" + StatusStarting Status = "Starting" + StatusReconnect Status = "Reconnecting" +) + +// PortHealth represents the health status of a single port +type PortHealth struct { + Port int + LastCheck time.Time + Status Status + ErrorMessage string +} + +// StatusCallback is called when a port's health status changes +type StatusCallback func(forwardID string, status Status) + +// Checker performs periodic health checks on local ports +type Checker struct { + mu sync.RWMutex + ports map[string]*PortHealth // key: forward ID + callbacks map[string]StatusCallback + interval time.Duration + timeout time.Duration + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewChecker creates a new health checker +func NewChecker(interval, timeout time.Duration) *Checker { + ctx, cancel := context.WithCancel(context.Background()) + return &Checker{ + ports: make(map[string]*PortHealth), + callbacks: make(map[string]StatusCallback), + interval: interval, + timeout: timeout, + ctx: ctx, + cancel: cancel, + } +} + +// Register adds a port to monitor +func (c *Checker) Register(forwardID string, port int, callback StatusCallback) { + c.mu.Lock() + defer c.mu.Unlock() + + c.ports[forwardID] = &PortHealth{ + Port: port, + LastCheck: time.Time{}, + Status: StatusStarting, + } + c.callbacks[forwardID] = callback + + // Start checking this port + c.wg.Add(1) + go c.checkLoop(forwardID) +} + +// Unregister removes a port from monitoring +func (c *Checker) Unregister(forwardID string) { + c.mu.Lock() + defer c.mu.Unlock() + + delete(c.ports, forwardID) + delete(c.callbacks, forwardID) +} + +// MarkReconnecting marks a forward as reconnecting (called by worker) +func (c *Checker) MarkReconnecting(forwardID string) { + c.mu.Lock() + defer c.mu.Unlock() + + if health, exists := c.ports[forwardID]; exists { + oldStatus := health.Status + health.Status = StatusReconnect + health.LastCheck = time.Now() + + // Notify if status changed + if oldStatus != StatusReconnect { + c.mu.Unlock() + c.notifyStatusChange(forwardID, StatusReconnect) + c.mu.Lock() + } + } +} + +// MarkStarting marks a forward as starting (called by worker) +func (c *Checker) MarkStarting(forwardID string) { + c.mu.Lock() + defer c.mu.Unlock() + + if health, exists := c.ports[forwardID]; exists { + oldStatus := health.Status + health.Status = StatusStarting + health.LastCheck = time.Now() + + // Notify if status changed + if oldStatus != StatusStarting { + c.mu.Unlock() + c.notifyStatusChange(forwardID, StatusStarting) + c.mu.Lock() + } + } +} + +// GetStatus returns the current health status of a forward +func (c *Checker) GetStatus(forwardID string) (Status, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + if health, exists := c.ports[forwardID]; exists { + return health.Status, true + } + return StatusUnhealthy, false +} + +// Stop stops all health checking +func (c *Checker) Stop() { + c.cancel() + c.wg.Wait() +} + +// checkLoop continuously checks a single port's health +func (c *Checker) checkLoop(forwardID string) { + defer c.wg.Done() + + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + // Initial check after a short delay to let port-forward establish + time.Sleep(2 * time.Second) + c.checkPort(forwardID) + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + // Check if this forward still exists + c.mu.RLock() + _, exists := c.ports[forwardID] + c.mu.RUnlock() + + if !exists { + return + } + + c.checkPort(forwardID) + } + } +} + +// checkPort performs a single health check on a port +func (c *Checker) checkPort(forwardID string) { + c.mu.RLock() + health, exists := c.ports[forwardID] + if !exists { + c.mu.RUnlock() + return + } + port := health.Port + oldStatus := health.Status + c.mu.RUnlock() + + // Attempt to connect to the local port + ctx, cancel := context.WithTimeout(c.ctx, c.timeout) + defer cancel() + + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", fmt.Sprintf("127.0.0.1:%d", port)) + + newStatus := StatusHealthy + errorMsg := "" + + if err != nil { + newStatus = StatusUnhealthy + errorMsg = err.Error() + } else { + conn.Close() + } + + // Update health status + c.mu.Lock() + if health, exists := c.ports[forwardID]; exists { + health.Status = newStatus + health.LastCheck = time.Now() + health.ErrorMessage = errorMsg + } + c.mu.Unlock() + + // Notify if status changed + if oldStatus != newStatus { + c.notifyStatusChange(forwardID, newStatus) + } +} + +// notifyStatusChange calls the callback for a forward +func (c *Checker) notifyStatusChange(forwardID string, status Status) { + c.mu.RLock() + callback, exists := c.callbacks[forwardID] + c.mu.RUnlock() + + if exists && callback != nil { + callback(forwardID, status) + } +}