From 6cb4f91eceb64b747f14fda0f3c45088bdf505fa Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Sun, 14 Dec 2025 18:17:20 +0000 Subject: [PATCH] Cleanup and refactor. --- cmd/kportal/main.go | 10 ++++++++ internal/benchmark/runner.go | 10 -------- internal/benchmark/runner_test.go | 9 ------- internal/config/validator.go | 5 ---- internal/config/validator_test.go | 9 ------- internal/config/watcher.go | 8 ++++-- internal/events/bus.go | 9 ------- internal/events/bus_test.go | 10 -------- internal/forward/manager.go | 26 ------------------- internal/forward/manager_test.go | 42 +------------------------------ internal/forward/worker.go | 13 ++++++++++ internal/healthcheck/checker.go | 11 ++++---- internal/mdns/publisher.go | 17 ------------- internal/mdns/publisher_test.go | 41 ++++++++++++++++++------------ internal/retry/backoff.go | 11 +++++++- internal/version/checker.go | 6 ----- internal/version/checker_test.go | 13 ---------- 17 files changed, 70 insertions(+), 180 deletions(-) diff --git a/cmd/kportal/main.go b/cmd/kportal/main.go index 4cc9b8b..1e6d07e 100644 --- a/cmd/kportal/main.go +++ b/cmd/kportal/main.go @@ -309,16 +309,26 @@ func main() { bubbleTeaUI.SetHTTPLogSubscriber(func(forwardID string, callback func(entry ui.HTTPLogEntry)) func() { worker := manager.GetWorker(forwardID) if worker == nil { + logger.Debug("HTTP log subscription failed: worker not found", map[string]interface{}{ + "forward_id": forwardID, + }) return func() {} // No-op cleanup } proxy := worker.GetHTTPProxy() if proxy == nil { + // This is expected for forwards without httpLog enabled - not an error + logger.Debug("HTTP log subscription skipped: proxy not enabled", map[string]interface{}{ + "forward_id": forwardID, + }) return func() {} // HTTP logging not enabled for this forward } proxyLogger := proxy.GetLogger() if proxyLogger == nil { + logger.Debug("HTTP log subscription failed: logger not available", map[string]interface{}{ + "forward_id": forwardID, + }) return func() {} } diff --git a/internal/benchmark/runner.go b/internal/benchmark/runner.go index 41dad45..780224b 100644 --- a/internal/benchmark/runner.go +++ b/internal/benchmark/runner.go @@ -27,16 +27,6 @@ type Config struct { ProgressCallback ProgressCallback // Optional callback for progress updates } -// DefaultConfig returns a default benchmark configuration -func DefaultConfig() Config { - return Config{ - Method: "GET", - Concurrency: 10, - Requests: 100, - Timeout: 30 * time.Second, - } -} - // Runner executes HTTP benchmarks type Runner struct { client *http.Client diff --git a/internal/benchmark/runner_test.go b/internal/benchmark/runner_test.go index 65a6342..dff401c 100644 --- a/internal/benchmark/runner_test.go +++ b/internal/benchmark/runner_test.go @@ -206,15 +206,6 @@ func TestRunnerWithBody(t *testing.T) { assert.Equal(t, int64(15), results.BytesWritten) } -func TestDefaultConfig(t *testing.T) { - cfg := DefaultConfig() - - assert.Equal(t, "GET", cfg.Method) - assert.Equal(t, 10, cfg.Concurrency) - assert.Equal(t, 100, cfg.Requests) - assert.Equal(t, 30*time.Second, cfg.Timeout) -} - func TestRunnerWithProgressCallback(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(10 * time.Millisecond) // Add small delay so progress ticker can fire diff --git a/internal/config/validator.go b/internal/config/validator.go index e2f2708..32bb6cf 100644 --- a/internal/config/validator.go +++ b/internal/config/validator.go @@ -22,11 +22,6 @@ type ValidationError struct { Context map[string]string // Additional context information } -// Error implements the error interface. -func (e ValidationError) Error() string { - return e.Message -} - // Validator validates configuration files. type Validator struct{} diff --git a/internal/config/validator_test.go b/internal/config/validator_test.go index 822c941..82edbd7 100644 --- a/internal/config/validator_test.go +++ b/internal/config/validator_test.go @@ -621,15 +621,6 @@ func TestFormatValidationErrors(t *testing.T) { } } -func TestValidationError_Error(t *testing.T) { - err := ValidationError{ - Field: "port", - Message: "Invalid port 0", - } - - assert.Equal(t, "Invalid port 0", err.Error(), "Error() should return the message") -} - func TestValidator_ValidateStructure(t *testing.T) { validator := NewValidator() diff --git a/internal/config/watcher.go b/internal/config/watcher.go index 6a4c963..4b29f88 100644 --- a/internal/config/watcher.go +++ b/internal/config/watcher.go @@ -22,6 +22,7 @@ type Watcher struct { done chan struct{} verbose bool wg sync.WaitGroup // Ensures watch goroutine exits before Stop returns + stopOnce sync.Once // Ensures Stop is safe to call multiple times } // NewWatcher creates a new file watcher for the given config file. @@ -61,9 +62,12 @@ func (w *Watcher) Start() { } // Stop stops watching the configuration file and waits for the watch goroutine to exit. +// Safe to call multiple times. func (w *Watcher) Stop() { - close(w.done) - _ = w.watcher.Close() + w.stopOnce.Do(func() { + close(w.done) + _ = w.watcher.Close() + }) w.wg.Wait() // Wait for watch goroutine to exit } diff --git a/internal/events/bus.go b/internal/events/bus.go index a0fb835..04b2174 100644 --- a/internal/events/bus.go +++ b/internal/events/bus.go @@ -135,15 +135,6 @@ func (b *Bus) Close() { // Helper functions for creating common events -// NewForwardEvent creates a forward-related event -func NewForwardEvent(eventType EventType, forwardID string, data map[string]interface{}) Event { - return Event{ - Type: eventType, - ForwardID: forwardID, - Data: data, - } -} - // NewHealthEvent creates a health status change event func NewHealthEvent(forwardID string, status string, errorMsg string) Event { return Event{ diff --git a/internal/events/bus_test.go b/internal/events/bus_test.go index 5377cf3..bb4e4a0 100644 --- a/internal/events/bus_test.go +++ b/internal/events/bus_test.go @@ -149,16 +149,6 @@ func TestBus_ConcurrentAccess(t *testing.T) { assert.Equal(t, int64(100), atomic.LoadInt64(&count)) } -func TestNewForwardEvent(t *testing.T) { - event := NewForwardEvent(EventForwardStarting, "test-id", map[string]interface{}{ - "pod": "my-pod", - }) - - assert.Equal(t, EventForwardStarting, event.Type) - assert.Equal(t, "test-id", event.ForwardID) - assert.Equal(t, "my-pod", event.Data["pod"]) -} - func TestNewHealthEvent(t *testing.T) { event := NewHealthEvent("test-id", "Active", "") diff --git a/internal/forward/manager.go b/internal/forward/manager.go index 9f168b5..b2a8a29 100644 --- a/internal/forward/manager.go +++ b/internal/forward/manager.go @@ -139,11 +139,6 @@ func (m *Manager) SetMDNSPublisher(publisher *mdns.Publisher) { m.mdnsPublisher = publisher } -// GetEventBus returns the event bus for subscribing to manager events -func (m *Manager) GetEventBus() *events.Bus { - return m.eventBus -} - // Start initializes and starts all port-forwards from the configuration. func (m *Manager) Start(cfg *config.Config) error { if cfg == nil { @@ -493,27 +488,6 @@ func (m *Manager) stopWorkerInternal(id string, removeFromUI bool) error { return nil } -// GetActiveForwards returns a list of all active forward IDs. -func (m *Manager) GetActiveForwards() []string { - m.workersMu.RLock() - defer m.workersMu.RUnlock() - - ids := make([]string, 0, len(m.workers)) - for id := range m.workers { - ids = append(ids, id) - } - - return ids -} - -// GetWorkerCount returns the number of active workers. -func (m *Manager) GetWorkerCount() int { - m.workersMu.RLock() - defer m.workersMu.RUnlock() - - return len(m.workers) -} - // GetWorker returns a worker by ID, or nil if not found. func (m *Manager) GetWorker(id string) *ForwardWorker { m.workersMu.RLock() diff --git a/internal/forward/manager_test.go b/internal/forward/manager_test.go index a3cb636..0ee1b78 100644 --- a/internal/forward/manager_test.go +++ b/internal/forward/manager_test.go @@ -7,7 +7,6 @@ import ( "github.com/nvm/kportal/internal/config" "github.com/nvm/kportal/internal/events" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // TestNewManager tests manager creation @@ -53,41 +52,6 @@ func TestManager_SetStatusUI(t *testing.T) { assert.Equal(t, mockUI, manager.statusUI) } -// TestManager_GetEventBus tests getting the event bus -func TestManager_GetEventBus(t *testing.T) { - manager, err := NewManager(false) - if err != nil { - t.Skip("Skipping test - no kubeconfig available") - } - defer manager.Stop() - - bus := manager.GetEventBus() - assert.NotNil(t, bus) -} - -// TestManager_GetWorkerCount tests worker count tracking -func TestManager_GetWorkerCount(t *testing.T) { - manager, err := NewManager(false) - if err != nil { - t.Skip("Skipping test - no kubeconfig available") - } - defer manager.Stop() - - assert.Equal(t, 0, manager.GetWorkerCount()) -} - -// TestManager_GetActiveForwards tests getting active forwards -func TestManager_GetActiveForwards(t *testing.T) { - manager, err := NewManager(false) - if err != nil { - t.Skip("Skipping test - no kubeconfig available") - } - defer manager.Stop() - - forwards := manager.GetActiveForwards() - assert.Empty(t, forwards) -} - // TestManager_GetWorker tests getting a worker by ID func TestManager_GetWorker(t *testing.T) { manager, err := NewManager(false) @@ -362,12 +326,8 @@ func TestManager_EventBusIntegration(t *testing.T) { // Event bus should be wired to health checker and watchdog assert.NotNil(t, manager.eventBus) - // Get event bus - bus := manager.GetEventBus() - require.NotNil(t, bus) - // SubscribeAll should work (no return value in this API) - bus.SubscribeAll(func(event events.Event) { + manager.eventBus.SubscribeAll(func(event events.Event) { // Handler }) } diff --git a/internal/forward/worker.go b/internal/forward/worker.go index 04b06e1..52a7fab 100644 --- a/internal/forward/worker.go +++ b/internal/forward/worker.go @@ -336,6 +336,11 @@ func (w *ForwardWorker) establishForward(podName string) error { // Start port forwarding in a goroutine errChan := make(chan error, 1) go func() { + defer func() { + if r := recover(); r != nil { + errChan <- fmt.Errorf("port forward panicked: %v", r) + } + }() errChan <- w.portForwarder.Forward(forwardCtx, req) }() @@ -409,6 +414,14 @@ func (w *ForwardWorker) startHTTPProxy() error { // Calculate internal port for k8s tunnel targetPort := w.forward.LocalPort + httpLogPortOffset + // Validate that the target port is available before attempting to bind + portChecker := NewPortChecker() + if !portChecker.isPortAvailable(targetPort) { + usedBy := portChecker.getProcessUsingPort(targetPort) + return fmt.Errorf("HTTP proxy target port %d is already in use by %s (forward port %d + offset %d)", + targetPort, usedBy, w.forward.LocalPort, httpLogPortOffset) + } + proxy, err := httplog.NewProxy(&w.forward, targetPort) if err != nil { return fmt.Errorf("failed to create HTTP proxy: %w", err) diff --git a/internal/healthcheck/checker.go b/internal/healthcheck/checker.go index 55b8b29..3fd39f0 100644 --- a/internal/healthcheck/checker.go +++ b/internal/healthcheck/checker.go @@ -365,7 +365,8 @@ func (c *Checker) checkPort(forwardID string) { } } - // Update health status + // Update health status and capture eventBus while holding lock + var bus *events.Bus c.mu.Lock() if health, exists := c.ports[forwardID]; exists { health.Status = newStatus @@ -378,17 +379,15 @@ func (c *Checker) checkPort(forwardID string) { health.LastActivity = now } } + // Capture eventBus while we have the lock to avoid race condition + bus = c.eventBus c.mu.Unlock() // Notify if status changed if oldStatus != newStatus { c.notifyStatusChange(forwardID, newStatus, errorMsg) - // Publish to event bus if available - c.mu.RLock() - bus := c.eventBus - c.mu.RUnlock() - + // Publish to event bus if available (captured while holding lock above) if bus != nil { if newStatus == StatusStale { bus.Publish(events.NewStaleEvent(forwardID, errorMsg)) diff --git a/internal/mdns/publisher.go b/internal/mdns/publisher.go index 729e01b..26c8a98 100644 --- a/internal/mdns/publisher.go +++ b/internal/mdns/publisher.go @@ -195,29 +195,12 @@ func shutdownWithTimeout(server *zeroconf.Server, forwardID string) { } } -// IsEnabled returns whether mDNS publishing is enabled. -func (p *Publisher) IsEnabled() bool { - return p.enabled -} - -// GetDomain returns the mDNS domain being used (always "local" per RFC 6762). -func (p *Publisher) GetDomain() string { - return mdnsDomain -} - // GetHostname returns the full mDNS hostname for an alias. // Example: GetHostname("myapp") returns "myapp.local" func GetHostname(alias string) string { return alias + "." + mdnsDomain } -// GetRegisteredCount returns the number of currently registered hostnames. -func (p *Publisher) GetRegisteredCount() int { - p.mu.RLock() - defer p.mu.RUnlock() - return len(p.servers) -} - // getLocalIPs returns the local IP addresses for logging purposes. func getLocalIPs() []string { var ips []string diff --git a/internal/mdns/publisher_test.go b/internal/mdns/publisher_test.go index 3935360..e086007 100644 --- a/internal/mdns/publisher_test.go +++ b/internal/mdns/publisher_test.go @@ -13,15 +13,17 @@ import ( func TestNewPublisher_Disabled(t *testing.T) { p := NewPublisher(false) - assert.False(t, p.IsEnabled()) - assert.Equal(t, 0, p.GetRegisteredCount()) + // When disabled, Register should succeed but be a no-op + err := p.Register("forward-1", "test-alias", 8080) + assert.NoError(t, err) } func TestNewPublisher_Enabled(t *testing.T) { p := NewPublisher(true) + defer p.Stop() - assert.True(t, p.IsEnabled()) - assert.Equal(t, 0, p.GetRegisteredCount()) + // Enabled publisher should be created successfully + assert.NotNil(t, p) } func TestRegister_WhenDisabled_NoOp(t *testing.T) { @@ -30,16 +32,17 @@ func TestRegister_WhenDisabled_NoOp(t *testing.T) { err := p.Register("forward-1", "test-alias", 8080) assert.NoError(t, err) - assert.Equal(t, 0, p.GetRegisteredCount()) + // Unregister should also be safe when disabled + p.Unregister("forward-1") } func TestRegister_EmptyAlias_NoOp(t *testing.T) { p := NewPublisher(true) + defer p.Stop() err := p.Register("forward-1", "", 8080) assert.NoError(t, err) - assert.Equal(t, 0, p.GetRegisteredCount()) } func TestUnregister_WhenDisabled_NoOp(t *testing.T) { @@ -51,10 +54,10 @@ func TestUnregister_WhenDisabled_NoOp(t *testing.T) { func TestUnregister_NotRegistered_NoOp(t *testing.T) { p := NewPublisher(true) + defer p.Stop() // Should not panic p.Unregister("non-existent") - assert.Equal(t, 0, p.GetRegisteredCount()) } func TestStop_WhenDisabled_NoOp(t *testing.T) { @@ -69,7 +72,6 @@ func TestStop_WhenNoRegistrations(t *testing.T) { // Should not panic p.Stop() - assert.Equal(t, 0, p.GetRegisteredCount()) } func TestGetLocalIPs(t *testing.T) { @@ -84,6 +86,11 @@ func TestGetLocalIPs(t *testing.T) { } } +func TestGetHostname(t *testing.T) { + hostname := GetHostname("myapp") + assert.Equal(t, "myapp.local", hostname) +} + // Integration tests - only run when explicitly requested // These tests actually register mDNS services and require network access @@ -96,9 +103,10 @@ func TestRegister_Integration(t *testing.T) { defer p.Stop() err := p.Register("forward-1", "test-service", 8080) - assert.NoError(t, err) - assert.Equal(t, 1, p.GetRegisteredCount()) + + // Verify by checking that unregister doesn't panic + p.Unregister("forward-1") } func TestRegister_Duplicate_Idempotent_Integration(t *testing.T) { @@ -112,12 +120,10 @@ func TestRegister_Duplicate_Idempotent_Integration(t *testing.T) { // First registration err := p.Register("forward-1", "test-service", 8080) assert.NoError(t, err) - assert.Equal(t, 1, p.GetRegisteredCount()) // Second registration with same ID should be idempotent err = p.Register("forward-1", "test-service", 8080) assert.NoError(t, err) - assert.Equal(t, 1, p.GetRegisteredCount()) } func TestRegister_MultipleForwards_Integration(t *testing.T) { @@ -135,7 +141,6 @@ func TestRegister_MultipleForwards_Integration(t *testing.T) { assert.NoError(t, err1) assert.NoError(t, err2) assert.NoError(t, err3) - assert.Equal(t, 3, p.GetRegisteredCount()) } func TestUnregister_Success_Integration(t *testing.T) { @@ -146,9 +151,13 @@ func TestUnregister_Success_Integration(t *testing.T) { p := NewPublisher(true) defer p.Stop() - p.Register("forward-1", "test-service", 8080) - assert.Equal(t, 1, p.GetRegisteredCount()) + err := p.Register("forward-1", "test-service", 8080) + assert.NoError(t, err) + // Unregister should not panic and should handle it gracefully p.Unregister("forward-1") - assert.Equal(t, 0, p.GetRegisteredCount()) + + // Re-registering should work after unregister + err = p.Register("forward-1", "test-service-2", 8080) + assert.NoError(t, err) } diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index 55fef52..65dac49 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -11,6 +11,9 @@ const ( initialDelay = 1 * time.Second maxDelay = 10 * time.Second jitterPct = 0.1 // 10% jitter + // maxAttempt caps the exponent to prevent math.Pow overflow + // 2^30 seconds is ~34 years, well above maxDelay, so this is safe + maxAttempt = 30 ) // Backoff implements exponential backoff with jitter for retry logic. @@ -33,8 +36,14 @@ func NewBackoff() *Backoff { // The duration follows exponential backoff: 1s → 2s → 4s → 8s → 10s (max). // A 10% jitter is added to prevent thundering herd effects. func (b *Backoff) Next() time.Duration { + // Cap attempt to prevent overflow in math.Pow + attempt := b.attempt + if attempt > maxAttempt { + attempt = maxAttempt + } + // Calculate base delay: 2^attempt seconds - exp := math.Pow(2, float64(b.attempt)) + exp := math.Pow(2, float64(attempt)) delay := time.Duration(exp) * time.Second // Cap at max delay diff --git a/internal/version/checker.go b/internal/version/checker.go index 10aea73..7da0f83 100644 --- a/internal/version/checker.go +++ b/internal/version/checker.go @@ -150,9 +150,3 @@ func parseVersion(v string) []int { return result } - -// FormatUpdateMessage formats a user-friendly update notification -func (u *UpdateInfo) FormatUpdateMessage() string { - return fmt.Sprintf("New version available: %s (current: %s) - %s", - u.LatestVersion, u.CurrentVersion, u.ReleaseURL) -} diff --git a/internal/version/checker_test.go b/internal/version/checker_test.go index 7d34f5a..9fdd388 100644 --- a/internal/version/checker_test.go +++ b/internal/version/checker_test.go @@ -75,16 +75,3 @@ func TestIsNewerVersion(t *testing.T) { }) } } - -func TestUpdateInfo_FormatUpdateMessage(t *testing.T) { - info := &UpdateInfo{ - CurrentVersion: "0.1.0", - LatestVersion: "0.2.0", - ReleaseURL: "https://github.com/nvm/kportal/releases/tag/v0.2.0", - } - - msg := info.FormatUpdateMessage() - assert.Contains(t, msg, "0.2.0") - assert.Contains(t, msg, "0.1.0") - assert.Contains(t, msg, "https://github.com/nvm/kportal/releases/tag/v0.2.0") -}