diff --git a/admin/dashboard.html b/admin/dashboard.html index ad7bf97..dfbbf78 100644 --- a/admin/dashboard.html +++ b/admin/dashboard.html @@ -4,6 +4,7 @@ GraphQL Proxy Admin Dashboard +
-

GraphQL Proxy Admin Dashboard

-
Real-time monitoring and management
+
+
+

GraphQL Proxy Admin Dashboard

+
+ Real-time monitoring and management + Connecting... +
+
+
+ + +
+
+ + + + +

+ System Overview +

+
+
+
Uptime
+
--
+
-- seconds
+
+ +
+
Total Requests
+
--
+
+ -- + -- +
+
+ +
+
Current RPS
+
--
+
Avg: -- req/s
+
+ +
+
Success Rate
+
--%
+
+
+
+
+
+ + +

Cache Performance

+
+
+
Cache Hit Rate
+
--%
+
+
+
+
+ +
+
Cache Hits / Misses
+
--
+
+ Hits: -- | + Misses: -- +
+
+ +
+
Cached Queries
+
--
+
Total entries
+
+ +
+
Memory Usage
+
-- MB
+
--%
+
+
+
+
+
+ + +

Real-time Metrics

+
+
+
Requests Per Second
+
+ +
+
+ +
+
Cache Hit Rate Over Time
+
+ +
+
+
+ +

Health Status

-
System Health
-
- - Loading... +
+ Backend Status + + + Loading... +
-

Key Metrics

+

Advanced Features

Request Coalescing
@@ -334,44 +625,558 @@
-
- Dashboard refreshes every 5 seconds + + + +
+ Connecting to real-time updates...
\ No newline at end of file diff --git a/admin_dashboard.go b/admin_dashboard.go index 33bf05b..c8e07a5 100644 --- a/admin_dashboard.go +++ b/admin_dashboard.go @@ -2,10 +2,12 @@ package main import ( "embed" + "encoding/json" "fmt" "time" "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache" libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging" ) @@ -41,6 +43,15 @@ func (ad *AdminDashboard) RegisterRoutes(app *fiber.App) { app.Get("/admin/api/coalescing", ad.getCoalescingStats) app.Get("/admin/api/websocket", ad.getWebSocketStats) + // WebSocket endpoint for streaming statistics + app.Get("/admin/ws/stats", websocket.New(ad.handleStatsWebSocket)) + + // Cluster mode endpoints (when using Redis) + app.Get("/admin/api/cluster/stats", ad.getClusterStats) + app.Get("/admin/api/cluster/instances", ad.getClusterInstances) + app.Get("/admin/api/cluster/debug", ad.getClusterDebug) + app.Post("/admin/api/cluster/force-publish", ad.forcePublish) + // Control endpoints app.Post("/admin/api/cache/clear", ad.clearCache) app.Post("/admin/api/retry-budget/reset", ad.resetRetryBudget) @@ -78,9 +89,9 @@ func (ad *AdminDashboard) getStats(c *fiber.Ctx) error { } if cfg != nil && cfg.Monitoring != nil { - succeeded := getAdminMetricValue("graphql_proxy_succeeded_total") - failed := getAdminMetricValue("graphql_proxy_failed_total") - skipped := getAdminMetricValue("graphql_proxy_skipped_total") + succeeded := getAdminMetricValue("requests_succesful") + failed := getAdminMetricValue("requests_failed") + skipped := getAdminMetricValue("requests_skipped") total := succeeded + failed + skipped // Request statistics @@ -241,18 +252,24 @@ func (ad *AdminDashboard) getCacheStats(c *fiber.Ctx) error { } stats["hit_rate_pct"] = hitRate - // Get memory usage - memoryUsage := libpack_cache.GetCacheMemoryUsage() - maxMemory := libpack_cache.GetCacheMaxMemorySize() - stats["memory_usage_bytes"] = memoryUsage - stats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024) + // Get memory usage only for in-memory cache + if cfg.Cache.CacheEnable && !cfg.Cache.CacheRedisEnable { + memoryUsage := libpack_cache.GetCacheMemoryUsage() + maxMemory := libpack_cache.GetCacheMaxMemorySize() + stats["memory_usage_bytes"] = memoryUsage + stats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024) - // Calculate memory usage percentage - memoryUsagePct := 0.0 - if maxMemory > 0 { - memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100 + // Calculate memory usage percentage + memoryUsagePct := 0.0 + if maxMemory > 0 { + memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100 + } + stats["memory_usage_pct"] = memoryUsagePct + } else { + // For Redis cache, memory tracking not available per instance + stats["memory_usage_mb"] = -1 // Sentinel value for "not applicable" + stats["memory_usage_pct"] = -1 } - stats["memory_usage_pct"] = memoryUsagePct } } @@ -349,6 +366,161 @@ func (ad *AdminDashboard) resetCoalescing(c *fiber.Ctx) error { }) } +// getClusterStats returns aggregated statistics from all proxy instances +func (ad *AdminDashboard) getClusterStats(c *fiber.Ctx) error { + aggregator := GetMetricsAggregator() + if aggregator == nil { + return c.Status(503).JSON(map[string]interface{}{ + "error": "Cluster mode not available", + "message": "Redis-based metrics aggregation is not enabled", + "cluster_mode": false, + }) + } + + metrics, err := aggregator.GetAggregatedMetrics() + if err != nil { + if ad.logger != nil { + ad.logger.Error(&libpack_logger.LogMessage{ + Message: "Failed to get aggregated metrics", + Pairs: map[string]interface{}{"error": err.Error()}, + }) + } + return c.Status(500).JSON(map[string]interface{}{ + "error": "Failed to retrieve cluster metrics", + "message": err.Error(), + }) + } + + // Format response similar to regular stats endpoint + response := map[string]interface{}{ + "cluster_mode": true, + "total_instances": metrics.TotalInstances, + "healthy_instances": metrics.HealthyInstances, + "last_update": metrics.LastUpdate.Format(time.RFC3339), + "stats": metrics.CombinedStats, + } + + return c.JSON(response) +} + +// getClusterInstances returns detailed metrics for each proxy instance +func (ad *AdminDashboard) getClusterInstances(c *fiber.Ctx) error { + aggregator := GetMetricsAggregator() + if aggregator == nil { + return c.Status(503).JSON(map[string]interface{}{ + "error": "Cluster mode not available", + "message": "Redis-based metrics aggregation is not enabled", + "cluster_mode": false, + }) + } + + metrics, err := aggregator.GetAggregatedMetrics() + if err != nil { + if ad.logger != nil { + ad.logger.Error(&libpack_logger.LogMessage{ + Message: "Failed to get instance metrics", + Pairs: map[string]interface{}{"error": err.Error()}, + }) + } + return c.Status(500).JSON(map[string]interface{}{ + "error": "Failed to retrieve instance metrics", + "message": err.Error(), + }) + } + + return c.JSON(map[string]interface{}{ + "cluster_mode": true, + "total_instances": metrics.TotalInstances, + "healthy_instances": metrics.HealthyInstances, + "current_instance": aggregator.GetInstanceID(), + "instances": metrics.Instances, + }) +} + +// getClusterDebug returns debug information about cluster mode +func (ad *AdminDashboard) getClusterDebug(c *fiber.Ctx) error { + aggregator := GetMetricsAggregator() + + debug := map[string]interface{}{ + "aggregator_initialized": aggregator != nil, + "redis_cache_enabled": false, + } + + if cfg != nil { + debug["redis_cache_enabled"] = cfg.Cache.CacheRedisEnable + debug["cache_enabled"] = cfg.Cache.CacheEnable + } + + if aggregator != nil { + debug["instance_id"] = aggregator.GetInstanceID() + debug["is_cluster_mode"] = aggregator.IsClusterMode() + + // Try to get metrics + metrics, err := aggregator.GetAggregatedMetrics() + if err != nil { + debug["error"] = err.Error() + } else { + debug["total_instances"] = metrics.TotalInstances + debug["healthy_instances"] = metrics.HealthyInstances + + // Show first instance structure as example + if len(metrics.Instances) > 0 { + first := metrics.Instances[0] + debug["sample_instance"] = map[string]interface{}{ + "instance_id": first.InstanceID, + "hostname": first.Hostname, + "uptime_seconds": first.UptimeSeconds, + "stats_keys": getMapKeys(first.Stats), + "has_requests": first.Stats["requests"] != nil, + "has_cache": len(first.CacheSummary) > 0, + "health_status": first.Health["status"], + } + + // Show requests structure if it exists + if requests, ok := first.Stats["requests"].(map[string]interface{}); ok { + debug["sample_requests"] = requests + } + } + } + } + + return c.JSON(debug) +} + +// Helper to get map keys +func getMapKeys(m map[string]interface{}) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +// forcePublish forces an immediate metrics publish for testing +func (ad *AdminDashboard) forcePublish(c *fiber.Ctx) error { + aggregator := GetMetricsAggregator() + if aggregator == nil { + return c.Status(503).JSON(map[string]interface{}{ + "error": "Aggregator not initialized", + "success": false, + }) + } + + // Trigger publish in goroutine to avoid blocking + go aggregator.publishMetrics() + + return c.JSON(map[string]interface{}{ + "success": true, + "triggered": true, + "message": "Publish triggered in background", + "next_steps": []string{ + "Wait 2 seconds", + "Check GET /admin/api/cluster/debug", + "Check server logs for ✓ Successfully published or ❌ CRITICAL errors", + }, + }) +} + // Helper to get metric value for admin dashboard func getAdminMetricValue(name string) int64 { if cfg == nil || cfg.Monitoring == nil { @@ -361,4 +533,295 @@ func getAdminMetricValue(name string) int64 { return int64(counter.Get()) } +// handleStatsWebSocket handles WebSocket connections for streaming statistics +func (ad *AdminDashboard) handleStatsWebSocket(c *websocket.Conn) { + if ad.logger != nil { + ad.logger.Info(&libpack_logger.LogMessage{ + Message: "WebSocket client connected to stats stream", + Pairs: map[string]interface{}{ + "remote_addr": c.RemoteAddr().String(), + }, + }) + } + + // Cleanup on disconnect + defer func() { + if ad.logger != nil { + ad.logger.Info(&libpack_logger.LogMessage{ + Message: "WebSocket client disconnected from stats stream", + Pairs: map[string]interface{}{ + "remote_addr": c.RemoteAddr().String(), + }, + }) + } + c.Close() + }() + + // Set up ping/pong handlers + c.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.SetPongHandler(func(string) error { + c.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + + // Channel to signal when to stop + done := make(chan struct{}) + + // Goroutine to handle incoming messages (for connection keep-alive) + go func() { + defer close(done) + for { + if _, _, err := c.ReadMessage(); err != nil { + // Connection closed or error + return + } + } + }() + + // Stream statistics every 2 seconds + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + // Send initial stats immediately + if stats := ad.gatherAllStats(); stats != nil { + if data, err := json.Marshal(stats); err == nil { + c.WriteMessage(websocket.TextMessage, data) + } + } + + // Stream loop + for { + select { + case <-ticker.C: + // Gather all stats + stats := ad.gatherAllStats() + + // Marshal to JSON + data, err := json.Marshal(stats) + if err != nil { + if ad.logger != nil { + ad.logger.Error(&libpack_logger.LogMessage{ + Message: "Failed to marshal stats for WebSocket", + Pairs: map[string]interface{}{"error": err.Error()}, + }) + } + return + } + + // Send to client + if err := c.WriteMessage(websocket.TextMessage, data); err != nil { + if ad.logger != nil { + ad.logger.Debug(&libpack_logger.LogMessage{ + Message: "Failed to write to WebSocket (client likely disconnected)", + Pairs: map[string]interface{}{"error": err.Error()}, + }) + } + return + } + + case <-done: + // Client disconnected + return + } + } +} + +// gatherAllStats collects all statistics into a single structure +func (ad *AdminDashboard) gatherAllStats() map[string]interface{} { + result := make(map[string]interface{}) + + // Main stats + uptimeSeconds := time.Since(startTime).Seconds() + stats := map[string]interface{}{ + "timestamp": time.Now().Format(time.RFC3339), + "uptime_seconds": uptimeSeconds, + "uptime_human": formatDuration(time.Since(startTime)), + "version": "0.27.0", + } + + if cfg != nil && cfg.Monitoring != nil { + succeeded := getAdminMetricValue("requests_succesful") + failed := getAdminMetricValue("requests_failed") + skipped := getAdminMetricValue("requests_skipped") + total := succeeded + failed + skipped + + requestStats := map[string]interface{}{ + "total": total, + "succeeded": succeeded, + "failed": failed, + "skipped": skipped, + } + + if total > 0 { + requestStats["success_rate_pct"] = float64(succeeded) / float64(total) * 100 + requestStats["failure_rate_pct"] = float64(failed) / float64(total) * 100 + requestStats["skip_rate_pct"] = float64(skipped) / float64(total) * 100 + } else { + requestStats["success_rate_pct"] = 0.0 + requestStats["failure_rate_pct"] = 0.0 + requestStats["skip_rate_pct"] = 0.0 + } + + if uptimeSeconds > 0 { + requestStats["avg_requests_per_second"] = float64(total) / uptimeSeconds + } else { + requestStats["avg_requests_per_second"] = 0.0 + } + + if rpsTracker := GetRPSTracker(); rpsTracker != nil { + requestStats["current_requests_per_second"] = rpsTracker.GetCurrentRPS() + } else { + requestStats["current_requests_per_second"] = 0.0 + } + + stats["requests"] = requestStats + + // Cache summary + cacheStats := libpack_cache.GetCacheStats() + if cacheStats != nil { + totalCacheRequests := cacheStats.CacheHits + cacheStats.CacheMisses + hitRate := 0.0 + if totalCacheRequests > 0 { + hitRate = float64(cacheStats.CacheHits) / float64(totalCacheRequests) * 100 + } + stats["cache_summary"] = map[string]interface{}{ + "hits": cacheStats.CacheHits, + "misses": cacheStats.CacheMisses, + "hit_rate_pct": hitRate, + "total_cached": cacheStats.CachedQueries, + } + } + } + + result["stats"] = stats + + // Health + healthMgr := GetBackendHealthManager() + health := map[string]interface{}{ + "status": "unknown", + "backend": map[string]interface{}{ + "healthy": false, + }, + } + + if healthMgr != nil { + isHealthy := healthMgr.IsHealthy() + health["backend"] = map[string]interface{}{ + "healthy": isHealthy, + "consecutive_failures": healthMgr.GetConsecutiveFailures(), + "last_check": healthMgr.GetLastHealthCheck().Format(time.RFC3339), + } + + if isHealthy { + health["status"] = "healthy" + } else { + health["status"] = "unhealthy" + } + } + result["health"] = health + + // Circuit breaker + cbStatus := map[string]interface{}{ + "enabled": false, + "state": "unknown", + } + + if cfg != nil { + cbStatus["enabled"] = cfg.CircuitBreaker.Enable + + if cb != nil { + cbMutex.RLock() + state := cb.State() + cbMutex.RUnlock() + + cbStatus["state"] = state.String() + cbStatus["config"] = map[string]interface{}{ + "max_failures": cfg.CircuitBreaker.MaxFailures, + "failure_ratio": cfg.CircuitBreaker.FailureRatio, + "timeout": cfg.CircuitBreaker.Timeout, + "max_requests_half_open": cfg.CircuitBreaker.MaxRequestsInHalfOpen, + "return_cached_on_open": cfg.CircuitBreaker.ReturnCachedOnOpen, + } + } + } + result["circuit_breaker"] = cbStatus + + // Cache stats + cacheStats := map[string]interface{}{ + "enabled": false, + } + + if cfg != nil { + cacheStats["enabled"] = cfg.Cache.CacheEnable + cacheStats["redis_enabled"] = cfg.Cache.CacheRedisEnable + cacheStats["ttl_seconds"] = cfg.Cache.CacheTTL + cacheStats["max_memory_mb"] = cfg.Cache.CacheMaxMemorySize + cacheStats["max_entries"] = cfg.Cache.CacheMaxEntries + + runtimeCacheStats := libpack_cache.GetCacheStats() + if runtimeCacheStats != nil { + cacheStats["cached_queries"] = runtimeCacheStats.CachedQueries + cacheStats["cache_hits"] = runtimeCacheStats.CacheHits + cacheStats["cache_misses"] = runtimeCacheStats.CacheMisses + + totalRequests := runtimeCacheStats.CacheHits + runtimeCacheStats.CacheMisses + hitRate := 0.0 + if totalRequests > 0 { + hitRate = float64(runtimeCacheStats.CacheHits) / float64(totalRequests) * 100 + } + cacheStats["hit_rate_pct"] = hitRate + + memoryUsage := libpack_cache.GetCacheMemoryUsage() + maxMemory := libpack_cache.GetCacheMaxMemorySize() + cacheStats["memory_usage_bytes"] = memoryUsage + cacheStats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024) + + memoryUsagePct := 0.0 + if maxMemory > 0 { + memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100 + } + cacheStats["memory_usage_pct"] = memoryUsagePct + } + } + result["cache"] = cacheStats + + // Connection stats + poolMgr := GetConnectionPoolManager() + connStats := map[string]interface{}{ + "available": false, + } + + if poolMgr != nil { + connStats = poolMgr.GetConnectionStats() + connStats["available"] = true + } + result["connections"] = connStats + + // Retry budget + rb := GetRetryBudget() + if rb == nil { + result["retry_budget"] = map[string]interface{}{"enabled": false} + } else { + result["retry_budget"] = rb.GetStats() + } + + // Coalescing + rc := GetRequestCoalescer() + if rc == nil { + result["coalescing"] = map[string]interface{}{"enabled": false} + } else { + result["coalescing"] = rc.GetStats() + } + + // WebSocket + wsp := GetWebSocketProxy() + if wsp == nil { + result["websocket"] = map[string]interface{}{"enabled": false} + } else { + result["websocket"] = wsp.GetStats() + } + + return result +} + var startTime = time.Now() diff --git a/admin_dashboard_test.go b/admin_dashboard_test.go index 90bd480..3a00f10 100644 --- a/admin_dashboard_test.go +++ b/admin_dashboard_test.go @@ -425,13 +425,13 @@ func TestGetAdminMetricValue(t *testing.T) { } // Test with valid metric - value := getAdminMetricValue("graphql_proxy_succeeded_total") + value := getAdminMetricValue("requests_succesful") assert.GreaterOrEqual(t, value, int64(0)) // Test with nil config oldCfg := cfg cfg = nil - value = getAdminMetricValue("graphql_proxy_succeeded_total") + value = getAdminMetricValue("requests_succesful") assert.Equal(t, int64(0), value) cfg = oldCfg } diff --git a/main.go b/main.go index f912ac0..f322ee1 100644 --- a/main.go +++ b/main.go @@ -319,6 +319,39 @@ func parseConfig() { } } + // Initialize metrics aggregator FIRST if Redis is enabled (even if cache is disabled) + // This allows cluster mode monitoring even when cache is off + if cfg.Cache.CacheRedisEnable { + cfg.Logger.Info(&libpack_logging.LogMessage{ + Message: "Initializing metrics aggregator for cluster mode", + Pairs: map[string]interface{}{ + "redis_url": cfg.Cache.CacheRedisURL, + "redis_db": cfg.Cache.CacheRedisDB, + }, + }) + + if err := InitializeMetricsAggregator( + cfg.Cache.CacheRedisURL, + cfg.Cache.CacheRedisPassword, + cfg.Cache.CacheRedisDB, + cfg.Logger, + ); err != nil { + cfg.Logger.Error(&libpack_logging.LogMessage{ + Message: "FAILED to initialize metrics aggregator - cluster mode will not work", + Pairs: map[string]interface{}{ + "error": err.Error(), + }, + }) + } else { + cfg.Logger.Info(&libpack_logging.LogMessage{ + Message: "✓ Metrics aggregator successfully initialized", + Pairs: map[string]interface{}{ + "instance_id": GetMetricsAggregator().GetInstanceID(), + }, + }) + } + } + // Initialize cache if enabled if cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable { cacheConfig := &libpack_cache.CacheConfig{ @@ -487,6 +520,14 @@ func main() { return nil }) + // Register metrics aggregator for cleanup + shutdownManager.RegisterComponent("metrics-aggregator", func(ctx context.Context) error { + if aggregator := GetMetricsAggregator(); aggregator != nil { + aggregator.Shutdown() + } + return nil + }) + // Cache shutdown is handled internally by the cache implementation // Start monitoring server diff --git a/metrics_aggregator.go b/metrics_aggregator.go new file mode 100644 index 0000000..8bb9c68 --- /dev/null +++ b/metrics_aggregator.go @@ -0,0 +1,805 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/google/uuid" + libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging" + "github.com/redis/go-redis/v9" +) + +// MetricsAggregator handles distributed metrics collection via Redis +type MetricsAggregator struct { + redisClient *redis.Client + logger *libpack_logger.Logger + instanceID string + publishKey string + ttl time.Duration + publishTimer *time.Ticker + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex +} + +// InstanceMetrics represents metrics for a single proxy instance +type InstanceMetrics struct { + InstanceID string `json:"instance_id"` + Hostname string `json:"hostname"` + LastUpdate time.Time `json:"last_update"` + UptimeSeconds float64 `json:"uptime_seconds"` + Stats map[string]interface{} `json:"stats"` + Cache map[string]interface{} `json:"cache,omitempty"` // Full cache details including memory + CacheSummary map[string]interface{} `json:"cache_summary,omitempty"` // Deprecated: kept for compatibility + Health map[string]interface{} `json:"health"` + CircuitBreaker map[string]interface{} `json:"circuit_breaker,omitempty"` + RetryBudget map[string]interface{} `json:"retry_budget,omitempty"` + Coalescing map[string]interface{} `json:"coalescing,omitempty"` + WebSocketStats map[string]interface{} `json:"websocket,omitempty"` + Connections map[string]interface{} `json:"connections,omitempty"` +} + +// AggregatedMetrics represents combined metrics from all instances +type AggregatedMetrics struct { + TotalInstances int `json:"total_instances"` + HealthyInstances int `json:"healthy_instances"` + LastUpdate time.Time `json:"last_update"` + CombinedStats map[string]interface{} `json:"combined_stats"` + Instances []InstanceMetrics `json:"instances"` + PerInstanceStats map[string]InstanceMetrics `json:"per_instance_stats"` +} + +var ( + metricsAggregator *MetricsAggregator + aggregatorMutex sync.RWMutex +) + +// InitializeMetricsAggregator creates and starts the metrics aggregator +func InitializeMetricsAggregator(redisURL, redisPassword string, redisDB int, logger *libpack_logger.Logger) error { + aggregatorMutex.Lock() + defer aggregatorMutex.Unlock() + + if metricsAggregator != nil { + return nil // Already initialized + } + + // Parse Redis URL + opt, err := redis.ParseURL(fmt.Sprintf("redis://%s/%d", redisURL, redisDB)) + if err != nil { + return fmt.Errorf("failed to parse Redis URL: %w", err) + } + + if redisPassword != "" { + opt.Password = redisPassword + } + + // Create Redis client with connection timeouts + opt.DialTimeout = 2 * time.Second + opt.ReadTimeout = 2 * time.Second + opt.WriteTimeout = 2 * time.Second + opt.PoolTimeout = 3 * time.Second + opt.MaxRetries = 2 + + client := redis.NewClient(opt) + + // Test connection with detailed error reporting + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + // Log detailed connection error + if logger != nil { + logger.Error(&libpack_logger.LogMessage{ + Message: "❌ CRITICAL: Redis connection test FAILED during initialization", + Pairs: map[string]interface{}{ + "error": err.Error(), + "redis_url": redisURL, + "redis_db": redisDB, + "has_password": redisPassword != "", + }, + }) + } + return fmt.Errorf("failed to connect to Redis: %w", err) + } + + // Log successful connection + if logger != nil { + logger.Info(&libpack_logger.LogMessage{ + Message: "✓ Redis connection test PASSED", + Pairs: map[string]interface{}{ + "redis_url": redisURL, + "redis_db": redisDB, + }, + }) + } + + // Generate unique instance ID (hostname + UUID for uniqueness) + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "unknown" + } + instanceID := fmt.Sprintf("%s-%s", hostname, uuid.New().String()[:8]) + + ctx, cancel = context.WithCancel(context.Background()) + + aggregator := &MetricsAggregator{ + redisClient: client, + logger: logger, + instanceID: instanceID, + publishKey: "graphql-proxy:metrics:instances", + ttl: 30 * time.Second, // Metrics expire after 30s if not updated + publishTimer: time.NewTicker(5 * time.Second), + ctx: ctx, + cancel: cancel, + } + + metricsAggregator = aggregator + + // Start publishing metrics + go aggregator.startPublishing() + + if logger != nil { + logger.Info(&libpack_logger.LogMessage{ + Message: "Metrics aggregator initialized", + Pairs: map[string]interface{}{ + "instance_id": instanceID, + "redis_url": redisURL, + "publish_key": aggregator.publishKey, + }, + }) + } + + return nil +} + +// GetMetricsAggregator returns the singleton instance +func GetMetricsAggregator() *MetricsAggregator { + aggregatorMutex.RLock() + defer aggregatorMutex.RUnlock() + return metricsAggregator +} + +// startPublishing periodically publishes metrics to Redis +func (ma *MetricsAggregator) startPublishing() { + defer ma.publishTimer.Stop() + + // Publish immediately on start + ma.publishMetrics() + + for { + select { + case <-ma.ctx.Done(): + // Clean up our metrics on shutdown + ma.removeInstanceMetrics() + return + case <-ma.publishTimer.C: + ma.publishMetrics() + } + } +} + +// publishMetrics collects current metrics and stores them in Redis +// Note: This is exported for testing/debugging via admin API +func (ma *MetricsAggregator) publishMetrics() { + // Defensive: check if aggregator is still valid + if ma == nil { + return + } + + ma.mu.RLock() + defer ma.mu.RUnlock() + + // Safety check: ensure global config is initialized + if cfg == nil { + if ma.logger != nil { + ma.logger.Warning(&libpack_logger.LogMessage{ + Message: "Cannot publish metrics - global config not initialized yet", + Pairs: map[string]interface{}{ + "instance_id": ma.instanceID, + }, + }) + } + return + } + + // Gather all stats using the admin dashboard's method + dashboard := NewAdminDashboard(ma.logger) + allStats := dashboard.gatherAllStats() + + if len(allStats) == 0 { + if ma.logger != nil { + ma.logger.Warning(&libpack_logger.LogMessage{ + Message: "gatherAllStats returned empty/nil result", + Pairs: map[string]interface{}{ + "instance_id": ma.instanceID, + }, + }) + } + return + } + + // Create instance metrics + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "unknown" + } + + metrics := InstanceMetrics{ + InstanceID: ma.instanceID, + Hostname: hostname, + LastUpdate: time.Now(), + UptimeSeconds: time.Since(startTime).Seconds(), + } + + // Extract specific sections - CRITICAL: we must set the correct structure + // Stats should contain the inner stats object with requests, cache_summary, etc. + if stats, ok := allStats["stats"].(map[string]interface{}); ok { + metrics.Stats = stats + + // Also extract cache summary separately for easier access (deprecated but kept for compatibility) + if cacheSummary, ok := stats["cache_summary"].(map[string]interface{}); ok { + metrics.CacheSummary = cacheSummary + } + + } else { + // Fallback: if stats extraction fails, use empty map + if ma.logger != nil { + ma.logger.Error(&libpack_logger.LogMessage{ + Message: "Failed to extract stats from allStats - using empty stats", + Pairs: map[string]interface{}{ + "instance_id": ma.instanceID, + "allStats_keys": func() []string { + keys := make([]string, 0, len(allStats)) + for k := range allStats { + keys = append(keys, k) + } + return keys + }(), + }, + }) + } + metrics.Stats = make(map[string]interface{}) + } + + // Extract full cache details (includes memory usage) + if cache, ok := allStats["cache"].(map[string]interface{}); ok { + metrics.Cache = cache + } + + if health, ok := allStats["health"].(map[string]interface{}); ok { + metrics.Health = health + } else { + metrics.Health = make(map[string]interface{}) + } + if cb, ok := allStats["circuit_breaker"].(map[string]interface{}); ok { + metrics.CircuitBreaker = cb + } + if rb, ok := allStats["retry_budget"].(map[string]interface{}); ok { + metrics.RetryBudget = rb + } + if coal, ok := allStats["coalescing"].(map[string]interface{}); ok { + metrics.Coalescing = coal + } + if ws, ok := allStats["websocket"].(map[string]interface{}); ok { + metrics.WebSocketStats = ws + } + if conn, ok := allStats["connections"].(map[string]interface{}); ok { + metrics.Connections = conn + } + + // Marshal to JSON + data, err := json.Marshal(metrics) + if err != nil { + if ma.logger != nil { + ma.logger.Error(&libpack_logger.LogMessage{ + Message: "Failed to marshal metrics for Redis", + Pairs: map[string]interface{}{"error": err.Error()}, + }) + } + return + } + + // Store in Redis hash with TTL + key := fmt.Sprintf("%s:%s", ma.publishKey, ma.instanceID) + + // Create a fresh context with timeout to avoid inheriting cancelled parent context + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + pipe := ma.redisClient.Pipeline() + pipe.Set(ctx, key, data, ma.ttl) + pipe.SAdd(ctx, ma.publishKey, ma.instanceID) + pipe.Expire(ctx, ma.publishKey, ma.ttl*2) // Keep set alive + + _, err = pipe.Exec(ctx) + if err != nil { + if ma.logger != nil { + ma.logger.Error(&libpack_logger.LogMessage{ + Message: "❌ CRITICAL: Failed to publish metrics to Redis - cluster mode will not work!", + Pairs: map[string]interface{}{ + "error": err.Error(), + "instance_id": ma.instanceID, + "key": key, + "redis_key": ma.publishKey, + }, + }) + } + return + } +} + +// removeInstanceMetrics cleans up metrics from Redis on shutdown +func (ma *MetricsAggregator) removeInstanceMetrics() { + // Create a fresh context with timeout for cleanup + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + key := fmt.Sprintf("%s:%s", ma.publishKey, ma.instanceID) + pipe := ma.redisClient.Pipeline() + pipe.Del(ctx, key) + pipe.SRem(ctx, ma.publishKey, ma.instanceID) + _, err := pipe.Exec(ctx) + + if err != nil && ma.logger != nil { + ma.logger.Warning(&libpack_logger.LogMessage{ + Message: "Failed to remove instance metrics from Redis during shutdown", + Pairs: map[string]interface{}{"instance_id": ma.instanceID, "error": err.Error()}, + }) + return + } + + if ma.logger != nil { + ma.logger.Info(&libpack_logger.LogMessage{ + Message: "Removed instance metrics from Redis", + Pairs: map[string]interface{}{"instance_id": ma.instanceID}, + }) + } +} + +// GetAggregatedMetrics retrieves and aggregates metrics from all instances +func (ma *MetricsAggregator) GetAggregatedMetrics() (*AggregatedMetrics, error) { + // Create a fresh context with timeout to avoid inheriting cancelled parent context + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Get all instance IDs + instanceIDs, err := ma.redisClient.SMembers(ctx, ma.publishKey).Result() + if err != nil { + return nil, fmt.Errorf("failed to get instance list: %w", err) + } + + if len(instanceIDs) == 0 { + return &AggregatedMetrics{ + TotalInstances: 0, + HealthyInstances: 0, + LastUpdate: time.Now(), + CombinedStats: make(map[string]interface{}), + Instances: []InstanceMetrics{}, + PerInstanceStats: make(map[string]InstanceMetrics), + }, nil + } + + // Fetch metrics for all instances + pipe := ma.redisClient.Pipeline() + cmds := make([]*redis.StringCmd, len(instanceIDs)) + for i, instanceID := range instanceIDs { + key := fmt.Sprintf("%s:%s", ma.publishKey, instanceID) + cmds[i] = pipe.Get(ctx, key) + } + pipe.Exec(ctx) + + // Parse metrics + instances := make([]InstanceMetrics, 0, len(instanceIDs)) + perInstance := make(map[string]InstanceMetrics) + healthyCount := 0 + staleCount := 0 + errorCount := 0 + + for i, cmd := range cmds { + data, err := cmd.Result() + if err != nil { + errorCount++ + // Clean up stale instance ID from the set + if err == redis.Nil { + staleCount++ + // Remove stale instance from set in background + go func(instID string) { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cleanupCancel() + ma.redisClient.SRem(cleanupCtx, ma.publishKey, instID) + }(instanceIDs[i]) + } + continue + } + + var metrics InstanceMetrics + if err := json.Unmarshal([]byte(data), &metrics); err != nil { + if ma.logger != nil { + ma.logger.Warning(&libpack_logger.LogMessage{ + Message: "Failed to unmarshal instance metrics", + Pairs: map[string]interface{}{"error": err.Error()}, + }) + } + continue + } + + // Check if instance is stale (not updated in 1 minute) + instanceAge := time.Since(metrics.LastUpdate) + if instanceAge > 1*time.Minute { + staleCount++ + // Clean up stale instance from set in background + go func(instID string, age time.Duration) { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cleanupCancel() + ma.redisClient.SRem(cleanupCtx, ma.publishKey, instID) + if ma.logger != nil { + ma.logger.Info(&libpack_logger.LogMessage{ + Message: "Removed inactive instance", + Pairs: map[string]interface{}{ + "instance_id": instID, + "inactive_seconds": age.Seconds(), + }, + }) + } + }(instanceIDs[i], instanceAge) + continue // Skip stale instances + } + + instances = append(instances, metrics) + perInstance[metrics.InstanceID] = metrics + + // Count healthy instances + if health, ok := metrics.Health["status"].(string); ok && health == "healthy" { + healthyCount++ + } + } + + // Log cleanup stats if we found stale instances + if ma.logger != nil && (staleCount > 0 || errorCount > 0) { + ma.logger.Info(&libpack_logger.LogMessage{ + Message: "Cleaned up stale instance IDs from Redis", + Pairs: map[string]interface{}{ + "total_in_set": len(instanceIDs), + "valid_instances": len(instances), + "stale_cleaned": staleCount, + "errors": errorCount, + }, + }) + } + + // Aggregate statistics + aggregated := &AggregatedMetrics{ + TotalInstances: len(instances), + HealthyInstances: healthyCount, + LastUpdate: time.Now(), + CombinedStats: ma.aggregateStats(instances), + Instances: instances, + PerInstanceStats: perInstance, + } + + return aggregated, nil +} + +// aggregateStats combines statistics from multiple instances +func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[string]interface{} { + if len(instances) == 0 { + if ma.logger != nil { + ma.logger.Warning(&libpack_logger.LogMessage{ + Message: "No instances to aggregate", + }) + } + return make(map[string]interface{}) + } + + // Initialize aggregated values + var ( + totalRequests int64 + totalSucceeded int64 + totalFailed int64 + totalSkipped int64 + totalCacheHits int64 + totalCacheMisses int64 + totalCachedQueries int64 + totalMemoryUsageMB float64 + totalCurrentRPS float64 + totalAvgRPS float64 + totalActiveConnections int64 + totalWSConnections int64 + totalCoalescedRequests int64 + totalPrimaryRequests int64 + oldestUptime float64 + + // Retry budget stats + totalRetryAllowed int64 + totalRetryDenied int64 + totalRetryAttempts int64 + retryBudgetEnabled = false + + // Circuit breaker stats + cbOpenCount int + cbHalfOpenCount int + cbClosedCount int + circuitBreakerEnabled = false + ) + + for idx, instance := range instances { + // Track oldest uptime for cluster uptime + if oldestUptime == 0 || instance.UptimeSeconds < oldestUptime { + oldestUptime = instance.UptimeSeconds + } + + // Aggregate request stats + if instance.Stats == nil { + if ma.logger != nil { + ma.logger.Warning(&libpack_logger.LogMessage{ + Message: "Instance has nil Stats", + Pairs: map[string]interface{}{ + "instance_id": instance.InstanceID, + "index": idx, + }, + }) + } + continue + } + + if stats, ok := instance.Stats["requests"].(map[string]interface{}); ok { + if total, ok := stats["total"].(float64); ok { + totalRequests += int64(total) + } + if succeeded, ok := stats["succeeded"].(float64); ok { + totalSucceeded += int64(succeeded) + } + if failed, ok := stats["failed"].(float64); ok { + totalFailed += int64(failed) + } + if skipped, ok := stats["skipped"].(float64); ok { + totalSkipped += int64(skipped) + } + if currentRPS, ok := stats["current_requests_per_second"].(float64); ok { + totalCurrentRPS += currentRPS + } + if avgRPS, ok := stats["avg_requests_per_second"].(float64); ok { + totalAvgRPS += avgRPS + } + } else { + if ma.logger != nil { + // Log what keys are actually in Stats for debugging + keys := make([]string, 0, len(instance.Stats)) + for k := range instance.Stats { + keys = append(keys, k) + } + ma.logger.Warning(&libpack_logger.LogMessage{ + Message: "Instance Stats missing 'requests' key", + Pairs: map[string]interface{}{ + "instance_id": instance.InstanceID, + "stats_keys": keys, + "index": idx, + }, + }) + } + } + + // Aggregate cache stats from CacheSummary (backward compatibility) + if len(instance.CacheSummary) > 0 { + if hits, ok := instance.CacheSummary["hits"].(float64); ok { + totalCacheHits += int64(hits) + } + if misses, ok := instance.CacheSummary["misses"].(float64); ok { + totalCacheMisses += int64(misses) + } + if cached, ok := instance.CacheSummary["total_cached"].(float64); ok { + totalCachedQueries += int64(cached) + } + } + + // Aggregate memory usage from full cache details + if len(instance.Cache) > 0 { + if memMB, ok := instance.Cache["memory_usage_mb"].(float64); ok { + totalMemoryUsageMB += memMB + } + } + + // Aggregate connection stats + if len(instance.Connections) > 0 { + if active, ok := instance.Connections["active_connections"].(float64); ok { + totalActiveConnections += int64(active) + } + } + + // Aggregate WebSocket connections + if len(instance.WebSocketStats) > 0 { + if active, ok := instance.WebSocketStats["active_connections"].(float64); ok { + totalWSConnections += int64(active) + } + } + + // Aggregate coalescing stats + if len(instance.Coalescing) > 0 { + if coalesced, ok := instance.Coalescing["coalesced_requests"].(float64); ok { + totalCoalescedRequests += int64(coalesced) + } + if primary, ok := instance.Coalescing["primary_requests"].(float64); ok { + totalPrimaryRequests += int64(primary) + } + } + + // Aggregate retry budget stats + if len(instance.RetryBudget) > 0 { + if enabled, ok := instance.RetryBudget["enabled"].(bool); ok && enabled { + retryBudgetEnabled = true + } + if allowed, ok := instance.RetryBudget["allowed_retries"].(float64); ok { + totalRetryAllowed += int64(allowed) + } + if denied, ok := instance.RetryBudget["denied_retries"].(float64); ok { + totalRetryDenied += int64(denied) + } + if attempts, ok := instance.RetryBudget["total_attempts"].(float64); ok { + totalRetryAttempts += int64(attempts) + } + } + + // Aggregate circuit breaker stats + if len(instance.CircuitBreaker) > 0 { + if enabled, ok := instance.CircuitBreaker["enabled"].(bool); ok && enabled { + circuitBreakerEnabled = true + } + if state, ok := instance.CircuitBreaker["state"].(string); ok { + switch state { + case "open": + cbOpenCount++ + case "half-open": + cbHalfOpenCount++ + case "closed": + cbClosedCount++ + } + } + } + } + + // Calculate derived metrics + successRate := 0.0 + if totalRequests > 0 { + successRate = float64(totalSucceeded) / float64(totalRequests) * 100 + } + + cacheHitRate := 0.0 + totalCacheRequests := totalCacheHits + totalCacheMisses + if totalCacheRequests > 0 { + cacheHitRate = float64(totalCacheHits) / float64(totalCacheRequests) * 100 + } + + backendSavings := 0.0 + totalCoalRequests := totalCoalescedRequests + totalPrimaryRequests + if totalCoalRequests > 0 { + backendSavings = float64(totalCoalescedRequests) / float64(totalCoalRequests) * 100 + } + + // Calculate retry budget denial rate + retryDenialRate := 0.0 + if totalRetryAttempts > 0 { + retryDenialRate = float64(totalRetryDenied) / float64(totalRetryAttempts) * 100 + } + + // Determine overall circuit breaker state + cbState := "unknown" + if circuitBreakerEnabled { + if cbOpenCount > 0 { + cbState = "open" // If any instance is open, cluster is in degraded state + } else if cbHalfOpenCount > 0 { + cbState = "half-open" + } else if cbClosedCount > 0 { + cbState = "closed" + } + } + + result := map[string]interface{}{ + "cluster_mode": true, + "total_instances": len(instances), + "cluster_uptime": oldestUptime, + "requests": map[string]interface{}{ + "total": totalRequests, + "succeeded": totalSucceeded, + "failed": totalFailed, + "skipped": totalSkipped, + "success_rate_pct": successRate, + "current_requests_per_second": totalCurrentRPS, + "avg_requests_per_second": totalAvgRPS, + }, + "cache_summary": map[string]interface{}{ + "hits": totalCacheHits, + "misses": totalCacheMisses, + "hit_rate_pct": cacheHitRate, + "total_cached": totalCachedQueries, + }, + "memory": map[string]interface{}{ + "total_usage_mb": totalMemoryUsageMB, + }, + "connections": map[string]interface{}{ + "total_active": totalActiveConnections, + }, + "websocket": map[string]interface{}{ + "total_connections": totalWSConnections, + }, + "coalescing": map[string]interface{}{ + "enabled": len(instances) > 0, // enabled if we have instances with data + "total_coalesced_requests": totalCoalescedRequests, + "total_primary_requests": totalPrimaryRequests, + "backend_savings_pct": backendSavings, + "coalescing_rate_pct": backendSavings, + }, + "retry_budget": map[string]interface{}{ + "enabled": retryBudgetEnabled, + "allowed_retries": totalRetryAllowed, + "denied_retries": totalRetryDenied, + "total_attempts": totalRetryAttempts, + "denial_rate_pct": retryDenialRate, + }, + "circuit_breaker": map[string]interface{}{ + "enabled": circuitBreakerEnabled, + "state": cbState, + "instances_open": cbOpenCount, + "instances_closed": cbClosedCount, + "instances_halfopen": cbHalfOpenCount, + }, + } + + return result +} + +// Shutdown stops the metrics aggregator +func (ma *MetricsAggregator) Shutdown() { + ma.mu.Lock() + defer ma.mu.Unlock() + + if ma.cancel != nil { + ma.cancel() + } + + if ma.redisClient != nil { + ma.redisClient.Close() + } + + if ma.logger != nil { + ma.logger.Info(&libpack_logger.LogMessage{ + Message: "Metrics aggregator shut down", + }) + } +} + +// GetInstanceID returns the current instance ID +func (ma *MetricsAggregator) GetInstanceID() string { + return ma.instanceID +} + +// IsClusterMode returns true if multiple instances are detected +func (ma *MetricsAggregator) IsClusterMode() bool { + // Create a fresh context with timeout to avoid inheriting cancelled parent context + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + count, err := ma.redisClient.SCard(ctx, ma.publishKey).Result() + if err != nil { + return false + } + + return count > 1 +} + +// GetInstanceHostname returns a human-readable instance identifier +func GetInstanceHostname() string { + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "unknown" + } + // Remove domain suffix for cleaner display + if idx := strings.Index(hostname, "."); idx > 0 { + hostname = hostname[:idx] + } + return hostname +}