Files
graphql-monitoring-proxy/main.go
T
lukaszraczylo e3e9f7d181 improvements mid apr 2025 (#27)
* General improvements and bug fixes.

* Improve tests coverage.

* fixup! Improve tests coverage.

* Update README.md with latest changes.

* Fix the uint32

* Resolve issue with race condition for logging.

* fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* Fix the test of the rate limiter

* Add default ratelimit.json file

* Update dependencies.

* Significant refactor.

* fixup! Significant refactor.

* fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* Enhance admin dashboard with real-time WebSocket streaming and charts

Dashboard improvements:
- Added Chart.js for time-series visualization
- Created real-time graphs for RPS and cache hit rate
- Added new statistics displays:
  * System uptime with human-readable format
  * Total requests with success/failure breakdown
  * Current and average RPS
  * Success rate with progress bars
  * Cache hit rate and memory usage with visual indicators
  * Detailed cache statistics
- Implemented WebSocket streaming endpoint (/admin/ws/stats)
- Real-time updates every 2 seconds via WebSocket
- Automatic fallback to polling if WebSocket unavailable
- Connection status indicator
- Progress bars for success rate, cache hit rate, and memory usage

Backend enhancements:
- New WebSocket handler for streaming all statistics
- gatherAllStats() method to collect comprehensive metrics
- Streams data every 2 seconds to connected clients
- Automatic reconnection handling
- Maintains up to 60 data points per chart

The dashboard now provides comprehensive real-time monitoring with:
- Live metrics streaming
- Historical trend visualization
- Responsive design with visual indicators
- Graceful degradation to polling mode

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix WebSocket message handling for real-time stats streaming

Issues fixed:
- Removed blocking default case that prevented ticker from firing
- Separated read and write operations into proper goroutines
- Added proper ping/pong handlers with read deadlines
- Implemented done channel for clean disconnection signaling
- Send initial stats immediately on connection

The WebSocket now properly:
- Streams stats every 2 seconds via ticker
- Handles client disconnections gracefully
- Maintains connection with ping/pong
- Detects connection drops via read goroutine
- Non-blocking message handling

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add Redis-based cluster mode for distributed metrics aggregation

When using Redis for caching, proxies now automatically form a cluster
and aggregate metrics across all instances for unified monitoring.

Features:
- Metrics Aggregator: Publishes instance metrics to Redis every 5s
- Cluster Mode API: /admin/api/cluster/stats and /admin/api/cluster/instances
- Dashboard Cluster View: Toggle between single instance and cluster view
- Auto-discovery: Detects cluster mode automatically via Redis
- Instance Management: Each instance gets unique ID (hostname + UUID)
- Graceful Cleanup: Removes metrics from Redis on shutdown
- TTL-based expiration: Stale instances auto-expire after 30s

Cluster metrics include:
- Aggregated requests (total, succeeded, failed, success rate)
- Combined RPS across all instances
- Total cache hits/misses with cluster-wide hit rate
- Per-instance health status and uptime
- Active connections and WebSocket stats
- Request coalescing backend savings

Dashboard improvements:
- Cluster Status section showing total/healthy instances
- Instance Details section with per-node metrics
- Cluster View toggle in header
- Automatic detection of cluster availability
- Individual instance cards with health indicators
- "Current" badge for the instance you're viewing

Architecture:
- Uses Redis SET to track active instances
- Each instance publishes to redis key: graphql-proxy:metrics:instances:{id}
- 30s TTL ensures stale instances are removed
- Aggregator started automatically when Redis cache enabled
- Registered with shutdown manager for graceful cleanup

Environment: Automatically enabled when ENABLE_REDIS_CACHE=true

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster instance details using correct JSON field names

JavaScript was using Go struct field names (PascalCase) instead of
JSON field names (snake_case), causing all instance metrics to show
as 0 or undefined.

Fixed references:
- instance.InstanceID → instance.instance_id
- instance.Hostname → instance.hostname
- instance.UptimeSeconds → instance.uptime_seconds
- instance.Stats → instance.stats
- instance.Health → instance.health

Also added fallback to check both instance.cache_summary and
stats.cache_summary for better compatibility.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Enhance Cluster View toggle visibility and styling

Improvements:
- Replaced basic checkbox with custom toggle switch
- Added prominent card-style container with backdrop blur
- Positioned toggle in header next to dashboard title
- Toggle switch with smooth animation (slides left/right)
- Green color when enabled (#10b981)
- Hover effects with slight lift
- Better typography with font weights and spacing
- Info text positioned below toggle label
- Disabled state with reduced opacity
- Responsive layout with flexbox

The toggle is now much more visible and professional-looking,
making it clear when cluster mode is available and active.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add comprehensive debugging for cluster mode metrics issues

Backend improvements:
- Fixed metrics structure: ensure Stats always has correct inner structure
- Added defensive nil checks for instance.Stats in aggregation
- Added debug logging in publishMetrics to verify data being sent
- Added warning logging in aggregateStats when data is missing
- Log actual keys present in Stats when 'requests' is missing
- Initialize empty maps instead of leaving fields nil

Frontend improvements:
- Added console.log statements to trace cluster data flow
- Log cluster data structure on receive
- Log stats keys and structure
- Log instances array and count
- Warn when expected data is missing
- Added fallback values (|| 0) for display fields

This will help diagnose why cluster view shows zeros by logging:
1. What data is being published to Redis
2. What data is being retrieved from Redis
3. What structure the data has at each step
4. What keys are present vs expected

Check browser console and server logs to see where the data flow breaks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add cluster debug endpoint for troubleshooting

New endpoint: GET /admin/api/cluster/debug

Returns comprehensive debug information:
- Whether metrics aggregator is initialized
- Redis cache enabled status
- Current instance ID
- Cluster mode status
- Total/healthy instance counts
- Sample instance structure with keys
- Sample requests data structure
- Error messages if any

This helps diagnose cluster mode issues by showing:
1. If Redis is actually enabled
2. If aggregator is initialized
3. What data structure is being stored
4. What keys are present in Stats
5. Sample of actual data being aggregated

Visit http://localhost:8181/admin/api/cluster/debug to see
what's happening under the hood.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster mode initialization and improve Redis error visibility

Critical fixes:
1. Move metrics aggregator initialization BEFORE cache initialization
   - Runs independently even if CacheEnable=false
   - Only requires CacheRedisEnable=true
   - This was causing aggregator to not initialize when cache was disabled

2. Promote Redis errors from Warning to Error level
   - Changed "Failed to publish" from Warning to Error with  CRITICAL prefix
   - Added detailed error context (instance_id, keys, error message)
   - Added success logging with ✓ confirmation
   - Log command count and data size on success

3. Enhanced startup logging
   - Log "Initializing metrics aggregator" with Redis URL/DB
   - Log "✓ Successfully initialized" with instance ID
   - Log "FAILED to initialize" as ERROR (was Warning)

Why this matters:
- If Redis cache is disabled but Redis is available, cluster mode should still work
- Previous code only initialized aggregator inside cache initialization block
- Redis publish errors were being silently logged as warnings
- No visibility into whether metrics were actually being stored

After this fix:
- Cluster mode works even with ENABLE_GLOBAL_CACHE=false + ENABLE_REDIS_CACHE=true
- Redis errors are immediately visible in logs
- Clear success/failure indicators
- Better troubleshooting information

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add force-publish endpoint for cluster mode debugging

New endpoint: POST /admin/api/cluster/force-publish

Forces an immediate metrics publish to Redis and reports results.
This helps diagnose why instances aren't appearing:

Response includes:
- success: true/false
- publish_done: confirmation publish was attempted
- instances_found: count after publish
- error: if retrieval failed
- check_logs: reminder to look for log messages

Use this to test:
curl -X POST http://localhost:8181/admin/api/cluster/force-publish

Then check server logs for:
✓ Successfully published metrics to Redis
OR
 CRITICAL: Failed to publish metrics to Redis

This bypasses the 5-second timer and publishes immediately,
making it easier to test without waiting.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster metrics aggregation and dashboard display issues

- Fix metric names: use correct Prometheus metric names (requests_succesful, requests_failed, requests_skipped)
- Add automatic stale instance cleanup (>1 minute inactive)
- Implement 10-second moving average smoothing for RPS, success rate, and cache hit rate
- Add trend indicators (↑ ↗ → ↘ ↓) to show metric direction
- Add compact number formatting (1.2M, 3.4K) with full-value tooltips
- Add retry budget aggregation (allowed/denied retries, denial rate)
- Add circuit breaker aggregation (state counts, per-instance breakdown)
- Add coalescing stats aggregation (backend savings percentage)
- Fix memory display to show "N/A" for Redis cache (memory tracking not available)
- Fix JavaScript error: change hitRate to smoothedHitRate in chart update call
- Change Redis operations to use context.Background() instead of parent context
- Fix staticcheck warning: omit nil check for map len()

This resolves cluster view showing zeros and prevents metrics from disappearing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster instance count not updating in upper right corner

The cluster-info element (showing instance count next to "Cluster View" toggle)
was only updated during initial page load. Now it updates in real-time whenever
cluster stats are received via WebSocket.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Clean up verbose debug logging

Removed debug console.log statements from dashboard JavaScript:
- WebSocket connection/disconnection logs
- Cluster mode availability checks
- Cluster stats update debug logs

Removed verbose Info logs from Go code that ran frequently:
- "Publishing metrics to Redis" (every 5s)
- "Metrics gathered successfully" (every 5s)
- "Successfully published metrics to Redis" (every 5s)
- "Aggregating stats from instances" (frequently)
- "Successfully aggregated cluster metrics" (frequently)
- "Aggregation complete" (frequently)

Kept important logs:
- Error and warning logs
- Initialization and shutdown logs
- Conditional logs (stale instance cleanup, failures)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-01 00:25:32 +01:00

826 lines
28 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/gofiber/fiber/v2/middleware/proxy"
"github.com/gookit/goutil/envutil"
graphql "github.com/lukaszraczylo/go-simple-graphql"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
libpack_tracing "github.com/lukaszraczylo/graphql-monitoring-proxy/tracing"
)
var (
cfg *config
cfgMutex sync.RWMutex
once sync.Once
tracer *libpack_tracing.TracingSetup
shutdownManager *ShutdownManager
)
// getDetailsFromEnv retrieves the value from the environment or returns the default.
// It first checks for a prefixed environment variable (GMP_KEY), then falls back to the unprefixed version.
func getDetailsFromEnv[T any](key string, defaultValue T) T {
prefixedKey := "GMP_" + key
switch v := any(defaultValue).(type) {
case string:
if val, ok := os.LookupEnv(prefixedKey); ok {
return any(val).(T)
}
return any(envutil.Getenv(key, v)).(T)
case int:
if val, ok := os.LookupEnv(prefixedKey); ok {
if intVal, err := strconv.Atoi(val); err == nil {
return any(intVal).(T)
}
}
return any(envutil.GetInt(key, v)).(T)
case bool:
if val, ok := os.LookupEnv(prefixedKey); ok {
boolVal := strings.ToLower(val) == "true" || val == "1"
return any(boolVal).(T)
}
return any(envutil.GetBool(key, v)).(T)
default:
return defaultValue
}
}
// validateJWTClaimPath validates JWT claim paths to prevent injection attacks
func validateJWTClaimPath(path string) error {
if path == "" {
return nil // Empty path is valid (feature disabled)
}
// Prevent path traversal attempts
if strings.Contains(path, "..") {
return fmt.Errorf("invalid JWT claim path (contains '..'): %s", path)
}
// Prevent absolute paths
if strings.HasPrefix(path, "/") {
return fmt.Errorf("invalid JWT claim path (absolute path not allowed): %s", path)
}
// Limit depth to prevent DoS from deeply nested claims
parts := strings.Split(path, ".")
if len(parts) > 10 {
return fmt.Errorf("invalid JWT claim path (too deep, max 10 levels): %s", path)
}
// Validate each part contains only allowed characters
for _, part := range parts {
if part == "" {
return fmt.Errorf("invalid JWT claim path (empty part): %s", path)
}
// Allow alphanumeric, underscore, and hyphen
for _, ch := range part {
if !((ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') ||
(ch >= '0' && ch <= '9') || ch == '_' || ch == '-') {
return fmt.Errorf("invalid JWT claim path (invalid character '%c'): %s", ch, path)
}
}
}
return nil
}
// parseConfig loads and parses the configuration.
func parseConfig() {
libpack_config.PKG_NAME = "graphql_proxy"
c := config{}
// Server configurations
c.Server.PortGraphQL = getDetailsFromEnv("PORT_GRAPHQL", 8080)
c.Server.PortMonitoring = getDetailsFromEnv("MONITORING_PORT", 9393)
c.Server.HostGraphQL = getDetailsFromEnv("HOST_GRAPHQL", "http://localhost/")
c.Server.HostGraphQLReadOnly = getDetailsFromEnv("HOST_GRAPHQL_READONLY", "")
// Client configurations
c.Client.JWTUserClaimPath = getDetailsFromEnv("JWT_USER_CLAIM_PATH", "")
c.Client.JWTRoleClaimPath = getDetailsFromEnv("JWT_ROLE_CLAIM_PATH", "")
// Validate JWT claim paths for security
if err := validateJWTClaimPath(c.Client.JWTUserClaimPath); err != nil {
fmt.Fprintf(os.Stderr, "❌ CRITICAL ERROR: Invalid JWT_USER_CLAIM_PATH: %v\n", err)
os.Exit(1)
}
if err := validateJWTClaimPath(c.Client.JWTRoleClaimPath); err != nil {
fmt.Fprintf(os.Stderr, "❌ CRITICAL ERROR: Invalid JWT_ROLE_CLAIM_PATH: %v\n", err)
os.Exit(1)
}
c.Client.RoleFromHeader = getDetailsFromEnv("ROLE_FROM_HEADER", "")
c.Client.RoleRateLimit = getDetailsFromEnv("ROLE_RATE_LIMIT", false)
// In-memory cache
c.Cache.CacheEnable = getDetailsFromEnv("ENABLE_GLOBAL_CACHE", false)
c.Cache.CacheTTL = getDetailsFromEnv("CACHE_TTL", 60)
c.Cache.CacheMaxMemorySize = getDetailsFromEnv("CACHE_MAX_MEMORY_SIZE", 100) // Default 100MB
c.Cache.CacheMaxEntries = getDetailsFromEnv("CACHE_MAX_ENTRIES", 10000) // Default 10000 entries
// GraphQL query parsing cache - auto-calculate based on CPU cores if not set
c.Cache.GraphQLQueryCacheSize = getDetailsFromEnv("GRAPHQL_QUERY_CACHE_SIZE", runtime.GOMAXPROCS(0)*250)
// Redis cache
c.Cache.CacheRedisEnable = getDetailsFromEnv("ENABLE_REDIS_CACHE", false)
c.Cache.CacheRedisURL = getDetailsFromEnv("CACHE_REDIS_URL", "localhost:6379")
c.Cache.CacheRedisPassword = getDetailsFromEnv("CACHE_REDIS_PASSWORD", "")
c.Cache.CacheRedisDB = getDetailsFromEnv("CACHE_REDIS_DB", 0)
// Security configurations
c.Security.BlockIntrospection = getDetailsFromEnv("BLOCK_SCHEMA_INTROSPECTION", false)
c.Security.IntrospectionAllowed = func() []string {
urls := getDetailsFromEnv("ALLOWED_INTROSPECTION", "")
if urls == "" {
return nil
}
return strings.Split(urls, ",")
}()
c.LogLevel = strings.ToUpper(getDetailsFromEnv("LOG_LEVEL", "info"))
// Logger setup
c.Logger = libpack_logging.New().SetMinLogLevel(libpack_logging.GetLogLevel(c.LogLevel)).
SetFieldName("timestamp", "ts").SetFieldName("message", "msg").SetShowCaller(false)
// Health check
c.Server.HealthcheckGraphQL = getDetailsFromEnv("HEALTHCHECK_GRAPHQL_URL", "")
c.Client.GQLClient = graphql.NewConnection()
c.Client.GQLClient.SetEndpoint(c.Server.HealthcheckGraphQL)
// Server modes
c.Server.AccessLog = getDetailsFromEnv("ENABLE_ACCESS_LOG", false)
c.Server.ReadOnlyMode = getDetailsFromEnv("READ_ONLY_MODE", false)
c.Server.AllowURLs = func() []string {
urls := getDetailsFromEnv("ALLOWED_URLS", "")
if urls == "" {
return nil
}
return strings.Split(urls, ",")
}()
// Client timeout and connection configurations with bounds checking
clientTimeout := getDetailsFromEnv("PROXIED_CLIENT_TIMEOUT", 120)
if clientTimeout < 1 || clientTimeout > 3600 { // 1 second to 1 hour max
c.Logger.Warning(&libpack_logging.LogMessage{
Message: "Invalid client timeout, using default",
Pairs: map[string]interface{}{"requested": clientTimeout, "default": 120},
})
clientTimeout = 120
}
c.Client.ClientTimeout = clientTimeout
// Configure HTTP connection pool and timeouts with sensible defaults
// MaxConnsPerHost limits parallel connections to prevent overwhelming backends
maxConns := getDetailsFromEnv("MAX_CONNS_PER_HOST", 1024)
if maxConns < 1 || maxConns > 10000 { // Reasonable bounds
c.Logger.Warning(&libpack_logging.LogMessage{
Message: "Invalid max connections per host, using default",
Pairs: map[string]interface{}{"requested": maxConns, "default": 1024},
})
maxConns = 1024
}
c.Client.MaxConnsPerHost = maxConns
// Configure distinct timeout values for more granular control with bounds checking
readTimeout := getDetailsFromEnv("CLIENT_READ_TIMEOUT", c.Client.ClientTimeout)
if readTimeout < 1 || readTimeout > 3600 {
readTimeout = c.Client.ClientTimeout
}
c.Client.ReadTimeout = readTimeout
writeTimeout := getDetailsFromEnv("CLIENT_WRITE_TIMEOUT", c.Client.ClientTimeout)
if writeTimeout < 1 || writeTimeout > 3600 {
writeTimeout = c.Client.ClientTimeout
}
c.Client.WriteTimeout = writeTimeout
// MaxIdleConnDuration controls how long connections stay in the pool
idleDuration := getDetailsFromEnv("CLIENT_MAX_IDLE_CONN_DURATION", 300)
if idleDuration < 1 || idleDuration > 7200 { // 1 second to 2 hours max
idleDuration = 300
}
c.Client.MaxIdleConnDuration = idleDuration
// Secure by default: TLS verification is enabled unless explicitly disabled
c.Client.DisableTLSVerify = getDetailsFromEnv("CLIENT_DISABLE_TLS_VERIFY", false)
// Warn if TLS verification is disabled (security risk)
if c.Client.DisableTLSVerify {
// Logger might not be initialized yet, will log after logger setup
defer func() {
if c.Logger != nil {
c.Logger.Warning(&libpack_logging.LogMessage{
Message: "⚠️ TLS certificate verification is DISABLED - This is a security risk in production!",
Pairs: map[string]interface{}{
"recommendation": "Enable TLS verification by removing CLIENT_DISABLE_TLS_VERIFY or setting it to false",
},
})
}
}()
}
// Create HTTP client with the optimized parameters
c.Client.FastProxyClient = createFasthttpClient(&c)
proxy.WithClient(c.Client.FastProxyClient) // Setting the global proxy client
// API configurations
c.Server.EnableApi = getDetailsFromEnv("ENABLE_API", false)
c.Server.ApiPort = getDetailsFromEnv("API_PORT", 9090)
// Validate and sanitize banned users file path to prevent path traversal
bannedUsersFile := getDetailsFromEnv("BANNED_USERS_FILE", "/go/src/app/banned_users.json")
if validatedPath, err := validateFilePath(bannedUsersFile); err != nil {
c.Logger.Error(&libpack_logging.LogMessage{
Message: "Invalid banned users file path, using default",
Pairs: map[string]interface{}{"requested": bannedUsersFile, "error": err.Error()},
})
c.Api.BannedUsersFile = "/go/src/app/banned_users.json"
} else {
c.Api.BannedUsersFile = validatedPath
}
c.Server.PurgeOnCrawl = getDetailsFromEnv("PURGE_METRICS_ON_CRAWL", false)
c.Server.PurgeEvery = getDetailsFromEnv("PURGE_METRICS_ON_TIMER", 0)
// Hasura event cleaner
c.HasuraEventCleaner.Enable = getDetailsFromEnv("HASURA_EVENT_CLEANER", false)
c.HasuraEventCleaner.ClearOlderThan = getDetailsFromEnv("HASURA_EVENT_CLEANER_OLDER_THAN", 1)
c.HasuraEventCleaner.EventMetadataDb = getDetailsFromEnv("HASURA_EVENT_METADATA_DB", "")
// Tracing configuration
c.Tracing.Enable = getDetailsFromEnv("ENABLE_TRACE", false)
c.Tracing.Endpoint = getDetailsFromEnv("TRACE_ENDPOINT", "localhost:4317")
// Circuit Breaker configuration - optimized for high-traffic production environments
c.CircuitBreaker.Enable = getDetailsFromEnv("ENABLE_CIRCUIT_BREAKER", false)
c.CircuitBreaker.MaxFailures = getDetailsFromEnv("CIRCUIT_MAX_FAILURES", 10) // Higher tolerance for transient failures
c.CircuitBreaker.FailureRatio = getDetailsFromEnv("CIRCUIT_FAILURE_RATIO", 0.5) // Trip at 50% failure rate
c.CircuitBreaker.SampleSize = getDetailsFromEnv("CIRCUIT_SAMPLE_SIZE", 100) // Statistically significant sample
c.CircuitBreaker.Timeout = getDetailsFromEnv("CIRCUIT_TIMEOUT_SECONDS", 60) // Longer recovery time for stability
c.CircuitBreaker.MaxRequestsInHalfOpen = getDetailsFromEnv("CIRCUIT_MAX_HALF_OPEN_REQUESTS", 5) // More probe requests
c.CircuitBreaker.ReturnCachedOnOpen = getDetailsFromEnv("CIRCUIT_RETURN_CACHED_ON_OPEN", true)
c.CircuitBreaker.TripOnTimeouts = getDetailsFromEnv("CIRCUIT_TRIP_ON_TIMEOUTS", true)
c.CircuitBreaker.TripOn5xx = getDetailsFromEnv("CIRCUIT_TRIP_ON_5XX", true)
c.CircuitBreaker.TripOn4xx = getDetailsFromEnv("CIRCUIT_TRIP_ON_4XX", false) // 4xx are usually client errors
c.CircuitBreaker.BackoffMultiplier = getDetailsFromEnv("CIRCUIT_BACKOFF_MULTIPLIER", 1.0) // No backoff by default
c.CircuitBreaker.MaxBackoffTimeout = getDetailsFromEnv("CIRCUIT_MAX_BACKOFF_TIMEOUT", 300) // 5 minutes max
// Initialize endpoint configs map
c.CircuitBreaker.EndpointConfigs = make(map[string]*EndpointCBConfig)
// Retry budget configuration
c.RetryBudget.Enable = getDetailsFromEnv("RETRY_BUDGET_ENABLE", true)
c.RetryBudget.TokensPerSecond = getDetailsFromEnv("RETRY_BUDGET_TOKENS_PER_SEC", 10.0)
c.RetryBudget.MaxTokens = getDetailsFromEnv("RETRY_BUDGET_MAX_TOKENS", 100)
// Request coalescing configuration
c.RequestCoalescing.Enable = getDetailsFromEnv("REQUEST_COALESCING_ENABLE", true)
// WebSocket configuration
c.WebSocket.Enable = getDetailsFromEnv("WEBSOCKET_ENABLE", false)
c.WebSocket.PingInterval = getDetailsFromEnv("WEBSOCKET_PING_INTERVAL", 30)
c.WebSocket.PongTimeout = getDetailsFromEnv("WEBSOCKET_PONG_TIMEOUT", 60)
c.WebSocket.MaxMessageSize = int64(getDetailsFromEnv("WEBSOCKET_MAX_MESSAGE_SIZE", 524288)) // 512KB
// Admin dashboard configuration
c.AdminDashboard.Enable = getDetailsFromEnv("ADMIN_DASHBOARD_ENABLE", true)
cfgMutex.Lock()
cfg = &c
cfgMutex.Unlock()
// Initialize tracing if enabled
if cfg.Tracing.Enable {
if cfg.Tracing.Endpoint == "" {
cfg.Logger.Warning(&libpack_logging.LogMessage{
Message: "Tracing endpoint not configured, using default localhost:4317",
})
cfg.Tracing.Endpoint = "localhost:4317"
}
var err error
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tracer, err = libpack_tracing.NewTracing(ctx, cfg.Tracing.Endpoint)
if err != nil {
cfg.Logger.Error(&libpack_logging.LogMessage{
Message: "Failed to initialize tracing",
Pairs: map[string]interface{}{"error": err.Error()},
})
} else {
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Tracing initialized",
Pairs: map[string]interface{}{"endpoint": cfg.Tracing.Endpoint},
})
}
}
// Initialize metrics aggregator FIRST if Redis is enabled (even if cache is disabled)
// This allows cluster mode monitoring even when cache is off
if cfg.Cache.CacheRedisEnable {
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Initializing metrics aggregator for cluster mode",
Pairs: map[string]interface{}{
"redis_url": cfg.Cache.CacheRedisURL,
"redis_db": cfg.Cache.CacheRedisDB,
},
})
if err := InitializeMetricsAggregator(
cfg.Cache.CacheRedisURL,
cfg.Cache.CacheRedisPassword,
cfg.Cache.CacheRedisDB,
cfg.Logger,
); err != nil {
cfg.Logger.Error(&libpack_logging.LogMessage{
Message: "FAILED to initialize metrics aggregator - cluster mode will not work",
Pairs: map[string]interface{}{
"error": err.Error(),
},
})
} else {
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "✓ Metrics aggregator successfully initialized",
Pairs: map[string]interface{}{
"instance_id": GetMetricsAggregator().GetInstanceID(),
},
})
}
}
// Initialize cache if enabled
if cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable {
cacheConfig := &libpack_cache.CacheConfig{
Logger: cfg.Logger,
TTL: cfg.Cache.CacheTTL,
}
// Redis cache configurations
if cfg.Cache.CacheRedisEnable {
cacheConfig.Redis.Enable = true
cacheConfig.Redis.URL = cfg.Cache.CacheRedisURL
cacheConfig.Redis.Password = cfg.Cache.CacheRedisPassword
cacheConfig.Redis.DB = cfg.Cache.CacheRedisDB
} else {
// Memory cache configurations
cacheConfig.Memory.MaxMemorySize = int64(cfg.Cache.CacheMaxMemorySize) * 1024 * 1024 // Convert MB to bytes
cacheConfig.Memory.MaxEntries = int64(cfg.Cache.CacheMaxEntries)
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Configuring memory cache with limits",
Pairs: map[string]interface{}{
"max_memory_mb": cfg.Cache.CacheMaxMemorySize,
"max_entries": cfg.Cache.CacheMaxEntries,
},
})
}
libpack_cache.EnableCache(cacheConfig)
// Start memory monitoring for in-memory cache if it's not Redis
// Will be started with context in main()
}
// Initialize circuit breaker if enabled
if cfg.CircuitBreaker.Enable {
initCircuitBreaker(cfg)
}
// Initialize retry budget
if cfg.RetryBudget.Enable {
retryBudgetConfig := RetryBudgetConfig{
TokensPerSecond: cfg.RetryBudget.TokensPerSecond,
MaxTokens: cfg.RetryBudget.MaxTokens,
Enabled: cfg.RetryBudget.Enable,
}
InitializeRetryBudget(retryBudgetConfig, cfg.Logger)
}
// Initialize request coalescer
if cfg.RequestCoalescing.Enable {
InitializeRequestCoalescer(true, cfg.Logger, cfg.Monitoring)
}
// Initialize WebSocket proxy
if cfg.WebSocket.Enable {
wsConfig := WebSocketConfig{
Enabled: cfg.WebSocket.Enable,
PingInterval: time.Duration(cfg.WebSocket.PingInterval) * time.Second,
PongTimeout: time.Duration(cfg.WebSocket.PongTimeout) * time.Second,
MaxMessageSize: cfg.WebSocket.MaxMessageSize,
}
InitializeWebSocketProxy(cfg.Server.HostGraphQL, wsConfig, cfg.Logger, cfg.Monitoring)
}
// Initialize backend health manager
if cfg.Server.HostGraphQL != "" {
healthMgr := InitializeBackendHealth(cfg.Client.FastProxyClient, cfg.Server.HostGraphQL, cfg.Logger)
// Start health checking in background
healthMgr.StartHealthChecking()
}
// Initialize RPS tracker for real-time requests per second monitoring
InitializeRPSTracker()
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "RPS tracker initialized",
})
// Load rate limit configuration with improved error handling
if err := loadRatelimitConfig(); err != nil {
// Log the error with clear guidance
detailedError := err.Error()
cfg.Logger.Error(&libpack_logging.LogMessage{
Message: "Failed to start service due to rate limit configuration error",
Pairs: map[string]interface{}{
"error": detailedError,
},
})
// If we're not in a test environment, print to stderr and exit if config error
if ifNotInTest() {
fmt.Fprintln(os.Stderr, "⚠️ CRITICAL ERROR: Rate limit configuration problem detected")
fmt.Fprintln(os.Stderr, detailedError)
os.Exit(1)
}
}
// API and event cleaner will be started with context in main()
prepareQueriesAndExemptions()
// Initialize GraphQL parsing optimizations
initGraphQLParsing()
}
func main() {
// Parse configuration
parseConfig()
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize shutdown manager
shutdownManager = NewShutdownManager(ctx)
// Create a wait group to manage goroutines
var wg sync.WaitGroup
// Setup signal handling for graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Shutdown signal received, stopping services...",
})
cancel()
}()
// Start background services with context
once.Do(func() {
// Start API server
shutdownManager.RunGoroutine("api-server", func(ctx context.Context) {
if err := enableApi(ctx); err != nil {
cfg.Logger.Error(&libpack_logging.LogMessage{
Message: "API server error",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
})
// Start event cleaner
shutdownManager.RunGoroutine("event-cleaner", func(ctx context.Context) {
if err := enableHasuraEventCleaner(ctx); err != nil {
cfg.Logger.Error(&libpack_logging.LogMessage{
Message: "Event cleaner error",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
})
// Start cache memory monitoring if not using Redis
if cfg.Cache.CacheEnable && !cfg.Cache.CacheRedisEnable {
shutdownManager.RunGoroutine("cache-memory-monitoring", startCacheMemoryMonitoring)
}
})
// Register connection pool for cleanup
shutdownManager.RegisterComponent("http-connection-pool", func(ctx context.Context) error {
if connectionPoolManager != nil {
return connectionPoolManager.Shutdown()
}
return nil
})
// Register backend health manager for cleanup
shutdownManager.RegisterComponent("backend-health-manager", func(ctx context.Context) error {
if healthMgr := GetBackendHealthManager(); healthMgr != nil {
healthMgr.Shutdown()
}
return nil
})
// Register metrics aggregator for cleanup
shutdownManager.RegisterComponent("metrics-aggregator", func(ctx context.Context) error {
if aggregator := GetMetricsAggregator(); aggregator != nil {
aggregator.Shutdown()
}
return nil
})
// Cache shutdown is handled internally by the cache implementation
// Start monitoring server
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Starting monitoring server...",
Pairs: map[string]interface{}{"port": cfg.Server.PortMonitoring},
})
// Start monitoring server in a goroutine
wg.Add(1)
monitoringErrCh := make(chan error, 1)
go func() {
defer wg.Done()
if err := StartMonitoringServer(); err != nil {
monitoringErrCh <- err
}
}()
// Give monitoring server time to initialize
select {
case err := <-monitoringErrCh:
cfg.Logger.Critical(&libpack_logging.LogMessage{
Message: "Failed to start monitoring server",
Pairs: map[string]interface{}{
"error": err.Error(),
"port": cfg.Server.PortMonitoring,
},
})
os.Exit(1)
case <-time.After(2 * time.Second):
// Continue if no error received within timeout
}
// Wait for GraphQL backend to be ready before starting proxy
if healthMgr := GetBackendHealthManager(); healthMgr != nil {
startupTimeout := time.Duration(getDetailsFromEnv("BACKEND_STARTUP_TIMEOUT", 300)) * time.Second
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Waiting for GraphQL backend to be ready",
Pairs: map[string]interface{}{
"timeout_seconds": int(startupTimeout.Seconds()),
},
})
if err := healthMgr.WaitForBackendReady(startupTimeout); err != nil {
cfg.Logger.Critical(&libpack_logging.LogMessage{
Message: "GraphQL backend did not become ready in time",
Pairs: map[string]interface{}{
"error": err.Error(),
"timeout": startupTimeout.String(),
},
})
// Don't exit immediately, but warn that backend is not ready
cfg.Logger.Warning(&libpack_logging.LogMessage{
Message: "Starting proxy anyway - requests will fail until backend becomes available",
})
}
}
// Start HTTP proxy
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Starting HTTP proxy server...",
Pairs: map[string]interface{}{"port": cfg.Server.PortGraphQL},
})
// Start HTTP proxy in a goroutine
wg.Add(1)
proxyErrCh := make(chan error, 1)
go func() {
defer wg.Done()
if err := StartHTTPProxy(); err != nil {
proxyErrCh <- err
}
}()
// Block for a moment to check for immediate startup errors
select {
case err := <-proxyErrCh:
cfg.Logger.Critical(&libpack_logging.LogMessage{
Message: "Failed to start HTTP proxy server",
Pairs: map[string]interface{}{
"error": err.Error(),
"port": cfg.Server.PortGraphQL,
},
})
os.Exit(1)
case <-time.After(1 * time.Second):
// Continue if no error received within timeout
}
// Wait for context cancellation
<-ctx.Done()
// Perform cleanup
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Shutting down services...",
})
// Register tracer shutdown
if tracer != nil {
shutdownManager.RegisterComponent("tracer", func(ctx context.Context) error {
return tracer.Shutdown(ctx)
})
}
// Perform graceful shutdown of all components
if err := shutdownManager.Shutdown(30 * time.Second); err != nil {
cfg.Logger.Error(&libpack_logging.LogMessage{
Message: "Error during shutdown",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
// Wait for all goroutines to finish (with timeout)
waitCh := make(chan struct{})
go func() {
wg.Wait()
close(waitCh)
}()
select {
case <-waitCh:
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "All services shut down gracefully",
})
case <-time.After(10 * time.Second):
cfg.Logger.Warning(&libpack_logging.LogMessage{
Message: "Some services didn't shut down gracefully within timeout",
})
}
}
// startCacheMemoryMonitoring polls memory cache usage and updates metrics
func startCacheMemoryMonitoring(ctx context.Context) {
// Check every few seconds (more frequent than cleanup routine)
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Starting memory cache monitoring",
})
// Use mutex to protect concurrent access to metrics registration
var metricsMutex sync.Mutex
// Create initial metrics with proper synchronization
metricsMutex.Lock()
cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryLimit, nil,
float64(libpack_cache.GetCacheMaxMemorySize()))
metricsMutex.Unlock()
for {
select {
case <-ctx.Done():
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Stopping cache memory monitoring",
})
return
case <-ticker.C:
// Skip if monitoring not initialized or cache not initialized
if cfg.Monitoring == nil || !libpack_cache.IsCacheInitialized() {
continue
}
// Get current memory usage atomically
memoryUsage := libpack_cache.GetCacheMemoryUsage()
memoryLimit := libpack_cache.GetCacheMaxMemorySize()
// Update metrics with proper synchronization
metricsMutex.Lock()
cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryUsage, nil,
float64(memoryUsage))
cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryLimit, nil,
float64(memoryLimit))
// Calculate percentage (protect against division by zero)
var percentUsed float64
if memoryLimit > 0 {
percentUsed = float64(memoryUsage) / float64(memoryLimit) * 100.0
}
cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryPercent, nil,
percentUsed)
metricsMutex.Unlock()
// Log if memory usage is high (over 80%)
if percentUsed > 80.0 {
cfg.Logger.Warning(&libpack_logging.LogMessage{
Message: "Memory cache usage is high",
Pairs: map[string]interface{}{
"memory_usage_bytes": memoryUsage,
"memory_limit_bytes": memoryLimit,
"percent_used": percentUsed,
},
})
}
}
}
}
// validateFilePath validates and sanitizes file paths to prevent path traversal attacks
func validateFilePath(path string) (string, error) {
if path == "" {
return "", fmt.Errorf("empty path not allowed")
}
// Reject bare current directory for security
if path == "." {
return "", fmt.Errorf("bare current directory not allowed")
}
// URL decode the path to detect encoded traversal attempts
decodedPath := path
if strings.Contains(path, "%") {
// Try to decode URL encoding (single and double)
for i := 0; i < 3; i++ { // Handle multiple levels of encoding
if decoded, err := url.QueryUnescape(decodedPath); err == nil {
decodedPath = decoded
} else {
break
}
}
}
// Check for path traversal patterns (in both original and decoded)
checkPaths := []string{path, decodedPath}
for _, checkPath := range checkPaths {
if strings.Contains(checkPath, "..") {
return "", fmt.Errorf("path traversal attempt detected")
}
}
// Check for dangerous characters
dangerousChars := []string{";", "|", "\n", "\r"}
for _, char := range dangerousChars {
if strings.Contains(path, char) {
return "", fmt.Errorf("dangerous character detected in path")
}
}
// Clean and normalize the path
cleaned := filepath.Clean(path)
// Get absolute path
absPath, err := filepath.Abs(cleaned)
if err != nil {
return "", fmt.Errorf("invalid file path: %w", err)
}
// Get working directory as base
workDir, err := os.Getwd()
if err != nil {
return "", fmt.Errorf("cannot determine working directory: %w", err)
}
// Define allowed directories
allowedDirs := []string{
workDir, // Current working directory
"/tmp", // Temporary files
"/var/tmp", // System temporary files
"/go/src/app", // Docker container default
}
// Check if the path is within any allowed directory
isAllowed := false
for _, allowedDir := range allowedDirs {
// Ensure both paths are cleaned and absolute for proper comparison
cleanedAllowed := filepath.Clean(allowedDir)
if strings.HasPrefix(absPath, cleanedAllowed+string(filepath.Separator)) || absPath == cleanedAllowed {
isAllowed = true
break
}
}
if !isAllowed {
return "", fmt.Errorf("path not in allowed directories")
}
// Additional security checks
if strings.Contains(absPath, "\x00") {
return "", fmt.Errorf("null byte in path")
}
// Return the original path if it's within the current working directory and is relative
if strings.HasPrefix(absPath, workDir) && !filepath.IsAbs(path) {
return path, nil
}
return absPath, nil
}
// ifNotInTest checks if the program is not running in a test environment.
func ifNotInTest() bool {
return flag.Lookup("test.v") == nil
}