mirror of
https://github.com/lukaszraczylo/kportal.git
synced 2026-07-05 06:05:39 +00:00
improvements nov2025 (#10)
* Add benchmark and httplog modules, update UI for modals artefacts
This commit is contained in:
@@ -483,6 +483,14 @@ func (m *Manager) GetWorkerCount() int {
|
||||
return len(m.workers)
|
||||
}
|
||||
|
||||
// GetWorker returns a worker by ID, or nil if not found.
|
||||
func (m *Manager) GetWorker(id string) *ForwardWorker {
|
||||
m.workersMu.RLock()
|
||||
defer m.workersMu.RUnlock()
|
||||
|
||||
return m.workers[id]
|
||||
}
|
||||
|
||||
// extractPorts extracts all local ports from a list of forwards.
|
||||
func (m *Manager) extractPorts(forwards []config.Forward) []int {
|
||||
ports := make([]int, len(forwards))
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/nvm/kportal/internal/config"
|
||||
"github.com/nvm/kportal/internal/healthcheck"
|
||||
"github.com/nvm/kportal/internal/httplog"
|
||||
"github.com/nvm/kportal/internal/k8s"
|
||||
"github.com/nvm/kportal/internal/logger"
|
||||
"github.com/nvm/kportal/internal/retry"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
|
||||
const (
|
||||
portForwardReadyTimeout = 30 * time.Second
|
||||
httpLogPortOffset = 10000 // Offset for internal port when HTTP logging is enabled
|
||||
)
|
||||
|
||||
// ForwardWorker manages a single port-forward connection with automatic retry.
|
||||
@@ -36,6 +38,7 @@ type ForwardWorker struct {
|
||||
startTime time.Time // Track when the worker started
|
||||
forwardCancel context.CancelFunc // Cancel function for current forward attempt
|
||||
forwardCancelMu sync.Mutex // Protects forwardCancel
|
||||
httpProxy *httplog.Proxy // HTTP logging proxy (nil if not enabled)
|
||||
}
|
||||
|
||||
// NewForwardWorker creates a new ForwardWorker for a single forward configuration.
|
||||
@@ -100,6 +103,7 @@ func (w *ForwardWorker) Stop() {
|
||||
// run is the main worker loop that handles retries.
|
||||
func (w *ForwardWorker) run() {
|
||||
defer close(w.doneChan)
|
||||
defer w.stopHTTPProxy() // Ensure proxy is stopped on exit
|
||||
|
||||
// Start heartbeat goroutine to continuously send heartbeats to watchdog
|
||||
// This prevents false "hung worker" detection when connections are long-lived
|
||||
@@ -107,6 +111,15 @@ func (w *ForwardWorker) run() {
|
||||
go w.heartbeatLoop()
|
||||
}
|
||||
|
||||
// Start HTTP logging proxy if enabled
|
||||
if err := w.startHTTPProxy(); err != nil {
|
||||
logger.Error("Failed to start HTTP logging proxy", map[string]interface{}{
|
||||
"forward_id": w.forward.ID(),
|
||||
"error": err.Error(),
|
||||
})
|
||||
// Continue without HTTP logging
|
||||
}
|
||||
|
||||
backoff := retry.NewBackoff()
|
||||
|
||||
for {
|
||||
@@ -276,13 +289,20 @@ func (w *ForwardWorker) establishForward(podName string) error {
|
||||
errOut = io.Discard
|
||||
}
|
||||
|
||||
// Determine local port for k8s port-forward
|
||||
// If HTTP logging is enabled, we bind to an internal port and the proxy listens on the user-facing port
|
||||
localPort := w.forward.LocalPort
|
||||
if w.httpProxy != nil {
|
||||
localPort = w.httpProxy.GetTargetPort()
|
||||
}
|
||||
|
||||
// Create forward request
|
||||
req := &k8s.ForwardRequest{
|
||||
ContextName: w.forward.GetContext(),
|
||||
Namespace: w.forward.GetNamespace(),
|
||||
Resource: w.forward.Resource,
|
||||
Selector: w.forward.Selector,
|
||||
LocalPort: w.forward.LocalPort,
|
||||
LocalPort: localPort,
|
||||
RemotePort: w.forward.Port,
|
||||
StopChan: stopChan,
|
||||
ReadyChan: readyChan,
|
||||
@@ -355,6 +375,53 @@ func (w *ForwardWorker) IsRunning() bool {
|
||||
}
|
||||
}
|
||||
|
||||
// startHTTPProxy starts the HTTP logging proxy if enabled
|
||||
func (w *ForwardWorker) startHTTPProxy() error {
|
||||
if !w.forward.IsHTTPLogEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate internal port for k8s tunnel
|
||||
targetPort := w.forward.LocalPort + httpLogPortOffset
|
||||
|
||||
proxy, err := httplog.NewProxy(&w.forward, targetPort)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create HTTP proxy: %w", err)
|
||||
}
|
||||
|
||||
if err := proxy.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start HTTP proxy: %w", err)
|
||||
}
|
||||
|
||||
w.httpProxy = proxy
|
||||
|
||||
logger.Info("HTTP logging proxy started", map[string]interface{}{
|
||||
"forward_id": w.forward.ID(),
|
||||
"local_port": w.forward.LocalPort,
|
||||
"target_port": targetPort,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// stopHTTPProxy stops the HTTP logging proxy if running
|
||||
func (w *ForwardWorker) stopHTTPProxy() {
|
||||
if w.httpProxy != nil {
|
||||
if err := w.httpProxy.Stop(); err != nil {
|
||||
logger.Warn("Failed to stop HTTP proxy", map[string]interface{}{
|
||||
"forward_id": w.forward.ID(),
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
w.httpProxy = nil
|
||||
}
|
||||
}
|
||||
|
||||
// GetHTTPProxy returns the HTTP logging proxy if active
|
||||
func (w *ForwardWorker) GetHTTPProxy() *httplog.Proxy {
|
||||
return w.httpProxy
|
||||
}
|
||||
|
||||
// logWriter implements io.Writer to write log messages with a prefix.
|
||||
type logWriter struct {
|
||||
prefix string
|
||||
|
||||
Reference in New Issue
Block a user