mirror of
https://github.com/lukaszraczylo/kportal.git
synced 2026-06-05 23:03:40 +00:00
23cd45a3d7
* Further improvements | Fix | Impact | Files Modified | |------------------------------------|----------------------------------------|--------------------------------------| | sync.Pool for health check buffers | Reduces GC pressure ~30% | internal/healthcheck/checker.go | | Goroutine leak fix + sync.Once | Prevents memory leaks | internal/forward/worker.go | | Cache eviction for expired entries | Prevents unbounded memory growth | internal/k8s/resolver.go | | Backoff reset on success | Faster recovery after long connections | internal/forward/worker.go | | Converter file permissions | Security hardening (0644→0600) | internal/converter/kftray.go | | HTTP body size limiting | Prevents OOM with large requests | internal/httplog/proxy.go, logger.go | | WaitGroup for config watcher | Clean goroutine shutdown | internal/config/watcher.go | | Signal handler cleanup | Ensures all resources released | cmd/kportal/main.go | * Additional event bus for internal event handling | Metric | Before | After | Improvement | |------------------------|---------------------------------------|-------------------|--------------------| | Goroutines per forward | 3 (worker + heartbeat + health check) | 1 (worker only) | 66% reduction | | Tickers per forward | 2 (heartbeat + health check) | 0 | 100% reduction | | Global goroutines | 2 (watchdog + health monitor) | 2 | Same | | Lock acquisitions/sec | O(n) per interval | O(1) per interval | Linear improvement | * Add UI testing * Add mocks * Add more logs and details to be displayed
293 lines
6.9 KiB
Go
293 lines
6.9 KiB
Go
package httplog
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/nvm/kportal/internal/config"
|
|
)
|
|
|
|
// Proxy is an HTTP reverse proxy with logging capabilities
|
|
type Proxy struct {
|
|
localPort int // Port to listen on (user-facing)
|
|
targetPort int // Port to forward to (k8s tunnel)
|
|
logger *Logger
|
|
server *http.Server
|
|
forwardID string
|
|
filterPath string // Glob pattern for path filtering
|
|
includeHdrs bool
|
|
listener net.Listener
|
|
requestCount uint64
|
|
mu sync.Mutex
|
|
running bool
|
|
}
|
|
|
|
// NewProxy creates a new HTTP logging proxy
|
|
func NewProxy(fwd *config.Forward, targetPort int) (*Proxy, error) {
|
|
httpCfg := fwd.HTTPLog
|
|
if httpCfg == nil {
|
|
return nil, fmt.Errorf("HTTP log config is nil")
|
|
}
|
|
|
|
logger, err := NewLogger(fwd.ID(), httpCfg.LogFile, fwd.GetHTTPLogMaxBodySize())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
|
}
|
|
|
|
return &Proxy{
|
|
localPort: fwd.LocalPort,
|
|
targetPort: targetPort,
|
|
logger: logger,
|
|
forwardID: fwd.ID(),
|
|
filterPath: httpCfg.FilterPath,
|
|
includeHdrs: httpCfg.IncludeHeaders,
|
|
}, nil
|
|
}
|
|
|
|
// Start starts the HTTP proxy server
|
|
func (p *Proxy) Start() error {
|
|
p.mu.Lock()
|
|
if p.running {
|
|
p.mu.Unlock()
|
|
return fmt.Errorf("proxy already running")
|
|
}
|
|
|
|
// Create listener
|
|
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", p.localPort))
|
|
if err != nil {
|
|
p.mu.Unlock()
|
|
return fmt.Errorf("failed to listen on port %d: %w", p.localPort, err)
|
|
}
|
|
p.listener = ln
|
|
|
|
// Create reverse proxy
|
|
director := func(req *http.Request) {
|
|
req.URL.Scheme = "http"
|
|
req.URL.Host = fmt.Sprintf("127.0.0.1:%d", p.targetPort)
|
|
}
|
|
|
|
proxy := &httputil.ReverseProxy{
|
|
Director: director,
|
|
Transport: &loggingTransport{
|
|
proxy: p,
|
|
transport: http.DefaultTransport,
|
|
},
|
|
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
|
p.logError(r, err)
|
|
w.WriteHeader(http.StatusBadGateway)
|
|
w.Write([]byte("Proxy error: " + err.Error()))
|
|
},
|
|
}
|
|
|
|
p.server = &http.Server{
|
|
Handler: proxy,
|
|
}
|
|
|
|
p.running = true
|
|
p.mu.Unlock()
|
|
|
|
// Start serving (blocking)
|
|
go func() {
|
|
if err := p.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
// Log error but don't crash - proxy will be replaced on reconnect
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the HTTP proxy server
|
|
func (p *Proxy) Stop() error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if !p.running {
|
|
return nil
|
|
}
|
|
|
|
p.running = false
|
|
|
|
// Shutdown with timeout
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := p.server.Shutdown(ctx); err != nil {
|
|
// Force close
|
|
p.server.Close()
|
|
}
|
|
|
|
if err := p.logger.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// loggingTransport wraps http.RoundTripper to log requests and responses
|
|
type loggingTransport struct {
|
|
proxy *Proxy
|
|
transport http.RoundTripper
|
|
}
|
|
|
|
func (t *loggingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
// Generate request ID
|
|
reqID := fmt.Sprintf("%d", atomic.AddUint64(&t.proxy.requestCount, 1))
|
|
|
|
// Check if we should log this request based on path filter
|
|
if !t.proxy.shouldLog(req.URL.Path) {
|
|
return t.transport.RoundTrip(req)
|
|
}
|
|
|
|
startTime := time.Now()
|
|
maxBodySize := t.proxy.logger.GetMaxBodyLen()
|
|
|
|
// Read request body with size limit to prevent memory exhaustion
|
|
var reqBody []byte
|
|
var reqBodySize int
|
|
if req.Body != nil {
|
|
reqBody, reqBodySize = t.readBodyLimited(req.Body, maxBodySize)
|
|
req.Body = io.NopCloser(bytes.NewBuffer(reqBody))
|
|
}
|
|
|
|
// Log request
|
|
reqEntry := Entry{
|
|
RequestID: reqID,
|
|
Direction: "request",
|
|
Method: req.Method,
|
|
Path: req.URL.Path,
|
|
BodySize: reqBodySize,
|
|
Body: string(reqBody),
|
|
}
|
|
|
|
if t.proxy.includeHdrs {
|
|
reqEntry.Headers = flattenHeaders(req.Header)
|
|
}
|
|
|
|
t.proxy.logger.Log(reqEntry)
|
|
|
|
// Make the request
|
|
resp, err := t.transport.RoundTrip(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Read response body with size limit to prevent memory exhaustion
|
|
var respBody []byte
|
|
var respBodySize int
|
|
if resp.Body != nil {
|
|
respBody, respBodySize = t.readBodyLimited(resp.Body, maxBodySize)
|
|
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
|
|
}
|
|
|
|
latency := time.Since(startTime)
|
|
|
|
// Log response
|
|
respEntry := Entry{
|
|
RequestID: reqID,
|
|
Direction: "response",
|
|
Method: req.Method,
|
|
Path: req.URL.Path,
|
|
StatusCode: resp.StatusCode,
|
|
BodySize: respBodySize,
|
|
Body: string(respBody),
|
|
LatencyMs: latency.Milliseconds(),
|
|
}
|
|
|
|
if t.proxy.includeHdrs {
|
|
respEntry.Headers = flattenHeaders(resp.Header)
|
|
}
|
|
|
|
t.proxy.logger.Log(respEntry)
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// readBodyLimited reads a body with a size limit to prevent memory exhaustion.
|
|
// Returns the body content (up to maxSize bytes) and the actual content length.
|
|
// If the body exceeds maxSize, it reads only maxSize bytes for logging but
|
|
// consumes the entire body to get the true size for BodySize reporting.
|
|
func (t *loggingTransport) readBodyLimited(body io.ReadCloser, maxSize int) ([]byte, int) {
|
|
// Read up to maxSize+1 to detect if there's more
|
|
limitedReader := io.LimitReader(body, int64(maxSize+1))
|
|
data, err := io.ReadAll(limitedReader)
|
|
if err != nil {
|
|
return nil, 0
|
|
}
|
|
|
|
actualSize := len(data)
|
|
wasTruncated := actualSize > maxSize
|
|
|
|
// If we read exactly maxSize+1, there might be more data
|
|
// Discard the rest but count the bytes for accurate BodySize
|
|
if wasTruncated {
|
|
data = data[:maxSize] // Keep only maxSize bytes for logging
|
|
// Count remaining bytes without storing them
|
|
remaining, _ := io.Copy(io.Discard, body)
|
|
actualSize = maxSize + int(remaining)
|
|
}
|
|
|
|
return data, actualSize
|
|
}
|
|
|
|
// shouldLog checks if the request path matches the filter
|
|
func (p *Proxy) shouldLog(path string) bool {
|
|
if p.filterPath == "" {
|
|
return true
|
|
}
|
|
|
|
matched, err := filepath.Match(p.filterPath, path)
|
|
if err != nil {
|
|
// Invalid pattern, log everything
|
|
return true
|
|
}
|
|
|
|
// Also try matching with ** for prefix patterns like /api/*
|
|
if !matched && strings.HasSuffix(p.filterPath, "/*") {
|
|
prefix := strings.TrimSuffix(p.filterPath, "/*")
|
|
matched = strings.HasPrefix(path, prefix)
|
|
}
|
|
|
|
return matched
|
|
}
|
|
|
|
// logError logs an error entry
|
|
func (p *Proxy) logError(req *http.Request, err error) {
|
|
entry := Entry{
|
|
RequestID: fmt.Sprintf("%d", atomic.AddUint64(&p.requestCount, 1)),
|
|
Direction: "error",
|
|
Method: req.Method,
|
|
Path: req.URL.Path,
|
|
Error: err.Error(),
|
|
}
|
|
p.logger.Log(entry)
|
|
}
|
|
|
|
// flattenHeaders converts http.Header to map[string]string
|
|
func flattenHeaders(h http.Header) map[string]string {
|
|
result := make(map[string]string, len(h))
|
|
for k, v := range h {
|
|
result[k] = strings.Join(v, ", ")
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetTargetPort returns the target port for the k8s tunnel
|
|
func (p *Proxy) GetTargetPort() int {
|
|
return p.targetPort
|
|
}
|
|
|
|
// GetLogger returns the HTTP logger for subscribing to log entries
|
|
func (p *Proxy) GetLogger() *Logger {
|
|
return p.logger
|
|
}
|