mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-05 23:03:48 +00:00
c2c75d69c0
Performance / resource usage: - circuit_breaker_metrics: fix data race on failCounters map (RWMutex + double-checked locking) - server.go: drop user_id and op_name metric labels (Prometheus cardinality bound); de-duplicate extractUserInfo - graphql.go: gate runtime.ReadMemStats per-request behind ENABLE_ALLOCATION_TRACKING flag (default off) - graphql.go: collapse two-pass AST scan into single pass; lower-case once - sanitization.go: cache compiled redaction regexes per pattern via sync.Map; hoist inner constants to pkg vars - proxy.go: hoist connection/timeout substrings to pkg vars; sentinel errors for static error paths; drop dead Headers map alloc - metrics_aggregator.go: log-field allocation guarded by Logger.IsLevelEnabled - logging/logger.go: add IsLevelEnabled helper - lru_cache.go: 16-shard sharding, FNV-1a routing (concurrent throughput +22%) - cache/memory/lru_memory_cache.go: gzip compress/decompress moved outside mu.Lock - rps_tracker.go: RWMutex+uint64 -> atomic.Uint64 - retry_budget.go: drop unused mutex - api.go: bannedUsersIDs map+RWMutex -> sync.Map (+ snapshot/replace helpers) - tracing/tracing.go: pkg-level constSpanAttrs, copy-then-append in StartSpanWithAttributes - admin_dashboard.go: handleStatsWebSocket reuses bytes.Buffer + json.Encoder per connection Build / runtime: - Makefile: -ldflags="-s -w" -trimpath, CGO_ENABLED=0 for build (=1 for test recipes) - Dockerfile + Dockerfile.goreleaser: ENV GOMEMLIMIT=512MiB - main.go: blank import go.uber.org/automaxprocs (cgroup-aware GOMAXPROCS) - main.go: PPROF_PORT env var wires net/http/pprof on 127.0.0.1 only with full server timeouts - README.md: env-var docs + metric-label docs updated; cardinality note Test coverage push (per package): - main 51.2% -> 74.7% - cache 66.3% -> 93.7% - cache/redis 45.5% -> 98.2% - tracing 66.7% -> 72.9% - (cache/memory 91.6%, logging 91.9%, monitoring 77.6%, pkg/pools 100% unchanged) New test files: coverage_micro_test, coverage_extras_test, server_handlers_test, api_health_test, admin_dashboard_cluster_test, metrics_aggregator_test, concerns_test, cache/cache_coverage_test, cache/redis/redis_coverage_test, tracing/tracing_coverage_test. Bug fix: connection_resilience_test.go TestIntegratedHealthManagement.health_manager_startup was sync.Once-coupled to InitializeBackendHealth and panicked when another test (e.g. via parseConfig) had already triggered Once. Use NewBackendHealthManager directly.
1007 lines
29 KiB
Go
1007 lines
29 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"embed"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2"
|
|
"github.com/gofiber/websocket/v2"
|
|
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
|
|
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
// Admin dashboard constants
|
|
const (
|
|
// WebSocketReadDeadline is the read deadline for WebSocket connections
|
|
WebSocketReadDeadline = 60 * time.Second
|
|
// StatsStreamInterval is the interval for streaming stats updates
|
|
StatsStreamInterval = 2 * time.Second
|
|
)
|
|
|
|
//go:embed admin/dashboard.html
|
|
var dashboardHTML embed.FS
|
|
|
|
// AdminDashboard provides monitoring and management interface
|
|
type AdminDashboard struct {
|
|
logger *libpack_logger.Logger
|
|
}
|
|
|
|
// NewAdminDashboard creates a new admin dashboard
|
|
func NewAdminDashboard(logger *libpack_logger.Logger) *AdminDashboard {
|
|
return &AdminDashboard{
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// RegisterRoutes registers dashboard routes
|
|
func (ad *AdminDashboard) RegisterRoutes(app *fiber.App) {
|
|
// Dashboard UI
|
|
app.Get("/admin", ad.serveDashboard)
|
|
app.Get("/admin/dashboard", ad.serveDashboard)
|
|
|
|
// API endpoints for dashboard data
|
|
app.Get("/admin/api/stats", ad.getStats)
|
|
app.Get("/admin/api/health", ad.getHealth)
|
|
app.Get("/admin/api/circuit-breaker", ad.getCircuitBreakerStatus)
|
|
app.Get("/admin/api/cache", ad.getCacheStats)
|
|
app.Get("/admin/api/connections", ad.getConnectionStats)
|
|
app.Get("/admin/api/retry-budget", ad.getRetryBudgetStats)
|
|
app.Get("/admin/api/coalescing", ad.getCoalescingStats)
|
|
app.Get("/admin/api/websocket", ad.getWebSocketStats)
|
|
|
|
// WebSocket endpoint for streaming statistics
|
|
app.Get("/admin/ws/stats", websocket.New(ad.handleStatsWebSocket))
|
|
|
|
// Cluster mode endpoints (when using Redis)
|
|
app.Get("/admin/api/cluster/stats", ad.getClusterStats)
|
|
app.Get("/admin/api/cluster/instances", ad.getClusterInstances)
|
|
app.Get("/admin/api/cluster/debug", ad.getClusterDebug)
|
|
app.Post("/admin/api/cluster/force-publish", ad.forcePublish)
|
|
|
|
// Control endpoints
|
|
app.Post("/admin/api/cache/clear", ad.clearCache)
|
|
app.Post("/admin/api/retry-budget/reset", ad.resetRetryBudget)
|
|
app.Post("/admin/api/coalescing/reset", ad.resetCoalescing)
|
|
|
|
if ad.logger != nil {
|
|
ad.logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Admin dashboard routes registered",
|
|
Pairs: map[string]any{
|
|
"path": "/admin",
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// serveDashboard serves the dashboard HTML
|
|
func (ad *AdminDashboard) serveDashboard(c *fiber.Ctx) error {
|
|
data, err := dashboardHTML.ReadFile("admin/dashboard.html")
|
|
if err != nil {
|
|
return c.Status(500).SendString("Failed to load dashboard")
|
|
}
|
|
|
|
c.Set("Content-Type", "text/html; charset=utf-8")
|
|
return c.Send(data)
|
|
}
|
|
|
|
// getStats returns overall proxy statistics
|
|
// In cluster mode (when metrics aggregator is available), returns aggregated stats from all instances
|
|
func (ad *AdminDashboard) getStats(c *fiber.Ctx) error {
|
|
// Check if cluster mode is enabled - if so, return aggregated stats
|
|
if aggregator := GetMetricsAggregator(); aggregator != nil {
|
|
metrics, err := aggregator.GetAggregatedMetrics()
|
|
if err != nil {
|
|
if ad.logger != nil {
|
|
ad.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to get aggregated metrics, falling back to local stats",
|
|
Pairs: map[string]any{"error": err.Error()},
|
|
})
|
|
}
|
|
// Fall through to local stats on error
|
|
} else {
|
|
// Return aggregated cluster stats
|
|
response := map[string]any{
|
|
"cluster_mode": true,
|
|
"total_instances": metrics.TotalInstances,
|
|
"healthy_instances": metrics.HealthyInstances,
|
|
"timestamp": metrics.LastUpdate.Format(time.RFC3339),
|
|
"version": libpack_config.PKG_VERSION,
|
|
}
|
|
|
|
// Add combined stats from aggregation
|
|
if metrics.CombinedStats != nil {
|
|
for k, v := range metrics.CombinedStats {
|
|
response[k] = v
|
|
}
|
|
}
|
|
|
|
return c.JSON(response)
|
|
}
|
|
}
|
|
|
|
// Local instance stats (fallback or non-cluster mode)
|
|
uptimeSeconds := time.Since(startTime).Seconds()
|
|
stats := map[string]any{
|
|
"cluster_mode": false,
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
"uptime_seconds": uptimeSeconds,
|
|
"uptime_human": formatDuration(time.Since(startTime)),
|
|
"version": libpack_config.PKG_VERSION,
|
|
}
|
|
|
|
if cfg != nil && cfg.Monitoring != nil {
|
|
succeeded := getAdminMetricValue("requests_succesful")
|
|
failed := getAdminMetricValue("requests_failed")
|
|
skipped := getAdminMetricValue("requests_skipped")
|
|
total := succeeded + failed + skipped
|
|
|
|
// Request statistics
|
|
requestStats := map[string]any{
|
|
"total": total,
|
|
"succeeded": succeeded,
|
|
"failed": failed,
|
|
"skipped": skipped,
|
|
}
|
|
|
|
// Calculate rates and percentages
|
|
if total > 0 {
|
|
requestStats["success_rate_pct"] = float64(succeeded) / float64(total) * 100
|
|
requestStats["failure_rate_pct"] = float64(failed) / float64(total) * 100
|
|
requestStats["skip_rate_pct"] = float64(skipped) / float64(total) * 100
|
|
} else {
|
|
requestStats["success_rate_pct"] = 0.0
|
|
requestStats["failure_rate_pct"] = 0.0
|
|
requestStats["skip_rate_pct"] = 0.0
|
|
}
|
|
|
|
// Calculate average requests per second (lifetime)
|
|
if uptimeSeconds > 0 {
|
|
requestStats["avg_requests_per_second"] = float64(total) / uptimeSeconds
|
|
} else {
|
|
requestStats["avg_requests_per_second"] = 0.0
|
|
}
|
|
|
|
// Get current requests per second (last 1 second)
|
|
if rpsTracker := GetRPSTracker(); rpsTracker != nil {
|
|
requestStats["current_requests_per_second"] = rpsTracker.GetCurrentRPS()
|
|
} else {
|
|
requestStats["current_requests_per_second"] = 0.0
|
|
}
|
|
|
|
stats["requests"] = requestStats
|
|
|
|
// Get cache statistics summary
|
|
cacheStats := libpack_cache.GetCacheStats()
|
|
if cacheStats != nil {
|
|
totalCacheRequests := cacheStats.CacheHits + cacheStats.CacheMisses
|
|
hitRate := 0.0
|
|
if totalCacheRequests > 0 {
|
|
hitRate = float64(cacheStats.CacheHits) / float64(totalCacheRequests) * 100
|
|
}
|
|
stats["cache_summary"] = map[string]any{
|
|
"hits": cacheStats.CacheHits,
|
|
"misses": cacheStats.CacheMisses,
|
|
"hit_rate_pct": hitRate,
|
|
"total_cached": cacheStats.CachedQueries,
|
|
}
|
|
}
|
|
}
|
|
|
|
return c.JSON(stats)
|
|
}
|
|
|
|
// formatDuration formats a duration into human-readable format
|
|
func formatDuration(d time.Duration) string {
|
|
days := int(d.Hours() / 24)
|
|
hours := int(d.Hours()) % 24
|
|
minutes := int(d.Minutes()) % 60
|
|
seconds := int(d.Seconds()) % 60
|
|
|
|
if days > 0 {
|
|
return fmt.Sprintf("%dd %dh %dm %ds", days, hours, minutes, seconds)
|
|
} else if hours > 0 {
|
|
return fmt.Sprintf("%dh %dm %ds", hours, minutes, seconds)
|
|
} else if minutes > 0 {
|
|
return fmt.Sprintf("%dm %ds", minutes, seconds)
|
|
}
|
|
return fmt.Sprintf("%ds", seconds)
|
|
}
|
|
|
|
// getHealth returns health status
|
|
func (ad *AdminDashboard) getHealth(c *fiber.Ctx) error {
|
|
healthMgr := GetBackendHealthManager()
|
|
|
|
health := map[string]any{
|
|
"status": "unknown",
|
|
"backend": map[string]any{
|
|
"healthy": false,
|
|
},
|
|
}
|
|
|
|
if healthMgr != nil {
|
|
isHealthy := healthMgr.IsHealthy()
|
|
health["backend"] = map[string]any{
|
|
"healthy": isHealthy,
|
|
"consecutive_failures": healthMgr.GetConsecutiveFailures(),
|
|
"last_check": healthMgr.GetLastHealthCheck().Format(time.RFC3339),
|
|
}
|
|
|
|
if isHealthy {
|
|
health["status"] = "healthy"
|
|
} else {
|
|
health["status"] = "unhealthy"
|
|
}
|
|
}
|
|
|
|
return c.JSON(health)
|
|
}
|
|
|
|
// getCircuitBreakerStatus returns circuit breaker status
|
|
func (ad *AdminDashboard) getCircuitBreakerStatus(c *fiber.Ctx) error {
|
|
status := map[string]any{
|
|
"enabled": false,
|
|
"state": "unknown",
|
|
}
|
|
|
|
if cfg != nil {
|
|
status["enabled"] = cfg.CircuitBreaker.Enable
|
|
|
|
if cb != nil {
|
|
cbMutex.RLock()
|
|
state := cb.State()
|
|
counts := cb.Counts()
|
|
cbMutex.RUnlock()
|
|
|
|
status["state"] = state.String()
|
|
status["counts"] = map[string]any{
|
|
"requests": counts.Requests,
|
|
"total_successes": counts.TotalSuccesses,
|
|
"total_failures": counts.TotalFailures,
|
|
"consecutive_successes": counts.ConsecutiveSuccesses,
|
|
"consecutive_failures": counts.ConsecutiveFailures,
|
|
}
|
|
status["config"] = map[string]any{
|
|
"max_failures": cfg.CircuitBreaker.MaxFailures,
|
|
"failure_ratio": cfg.CircuitBreaker.FailureRatio,
|
|
"timeout": cfg.CircuitBreaker.Timeout,
|
|
"max_requests_half_open": cfg.CircuitBreaker.MaxRequestsInHalfOpen,
|
|
"return_cached_on_open": cfg.CircuitBreaker.ReturnCachedOnOpen,
|
|
}
|
|
}
|
|
}
|
|
|
|
return c.JSON(status)
|
|
}
|
|
|
|
// getCacheStats returns cache statistics
|
|
// In cluster mode, returns aggregated cache stats from all instances
|
|
func (ad *AdminDashboard) getCacheStats(c *fiber.Ctx) error {
|
|
// Check if cluster mode is enabled - if so, return aggregated cache stats
|
|
if aggregator := GetMetricsAggregator(); aggregator != nil {
|
|
metrics, err := aggregator.GetAggregatedMetrics()
|
|
if err != nil {
|
|
if ad.logger != nil {
|
|
ad.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to get aggregated cache metrics, falling back to local stats",
|
|
Pairs: map[string]any{"error": err.Error()},
|
|
})
|
|
}
|
|
// Fall through to local stats on error
|
|
} else {
|
|
// Build aggregated cache stats from combined stats
|
|
response := map[string]any{
|
|
"cluster_mode": true,
|
|
"total_instances": metrics.TotalInstances,
|
|
}
|
|
|
|
// Add cache config from local config
|
|
if cfg != nil {
|
|
response["enabled"] = cfg.Cache.CacheEnable
|
|
response["redis_enabled"] = cfg.Cache.CacheRedisEnable
|
|
response["ttl_seconds"] = cfg.Cache.CacheTTL
|
|
response["max_memory_mb"] = cfg.Cache.CacheMaxMemorySize
|
|
response["max_entries"] = cfg.Cache.CacheMaxEntries
|
|
}
|
|
|
|
// Extract aggregated cache stats from combined stats
|
|
if metrics.CombinedStats != nil {
|
|
if cacheHits, ok := metrics.CombinedStats["cache_hits"]; ok {
|
|
response["cache_hits"] = cacheHits
|
|
}
|
|
if cacheMisses, ok := metrics.CombinedStats["cache_misses"]; ok {
|
|
response["cache_misses"] = cacheMisses
|
|
}
|
|
if cachedQueries, ok := metrics.CombinedStats["cached_queries"]; ok {
|
|
response["cached_queries"] = cachedQueries
|
|
}
|
|
if hitRate, ok := metrics.CombinedStats["cache_hit_rate_pct"]; ok {
|
|
response["hit_rate_pct"] = hitRate
|
|
}
|
|
if memoryMB, ok := metrics.CombinedStats["memory_usage_mb"]; ok {
|
|
response["memory_usage_mb"] = memoryMB
|
|
}
|
|
}
|
|
|
|
return c.JSON(response)
|
|
}
|
|
}
|
|
|
|
// Local instance stats (fallback or non-cluster mode)
|
|
stats := map[string]any{
|
|
"cluster_mode": false,
|
|
"enabled": false,
|
|
}
|
|
|
|
if cfg != nil {
|
|
stats["enabled"] = cfg.Cache.CacheEnable
|
|
stats["redis_enabled"] = cfg.Cache.CacheRedisEnable
|
|
stats["ttl_seconds"] = cfg.Cache.CacheTTL
|
|
stats["max_memory_mb"] = cfg.Cache.CacheMaxMemorySize
|
|
stats["max_entries"] = cfg.Cache.CacheMaxEntries
|
|
|
|
// Get runtime cache statistics
|
|
cacheStats := libpack_cache.GetCacheStats()
|
|
if cacheStats != nil {
|
|
stats["cached_queries"] = cacheStats.CachedQueries
|
|
stats["cache_hits"] = cacheStats.CacheHits
|
|
stats["cache_misses"] = cacheStats.CacheMisses
|
|
|
|
// Calculate hit rate
|
|
totalRequests := cacheStats.CacheHits + cacheStats.CacheMisses
|
|
hitRate := 0.0
|
|
if totalRequests > 0 {
|
|
hitRate = float64(cacheStats.CacheHits) / float64(totalRequests) * 100
|
|
}
|
|
stats["hit_rate_pct"] = hitRate
|
|
|
|
// Get memory usage only for in-memory cache
|
|
if cfg.Cache.CacheEnable && !cfg.Cache.CacheRedisEnable {
|
|
memoryUsage := libpack_cache.GetCacheMemoryUsage()
|
|
maxMemory := libpack_cache.GetCacheMaxMemorySize()
|
|
stats["memory_usage_bytes"] = memoryUsage
|
|
stats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024)
|
|
|
|
// Calculate memory usage percentage
|
|
memoryUsagePct := 0.0
|
|
if maxMemory > 0 {
|
|
memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100
|
|
}
|
|
stats["memory_usage_pct"] = memoryUsagePct
|
|
} else {
|
|
// For Redis cache, memory tracking not available per instance
|
|
stats["memory_usage_mb"] = -1 // Sentinel value for "not applicable"
|
|
stats["memory_usage_pct"] = -1
|
|
}
|
|
}
|
|
}
|
|
|
|
return c.JSON(stats)
|
|
}
|
|
|
|
// getConnectionStats returns connection pool statistics
|
|
func (ad *AdminDashboard) getConnectionStats(c *fiber.Ctx) error {
|
|
poolMgr := GetConnectionPoolManager()
|
|
|
|
stats := map[string]any{
|
|
"available": false,
|
|
}
|
|
|
|
if poolMgr != nil {
|
|
stats = poolMgr.GetConnectionStats()
|
|
stats["available"] = true
|
|
}
|
|
|
|
return c.JSON(stats)
|
|
}
|
|
|
|
// getRetryBudgetStats returns retry budget statistics
|
|
func (ad *AdminDashboard) getRetryBudgetStats(c *fiber.Ctx) error {
|
|
rb := GetRetryBudget()
|
|
|
|
if rb == nil {
|
|
return c.JSON(map[string]any{
|
|
"enabled": false,
|
|
})
|
|
}
|
|
|
|
return c.JSON(rb.GetStats())
|
|
}
|
|
|
|
// getCoalescingStats returns request coalescing statistics
|
|
func (ad *AdminDashboard) getCoalescingStats(c *fiber.Ctx) error {
|
|
rc := GetRequestCoalescer()
|
|
|
|
if rc == nil {
|
|
return c.JSON(map[string]any{
|
|
"enabled": false,
|
|
})
|
|
}
|
|
|
|
return c.JSON(rc.GetStats())
|
|
}
|
|
|
|
// getWebSocketStats returns WebSocket statistics
|
|
func (ad *AdminDashboard) getWebSocketStats(c *fiber.Ctx) error {
|
|
wsp := GetWebSocketProxy()
|
|
|
|
if wsp == nil {
|
|
return c.JSON(map[string]any{
|
|
"enabled": false,
|
|
})
|
|
}
|
|
|
|
return c.JSON(wsp.GetStats())
|
|
}
|
|
|
|
// clearCache clears the cache
|
|
func (ad *AdminDashboard) clearCache(c *fiber.Ctx) error {
|
|
libpack_cache.CacheClear()
|
|
return c.JSON(map[string]any{
|
|
"success": true,
|
|
"message": "Cache cleared successfully",
|
|
})
|
|
}
|
|
|
|
// resetRetryBudget resets retry budget statistics
|
|
func (ad *AdminDashboard) resetRetryBudget(c *fiber.Ctx) error {
|
|
rb := GetRetryBudget()
|
|
if rb != nil {
|
|
rb.Reset()
|
|
}
|
|
|
|
return c.JSON(map[string]any{
|
|
"success": true,
|
|
"message": "Retry budget statistics reset",
|
|
})
|
|
}
|
|
|
|
// resetCoalescing resets coalescing statistics
|
|
func (ad *AdminDashboard) resetCoalescing(c *fiber.Ctx) error {
|
|
rc := GetRequestCoalescer()
|
|
if rc != nil {
|
|
rc.Reset()
|
|
}
|
|
|
|
return c.JSON(map[string]any{
|
|
"success": true,
|
|
"message": "Coalescing statistics reset",
|
|
})
|
|
}
|
|
|
|
// getClusterStats returns aggregated statistics from all proxy instances
|
|
func (ad *AdminDashboard) getClusterStats(c *fiber.Ctx) error {
|
|
aggregator := GetMetricsAggregator()
|
|
if aggregator == nil {
|
|
return c.Status(503).JSON(map[string]any{
|
|
"error": "Cluster mode not available",
|
|
"message": "Redis-based metrics aggregation is not enabled",
|
|
"cluster_mode": false,
|
|
})
|
|
}
|
|
|
|
metrics, err := aggregator.GetAggregatedMetrics()
|
|
if err != nil {
|
|
if ad.logger != nil {
|
|
ad.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to get aggregated metrics",
|
|
Pairs: map[string]any{"error": err.Error()},
|
|
})
|
|
}
|
|
return c.Status(500).JSON(map[string]any{
|
|
"error": "Failed to retrieve cluster metrics",
|
|
"message": err.Error(),
|
|
})
|
|
}
|
|
|
|
// Format response similar to regular stats endpoint
|
|
response := map[string]any{
|
|
"cluster_mode": true,
|
|
"total_instances": metrics.TotalInstances,
|
|
"healthy_instances": metrics.HealthyInstances,
|
|
"last_update": metrics.LastUpdate.Format(time.RFC3339),
|
|
"stats": metrics.CombinedStats,
|
|
}
|
|
|
|
return c.JSON(response)
|
|
}
|
|
|
|
// getClusterInstances returns detailed metrics for each proxy instance
|
|
func (ad *AdminDashboard) getClusterInstances(c *fiber.Ctx) error {
|
|
aggregator := GetMetricsAggregator()
|
|
if aggregator == nil {
|
|
return c.Status(503).JSON(map[string]any{
|
|
"error": "Cluster mode not available",
|
|
"message": "Redis-based metrics aggregation is not enabled",
|
|
"cluster_mode": false,
|
|
})
|
|
}
|
|
|
|
metrics, err := aggregator.GetAggregatedMetrics()
|
|
if err != nil {
|
|
if ad.logger != nil {
|
|
ad.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to get instance metrics",
|
|
Pairs: map[string]any{"error": err.Error()},
|
|
})
|
|
}
|
|
return c.Status(500).JSON(map[string]any{
|
|
"error": "Failed to retrieve instance metrics",
|
|
"message": err.Error(),
|
|
})
|
|
}
|
|
|
|
return c.JSON(map[string]any{
|
|
"cluster_mode": true,
|
|
"total_instances": metrics.TotalInstances,
|
|
"healthy_instances": metrics.HealthyInstances,
|
|
"current_instance": aggregator.GetInstanceID(),
|
|
"instances": metrics.Instances,
|
|
})
|
|
}
|
|
|
|
// getClusterDebug returns debug information about cluster mode
|
|
func (ad *AdminDashboard) getClusterDebug(c *fiber.Ctx) error {
|
|
aggregator := GetMetricsAggregator()
|
|
|
|
debug := map[string]any{
|
|
"aggregator_initialized": aggregator != nil,
|
|
"redis_cache_enabled": false,
|
|
}
|
|
|
|
if cfg != nil {
|
|
debug["redis_cache_enabled"] = cfg.Cache.CacheRedisEnable
|
|
debug["cache_enabled"] = cfg.Cache.CacheEnable
|
|
}
|
|
|
|
if aggregator != nil {
|
|
debug["instance_id"] = aggregator.GetInstanceID()
|
|
debug["is_cluster_mode"] = aggregator.IsClusterMode()
|
|
|
|
// Try to get metrics
|
|
metrics, err := aggregator.GetAggregatedMetrics()
|
|
if err != nil {
|
|
debug["error"] = err.Error()
|
|
} else {
|
|
debug["total_instances"] = metrics.TotalInstances
|
|
debug["healthy_instances"] = metrics.HealthyInstances
|
|
|
|
// Show first instance structure as example
|
|
if len(metrics.Instances) > 0 {
|
|
first := metrics.Instances[0]
|
|
debug["sample_instance"] = map[string]any{
|
|
"instance_id": first.InstanceID,
|
|
"hostname": first.Hostname,
|
|
"uptime_seconds": first.UptimeSeconds,
|
|
"stats_keys": getMapKeys(first.Stats),
|
|
"has_requests": first.Stats["requests"] != nil,
|
|
"has_cache": len(first.CacheSummary) > 0,
|
|
"health_status": first.Health["status"],
|
|
}
|
|
|
|
// Show requests structure if it exists
|
|
if requests, ok := first.Stats["requests"].(map[string]any); ok {
|
|
debug["sample_requests"] = requests
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return c.JSON(debug)
|
|
}
|
|
|
|
// Helper to get map keys
|
|
func getMapKeys(m map[string]any) []string {
|
|
keys := make([]string, 0, len(m))
|
|
for k := range m {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
// forcePublish forces an immediate metrics publish for testing
|
|
func (ad *AdminDashboard) forcePublish(c *fiber.Ctx) error {
|
|
aggregator := GetMetricsAggregator()
|
|
if aggregator == nil {
|
|
return c.Status(503).JSON(map[string]any{
|
|
"error": "Aggregator not initialized",
|
|
"success": false,
|
|
})
|
|
}
|
|
|
|
// Trigger publish in goroutine to avoid blocking
|
|
go aggregator.publishMetrics()
|
|
|
|
return c.JSON(map[string]any{
|
|
"success": true,
|
|
"triggered": true,
|
|
"message": "Publish triggered in background",
|
|
"next_steps": []string{
|
|
"Wait 2 seconds",
|
|
"Check GET /admin/api/cluster/debug",
|
|
"Check server logs for ✓ Successfully published or ❌ CRITICAL errors",
|
|
},
|
|
})
|
|
}
|
|
|
|
// Helper to get metric value for admin dashboard
|
|
func getAdminMetricValue(name string) int64 {
|
|
if cfg == nil || cfg.Monitoring == nil {
|
|
return 0
|
|
}
|
|
counter := cfg.Monitoring.RegisterMetricsCounter(name, nil)
|
|
if counter == nil {
|
|
return 0
|
|
}
|
|
return int64(counter.Get())
|
|
}
|
|
|
|
// handleStatsWebSocket handles WebSocket connections for streaming statistics
|
|
func (ad *AdminDashboard) handleStatsWebSocket(c *websocket.Conn) {
|
|
if ad.logger != nil {
|
|
ad.logger.Info(&libpack_logger.LogMessage{
|
|
Message: "WebSocket client connected to stats stream",
|
|
Pairs: map[string]any{
|
|
"remote_addr": c.RemoteAddr().String(),
|
|
},
|
|
})
|
|
}
|
|
|
|
// Cleanup on disconnect
|
|
defer func() {
|
|
if ad.logger != nil {
|
|
ad.logger.Info(&libpack_logger.LogMessage{
|
|
Message: "WebSocket client disconnected from stats stream",
|
|
Pairs: map[string]any{
|
|
"remote_addr": c.RemoteAddr().String(),
|
|
},
|
|
})
|
|
}
|
|
_ = c.Close() // Best-effort cleanup
|
|
}()
|
|
|
|
// Set up ping/pong handlers
|
|
_ = c.SetReadDeadline(time.Now().Add(WebSocketReadDeadline))
|
|
c.SetPongHandler(func(string) error {
|
|
_ = c.SetReadDeadline(time.Now().Add(WebSocketReadDeadline))
|
|
return nil
|
|
})
|
|
|
|
// Channel to signal when to stop
|
|
done := make(chan struct{})
|
|
|
|
// Goroutine to handle incoming messages (for connection keep-alive)
|
|
go func() {
|
|
defer close(done)
|
|
for {
|
|
if _, _, err := c.ReadMessage(); err != nil {
|
|
// Connection closed or error
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Stream statistics at configured interval
|
|
ticker := time.NewTicker(StatsStreamInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Per-connection encoder + buffer reused across ticks to avoid
|
|
// a fresh json.Marshal allocation every 2s per connection.
|
|
var buf bytes.Buffer
|
|
enc := json.NewEncoder(&buf)
|
|
enc.SetEscapeHTML(false)
|
|
|
|
// Send initial stats immediately (cluster-aware for dashboard)
|
|
if stats := ad.gatherAllStatsClusterAware(); stats != nil {
|
|
buf.Reset()
|
|
if err := enc.Encode(stats); err == nil {
|
|
// json.Encoder.Encode appends a trailing newline; strip it
|
|
// so the wire format matches the previous json.Marshal output.
|
|
_ = c.WriteMessage(websocket.TextMessage, bytes.TrimRight(buf.Bytes(), "\n"))
|
|
}
|
|
}
|
|
|
|
// Stream loop
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Gather all stats (cluster-aware for dashboard)
|
|
stats := ad.gatherAllStatsClusterAware()
|
|
|
|
// Encode into reused buffer (no per-tick allocation churn)
|
|
buf.Reset()
|
|
if err := enc.Encode(stats); err != nil {
|
|
if ad.logger != nil {
|
|
ad.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to marshal stats for WebSocket",
|
|
Pairs: map[string]any{"error": err.Error()},
|
|
})
|
|
}
|
|
return
|
|
}
|
|
|
|
// Send to client (strip trailing newline from Encoder to match prior format)
|
|
if err := c.WriteMessage(websocket.TextMessage, bytes.TrimRight(buf.Bytes(), "\n")); err != nil {
|
|
if ad.logger != nil {
|
|
ad.logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Failed to write to WebSocket (client likely disconnected)",
|
|
Pairs: map[string]any{"error": err.Error()},
|
|
})
|
|
}
|
|
return
|
|
}
|
|
|
|
case <-done:
|
|
// Client disconnected
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// gatherAllStats collects all statistics into a single structure
|
|
// This always returns LOCAL stats for this instance (used by metrics aggregator)
|
|
func (ad *AdminDashboard) gatherAllStats() map[string]any {
|
|
return ad.gatherAllStatsWithMode(false)
|
|
}
|
|
|
|
// gatherAllStatsClusterAware collects statistics with cluster awareness
|
|
// If cluster mode is available, returns aggregated stats from all instances
|
|
func (ad *AdminDashboard) gatherAllStatsClusterAware() map[string]any {
|
|
return ad.gatherAllStatsWithMode(true)
|
|
}
|
|
|
|
// gatherAllStatsWithMode collects statistics with optional cluster mode
|
|
func (ad *AdminDashboard) gatherAllStatsWithMode(useClusterMode bool) map[string]any {
|
|
// Check if cluster mode is requested and available
|
|
if useClusterMode {
|
|
if aggregator := GetMetricsAggregator(); aggregator != nil {
|
|
metrics, err := aggregator.GetAggregatedMetrics()
|
|
if err == nil && metrics != nil {
|
|
// Return aggregated cluster stats
|
|
result := map[string]any{
|
|
"cluster_mode": true,
|
|
"total_instances": metrics.TotalInstances,
|
|
"healthy_instances": metrics.HealthyInstances,
|
|
}
|
|
|
|
// Build stats section from combined stats
|
|
stats := map[string]any{
|
|
"timestamp": metrics.LastUpdate.Format(time.RFC3339),
|
|
"version": libpack_config.PKG_VERSION,
|
|
}
|
|
|
|
// Copy all combined stats
|
|
if metrics.CombinedStats != nil {
|
|
for k, v := range metrics.CombinedStats {
|
|
stats[k] = v
|
|
}
|
|
}
|
|
result["stats"] = stats
|
|
|
|
// Add per-instance details
|
|
result["instances"] = metrics.Instances
|
|
|
|
return result
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fall back to local stats
|
|
result := make(map[string]any)
|
|
result["cluster_mode"] = false
|
|
|
|
// Main stats
|
|
uptimeSeconds := time.Since(startTime).Seconds()
|
|
stats := map[string]any{
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
"uptime_seconds": uptimeSeconds,
|
|
"uptime_human": formatDuration(time.Since(startTime)),
|
|
"version": libpack_config.PKG_VERSION,
|
|
}
|
|
|
|
if cfg != nil && cfg.Monitoring != nil {
|
|
succeeded := getAdminMetricValue("requests_succesful")
|
|
failed := getAdminMetricValue("requests_failed")
|
|
skipped := getAdminMetricValue("requests_skipped")
|
|
total := succeeded + failed + skipped
|
|
|
|
requestStats := map[string]any{
|
|
"total": total,
|
|
"succeeded": succeeded,
|
|
"failed": failed,
|
|
"skipped": skipped,
|
|
}
|
|
|
|
if total > 0 {
|
|
requestStats["success_rate_pct"] = float64(succeeded) / float64(total) * 100
|
|
requestStats["failure_rate_pct"] = float64(failed) / float64(total) * 100
|
|
requestStats["skip_rate_pct"] = float64(skipped) / float64(total) * 100
|
|
} else {
|
|
requestStats["success_rate_pct"] = 0.0
|
|
requestStats["failure_rate_pct"] = 0.0
|
|
requestStats["skip_rate_pct"] = 0.0
|
|
}
|
|
|
|
if uptimeSeconds > 0 {
|
|
requestStats["avg_requests_per_second"] = float64(total) / uptimeSeconds
|
|
} else {
|
|
requestStats["avg_requests_per_second"] = 0.0
|
|
}
|
|
|
|
if rpsTracker := GetRPSTracker(); rpsTracker != nil {
|
|
requestStats["current_requests_per_second"] = rpsTracker.GetCurrentRPS()
|
|
} else {
|
|
requestStats["current_requests_per_second"] = 0.0
|
|
}
|
|
|
|
stats["requests"] = requestStats
|
|
|
|
// Cache summary
|
|
cacheStats := libpack_cache.GetCacheStats()
|
|
if cacheStats != nil {
|
|
totalCacheRequests := cacheStats.CacheHits + cacheStats.CacheMisses
|
|
hitRate := 0.0
|
|
if totalCacheRequests > 0 {
|
|
hitRate = float64(cacheStats.CacheHits) / float64(totalCacheRequests) * 100
|
|
}
|
|
stats["cache_summary"] = map[string]any{
|
|
"hits": cacheStats.CacheHits,
|
|
"misses": cacheStats.CacheMisses,
|
|
"hit_rate_pct": hitRate,
|
|
"total_cached": cacheStats.CachedQueries,
|
|
}
|
|
}
|
|
}
|
|
|
|
result["stats"] = stats
|
|
|
|
// Health
|
|
healthMgr := GetBackendHealthManager()
|
|
health := map[string]any{
|
|
"status": "unknown",
|
|
"backend": map[string]any{
|
|
"healthy": false,
|
|
},
|
|
}
|
|
|
|
if healthMgr != nil {
|
|
isHealthy := healthMgr.IsHealthy()
|
|
health["backend"] = map[string]any{
|
|
"healthy": isHealthy,
|
|
"consecutive_failures": healthMgr.GetConsecutiveFailures(),
|
|
"last_check": healthMgr.GetLastHealthCheck().Format(time.RFC3339),
|
|
}
|
|
|
|
if isHealthy {
|
|
health["status"] = "healthy"
|
|
} else {
|
|
health["status"] = "unhealthy"
|
|
}
|
|
}
|
|
result["health"] = health
|
|
|
|
// Circuit breaker
|
|
cbStatus := map[string]any{
|
|
"enabled": false,
|
|
"state": "unknown",
|
|
}
|
|
|
|
if cfg != nil {
|
|
cbStatus["enabled"] = cfg.CircuitBreaker.Enable
|
|
|
|
if cb != nil {
|
|
cbMutex.RLock()
|
|
state := cb.State()
|
|
counts := cb.Counts()
|
|
cbMutex.RUnlock()
|
|
|
|
cbStatus["state"] = state.String()
|
|
cbStatus["counts"] = map[string]any{
|
|
"requests": counts.Requests,
|
|
"total_successes": counts.TotalSuccesses,
|
|
"total_failures": counts.TotalFailures,
|
|
"consecutive_successes": counts.ConsecutiveSuccesses,
|
|
"consecutive_failures": counts.ConsecutiveFailures,
|
|
}
|
|
cbStatus["config"] = map[string]any{
|
|
"max_failures": cfg.CircuitBreaker.MaxFailures,
|
|
"failure_ratio": cfg.CircuitBreaker.FailureRatio,
|
|
"timeout": cfg.CircuitBreaker.Timeout,
|
|
"max_requests_half_open": cfg.CircuitBreaker.MaxRequestsInHalfOpen,
|
|
"return_cached_on_open": cfg.CircuitBreaker.ReturnCachedOnOpen,
|
|
}
|
|
}
|
|
}
|
|
result["circuit_breaker"] = cbStatus
|
|
|
|
// Cache stats
|
|
cacheStats := map[string]any{
|
|
"enabled": false,
|
|
}
|
|
|
|
if cfg != nil {
|
|
cacheStats["enabled"] = cfg.Cache.CacheEnable
|
|
cacheStats["redis_enabled"] = cfg.Cache.CacheRedisEnable
|
|
cacheStats["ttl_seconds"] = cfg.Cache.CacheTTL
|
|
cacheStats["max_memory_mb"] = cfg.Cache.CacheMaxMemorySize
|
|
cacheStats["max_entries"] = cfg.Cache.CacheMaxEntries
|
|
|
|
runtimeCacheStats := libpack_cache.GetCacheStats()
|
|
if runtimeCacheStats != nil {
|
|
cacheStats["cached_queries"] = runtimeCacheStats.CachedQueries
|
|
cacheStats["cache_hits"] = runtimeCacheStats.CacheHits
|
|
cacheStats["cache_misses"] = runtimeCacheStats.CacheMisses
|
|
|
|
totalRequests := runtimeCacheStats.CacheHits + runtimeCacheStats.CacheMisses
|
|
hitRate := 0.0
|
|
if totalRequests > 0 {
|
|
hitRate = float64(runtimeCacheStats.CacheHits) / float64(totalRequests) * 100
|
|
}
|
|
cacheStats["hit_rate_pct"] = hitRate
|
|
|
|
// Only get memory usage for in-memory cache (not Redis)
|
|
if cfg.Cache.CacheEnable && !cfg.Cache.CacheRedisEnable {
|
|
memoryUsage := libpack_cache.GetCacheMemoryUsage()
|
|
maxMemory := libpack_cache.GetCacheMaxMemorySize()
|
|
cacheStats["memory_usage_bytes"] = memoryUsage
|
|
cacheStats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024)
|
|
|
|
memoryUsagePct := 0.0
|
|
if maxMemory > 0 {
|
|
memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100
|
|
}
|
|
cacheStats["memory_usage_pct"] = memoryUsagePct
|
|
} else {
|
|
// For Redis cache, memory tracking is not available per instance
|
|
cacheStats["memory_usage_bytes"] = int64(-1)
|
|
cacheStats["memory_usage_mb"] = float64(-1)
|
|
cacheStats["memory_usage_pct"] = float64(-1)
|
|
}
|
|
}
|
|
}
|
|
result["cache"] = cacheStats
|
|
|
|
// Connection stats
|
|
poolMgr := GetConnectionPoolManager()
|
|
connStats := map[string]any{
|
|
"available": false,
|
|
}
|
|
|
|
if poolMgr != nil {
|
|
connStats = poolMgr.GetConnectionStats()
|
|
connStats["available"] = true
|
|
}
|
|
result["connections"] = connStats
|
|
|
|
// Retry budget
|
|
rb := GetRetryBudget()
|
|
if rb == nil {
|
|
result["retry_budget"] = map[string]any{"enabled": false}
|
|
} else {
|
|
result["retry_budget"] = rb.GetStats()
|
|
}
|
|
|
|
// Coalescing
|
|
rc := GetRequestCoalescer()
|
|
if rc == nil {
|
|
result["coalescing"] = map[string]any{"enabled": false}
|
|
} else {
|
|
result["coalescing"] = rc.GetStats()
|
|
}
|
|
|
|
// WebSocket
|
|
wsp := GetWebSocketProxy()
|
|
if wsp == nil {
|
|
result["websocket"] = map[string]any{"enabled": false}
|
|
} else {
|
|
result["websocket"] = wsp.GetStats()
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
var startTime = time.Now()
|