Add active healtchecks to verify the connectivity.

This commit is contained in:
2025-11-23 15:56:57 +00:00
parent 9b57431f59
commit 112969b82a
3 changed files with 254 additions and 12 deletions
+21 -1
View File
@@ -4,8 +4,10 @@ import (
"fmt"
"log"
"sync"
"time"
"github.com/nvm/kportal/internal/config"
"github.com/nvm/kportal/internal/healthcheck"
"github.com/nvm/kportal/internal/k8s"
)
@@ -25,6 +27,7 @@ type Manager struct {
resolver *k8s.ResourceResolver
portForwarder *k8s.PortForwarder
portChecker *PortChecker
healthChecker *healthcheck.Checker
verbose bool
currentConfig *config.Config
statusUI StatusUpdater
@@ -40,12 +43,16 @@ func NewManager(verbose bool) *Manager {
resolver := k8s.NewResourceResolver(clientPool)
portForwarder := k8s.NewPortForwarder(clientPool, resolver)
// Create health checker: check every 5 seconds with 2 second timeout
healthChecker := healthcheck.NewChecker(5*time.Second, 2*time.Second)
return &Manager{
workers: make(map[string]*ForwardWorker),
clientPool: clientPool,
resolver: resolver,
portForwarder: portForwarder,
portChecker: NewPortChecker(),
healthChecker: healthChecker,
verbose: verbose,
}
}
@@ -99,6 +106,9 @@ func (m *Manager) Start(cfg *config.Config) error {
func (m *Manager) Stop() {
log.Printf("Stopping all port-forwards...")
// Stop health checker first
m.healthChecker.Stop()
m.workersMu.Lock()
workers := make([]*ForwardWorker, 0, len(m.workers))
for _, worker := range m.workers {
@@ -248,8 +258,15 @@ func (m *Manager) startWorker(fwd config.Forward) error {
m.statusUI.AddForward(fwd.ID(), &fwd)
}
// Register with health checker
m.healthChecker.Register(fwd.ID(), fwd.LocalPort, func(forwardID string, status healthcheck.Status) {
if m.statusUI != nil {
m.statusUI.UpdateStatus(forwardID, string(status))
}
})
// Create and start worker
worker := NewForwardWorker(fwd, m.portForwarder, m.verbose, m.statusUI)
worker := NewForwardWorker(fwd, m.portForwarder, m.verbose, m.statusUI, m.healthChecker)
worker.Start()
// Store worker
@@ -269,6 +286,9 @@ func (m *Manager) stopWorker(id string) error {
delete(m.workers, id)
m.workersMu.Unlock()
// Unregister from health checker
m.healthChecker.Unregister(id)
// Stop the worker
worker.Stop()
+12 -11
View File
@@ -8,6 +8,7 @@ import (
"time"
"github.com/nvm/kportal/internal/config"
"github.com/nvm/kportal/internal/healthcheck"
"github.com/nvm/kportal/internal/k8s"
"github.com/nvm/kportal/internal/retry"
)
@@ -23,10 +24,11 @@ type ForwardWorker struct {
verbose bool
lastPod string // Track the last pod we connected to
statusUI StatusUpdater
healthChecker *healthcheck.Checker
}
// NewForwardWorker creates a new ForwardWorker for a single forward configuration.
func NewForwardWorker(fwd config.Forward, portForwarder *k8s.PortForwarder, verbose bool, statusUI StatusUpdater) *ForwardWorker {
func NewForwardWorker(fwd config.Forward, portForwarder *k8s.PortForwarder, verbose bool, statusUI StatusUpdater, healthChecker *healthcheck.Checker) *ForwardWorker {
ctx, cancel := context.WithCancel(context.Background())
return &ForwardWorker{
@@ -38,6 +40,7 @@ func NewForwardWorker(fwd config.Forward, portForwarder *k8s.PortForwarder, verb
doneChan: make(chan struct{}),
verbose: verbose,
statusUI: statusUI,
healthChecker: healthChecker,
}
}
@@ -88,22 +91,20 @@ func (w *ForwardWorker) run() {
// Check if pod changed (restart detected)
if w.lastPod != "" && w.lastPod != podName {
if w.statusUI != nil {
w.statusUI.UpdateStatus(w.forward.ID(), "Reconnecting")
if w.healthChecker != nil {
w.healthChecker.MarkReconnecting(w.forward.ID())
}
log.Printf("[%s] Switched to new pod: %s → %s", w.forward.ID(), w.lastPod, podName)
} else if w.lastPod == "" {
log.Printf("[%s] Forwarding %s → localhost:%d",
w.forward.ID(), w.forward.String(), w.forward.LocalPort)
if w.healthChecker != nil {
w.healthChecker.MarkStarting(w.forward.ID())
}
}
w.lastPod = podName
// Update status to active
if w.statusUI != nil {
w.statusUI.UpdateStatus(w.forward.ID(), "Active")
}
// Establish port-forward connection
err = w.establishForward(podName)
@@ -114,9 +115,9 @@ func (w *ForwardWorker) run() {
return
}
// Update status to error
if w.statusUI != nil {
w.statusUI.UpdateStatus(w.forward.ID(), "Reconnecting")
// Update status to reconnecting
if w.healthChecker != nil {
w.healthChecker.MarkReconnecting(w.forward.ID())
}
// Log the error
+221
View File
@@ -0,0 +1,221 @@
package healthcheck
import (
"context"
"fmt"
"net"
"sync"
"time"
)
// Status represents the health status of a port forward
type Status string
const (
StatusHealthy Status = "Active"
StatusUnhealthy Status = "Error"
StatusStarting Status = "Starting"
StatusReconnect Status = "Reconnecting"
)
// PortHealth represents the health status of a single port
type PortHealth struct {
Port int
LastCheck time.Time
Status Status
ErrorMessage string
}
// StatusCallback is called when a port's health status changes
type StatusCallback func(forwardID string, status Status)
// Checker performs periodic health checks on local ports
type Checker struct {
mu sync.RWMutex
ports map[string]*PortHealth // key: forward ID
callbacks map[string]StatusCallback
interval time.Duration
timeout time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewChecker creates a new health checker
func NewChecker(interval, timeout time.Duration) *Checker {
ctx, cancel := context.WithCancel(context.Background())
return &Checker{
ports: make(map[string]*PortHealth),
callbacks: make(map[string]StatusCallback),
interval: interval,
timeout: timeout,
ctx: ctx,
cancel: cancel,
}
}
// Register adds a port to monitor
func (c *Checker) Register(forwardID string, port int, callback StatusCallback) {
c.mu.Lock()
defer c.mu.Unlock()
c.ports[forwardID] = &PortHealth{
Port: port,
LastCheck: time.Time{},
Status: StatusStarting,
}
c.callbacks[forwardID] = callback
// Start checking this port
c.wg.Add(1)
go c.checkLoop(forwardID)
}
// Unregister removes a port from monitoring
func (c *Checker) Unregister(forwardID string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.ports, forwardID)
delete(c.callbacks, forwardID)
}
// MarkReconnecting marks a forward as reconnecting (called by worker)
func (c *Checker) MarkReconnecting(forwardID string) {
c.mu.Lock()
defer c.mu.Unlock()
if health, exists := c.ports[forwardID]; exists {
oldStatus := health.Status
health.Status = StatusReconnect
health.LastCheck = time.Now()
// Notify if status changed
if oldStatus != StatusReconnect {
c.mu.Unlock()
c.notifyStatusChange(forwardID, StatusReconnect)
c.mu.Lock()
}
}
}
// MarkStarting marks a forward as starting (called by worker)
func (c *Checker) MarkStarting(forwardID string) {
c.mu.Lock()
defer c.mu.Unlock()
if health, exists := c.ports[forwardID]; exists {
oldStatus := health.Status
health.Status = StatusStarting
health.LastCheck = time.Now()
// Notify if status changed
if oldStatus != StatusStarting {
c.mu.Unlock()
c.notifyStatusChange(forwardID, StatusStarting)
c.mu.Lock()
}
}
}
// GetStatus returns the current health status of a forward
func (c *Checker) GetStatus(forwardID string) (Status, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if health, exists := c.ports[forwardID]; exists {
return health.Status, true
}
return StatusUnhealthy, false
}
// Stop stops all health checking
func (c *Checker) Stop() {
c.cancel()
c.wg.Wait()
}
// checkLoop continuously checks a single port's health
func (c *Checker) checkLoop(forwardID string) {
defer c.wg.Done()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
// Initial check after a short delay to let port-forward establish
time.Sleep(2 * time.Second)
c.checkPort(forwardID)
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
// Check if this forward still exists
c.mu.RLock()
_, exists := c.ports[forwardID]
c.mu.RUnlock()
if !exists {
return
}
c.checkPort(forwardID)
}
}
}
// checkPort performs a single health check on a port
func (c *Checker) checkPort(forwardID string) {
c.mu.RLock()
health, exists := c.ports[forwardID]
if !exists {
c.mu.RUnlock()
return
}
port := health.Port
oldStatus := health.Status
c.mu.RUnlock()
// Attempt to connect to the local port
ctx, cancel := context.WithTimeout(c.ctx, c.timeout)
defer cancel()
var d net.Dialer
conn, err := d.DialContext(ctx, "tcp", fmt.Sprintf("127.0.0.1:%d", port))
newStatus := StatusHealthy
errorMsg := ""
if err != nil {
newStatus = StatusUnhealthy
errorMsg = err.Error()
} else {
conn.Close()
}
// Update health status
c.mu.Lock()
if health, exists := c.ports[forwardID]; exists {
health.Status = newStatus
health.LastCheck = time.Now()
health.ErrorMessage = errorMsg
}
c.mu.Unlock()
// Notify if status changed
if oldStatus != newStatus {
c.notifyStatusChange(forwardID, newStatus)
}
}
// notifyStatusChange calls the callback for a forward
func (c *Checker) notifyStatusChange(forwardID string, status Status) {
c.mu.RLock()
callback, exists := c.callbacks[forwardID]
c.mu.RUnlock()
if exists && callback != nil {
callback(forwardID, status)
}
}