package forward import ( "fmt" "log" "sync" "github.com/nvm/kportal/internal/config" "github.com/nvm/kportal/internal/k8s" ) // Manager orchestrates all port-forward workers. // It handles starting, stopping, and hot-reloading forwards. type Manager struct { workers map[string]*ForwardWorker // key: forward.ID() workersMu sync.RWMutex clientPool *k8s.ClientPool resolver *k8s.ResourceResolver portForwarder *k8s.PortForwarder portChecker *PortChecker verbose bool currentConfig *config.Config } // NewManager creates a new forward Manager. func NewManager(verbose bool) *Manager { clientPool, err := k8s.NewClientPool() if err != nil { log.Fatalf("Failed to create client pool: %v", err) } resolver := k8s.NewResourceResolver(clientPool) portForwarder := k8s.NewPortForwarder(clientPool, resolver) return &Manager{ workers: make(map[string]*ForwardWorker), clientPool: clientPool, resolver: resolver, portForwarder: portForwarder, portChecker: NewPortChecker(), verbose: verbose, } } // Start initializes and starts all port-forwards from the configuration. func (m *Manager) Start(cfg *config.Config) error { if cfg == nil { return fmt.Errorf("configuration is nil") } m.currentConfig = cfg // Get all forwards from config forwards := cfg.GetAllForwards() if len(forwards) == 0 { return fmt.Errorf("no forwards configured") } // Check port availability before starting ports := m.extractPorts(forwards) conflicts := m.portChecker.CheckAvailability(ports, nil) if len(conflicts) > 0 { // Add resource information to conflicts for i := range conflicts { conflicts[i].Resource = m.getResourceForPort(forwards, conflicts[i].Port) } return fmt.Errorf("port conflicts detected:\n%s", FormatConflicts(conflicts)) } // Start all workers log.Printf("Starting %d port-forward(s)...", len(forwards)) for _, fwd := range forwards { if err := m.startWorker(fwd); err != nil { log.Printf("Failed to start worker for %s: %v", fwd.ID(), err) // Continue with other workers } } log.Printf("All port-forwards started") return nil } // Stop gracefully stops all port-forward workers. func (m *Manager) Stop() { log.Printf("Stopping all port-forwards...") m.workersMu.Lock() workers := make([]*ForwardWorker, 0, len(m.workers)) for _, worker := range m.workers { workers = append(workers, worker) } m.workersMu.Unlock() // Stop all workers var wg sync.WaitGroup for _, worker := range workers { wg.Add(1) go func(w *ForwardWorker) { defer wg.Done() w.Stop() }(worker) } wg.Wait() // Clear workers map m.workersMu.Lock() m.workers = make(map[string]*ForwardWorker) m.workersMu.Unlock() log.Printf("All port-forwards stopped") } // Reload applies a new configuration with hot-reload logic. // It diffs the new config against the current one and: // - Stops removed forwards // - Keeps unchanged forwards running // - Starts new forwards func (m *Manager) Reload(newCfg *config.Config) error { if newCfg == nil { return fmt.Errorf("new configuration is nil") } log.Printf("Reloading configuration...") // Get all forwards from new config newForwards := newCfg.GetAllForwards() if len(newForwards) == 0 { log.Printf("New configuration has no forwards, stopping all") m.Stop() m.currentConfig = newCfg return nil } // Create maps for easier comparison newForwardsMap := make(map[string]config.Forward) for _, fwd := range newForwards { newForwardsMap[fwd.ID()] = fwd } m.workersMu.RLock() currentForwardsMap := make(map[string]config.Forward) for id, worker := range m.workers { currentForwardsMap[id] = worker.GetForward() } m.workersMu.RUnlock() // Determine changes var toAdd []config.Forward var toRemove []string var toKeep []string // Find forwards to add and keep for id, fwd := range newForwardsMap { if _, exists := currentForwardsMap[id]; exists { toKeep = append(toKeep, id) } else { toAdd = append(toAdd, fwd) } } // Find forwards to remove for id := range currentForwardsMap { if _, exists := newForwardsMap[id]; !exists { toRemove = append(toRemove, id) } } // Check port availability for new forwards if len(toAdd) > 0 { // Get currently managed ports to skip in availability check managedPorts := make(map[int]bool) for _, id := range toKeep { managedPorts[currentForwardsMap[id].LocalPort] = true } // Check new ports newPorts := m.extractPorts(toAdd) conflicts := m.portChecker.CheckAvailability(newPorts, managedPorts) if len(conflicts) > 0 { // Add resource information to conflicts for i := range conflicts { conflicts[i].Resource = m.getResourceForPort(toAdd, conflicts[i].Port) } log.Printf("Config change rejected due to port conflicts:\n%s", FormatConflicts(conflicts)) log.Printf("Keeping previous configuration active") return fmt.Errorf("port conflicts detected") } } // Apply changes log.Printf("Configuration diff: %d to add, %d to remove, %d to keep", len(toAdd), len(toRemove), len(toKeep)) // Stop removed forwards for _, id := range toRemove { if err := m.stopWorker(id); err != nil { log.Printf("Failed to stop worker %s: %v", id, err) } else { log.Printf("Stopped: %s", id) } } // Start new forwards for _, fwd := range toAdd { if err := m.startWorker(fwd); err != nil { log.Printf("Failed to start worker for %s: %v", fwd.ID(), err) } else { log.Printf("Started: %s", fwd.ID()) } } // Update current config m.currentConfig = newCfg log.Printf("Configuration reloaded successfully") return nil } // startWorker creates and starts a new forward worker. func (m *Manager) startWorker(fwd config.Forward) error { m.workersMu.Lock() defer m.workersMu.Unlock() // Check if worker already exists if _, exists := m.workers[fwd.ID()]; exists { return fmt.Errorf("worker already exists for %s", fwd.ID()) } // Create and start worker worker := NewForwardWorker(fwd, m.portForwarder, m.verbose) worker.Start() // Store worker m.workers[fwd.ID()] = worker return nil } // stopWorker stops and removes a forward worker. func (m *Manager) stopWorker(id string) error { m.workersMu.Lock() worker, exists := m.workers[id] if !exists { m.workersMu.Unlock() return fmt.Errorf("worker not found: %s", id) } delete(m.workers, id) m.workersMu.Unlock() // Stop the worker worker.Stop() return nil } // GetActiveForwards returns a list of all active forward IDs. func (m *Manager) GetActiveForwards() []string { m.workersMu.RLock() defer m.workersMu.RUnlock() ids := make([]string, 0, len(m.workers)) for id := range m.workers { ids = append(ids, id) } return ids } // GetWorkerCount returns the number of active workers. func (m *Manager) GetWorkerCount() int { m.workersMu.RLock() defer m.workersMu.RUnlock() return len(m.workers) } // extractPorts extracts all local ports from a list of forwards. func (m *Manager) extractPorts(forwards []config.Forward) []int { ports := make([]int, len(forwards)) for i, fwd := range forwards { ports[i] = fwd.LocalPort } return ports } // getResourceForPort finds the resource (forward ID) that uses a given port. func (m *Manager) getResourceForPort(forwards []config.Forward, port int) string { for _, fwd := range forwards { if fwd.LocalPort == port { return fwd.ID() } } return "unknown" }