mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-05 23:03:48 +00:00
3aa83d4480
* chore(security,refactor): extract sanitization and improve code quality
- [x] Extract sanitization functions to dedicated sanitization.go module
- [x] Add comprehensive golangci-lint v2 configuration with security rules
- [x] Replace interface{} with any type throughout codebase
- [x] Add admin API authentication security warning
- [x] Extract WebSocket and stats streaming constants
- [x] Add best-effort error handling comments for resource cleanup
- [x] Expand sensitive field patterns for improved PII redaction
- [x] Simplify safety checks and remove redundant nil validations
- [x] Improve test coverage for password field redaction patterns
* refactor: replace interface{} with any type alias
- [x] Replace all `map[string]interface{}` with `map[string]any`
- [x] Replace all `interface{}` with `any` in function signatures and type definitions
- [x] Update sync.Pool New function returns from `interface{}` to `any`
- [x] Add package documentation comments to 8 package files
- [x] Update type assertions and casts to work with `any` type
320 lines
8.1 KiB
Go
320 lines
8.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
"github.com/valyala/fasthttp"
|
|
)
|
|
|
|
// ConnectionPoolManager manages HTTP client connections
|
|
type ConnectionPoolManager struct {
|
|
lastRecoveryAttempt time.Time
|
|
ctx context.Context
|
|
client *fasthttp.Client
|
|
cancel context.CancelFunc
|
|
logger *libpack_logging.Logger
|
|
cleanupInterval time.Duration
|
|
keepAliveInterval time.Duration
|
|
recoveryCheckInterval time.Duration
|
|
activeConnections atomic.Int64
|
|
totalConnections atomic.Int64
|
|
connectionFailures atomic.Int64
|
|
mu sync.RWMutex
|
|
recoveryMutex sync.Mutex
|
|
}
|
|
|
|
// NewConnectionPoolManager creates a new connection pool manager
|
|
func NewConnectionPoolManager(client *fasthttp.Client) *ConnectionPoolManager {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cpm := &ConnectionPoolManager{
|
|
client: client,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
keepAliveInterval: 45 * time.Second, // Reduced frequency to lower backend load
|
|
cleanupInterval: 30 * time.Second,
|
|
recoveryCheckInterval: 60 * time.Second,
|
|
}
|
|
|
|
// Set logger if available
|
|
if cfg != nil && cfg.Logger != nil {
|
|
cpm.logger = cfg.Logger
|
|
}
|
|
|
|
// Start periodic maintenance tasks
|
|
cpm.startPeriodicMaintenance()
|
|
|
|
return cpm
|
|
}
|
|
|
|
// startPeriodicMaintenance starts background maintenance tasks
|
|
func (cpm *ConnectionPoolManager) startPeriodicMaintenance() {
|
|
// Start cleanup task
|
|
go cpm.runCleanupTask()
|
|
|
|
// Start keep-alive task
|
|
go cpm.runKeepAliveTask()
|
|
|
|
// Start recovery monitoring
|
|
go cpm.runRecoveryTask()
|
|
}
|
|
|
|
// runCleanupTask runs periodic connection cleanup
|
|
func (cpm *ConnectionPoolManager) runCleanupTask() {
|
|
ticker := time.NewTicker(cpm.cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-cpm.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
cpm.cleanIdleConnections()
|
|
}
|
|
}
|
|
}
|
|
|
|
// runKeepAliveTask sends periodic keep-alive requests to maintain connections
|
|
func (cpm *ConnectionPoolManager) runKeepAliveTask() {
|
|
ticker := time.NewTicker(cpm.keepAliveInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-cpm.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
cpm.performKeepAlive()
|
|
}
|
|
}
|
|
}
|
|
|
|
// runRecoveryTask monitors connection health and triggers recovery when needed
|
|
func (cpm *ConnectionPoolManager) runRecoveryTask() {
|
|
ticker := time.NewTicker(cpm.recoveryCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-cpm.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
cpm.checkAndRecover()
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanIdleConnections closes idle connections
|
|
func (cpm *ConnectionPoolManager) cleanIdleConnections() {
|
|
cpm.mu.Lock()
|
|
defer cpm.mu.Unlock()
|
|
|
|
if cpm.client != nil {
|
|
cpm.client.CloseIdleConnections()
|
|
if cpm.logger != nil {
|
|
cpm.logger.Debug(&libpack_logging.LogMessage{
|
|
Message: "Cleaned idle HTTP connections",
|
|
Pairs: map[string]any{
|
|
"active_connections": cpm.activeConnections.Load(),
|
|
"total_connections": cpm.totalConnections.Load(),
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// performKeepAlive sends a lightweight request to keep connections alive
|
|
func (cpm *ConnectionPoolManager) performKeepAlive() {
|
|
if cpm.client == nil {
|
|
return
|
|
}
|
|
|
|
// Only perform keep-alive if we have a backend URL configured
|
|
if cfg == nil || cfg.Server.HostGraphQL == "" {
|
|
return
|
|
}
|
|
|
|
// Skip keep-alive if we have recent successful connections
|
|
// This reduces unnecessary load when the system is actively processing requests
|
|
if cpm.connectionFailures.Load() == 0 && cpm.totalConnections.Load() > 0 {
|
|
// No recent failures and we have active connections, skip this keep-alive
|
|
return
|
|
}
|
|
|
|
// Use HEAD request for minimal overhead
|
|
req := fasthttp.AcquireRequest()
|
|
resp := fasthttp.AcquireResponse()
|
|
defer fasthttp.ReleaseRequest(req)
|
|
defer fasthttp.ReleaseResponse(resp)
|
|
|
|
// Try to use health check endpoint if available, otherwise use base URL
|
|
healthURL := cfg.Server.HealthcheckGraphQL
|
|
if healthURL == "" {
|
|
// Use base URL with proper path separator
|
|
baseURL := cfg.Server.HostGraphQL
|
|
if !strings.HasSuffix(baseURL, "/") {
|
|
baseURL += "/"
|
|
}
|
|
healthURL = baseURL + "healthz"
|
|
}
|
|
|
|
req.SetRequestURI(healthURL)
|
|
req.Header.SetMethod("HEAD") // HEAD is lighter than POST with body
|
|
|
|
// Short timeout for keep-alive
|
|
err := cpm.client.DoTimeout(req, resp, 3*time.Second)
|
|
if err != nil {
|
|
cpm.connectionFailures.Add(1)
|
|
if cpm.logger != nil {
|
|
cpm.logger.Debug(&libpack_logging.LogMessage{
|
|
Message: "Keep-alive request failed",
|
|
Pairs: map[string]any{
|
|
"error": err.Error(),
|
|
},
|
|
})
|
|
}
|
|
} else {
|
|
// Reset failure count on success
|
|
cpm.connectionFailures.Store(0)
|
|
}
|
|
}
|
|
|
|
// checkAndRecover monitors connection health and performs recovery if needed
|
|
func (cpm *ConnectionPoolManager) checkAndRecover() {
|
|
cpm.recoveryMutex.Lock()
|
|
defer cpm.recoveryMutex.Unlock()
|
|
|
|
failures := cpm.connectionFailures.Load()
|
|
|
|
// If we have too many failures, trigger recovery
|
|
if failures > 5 {
|
|
// Don't attempt recovery too frequently
|
|
if time.Since(cpm.lastRecoveryAttempt) < 30*time.Second {
|
|
return
|
|
}
|
|
|
|
cpm.lastRecoveryAttempt = time.Now()
|
|
|
|
if cpm.logger != nil {
|
|
cpm.logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Connection pool health degraded, attempting recovery",
|
|
Pairs: map[string]any{
|
|
"consecutive_failures": failures,
|
|
},
|
|
})
|
|
}
|
|
|
|
cpm.performRecovery()
|
|
}
|
|
}
|
|
|
|
// performRecovery attempts to recover the connection pool
|
|
func (cpm *ConnectionPoolManager) performRecovery() {
|
|
cpm.mu.Lock()
|
|
defer cpm.mu.Unlock()
|
|
|
|
if cpm.client != nil {
|
|
// Close all idle connections to force new ones
|
|
cpm.client.CloseIdleConnections()
|
|
|
|
// Reset failure counter
|
|
cpm.connectionFailures.Store(0)
|
|
|
|
if cpm.logger != nil {
|
|
cpm.logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Connection pool recovery completed",
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// RecordConnectionSuccess records a successful connection
|
|
func (cpm *ConnectionPoolManager) RecordConnectionSuccess() {
|
|
cpm.activeConnections.Add(1)
|
|
cpm.totalConnections.Add(1)
|
|
// Reset failures on success
|
|
cpm.connectionFailures.Store(0)
|
|
}
|
|
|
|
// RecordConnectionFailure records a failed connection
|
|
func (cpm *ConnectionPoolManager) RecordConnectionFailure() {
|
|
cpm.connectionFailures.Add(1)
|
|
}
|
|
|
|
// GetConnectionStats returns current connection statistics
|
|
func (cpm *ConnectionPoolManager) GetConnectionStats() map[string]any {
|
|
return map[string]any{
|
|
"active_connections": cpm.activeConnections.Load(),
|
|
"total_connections": cpm.totalConnections.Load(),
|
|
"connection_failures": cpm.connectionFailures.Load(),
|
|
"last_recovery_attempt": cpm.lastRecoveryAttempt,
|
|
}
|
|
}
|
|
|
|
// GetClient returns the HTTP client
|
|
func (cpm *ConnectionPoolManager) GetClient() *fasthttp.Client {
|
|
cpm.mu.RLock()
|
|
defer cpm.mu.RUnlock()
|
|
return cpm.client
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the connection pool
|
|
func (cpm *ConnectionPoolManager) Shutdown() error {
|
|
if cpm == nil {
|
|
return nil
|
|
}
|
|
|
|
cpm.cancel()
|
|
|
|
cpm.mu.Lock()
|
|
defer cpm.mu.Unlock()
|
|
|
|
if cpm.client != nil {
|
|
cpm.client.CloseIdleConnections()
|
|
if cfg != nil && cfg.Logger != nil {
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "HTTP connection pool shut down",
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Global connection pool manager
|
|
var (
|
|
connectionPoolManager *ConnectionPoolManager
|
|
connectionPoolMutex sync.RWMutex
|
|
)
|
|
|
|
// InitializeConnectionPool initializes the global connection pool
|
|
func InitializeConnectionPool(client *fasthttp.Client) {
|
|
connectionPoolMutex.Lock()
|
|
defer connectionPoolMutex.Unlock()
|
|
if connectionPoolManager != nil {
|
|
_ = connectionPoolManager.Shutdown() // Best-effort cleanup
|
|
}
|
|
connectionPoolManager = NewConnectionPoolManager(client)
|
|
}
|
|
|
|
// ShutdownConnectionPool safely shuts down the global connection pool
|
|
func ShutdownConnectionPool() {
|
|
connectionPoolMutex.Lock()
|
|
defer connectionPoolMutex.Unlock()
|
|
if connectionPoolManager != nil {
|
|
_ = connectionPoolManager.Shutdown() // Best-effort cleanup
|
|
connectionPoolManager = nil
|
|
}
|
|
}
|
|
|
|
// GetConnectionPoolManager returns the global connection pool manager
|
|
func GetConnectionPoolManager() *ConnectionPoolManager {
|
|
connectionPoolMutex.RLock()
|
|
defer connectionPoolMutex.RUnlock()
|
|
return connectionPoolManager
|
|
}
|