Files
graphql-monitoring-proxy/api.go
T
lukaszraczylo c2c75d69c0 perf+coverage: optimisation pass + coverage push to ≥70%
Performance / resource usage:
- circuit_breaker_metrics: fix data race on failCounters map (RWMutex + double-checked locking)
- server.go: drop user_id and op_name metric labels (Prometheus cardinality bound); de-duplicate extractUserInfo
- graphql.go: gate runtime.ReadMemStats per-request behind ENABLE_ALLOCATION_TRACKING flag (default off)
- graphql.go: collapse two-pass AST scan into single pass; lower-case once
- sanitization.go: cache compiled redaction regexes per pattern via sync.Map; hoist inner constants to pkg vars
- proxy.go: hoist connection/timeout substrings to pkg vars; sentinel errors for static error paths; drop dead Headers map alloc
- metrics_aggregator.go: log-field allocation guarded by Logger.IsLevelEnabled
- logging/logger.go: add IsLevelEnabled helper
- lru_cache.go: 16-shard sharding, FNV-1a routing (concurrent throughput +22%)
- cache/memory/lru_memory_cache.go: gzip compress/decompress moved outside mu.Lock
- rps_tracker.go: RWMutex+uint64 -> atomic.Uint64
- retry_budget.go: drop unused mutex
- api.go: bannedUsersIDs map+RWMutex -> sync.Map (+ snapshot/replace helpers)
- tracing/tracing.go: pkg-level constSpanAttrs, copy-then-append in StartSpanWithAttributes
- admin_dashboard.go: handleStatsWebSocket reuses bytes.Buffer + json.Encoder per connection

Build / runtime:
- Makefile: -ldflags="-s -w" -trimpath, CGO_ENABLED=0 for build (=1 for test recipes)
- Dockerfile + Dockerfile.goreleaser: ENV GOMEMLIMIT=512MiB
- main.go: blank import go.uber.org/automaxprocs (cgroup-aware GOMAXPROCS)
- main.go: PPROF_PORT env var wires net/http/pprof on 127.0.0.1 only with full server timeouts
- README.md: env-var docs + metric-label docs updated; cardinality note

Test coverage push (per package):
- main 51.2% -> 74.7%
- cache 66.3% -> 93.7%
- cache/redis 45.5% -> 98.2%
- tracing 66.7% -> 72.9%
- (cache/memory 91.6%, logging 91.9%, monitoring 77.6%, pkg/pools 100% unchanged)

New test files: coverage_micro_test, coverage_extras_test, server_handlers_test,
api_health_test, admin_dashboard_cluster_test, metrics_aggregator_test, concerns_test,
cache/cache_coverage_test, cache/redis/redis_coverage_test, tracing/tracing_coverage_test.

Bug fix: connection_resilience_test.go TestIntegratedHealthManagement.health_manager_startup
was sync.Once-coupled to InitializeBackendHealth and panicked when another test (e.g. via
parseConfig) had already triggered Once. Use NewBackendHealthManager directly.
2026-04-19 19:49:24 +01:00

539 lines
15 KiB
Go

package main
import (
"context"
"crypto/subtle"
"fmt"
"os"
"sync"
"time"
"github.com/goccy/go-json"
fiber "github.com/gofiber/fiber/v2"
"github.com/gofrs/flock"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
"github.com/sony/gobreaker"
)
var bannedUsersIDs sync.Map // key: userID string, value: reason string
// authMiddleware provides API key authentication for admin endpoints
func authMiddleware(c *fiber.Ctx) error {
apiKey := c.Get("X-API-Key")
// Get expected key from config (try GMP_ prefix first, then fallback)
expectedKey := os.Getenv("GMP_ADMIN_API_KEY")
if expectedKey == "" {
expectedKey = os.Getenv("ADMIN_API_KEY")
}
// If no API key is configured, authentication is optional (internal service pattern)
// Admin endpoints are typically protected by network segmentation
if expectedKey == "" {
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Admin API authentication disabled - endpoints protected by network segmentation",
Pairs: map[string]any{"endpoint": c.Path()},
})
return c.Next()
}
// Use constant-time comparison to prevent timing attacks
if subtle.ConstantTimeCompare([]byte(apiKey), []byte(expectedKey)) != 1 {
cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Unauthorized API access attempt",
Pairs: map[string]any{"endpoint": c.Path(), "ip": c.IP()},
})
return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{
"error": "Unauthorized",
})
}
return c.Next()
}
func enableApi(ctx context.Context) error {
if !cfg.Server.EnableApi {
return nil
}
// SECURITY WARNING: Check if API authentication is configured
adminAPIKey := os.Getenv("GMP_ADMIN_API_KEY")
if adminAPIKey == "" {
adminAPIKey = os.Getenv("ADMIN_API_KEY")
}
if adminAPIKey == "" {
cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "⚠️ Admin API enabled WITHOUT authentication - all endpoints are publicly accessible!",
Pairs: map[string]any{
"security_risk": "HIGH - Admin API endpoints can be accessed without credentials",
"affected_ops": "user-ban, user-unban, cache-clear, circuit-breaker controls",
"recommendation": "Set GMP_ADMIN_API_KEY environment variable or use network segmentation",
"api_port": cfg.Server.ApiPort,
},
})
}
apiserver := fiber.New(fiber.Config{
DisableStartupMessage: true,
AppName: fmt.Sprintf("GraphQL Monitoring Proxy - %s v%s", libpack_config.PKG_NAME, libpack_config.PKG_VERSION),
})
api := apiserver.Group("/api")
// Apply authentication middleware to all admin routes
api.Use(authMiddleware)
api.Post("/user-ban", apiBanUser)
api.Post("/user-unban", apiUnbanUser)
api.Post("/cache-clear", apiClearCache)
api.Get("/cache-stats", apiCacheStats)
api.Get("/circuit-breaker/health", apiCircuitBreakerHealth)
api.Get("/backend/health", apiBackendHealth)
api.Get("/connection-pool/health", apiConnectionPoolHealth)
// Start banned users reload in a separate goroutine with context
go periodicallyReloadBannedUsers(ctx)
// Start server in a goroutine and handle shutdown
errCh := make(chan error, 1)
go func() {
if err := apiserver.Listen(fmt.Sprintf(":%d", cfg.Server.ApiPort)); err != nil {
errCh <- err
}
}()
// Wait for context cancellation or error
select {
case <-ctx.Done():
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Shutting down API server",
})
return apiserver.Shutdown()
case err := <-errCh:
return err
}
}
func periodicallyReloadBannedUsers(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Stopping banned users reload",
})
return
case <-ticker.C:
loadBannedUsers()
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Banned users reloaded",
Pairs: map[string]any{"users": snapshotBannedUsers()},
})
}
}
}
func checkIfUserIsBanned(c *fiber.Ctx, userID string) bool {
_, found := bannedUsersIDs.Load(userID)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Checking if user is banned",
Pairs: map[string]any{"user_id": userID, "banned": found},
})
if found {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "User is banned",
Pairs: map[string]any{"user_id": userID},
})
if err := c.Status(fiber.StatusForbidden).SendString("User is banned"); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Failed to send banned user response",
Pairs: map[string]any{"error": err.Error()},
})
}
}
return found
}
func apiClearCache(c *fiber.Ctx) error {
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Clearing cache via API",
})
libpack_cache.CacheClear()
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Cache cleared via API",
})
return c.SendString("OK: cache cleared")
}
func apiCacheStats(c *fiber.Ctx) error {
return c.JSON(libpack_cache.GetCacheStats())
}
// apiCircuitBreakerHealth returns the health status of the circuit breaker
func apiCircuitBreakerHealth(c *fiber.Ctx) error {
if cb == nil {
return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{
"status": "disabled",
"message": "Circuit breaker is not enabled",
})
}
// Get circuit breaker state with proper mutex protection
cbMutex.RLock()
state := cb.State()
counts := cb.Counts()
cbMutex.RUnlock()
// Determine health status
var status string
var httpStatus int
switch state {
case gobreaker.StateClosed:
status = "healthy"
httpStatus = fiber.StatusOK
case gobreaker.StateHalfOpen:
status = "recovering"
httpStatus = fiber.StatusOK
case gobreaker.StateOpen:
status = "unhealthy"
httpStatus = fiber.StatusServiceUnavailable
}
response := fiber.Map{
"status": status,
"state": state.String(),
"counts": fiber.Map{
"requests": counts.Requests,
"total_successes": counts.TotalSuccesses,
"total_failures": counts.TotalFailures,
"consecutive_successes": counts.ConsecutiveSuccesses,
"consecutive_failures": counts.ConsecutiveFailures,
},
"configuration": fiber.Map{
"max_failures": cfg.CircuitBreaker.MaxFailures,
"failure_ratio": cfg.CircuitBreaker.FailureRatio,
"sample_size": cfg.CircuitBreaker.SampleSize,
"timeout_seconds": cfg.CircuitBreaker.Timeout,
"max_half_open_reqs": cfg.CircuitBreaker.MaxRequestsInHalfOpen,
"backoff_multiplier": cfg.CircuitBreaker.BackoffMultiplier,
},
}
return c.Status(httpStatus).JSON(response)
}
type apiBanUserRequest struct {
UserID string `json:"user_id"`
Reason string `json:"reason"`
}
func apiBanUser(c *fiber.Ctx) error {
var req apiBanUserRequest
if err := c.BodyParser(&req); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the ban user request",
Pairs: map[string]any{"error": err.Error()},
})
return c.Status(fiber.StatusBadRequest).SendString("Invalid request payload")
}
if req.UserID == "" || req.Reason == "" {
return c.Status(fiber.StatusBadRequest).SendString("user_id and reason are required")
}
bannedUsersIDs.Store(req.UserID, req.Reason)
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Banned user",
Pairs: map[string]any{"user_id": req.UserID, "reason": req.Reason},
})
if err := storeBannedUsers(); err != nil {
return c.Status(fiber.StatusInternalServerError).SendString("Failed to store banned users")
}
return c.SendString("OK: user banned")
}
func apiUnbanUser(c *fiber.Ctx) error {
var req apiBanUserRequest
if err := c.BodyParser(&req); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the unban user request",
Pairs: map[string]any{"error": err.Error()},
})
return c.Status(fiber.StatusBadRequest).SendString("Invalid request payload")
}
if req.UserID == "" {
return c.Status(fiber.StatusBadRequest).SendString("user_id is required")
}
bannedUsersIDs.Delete(req.UserID)
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Unbanned user",
Pairs: map[string]any{"user_id": req.UserID},
})
if err := storeBannedUsers(); err != nil {
return c.Status(fiber.StatusInternalServerError).SendString("Failed to store banned users")
}
return c.SendString("OK: user unbanned")
}
func storeBannedUsers() error {
fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
if err := lockFile(fileLock); err != nil {
return err
}
defer func() {
if err := fileLock.Unlock(); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Failed to unlock file",
Pairs: map[string]any{"error": err.Error()},
})
}
}()
data, err := json.Marshal(snapshotBannedUsers())
if err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't marshal banned users",
Pairs: map[string]any{"error": err.Error()},
})
return err
}
if err := os.WriteFile(cfg.Api.BannedUsersFile, data, 0o644); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't write banned users to file",
Pairs: map[string]any{"error": err.Error()},
})
return err
}
return nil
}
func loadBannedUsers() {
if _, err := os.Stat(cfg.Api.BannedUsersFile); os.IsNotExist(err) {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Banned users file doesn't exist - creating it",
Pairs: map[string]any{"file": cfg.Api.BannedUsersFile},
})
if err := os.WriteFile(cfg.Api.BannedUsersFile, []byte("{}"), 0o644); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't create and write to the file",
Pairs: map[string]any{"error": err.Error()},
})
return
}
}
fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
if err := lockFileRead(fileLock); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't lock the file [load]",
Pairs: map[string]any{"error": err.Error()},
})
return
}
defer func() {
if err := fileLock.Unlock(); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Failed to unlock file",
Pairs: map[string]any{"error": err.Error()},
})
}
}()
data, err := os.ReadFile(cfg.Api.BannedUsersFile)
if err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't read banned users from file",
Pairs: map[string]any{"error": err.Error()},
})
return
}
var newBannedUsers map[string]string
if err := json.Unmarshal(data, &newBannedUsers); err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't unmarshal banned users",
Pairs: map[string]any{"error": err.Error()},
})
return
}
replaceBannedUsers(newBannedUsers)
}
// snapshotBannedUsers returns a plain map copy of the current banned users.
func snapshotBannedUsers() map[string]string {
out := make(map[string]string)
bannedUsersIDs.Range(func(k, v any) bool {
ks, kok := k.(string)
vs, vok := v.(string)
if kok && vok {
out[ks] = vs
}
return true
})
return out
}
// replaceBannedUsers swaps the banned users set with the provided map.
// Existing entries are removed before inserting the new ones.
func replaceBannedUsers(newUsers map[string]string) {
bannedUsersIDs.Range(func(k, _ any) bool {
bannedUsersIDs.Delete(k)
return true
})
for k, v := range newUsers {
bannedUsersIDs.Store(k, v)
}
}
func lockFile(fileLock *flock.Flock) error {
// Add timeout to prevent indefinite blocking
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Try to acquire lock with timeout
lockChan := make(chan error, 1)
go func() {
lockChan <- fileLock.Lock()
}()
select {
case err := <-lockChan:
if err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't lock the file",
Pairs: map[string]any{"error": err.Error()},
})
return err
}
return nil
case <-ctx.Done():
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "File lock timeout",
Pairs: map[string]any{"timeout": "30s"},
})
return fmt.Errorf("file lock timeout after 30 seconds")
}
}
func lockFileRead(fileLock *flock.Flock) error {
// Add timeout to prevent indefinite blocking
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Try to acquire read lock with timeout
lockChan := make(chan error, 1)
go func() {
lockChan <- fileLock.RLock()
}()
select {
case err := <-lockChan:
if err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't lock the file for reading",
Pairs: map[string]any{"error": err.Error()},
})
return err
}
return nil
case <-ctx.Done():
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "File read lock timeout",
Pairs: map[string]any{"timeout": "30s"},
})
return fmt.Errorf("file read lock timeout after 30 seconds")
}
}
// apiBackendHealth returns the health status of the GraphQL backend
func apiBackendHealth(c *fiber.Ctx) error {
healthMgr := GetBackendHealthManager()
if healthMgr == nil {
return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{
"status": "unknown",
"message": "Backend health manager not initialized",
})
}
isHealthy := healthMgr.IsHealthy()
lastCheck := healthMgr.GetLastHealthCheck()
consecutiveFailures := healthMgr.GetConsecutiveFailures()
var status string
var httpStatus int
if isHealthy {
status = "healthy"
httpStatus = fiber.StatusOK
} else {
status = "unhealthy"
httpStatus = fiber.StatusServiceUnavailable
}
response := fiber.Map{
"status": status,
"backend_url": cfg.Server.HostGraphQL,
"last_health_check": lastCheck,
"consecutive_failures": consecutiveFailures,
"check_interval": "5s",
}
return c.Status(httpStatus).JSON(response)
}
// apiConnectionPoolHealth returns the health status of the connection pool
func apiConnectionPoolHealth(c *fiber.Ctx) error {
poolMgr := GetConnectionPoolManager()
if poolMgr == nil {
return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{
"status": "unknown",
"message": "Connection pool manager not initialized",
})
}
stats := poolMgr.GetConnectionStats()
connectionFailures := stats["connection_failures"].(int64)
var status string
var httpStatus int
// Consider pool healthy if we haven't had too many recent failures
if connectionFailures < 10 {
status = "healthy"
httpStatus = fiber.StatusOK
} else {
status = "degraded"
httpStatus = fiber.StatusOK // Still return 200 since pool is functional
}
response := fiber.Map{
"status": status,
"active_connections": stats["active_connections"],
"total_connections": stats["total_connections"],
"connection_failures": connectionFailures,
"last_recovery_attempt": stats["last_recovery_attempt"],
"cleanup_interval": "30s",
"keepalive_interval": "15s",
"recovery_check_interval": "60s",
}
return c.Status(httpStatus).JSON(response)
}