mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-04 22:59:26 +00:00
cedee416a8
* General improvements and bug fixes. * Improve tests coverage. * fixup! Improve tests coverage. * Update README.md with latest changes. * Fix the uint32 * Resolve issue with race condition for logging. * fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * Fix the test of the rate limiter * Add default ratelimit.json file * Update dependencies. * Significant refactor. * fixup! Significant refactor. * fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025 * fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025
5739 lines
244 KiB
HTML
5739 lines
244 KiB
HTML
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
|
|
<title>graphql-monitoring-proxy: Go Coverage Report</title>
|
|
<style>
|
|
body {
|
|
background: black;
|
|
color: rgb(80, 80, 80);
|
|
}
|
|
body, pre, #legend span {
|
|
font-family: Menlo, monospace;
|
|
font-weight: bold;
|
|
}
|
|
#topbar {
|
|
background: black;
|
|
position: fixed;
|
|
top: 0; left: 0; right: 0;
|
|
height: 42px;
|
|
border-bottom: 1px solid rgb(80, 80, 80);
|
|
}
|
|
#content {
|
|
margin-top: 50px;
|
|
}
|
|
#nav, #legend {
|
|
float: left;
|
|
margin-left: 10px;
|
|
}
|
|
#legend {
|
|
margin-top: 12px;
|
|
}
|
|
#nav {
|
|
margin-top: 10px;
|
|
}
|
|
#legend span {
|
|
margin: 0 5px;
|
|
}
|
|
.cov0 { color: rgb(192, 0, 0) }
|
|
.cov1 { color: rgb(128, 128, 128) }
|
|
.cov2 { color: rgb(116, 140, 131) }
|
|
.cov3 { color: rgb(104, 152, 134) }
|
|
.cov4 { color: rgb(92, 164, 137) }
|
|
.cov5 { color: rgb(80, 176, 140) }
|
|
.cov6 { color: rgb(68, 188, 143) }
|
|
.cov7 { color: rgb(56, 200, 146) }
|
|
.cov8 { color: rgb(44, 212, 149) }
|
|
.cov9 { color: rgb(32, 224, 152) }
|
|
.cov10 { color: rgb(20, 236, 155) }
|
|
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div id="topbar">
|
|
<div id="nav">
|
|
<select id="files">
|
|
|
|
<option value="file0">github.com/lukaszraczylo/graphql-monitoring-proxy/api.go (63.3%)</option>
|
|
|
|
<option value="file1">github.com/lukaszraczylo/graphql-monitoring-proxy/buffer_pool.go (70.0%)</option>
|
|
|
|
<option value="file2">github.com/lukaszraczylo/graphql-monitoring-proxy/cache/cache.go (66.7%)</option>
|
|
|
|
<option value="file3">github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory/buffer_pool.go (0.0%)</option>
|
|
|
|
<option value="file4">github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory/lru_memory_cache.go (0.0%)</option>
|
|
|
|
<option value="file5">github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory/memory.go (92.1%)</option>
|
|
|
|
<option value="file6">github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis/redis.go (73.5%)</option>
|
|
|
|
<option value="file7">github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis/wrapper.go (0.0%)</option>
|
|
|
|
<option value="file8">github.com/lukaszraczylo/graphql-monitoring-proxy/circuit_breaker_metrics.go (93.8%)</option>
|
|
|
|
<option value="file9">github.com/lukaszraczylo/graphql-monitoring-proxy/connection_pool.go (69.4%)</option>
|
|
|
|
<option value="file10">github.com/lukaszraczylo/graphql-monitoring-proxy/details.go (83.0%)</option>
|
|
|
|
<option value="file11">github.com/lukaszraczylo/graphql-monitoring-proxy/events.go (18.9%)</option>
|
|
|
|
<option value="file12">github.com/lukaszraczylo/graphql-monitoring-proxy/graphql.go (85.4%)</option>
|
|
|
|
<option value="file13">github.com/lukaszraczylo/graphql-monitoring-proxy/logging/logger.go (93.4%)</option>
|
|
|
|
<option value="file14">github.com/lukaszraczylo/graphql-monitoring-proxy/lru_cache.go (28.2%)</option>
|
|
|
|
<option value="file15">github.com/lukaszraczylo/graphql-monitoring-proxy/main.go (45.9%)</option>
|
|
|
|
<option value="file16">github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring.go (100.0%)</option>
|
|
|
|
<option value="file17">github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring/defaults.go (100.0%)</option>
|
|
|
|
<option value="file18">github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring/helpers.go (98.8%)</option>
|
|
|
|
<option value="file19">github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring/monitoring.go (58.8%)</option>
|
|
|
|
<option value="file20">github.com/lukaszraczylo/graphql-monitoring-proxy/proxy.go (81.2%)</option>
|
|
|
|
<option value="file21">github.com/lukaszraczylo/graphql-monitoring-proxy/ratelimit.go (88.2%)</option>
|
|
|
|
<option value="file22">github.com/lukaszraczylo/graphql-monitoring-proxy/ratelimit_errors.go (6.2%)</option>
|
|
|
|
<option value="file23">github.com/lukaszraczylo/graphql-monitoring-proxy/server.go (3.6%)</option>
|
|
|
|
<option value="file24">github.com/lukaszraczylo/graphql-monitoring-proxy/shutdown.go (0.0%)</option>
|
|
|
|
<option value="file25">github.com/lukaszraczylo/graphql-monitoring-proxy/tracing/tracing.go (66.7%)</option>
|
|
|
|
</select>
|
|
</div>
|
|
<div id="legend">
|
|
<span>not tracked</span>
|
|
|
|
<span class="cov0">not covered</span>
|
|
<span class="cov8">covered</span>
|
|
|
|
</div>
|
|
</div>
|
|
<div id="content">
|
|
|
|
<pre class="file" id="file0" style="display: none">package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/goccy/go-json"
|
|
fiber "github.com/gofiber/fiber/v2"
|
|
"github.com/gofrs/flock"
|
|
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
|
|
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
var (
|
|
bannedUsersIDs = make(map[string]string)
|
|
bannedUsersIDsMutex sync.RWMutex
|
|
)
|
|
|
|
func enableApi(ctx context.Context) error <span class="cov8" title="1">{
|
|
if !cfg.Server.EnableApi </span><span class="cov8" title="1">{
|
|
return nil
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">apiserver := fiber.New(fiber.Config{
|
|
DisableStartupMessage: true,
|
|
AppName: fmt.Sprintf("GraphQL Monitoring Proxy - %s v%s", libpack_config.PKG_NAME, libpack_config.PKG_VERSION),
|
|
})
|
|
|
|
api := apiserver.Group("/api")
|
|
api.Post("/user-ban", apiBanUser)
|
|
api.Post("/user-unban", apiUnbanUser)
|
|
api.Post("/cache-clear", apiClearCache)
|
|
api.Get("/cache-stats", apiCacheStats)
|
|
|
|
// Start banned users reload in a separate goroutine with context
|
|
go periodicallyReloadBannedUsers(ctx)
|
|
|
|
// Start server in a goroutine and handle shutdown
|
|
errCh := make(chan error, 1)
|
|
go func() </span><span class="cov0" title="0">{
|
|
if err := apiserver.Listen(fmt.Sprintf(":%d", cfg.Server.ApiPort)); err != nil </span><span class="cov0" title="0">{
|
|
errCh <- err
|
|
}</span>
|
|
}()
|
|
|
|
// Wait for context cancellation or error
|
|
<span class="cov0" title="0">select </span>{
|
|
case <-ctx.Done():<span class="cov0" title="0">
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Shutting down API server",
|
|
})
|
|
return apiserver.Shutdown()</span>
|
|
case err := <-errCh:<span class="cov0" title="0">
|
|
return err</span>
|
|
}
|
|
}
|
|
|
|
func periodicallyReloadBannedUsers(ctx context.Context) <span class="cov0" title="0">{
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for </span><span class="cov0" title="0">{
|
|
select </span>{
|
|
case <-ctx.Done():<span class="cov0" title="0">
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Stopping banned users reload",
|
|
})
|
|
return</span>
|
|
case <-ticker.C:<span class="cov0" title="0">
|
|
loadBannedUsers()
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Banned users reloaded",
|
|
Pairs: map[string]interface{}{"users": bannedUsersIDs},
|
|
})</span>
|
|
}
|
|
}
|
|
}
|
|
|
|
func checkIfUserIsBanned(c *fiber.Ctx, userID string) bool <span class="cov8" title="1">{
|
|
bannedUsersIDsMutex.RLock()
|
|
_, found := bannedUsersIDs[userID]
|
|
bannedUsersIDsMutex.RUnlock()
|
|
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Checking if user is banned",
|
|
Pairs: map[string]interface{}{"user_id": userID, "banned": found},
|
|
})
|
|
|
|
if found </span><span class="cov8" title="1">{
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "User is banned",
|
|
Pairs: map[string]interface{}{"user_id": userID},
|
|
})
|
|
if err := c.Status(fiber.StatusForbidden).SendString("User is banned"); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to send banned user response",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span>
|
|
}
|
|
<span class="cov8" title="1">return found</span>
|
|
}
|
|
|
|
func apiClearCache(c *fiber.Ctx) error <span class="cov8" title="1">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Clearing cache via API",
|
|
})
|
|
libpack_cache.CacheClear()
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Cache cleared via API",
|
|
})
|
|
return c.SendString("OK: cache cleared")
|
|
}</span>
|
|
|
|
func apiCacheStats(c *fiber.Ctx) error <span class="cov8" title="1">{
|
|
return c.JSON(libpack_cache.GetCacheStats())
|
|
}</span>
|
|
|
|
type apiBanUserRequest struct {
|
|
UserID string `json:"user_id"`
|
|
Reason string `json:"reason"`
|
|
}
|
|
|
|
func apiBanUser(c *fiber.Ctx) error <span class="cov8" title="1">{
|
|
var req apiBanUserRequest
|
|
if err := c.BodyParser(&req); err != nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't parse the ban user request",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return c.Status(fiber.StatusBadRequest).SendString("Invalid request payload")
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">if req.UserID == "" || req.Reason == "" </span><span class="cov8" title="1">{
|
|
return c.Status(fiber.StatusBadRequest).SendString("user_id and reason are required")
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">bannedUsersIDsMutex.Lock()
|
|
bannedUsersIDs[req.UserID] = req.Reason
|
|
bannedUsersIDsMutex.Unlock()
|
|
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Banned user",
|
|
Pairs: map[string]interface{}{"user_id": req.UserID, "reason": req.Reason},
|
|
})
|
|
|
|
if err := storeBannedUsers(); err != nil </span><span class="cov0" title="0">{
|
|
return c.Status(fiber.StatusInternalServerError).SendString("Failed to store banned users")
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return c.SendString("OK: user banned")</span>
|
|
}
|
|
|
|
func apiUnbanUser(c *fiber.Ctx) error <span class="cov8" title="1">{
|
|
var req apiBanUserRequest
|
|
if err := c.BodyParser(&req); err != nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't parse the unban user request",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return c.Status(fiber.StatusBadRequest).SendString("Invalid request payload")
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">if req.UserID == "" </span><span class="cov8" title="1">{
|
|
return c.Status(fiber.StatusBadRequest).SendString("user_id is required")
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">bannedUsersIDsMutex.Lock()
|
|
delete(bannedUsersIDs, req.UserID)
|
|
bannedUsersIDsMutex.Unlock()
|
|
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Unbanned user",
|
|
Pairs: map[string]interface{}{"user_id": req.UserID},
|
|
})
|
|
|
|
if err := storeBannedUsers(); err != nil </span><span class="cov0" title="0">{
|
|
return c.Status(fiber.StatusInternalServerError).SendString("Failed to store banned users")
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return c.SendString("OK: user unbanned")</span>
|
|
}
|
|
|
|
func storeBannedUsers() error <span class="cov8" title="1">{
|
|
fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
|
|
if err := lockFile(fileLock); err != nil </span><span class="cov0" title="0">{
|
|
return err
|
|
}</span>
|
|
<span class="cov8" title="1">defer func() </span><span class="cov8" title="1">{
|
|
if err := fileLock.Unlock(); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to unlock file",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span>
|
|
}()
|
|
|
|
<span class="cov8" title="1">bannedUsersIDsMutex.RLock()
|
|
data, err := json.Marshal(bannedUsersIDs)
|
|
bannedUsersIDsMutex.RUnlock()
|
|
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't marshal banned users",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return err
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">if err := os.WriteFile(cfg.Api.BannedUsersFile, data, 0o644); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't write banned users to file",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return err
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return nil</span>
|
|
}
|
|
|
|
func loadBannedUsers() <span class="cov8" title="1">{
|
|
if _, err := os.Stat(cfg.Api.BannedUsersFile); os.IsNotExist(err) </span><span class="cov8" title="1">{
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Banned users file doesn't exist - creating it",
|
|
Pairs: map[string]interface{}{"file": cfg.Api.BannedUsersFile},
|
|
})
|
|
if err := os.WriteFile(cfg.Api.BannedUsersFile, []byte("{}"), 0o644); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't create and write to the file",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
|
|
if err := lockFileRead(fileLock); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't lock the file [load]",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return
|
|
}</span>
|
|
<span class="cov8" title="1">defer func() </span><span class="cov8" title="1">{
|
|
if err := fileLock.Unlock(); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to unlock file",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span>
|
|
}()
|
|
|
|
<span class="cov8" title="1">data, err := os.ReadFile(cfg.Api.BannedUsersFile)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't read banned users from file",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">var newBannedUsers map[string]string
|
|
if err := json.Unmarshal(data, &newBannedUsers); err != nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't unmarshal banned users",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">bannedUsersIDsMutex.Lock()
|
|
bannedUsersIDs = newBannedUsers
|
|
bannedUsersIDsMutex.Unlock()</span>
|
|
}
|
|
|
|
func lockFile(fileLock *flock.Flock) error <span class="cov8" title="1">{
|
|
// Add timeout to prevent indefinite blocking
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Try to acquire lock with timeout
|
|
lockChan := make(chan error, 1)
|
|
go func() </span><span class="cov8" title="1">{
|
|
lockChan <- fileLock.Lock()
|
|
}</span>()
|
|
|
|
<span class="cov8" title="1">select </span>{
|
|
case err := <-lockChan:<span class="cov8" title="1">
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't lock the file",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return err
|
|
}</span>
|
|
<span class="cov8" title="1">return nil</span>
|
|
case <-ctx.Done():<span class="cov0" title="0">
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "File lock timeout",
|
|
Pairs: map[string]interface{}{"timeout": "30s"},
|
|
})
|
|
return fmt.Errorf("file lock timeout after 30 seconds")</span>
|
|
}
|
|
}
|
|
|
|
func lockFileRead(fileLock *flock.Flock) error <span class="cov8" title="1">{
|
|
// Add timeout to prevent indefinite blocking
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Try to acquire read lock with timeout
|
|
lockChan := make(chan error, 1)
|
|
go func() </span><span class="cov8" title="1">{
|
|
lockChan <- fileLock.RLock()
|
|
}</span>()
|
|
|
|
<span class="cov8" title="1">select </span>{
|
|
case err := <-lockChan:<span class="cov8" title="1">
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't lock the file for reading",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return err
|
|
}</span>
|
|
<span class="cov8" title="1">return nil</span>
|
|
case <-ctx.Done():<span class="cov0" title="0">
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "File read lock timeout",
|
|
Pairs: map[string]interface{}{"timeout": "30s"},
|
|
})
|
|
return fmt.Errorf("file read lock timeout after 30 seconds")</span>
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file1" style="display: none">package main
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
// BufferPool manages reusable buffers for HTTP operations
|
|
type BufferPool struct {
|
|
pool sync.Pool
|
|
}
|
|
|
|
// NewBufferPool creates a new buffer pool
|
|
func NewBufferPool() *BufferPool <span class="cov8" title="1">{
|
|
return &BufferPool{
|
|
pool: sync.Pool{
|
|
New: func() interface{} </span><span class="cov8" title="1">{
|
|
// Create a buffer with 4KB initial capacity
|
|
return bytes.NewBuffer(make([]byte, 0, 4096))
|
|
}</span>,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Get retrieves a buffer from the pool
|
|
func (bp *BufferPool) Get() *bytes.Buffer <span class="cov8" title="1">{
|
|
buf := bp.pool.Get().(*bytes.Buffer)
|
|
buf.Reset()
|
|
return buf
|
|
}</span>
|
|
|
|
// Put returns a buffer to the pool
|
|
func (bp *BufferPool) Put(buf *bytes.Buffer) <span class="cov8" title="1">{
|
|
// Only return buffers that aren't too large (avoid memory bloat)
|
|
if buf.Cap() > 1024*1024 </span><span class="cov0" title="0">{ // 1MB limit
|
|
return
|
|
}</span>
|
|
<span class="cov8" title="1">buf.Reset()
|
|
bp.pool.Put(buf)</span>
|
|
}
|
|
|
|
// GzipWriterPool manages reusable gzip writers
|
|
type GzipWriterPool struct {
|
|
pool sync.Pool
|
|
}
|
|
|
|
// NewGzipWriterPool creates a new gzip writer pool
|
|
func NewGzipWriterPool() *GzipWriterPool <span class="cov8" title="1">{
|
|
return &GzipWriterPool{
|
|
pool: sync.Pool{
|
|
New: func() interface{} </span><span class="cov0" title="0">{
|
|
// Create a gzip writer with default compression
|
|
return gzip.NewWriter(nil)
|
|
}</span>,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Get retrieves a gzip writer from the pool
|
|
func (gp *GzipWriterPool) Get(w io.Writer) *gzip.Writer <span class="cov0" title="0">{
|
|
gz := gp.pool.Get().(*gzip.Writer)
|
|
gz.Reset(w)
|
|
return gz
|
|
}</span>
|
|
|
|
// Put returns a gzip writer to the pool
|
|
func (gp *GzipWriterPool) Put(gz *gzip.Writer) <span class="cov0" title="0">{
|
|
gz.Reset(nil)
|
|
gp.pool.Put(gz)
|
|
}</span>
|
|
|
|
// GzipReaderPool manages reusable gzip readers
|
|
type GzipReaderPool struct {
|
|
pool sync.Pool
|
|
}
|
|
|
|
// NewGzipReaderPool creates a new gzip reader pool
|
|
func NewGzipReaderPool() *GzipReaderPool <span class="cov8" title="1">{
|
|
return &GzipReaderPool{
|
|
pool: sync.Pool{
|
|
New: func() interface{} </span><span class="cov8" title="1">{
|
|
// We'll reset the reader when getting from pool
|
|
return &gzip.Reader{}
|
|
}</span>,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Get retrieves a gzip reader from the pool
|
|
func (gp *GzipReaderPool) Get(r io.Reader) (*gzip.Reader, error) <span class="cov8" title="1">{
|
|
gr := gp.pool.Get().(*gzip.Reader)
|
|
if err := gr.Reset(r); err != nil </span><span class="cov8" title="1">{
|
|
// If reset fails, create a new reader
|
|
return gzip.NewReader(r)
|
|
}</span>
|
|
<span class="cov8" title="1">return gr, nil</span>
|
|
}
|
|
|
|
// Put returns a gzip reader to the pool
|
|
func (gp *GzipReaderPool) Put(gr *gzip.Reader) <span class="cov8" title="1">{
|
|
gr.Close()
|
|
gp.pool.Put(gr)
|
|
}</span>
|
|
|
|
// Global buffer pools
|
|
var (
|
|
httpBufferPool = NewBufferPool()
|
|
gzipWriterPool = NewGzipWriterPool()
|
|
gzipReaderPool = NewGzipReaderPool()
|
|
)
|
|
|
|
// GetHTTPBuffer gets a buffer from the global pool
|
|
func GetHTTPBuffer() *bytes.Buffer <span class="cov8" title="1">{
|
|
return httpBufferPool.Get()
|
|
}</span>
|
|
|
|
// PutHTTPBuffer returns a buffer to the global pool
|
|
func PutHTTPBuffer(buf *bytes.Buffer) <span class="cov8" title="1">{
|
|
httpBufferPool.Put(buf)
|
|
}</span>
|
|
|
|
// GetGzipWriter gets a gzip writer from the global pool
|
|
func GetGzipWriter(w io.Writer) *gzip.Writer <span class="cov0" title="0">{
|
|
return gzipWriterPool.Get(w)
|
|
}</span>
|
|
|
|
// PutGzipWriter returns a gzip writer to the global pool
|
|
func PutGzipWriter(gz *gzip.Writer) <span class="cov0" title="0">{
|
|
gzipWriterPool.Put(gz)
|
|
}</span>
|
|
|
|
// GetGzipReader gets a gzip reader from the global pool
|
|
func GetGzipReader(r io.Reader) (*gzip.Reader, error) <span class="cov8" title="1">{
|
|
return gzipReaderPool.Get(r)
|
|
}</span>
|
|
|
|
// PutGzipReader returns a gzip reader to the global pool
|
|
func PutGzipReader(gr *gzip.Reader) <span class="cov8" title="1">{
|
|
gzipReaderPool.Put(gr)
|
|
}</pre>
|
|
|
|
<pre class="file" id="file2" style="display: none">package libpack_cache
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"io"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
fiber "github.com/gofiber/fiber/v2"
|
|
"github.com/gookit/goutil/strutil"
|
|
libpack_cache_memory "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
|
|
libpack_cache_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
type CacheConfig struct {
|
|
Logger *libpack_logger.Logger
|
|
Client CacheClient
|
|
Redis struct {
|
|
URL string `json:"url"`
|
|
Password string `json:"password"`
|
|
DB int `json:"db"`
|
|
Enable bool `json:"enable"`
|
|
}
|
|
Memory struct {
|
|
MaxMemorySize int64 `json:"max_memory_size"` // Maximum memory size in bytes
|
|
MaxEntries int64 `json:"max_entries"` // Maximum number of entries
|
|
}
|
|
TTL int `json:"ttl"`
|
|
}
|
|
|
|
type CacheStats struct {
|
|
CachedQueries int64 `json:"cached_queries"`
|
|
CacheHits int64 `json:"cache_hits"`
|
|
CacheMisses int64 `json:"cache_misses"`
|
|
}
|
|
|
|
type CacheClient interface {
|
|
Set(key string, value []byte, ttl time.Duration)
|
|
Get(key string) ([]byte, bool)
|
|
Delete(key string)
|
|
Clear()
|
|
CountQueries() int64
|
|
// Memory usage reporting methods
|
|
GetMemoryUsage() int64 // Returns current memory usage in bytes
|
|
GetMaxMemorySize() int64 // Returns max memory size in bytes
|
|
}
|
|
|
|
var (
|
|
cacheStats *CacheStats
|
|
config *CacheConfig
|
|
)
|
|
|
|
func CalculateHash(c *fiber.Ctx) string <span class="cov8" title="1">{
|
|
return strutil.Md5(c.Body())
|
|
}</span>
|
|
|
|
func EnableCache(cfg *CacheConfig) <span class="cov8" title="1">{
|
|
if cfg.Logger == nil </span><span class="cov0" title="0">{
|
|
cfg.Logger = libpack_logger.New()
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Initializing in-module logger",
|
|
})
|
|
}</span>
|
|
<span class="cov8" title="1">cacheStats = &CacheStats{}
|
|
if ShouldUseRedisCache(cfg) </span><span class="cov8" title="1">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Using Redis cache",
|
|
})
|
|
redisClient, err := libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
|
|
RedisDB: cfg.Redis.DB,
|
|
RedisServer: cfg.Redis.URL,
|
|
RedisPassword: cfg.Redis.Password,
|
|
})
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to create Redis client",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
// Fall back to memory cache
|
|
cfg.Client = libpack_cache_memory.New(time.Duration(cfg.TTL) * time.Second)
|
|
}</span> else<span class="cov8" title="1"> {
|
|
cfg.Client = libpack_cache_redis.NewCacheWrapper(redisClient, cfg.Logger)
|
|
}</span>
|
|
} else<span class="cov0" title="0"> {
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Using in-memory cache",
|
|
Pairs: map[string]interface{}{
|
|
"max_memory_size_bytes": cfg.Memory.MaxMemorySize,
|
|
"max_entries": cfg.Memory.MaxEntries,
|
|
},
|
|
})
|
|
|
|
// Use memory size and entry limits if configured, otherwise use defaults
|
|
if cfg.Memory.MaxMemorySize > 0 || cfg.Memory.MaxEntries > 0 </span><span class="cov0" title="0">{
|
|
maxMemory := cfg.Memory.MaxMemorySize
|
|
if maxMemory <= 0 </span><span class="cov0" title="0">{
|
|
maxMemory = libpack_cache_memory.DefaultMaxMemorySize
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">maxEntries := cfg.Memory.MaxEntries
|
|
if maxEntries <= 0 </span><span class="cov0" title="0">{
|
|
maxEntries = libpack_cache_memory.DefaultMaxCacheSize
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">cfg.Client = libpack_cache_memory.NewWithSize(
|
|
time.Duration(cfg.TTL)*time.Second,
|
|
maxMemory,
|
|
maxEntries,
|
|
)</span>
|
|
} else<span class="cov0" title="0"> {
|
|
// Backward compatibility
|
|
cfg.Client = libpack_cache_memory.New(time.Duration(cfg.TTL) * time.Second)
|
|
}</span>
|
|
}
|
|
<span class="cov8" title="1">config = cfg</span>
|
|
}
|
|
|
|
func CacheLookup(hash string) []byte <span class="cov8" title="1">{
|
|
if !IsCacheInitialized() </span><span class="cov0" title="0">{
|
|
return nil
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">obj, found := config.Client.Get(hash)
|
|
if found </span><span class="cov8" title="1">{
|
|
atomic.AddInt64(&cacheStats.CacheHits, 1)
|
|
// If the cached data is compressed, decompress it
|
|
if len(obj) > 2 && obj[0] == 0x1f && obj[1] == 0x8b </span><span class="cov8" title="1">{
|
|
reader, err := gzip.NewReader(bytes.NewReader(obj))
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
config.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to create gzip reader for cached data",
|
|
Pairs: map[string]interface{}{"error": err.Error(), "hash": hash},
|
|
})
|
|
return nil
|
|
}</span>
|
|
// Ensure reader is always closed, even on error
|
|
<span class="cov8" title="1">defer func() </span><span class="cov8" title="1">{
|
|
if closeErr := reader.Close(); closeErr != nil </span><span class="cov0" title="0">{
|
|
config.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to close gzip reader",
|
|
Pairs: map[string]interface{}{"error": closeErr.Error(), "hash": hash},
|
|
})
|
|
}</span>
|
|
}()
|
|
|
|
<span class="cov8" title="1">decompressed, err := io.ReadAll(reader)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
config.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to decompress cached data",
|
|
Pairs: map[string]interface{}{"error": err.Error(), "hash": hash},
|
|
})
|
|
return nil
|
|
}</span>
|
|
<span class="cov8" title="1">return decompressed</span>
|
|
}
|
|
<span class="cov8" title="1">return obj</span>
|
|
}
|
|
<span class="cov8" title="1">atomic.AddInt64(&cacheStats.CacheMisses, 1)
|
|
return nil</span>
|
|
}
|
|
|
|
func CacheDelete(hash string) <span class="cov8" title="1">{
|
|
if !IsCacheInitialized() </span><span class="cov8" title="1">{
|
|
return
|
|
}</span>
|
|
<span class="cov8" title="1">config.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Deleting data from cache",
|
|
Pairs: map[string]interface{}{"hash": hash},
|
|
})
|
|
// Use atomic operations with validation to prevent inconsistent statistics
|
|
for </span><span class="cov8" title="1">{
|
|
current := atomic.LoadInt64(&cacheStats.CachedQueries)
|
|
if current <= 0 </span><span class="cov8" title="1">{
|
|
break</span> // Don't go below zero
|
|
}
|
|
<span class="cov8" title="1">if atomic.CompareAndSwapInt64(&cacheStats.CachedQueries, current, current-1) </span><span class="cov8" title="1">{
|
|
break</span>
|
|
}
|
|
// Retry if CAS failed due to concurrent modification
|
|
}
|
|
<span class="cov8" title="1">config.Client.Delete(hash)</span>
|
|
}
|
|
|
|
func CacheStore(hash string, data []byte) <span class="cov8" title="1">{
|
|
if !IsCacheInitialized() </span><span class="cov0" title="0">{
|
|
config.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Cache not initialized",
|
|
})
|
|
return
|
|
}</span>
|
|
<span class="cov8" title="1">config.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Storing data in cache",
|
|
Pairs: map[string]interface{}{"hash": hash},
|
|
})
|
|
atomic.AddInt64(&cacheStats.CachedQueries, 1)
|
|
config.Client.Set(hash, data, time.Duration(config.TTL)*time.Second)</span>
|
|
}
|
|
|
|
func CacheStoreWithTTL(hash string, data []byte, ttl time.Duration) <span class="cov8" title="1">{
|
|
if !IsCacheInitialized() </span><span class="cov8" title="1">{
|
|
return
|
|
}</span>
|
|
<span class="cov8" title="1">config.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Storing data in cache with TTL",
|
|
Pairs: map[string]interface{}{"hash": hash, "ttl": ttl},
|
|
})
|
|
atomic.AddInt64(&cacheStats.CachedQueries, 1)
|
|
config.Client.Set(hash, data, ttl)</span>
|
|
}
|
|
|
|
func CacheGetQueries() int64 <span class="cov8" title="1">{
|
|
if !IsCacheInitialized() </span><span class="cov8" title="1">{
|
|
return 0
|
|
}</span>
|
|
<span class="cov8" title="1">config.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Counting cache queries",
|
|
})
|
|
return config.Client.CountQueries()</span>
|
|
}
|
|
|
|
func CacheClear() <span class="cov8" title="1">{
|
|
config.Client.Clear()
|
|
cacheStats = &CacheStats{}
|
|
}</span>
|
|
|
|
func GetCacheStats() *CacheStats <span class="cov8" title="1">{
|
|
if !IsCacheInitialized() </span><span class="cov8" title="1">{
|
|
return &CacheStats{}
|
|
}</span>
|
|
<span class="cov8" title="1">config.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Getting cache stats",
|
|
})
|
|
cacheStats.CachedQueries = CacheGetQueries()
|
|
return cacheStats</span>
|
|
}
|
|
|
|
// GetCacheMemoryUsage returns the current memory usage of the cache in bytes
|
|
func GetCacheMemoryUsage() int64 <span class="cov0" title="0">{
|
|
if !IsCacheInitialized() </span><span class="cov0" title="0">{
|
|
return 0
|
|
}</span>
|
|
<span class="cov0" title="0">return config.Client.GetMemoryUsage()</span>
|
|
}
|
|
|
|
// GetCacheMaxMemorySize returns the maximum memory size allowed for the cache in bytes
|
|
func GetCacheMaxMemorySize() int64 <span class="cov0" title="0">{
|
|
if !IsCacheInitialized() </span><span class="cov0" title="0">{
|
|
return 0
|
|
}</span>
|
|
<span class="cov0" title="0">return config.Client.GetMaxMemorySize()</span>
|
|
}
|
|
|
|
func ShouldUseRedisCache(cfg *CacheConfig) bool <span class="cov8" title="1">{
|
|
return cfg.Redis.Enable
|
|
}</span>
|
|
|
|
func IsCacheInitialized() bool <span class="cov8" title="1">{
|
|
return config != nil && config.Client != nil
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file3" style="display: none">package libpack_cache_memory
|
|
|
|
import (
|
|
"bytes"
|
|
"sync"
|
|
)
|
|
|
|
var bufferPool = sync.Pool{
|
|
New: func() interface{} <span class="cov0" title="0">{
|
|
return bytes.NewBuffer(make([]byte, 0, 4096))
|
|
}</span>,
|
|
}
|
|
|
|
// GetBuffer gets a buffer from the pool
|
|
func GetBuffer() *bytes.Buffer <span class="cov0" title="0">{
|
|
buf := bufferPool.Get().(*bytes.Buffer)
|
|
buf.Reset()
|
|
return buf
|
|
}</span>
|
|
|
|
// PutBuffer returns a buffer to the pool
|
|
func PutBuffer(buf *bytes.Buffer) <span class="cov0" title="0">{
|
|
if buf.Cap() > 1024*1024 </span><span class="cov0" title="0">{ // Don't pool buffers larger than 1MB
|
|
return
|
|
}</span>
|
|
<span class="cov0" title="0">buf.Reset()
|
|
bufferPool.Put(buf)</span>
|
|
}</pre>
|
|
|
|
<pre class="file" id="file4" style="display: none">package libpack_cache_memory
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"container/list"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// LRUMemoryCache is an efficient LRU-based memory cache implementation
|
|
type LRUMemoryCache struct {
|
|
maxMemorySize int64
|
|
maxEntries int64
|
|
currentMemory int64
|
|
currentCount int64
|
|
|
|
mu sync.RWMutex
|
|
entries map[string]*lruEntry
|
|
evictList *list.List
|
|
|
|
gzipWriterPool *sync.Pool
|
|
gzipReaderPool *sync.Pool
|
|
cancel func()
|
|
}
|
|
|
|
type lruEntry struct {
|
|
key string
|
|
value []byte
|
|
compressed bool
|
|
size int64
|
|
expiresAt time.Time
|
|
element *list.Element
|
|
}
|
|
|
|
// NewLRUMemoryCache creates a new LRU memory cache
|
|
func NewLRUMemoryCache(maxMemorySize, maxEntries int64) *LRUMemoryCache <span class="cov0" title="0">{
|
|
return &LRUMemoryCache{
|
|
maxMemorySize: maxMemorySize,
|
|
maxEntries: maxEntries,
|
|
entries: make(map[string]*lruEntry),
|
|
evictList: list.New(),
|
|
gzipWriterPool: &sync.Pool{
|
|
New: func() interface{} </span><span class="cov0" title="0">{
|
|
return gzip.NewWriter(nil)
|
|
}</span>,
|
|
},
|
|
gzipReaderPool: &sync.Pool{
|
|
New: func() interface{} <span class="cov0" title="0">{
|
|
return &gzip.Reader{}
|
|
}</span>,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Set adds or updates an entry in the cache
|
|
func (c *LRUMemoryCache) Set(key string, value []byte, ttl time.Duration) <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Calculate expiry time
|
|
expiresAt := time.Now().Add(ttl)
|
|
|
|
// Check if we should compress
|
|
compressed := false
|
|
finalValue := value
|
|
if len(value) > 1024 </span><span class="cov0" title="0">{ // Compress if larger than 1KB
|
|
if compressedData, err := c.compress(value); err == nil && len(compressedData) < len(value) </span><span class="cov0" title="0">{
|
|
compressed = true
|
|
finalValue = compressedData
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">entrySize := int64(len(key) + len(finalValue) + 64) // 64 bytes overhead estimate
|
|
|
|
// Check if key exists
|
|
if existing, exists := c.entries[key]; exists </span><span class="cov0" title="0">{
|
|
// Update existing entry
|
|
c.evictList.MoveToFront(existing.element)
|
|
atomic.AddInt64(&c.currentMemory, -existing.size)
|
|
atomic.AddInt64(&c.currentMemory, entrySize)
|
|
|
|
existing.value = finalValue
|
|
existing.compressed = compressed
|
|
existing.size = entrySize
|
|
existing.expiresAt = expiresAt
|
|
|
|
c.evictIfNeeded()
|
|
return
|
|
}</span>
|
|
|
|
// Create new entry
|
|
<span class="cov0" title="0">entry := &lruEntry{
|
|
key: key,
|
|
value: finalValue,
|
|
compressed: compressed,
|
|
size: entrySize,
|
|
expiresAt: expiresAt,
|
|
}
|
|
|
|
element := c.evictList.PushFront(entry)
|
|
entry.element = element
|
|
c.entries[key] = entry
|
|
|
|
atomic.AddInt64(&c.currentMemory, entrySize)
|
|
atomic.AddInt64(&c.currentCount, 1)
|
|
|
|
c.evictIfNeeded()</span>
|
|
}
|
|
|
|
// Get retrieves a value from the cache
|
|
func (c *LRUMemoryCache) Get(key string) ([]byte, bool) <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
entry, exists := c.entries[key]
|
|
if !exists </span><span class="cov0" title="0">{
|
|
return nil, false
|
|
}</span>
|
|
|
|
// Check if expired
|
|
<span class="cov0" title="0">if time.Now().After(entry.expiresAt) </span><span class="cov0" title="0">{
|
|
c.removeEntry(entry)
|
|
return nil, false
|
|
}</span>
|
|
|
|
// Move to front (most recently used)
|
|
<span class="cov0" title="0">c.evictList.MoveToFront(entry.element)
|
|
|
|
// Decompress if needed
|
|
if entry.compressed </span><span class="cov0" title="0">{
|
|
if decompressed, err := c.decompress(entry.value); err == nil </span><span class="cov0" title="0">{
|
|
return decompressed, true
|
|
}</span>
|
|
// If decompression fails, remove the entry
|
|
<span class="cov0" title="0">c.removeEntry(entry)
|
|
return nil, false</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">return entry.value, true</span>
|
|
}
|
|
|
|
// Delete removes an entry from the cache
|
|
func (c *LRUMemoryCache) Delete(key string) <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if entry, exists := c.entries[key]; exists </span><span class="cov0" title="0">{
|
|
c.removeEntry(entry)
|
|
}</span>
|
|
}
|
|
|
|
// Clear removes all entries
|
|
func (c *LRUMemoryCache) Clear() <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
c.entries = make(map[string]*lruEntry)
|
|
c.evictList = list.New()
|
|
atomic.StoreInt64(&c.currentMemory, 0)
|
|
atomic.StoreInt64(&c.currentCount, 0)
|
|
}</span>
|
|
|
|
// evictIfNeeded removes entries when limits are exceeded
|
|
func (c *LRUMemoryCache) evictIfNeeded() <span class="cov0" title="0">{
|
|
// Evict based on entry count
|
|
for atomic.LoadInt64(&c.currentCount) > c.maxEntries && c.evictList.Len() > 0 </span><span class="cov0" title="0">{
|
|
c.evictOldest()
|
|
}</span>
|
|
|
|
// Evict based on memory
|
|
<span class="cov0" title="0">for atomic.LoadInt64(&c.currentMemory) > c.maxMemorySize && c.evictList.Len() > 0 </span><span class="cov0" title="0">{
|
|
c.evictOldest()
|
|
}</span>
|
|
}
|
|
|
|
// evictOldest removes the least recently used entry
|
|
func (c *LRUMemoryCache) evictOldest() <span class="cov0" title="0">{
|
|
element := c.evictList.Back()
|
|
if element == nil </span><span class="cov0" title="0">{
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">entry := element.Value.(*lruEntry)
|
|
c.removeEntry(entry)</span>
|
|
}
|
|
|
|
// removeEntry removes an entry from all data structures
|
|
func (c *LRUMemoryCache) removeEntry(entry *lruEntry) <span class="cov0" title="0">{
|
|
c.evictList.Remove(entry.element)
|
|
delete(c.entries, entry.key)
|
|
atomic.AddInt64(&c.currentMemory, -entry.size)
|
|
atomic.AddInt64(&c.currentCount, -1)
|
|
}</span>
|
|
|
|
// CleanExpiredEntries removes all expired entries
|
|
func (c *LRUMemoryCache) CleanExpiredEntries() <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
for element := c.evictList.Back(); element != nil; </span><span class="cov0" title="0">{
|
|
entry := element.Value.(*lruEntry)
|
|
|
|
if now.After(entry.expiresAt) </span><span class="cov0" title="0">{
|
|
next := element.Prev()
|
|
c.removeEntry(entry)
|
|
element = next
|
|
}</span> else<span class="cov0" title="0"> {
|
|
element = element.Prev()
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
// compress compresses data using gzip
|
|
func (c *LRUMemoryCache) compress(data []byte) ([]byte, error) <span class="cov0" title="0">{
|
|
buf := GetBuffer()
|
|
defer PutBuffer(buf)
|
|
|
|
gz := c.gzipWriterPool.Get().(*gzip.Writer)
|
|
gz.Reset(buf)
|
|
defer c.gzipWriterPool.Put(gz)
|
|
|
|
if _, err := gz.Write(data); err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
<span class="cov0" title="0">if err := gz.Close(); err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">compressed := make([]byte, buf.Len())
|
|
copy(compressed, buf.Bytes())
|
|
return compressed, nil</span>
|
|
}
|
|
|
|
// decompress decompresses gzip data
|
|
func (c *LRUMemoryCache) decompress(data []byte) ([]byte, error) <span class="cov0" title="0">{
|
|
buf := GetBuffer()
|
|
defer PutBuffer(buf)
|
|
|
|
buf.Write(data)
|
|
|
|
gr := c.gzipReaderPool.Get().(*gzip.Reader)
|
|
defer c.gzipReaderPool.Put(gr)
|
|
|
|
if err := gr.Reset(buf); err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">result := GetBuffer()
|
|
defer PutBuffer(result)
|
|
|
|
if _, err := result.ReadFrom(gr); err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">decompressed := make([]byte, result.Len())
|
|
copy(decompressed, result.Bytes())
|
|
return decompressed, nil</span>
|
|
}
|
|
|
|
// GetStats returns cache statistics
|
|
func (c *LRUMemoryCache) GetStats() map[string]interface{} <span class="cov0" title="0">{
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"entries": atomic.LoadInt64(&c.currentCount),
|
|
"memory_bytes": atomic.LoadInt64(&c.currentMemory),
|
|
"max_entries": c.maxEntries,
|
|
"max_memory": c.maxMemorySize,
|
|
"fill_percent": float64(atomic.LoadInt64(&c.currentMemory)) / float64(c.maxMemorySize) * 100,
|
|
}
|
|
}</span>
|
|
|
|
// GetMemoryUsage returns current memory usage in bytes
|
|
func (c *LRUMemoryCache) GetMemoryUsage() int64 <span class="cov0" title="0">{
|
|
return atomic.LoadInt64(&c.currentMemory)
|
|
}</span>
|
|
|
|
// GetMaxMemorySize returns the maximum memory size
|
|
func (c *LRUMemoryCache) GetMaxMemorySize() int64 <span class="cov0" title="0">{
|
|
return c.maxMemorySize
|
|
}</pre>
|
|
|
|
<pre class="file" id="file5" style="display: none">package libpack_cache_memory
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"io"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// CompressionThreshold is the minimum size in bytes before a value is compressed
|
|
const CompressionThreshold = 1024 // 1KB
|
|
|
|
// DefaultMaxMemorySize is the default maximum memory size in bytes (100MB)
|
|
const DefaultMaxMemorySize = 100 * 1024 * 1024
|
|
|
|
// DefaultMaxCacheSize is the default maximum number of entries in the cache
|
|
// This is used for backward compatibility
|
|
const DefaultMaxCacheSize = 10000
|
|
|
|
// approxEntryOverhead is the estimated overhead per cache entry in bytes
|
|
// This accounts for the CacheEntry struct overhead, map entry, and synchronization
|
|
const approxEntryOverhead = 64
|
|
|
|
type CacheEntry struct {
|
|
ExpiresAt time.Time
|
|
Value []byte
|
|
Compressed bool
|
|
MemorySize int64 // Estimated memory usage of this entry in bytes
|
|
}
|
|
|
|
type Cache struct {
|
|
compressPool sync.Pool
|
|
decompressPool sync.Pool
|
|
entries sync.Map
|
|
globalTTL time.Duration
|
|
entryCount int64
|
|
memoryUsage int64 // Total memory usage in bytes
|
|
maxMemorySize int64 // Maximum memory usage in bytes
|
|
maxCacheSize int64 // Maximum number of entries (for backward compatibility)
|
|
// Add context for graceful shutdown
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
sync.RWMutex
|
|
}
|
|
|
|
func New(globalTTL time.Duration) *Cache <span class="cov8" title="1">{
|
|
return NewWithSize(globalTTL, DefaultMaxMemorySize, DefaultMaxCacheSize)
|
|
}</span>
|
|
|
|
// NewWithSize creates a new cache with the specified memory size limit and entry count limit
|
|
func NewWithSize(globalTTL time.Duration, maxMemorySize int64, maxCacheSize int64) *Cache <span class="cov8" title="1">{
|
|
// Create context for graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
cache := &Cache{
|
|
globalTTL: globalTTL,
|
|
maxMemorySize: maxMemorySize,
|
|
maxCacheSize: maxCacheSize,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
compressPool: sync.Pool{
|
|
New: func() interface{} </span><span class="cov8" title="1">{
|
|
return gzip.NewWriter(nil)
|
|
}</span>,
|
|
},
|
|
decompressPool: sync.Pool{
|
|
New: func() interface{} <span class="cov8" title="1">{
|
|
r, _ := gzip.NewReader(bytes.NewReader([]byte{}))
|
|
return r
|
|
}</span>,
|
|
},
|
|
}
|
|
|
|
// Start cleanup routine with context cancellation
|
|
<span class="cov8" title="1">go cache.cleanupRoutine(globalTTL)
|
|
return cache</span>
|
|
}
|
|
|
|
func (c *Cache) cleanupRoutine(globalTTL time.Duration) <span class="cov8" title="1">{
|
|
// Clean up more frequently when the cache is large
|
|
ticker := time.NewTicker(globalTTL / 4)
|
|
defer ticker.Stop()
|
|
|
|
for </span><span class="cov8" title="1">{
|
|
select </span>{
|
|
case <-c.ctx.Done():<span class="cov0" title="0">
|
|
// Context cancelled, exit gracefully
|
|
return</span>
|
|
case <-ticker.C:<span class="cov8" title="1">
|
|
c.CleanExpiredEntries()</span>
|
|
|
|
// Note: Removed aggressive GC trigger that was causing performance issues
|
|
// The Go runtime GC is already optimized and will run when needed
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown gracefully stops the cache cleanup routine
|
|
func (c *Cache) Shutdown() <span class="cov0" title="0">{
|
|
if c.cancel != nil </span><span class="cov0" title="0">{
|
|
c.cancel()
|
|
}</span>
|
|
}
|
|
|
|
func (c *Cache) Set(key string, value []byte, ttl time.Duration) <span class="cov8" title="1">{
|
|
// Calculate the memory size of this entry
|
|
entrySize := int64(len(key) + len(value) + approxEntryOverhead)
|
|
|
|
// Check if we need to evict entries based on memory or count limits
|
|
currentMemory := atomic.LoadInt64(&c.memoryUsage)
|
|
if currentMemory+entrySize > c.maxMemorySize </span><span class="cov8" title="1">{
|
|
// Need to evict based on memory
|
|
memoryToFree := (currentMemory + entrySize) - c.maxMemorySize + (c.maxMemorySize / 10)
|
|
c.evictToFreeMemory(memoryToFree)
|
|
}</span> else<span class="cov8" title="1"> if atomic.LoadInt64(&c.entryCount) >= c.maxCacheSize </span><span class="cov8" title="1">{
|
|
// Fall back to count-based eviction for backward compatibility
|
|
c.evictOldest(int(c.maxCacheSize / 10)) // Evict 10% of entries
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">expiresAt := time.Now().Add(ttl)
|
|
|
|
// Only compress if the value is larger than the threshold
|
|
var entry CacheEntry
|
|
if len(value) > CompressionThreshold </span><span class="cov8" title="1">{
|
|
compressedValue, err := c.compress(value)
|
|
if err == nil && len(compressedValue) < len(value) </span><span class="cov8" title="1">{
|
|
entry = CacheEntry{
|
|
Value: compressedValue,
|
|
ExpiresAt: expiresAt,
|
|
Compressed: true,
|
|
}
|
|
}</span> else<span class="cov0" title="0"> {
|
|
// If compression failed or didn't reduce size, store uncompressed
|
|
entry = CacheEntry{
|
|
Value: value,
|
|
ExpiresAt: expiresAt,
|
|
Compressed: false,
|
|
}
|
|
}</span>
|
|
} else<span class="cov8" title="1"> {
|
|
entry = CacheEntry{
|
|
Value: value,
|
|
ExpiresAt: expiresAt,
|
|
Compressed: false,
|
|
}
|
|
}</span>
|
|
|
|
// Update the entry memory size based on compression status
|
|
<span class="cov8" title="1">if entry.Compressed </span><span class="cov8" title="1">{
|
|
entry.MemorySize = int64(len(key) + len(entry.Value) + approxEntryOverhead)
|
|
}</span> else<span class="cov8" title="1"> {
|
|
entry.MemorySize = int64(len(key) + len(entry.Value) + approxEntryOverhead)
|
|
}</span>
|
|
|
|
// Check if this is a new entry or an update
|
|
<span class="cov8" title="1">oldEntry, exists := c.entries.Load(key)
|
|
if exists </span><span class="cov0" title="0">{
|
|
// Update memory usage: subtract old entry size, add new entry size
|
|
oldCacheEntry := oldEntry.(CacheEntry)
|
|
atomic.AddInt64(&c.memoryUsage, -oldCacheEntry.MemorySize)
|
|
}</span> else<span class="cov8" title="1"> {
|
|
// New entry
|
|
atomic.AddInt64(&c.entryCount, 1)
|
|
}</span>
|
|
|
|
// Add new entry's memory size to total
|
|
<span class="cov8" title="1">atomic.AddInt64(&c.memoryUsage, entry.MemorySize)
|
|
c.entries.Store(key, entry)</span>
|
|
}
|
|
|
|
func (c *Cache) Get(key string) ([]byte, bool) <span class="cov8" title="1">{
|
|
entry, ok := c.entries.Load(key)
|
|
if !ok </span><span class="cov8" title="1">{
|
|
return nil, false
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">cacheEntry := entry.(CacheEntry)
|
|
if cacheEntry.ExpiresAt.Before(time.Now()) </span><span class="cov8" title="1">{
|
|
c.entries.Delete(key)
|
|
atomic.AddInt64(&c.entryCount, -1)
|
|
atomic.AddInt64(&c.memoryUsage, -cacheEntry.MemorySize)
|
|
return nil, false
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">if cacheEntry.Compressed </span><span class="cov8" title="1">{
|
|
value, err := c.decompress(cacheEntry.Value)
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
return nil, false
|
|
}</span>
|
|
<span class="cov8" title="1">return value, true</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">return cacheEntry.Value, true</span>
|
|
}
|
|
|
|
func (c *Cache) Delete(key string) <span class="cov8" title="1">{
|
|
if entry, exists := c.entries.LoadAndDelete(key); exists </span><span class="cov8" title="1">{
|
|
cacheEntry := entry.(CacheEntry)
|
|
atomic.AddInt64(&c.entryCount, -1)
|
|
atomic.AddInt64(&c.memoryUsage, -cacheEntry.MemorySize)
|
|
}</span>
|
|
}
|
|
|
|
func (c *Cache) Clear() <span class="cov8" title="1">{
|
|
c.entries.Range(func(key, value interface{}) bool </span><span class="cov8" title="1">{
|
|
c.entries.Delete(key)
|
|
return true
|
|
}</span>)
|
|
<span class="cov8" title="1">atomic.StoreInt64(&c.entryCount, 0)
|
|
atomic.StoreInt64(&c.memoryUsage, 0)</span>
|
|
}
|
|
|
|
func (c *Cache) CountQueries() int64 <span class="cov8" title="1">{
|
|
return atomic.LoadInt64(&c.entryCount)
|
|
}</span>
|
|
|
|
func (c *Cache) compress(data []byte) ([]byte, error) <span class="cov8" title="1">{
|
|
var buf bytes.Buffer
|
|
w := c.compressPool.Get().(*gzip.Writer)
|
|
defer c.compressPool.Put(w)
|
|
|
|
w.Reset(&buf)
|
|
if _, err := w.Write(data); err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
<span class="cov8" title="1">if err := w.Close(); err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
<span class="cov8" title="1">return buf.Bytes(), nil</span>
|
|
}
|
|
|
|
func (c *Cache) decompress(data []byte) ([]byte, error) <span class="cov8" title="1">{
|
|
r, ok := c.decompressPool.Get().(*gzip.Reader)
|
|
defer c.decompressPool.Put(r)
|
|
|
|
if !ok || r == nil </span><span class="cov8" title="1">{
|
|
var err error
|
|
r, err = gzip.NewReader(bytes.NewReader(data))
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
return nil, err
|
|
}</span>
|
|
} else<span class="cov0" title="0"> {
|
|
if err := r.Reset(bytes.NewReader(data)); err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">defer func() </span><span class="cov8" title="1">{
|
|
_ = r.Close() // Ignore error in defer cleanup
|
|
}</span>()
|
|
<span class="cov8" title="1">return io.ReadAll(r)</span>
|
|
}
|
|
|
|
func (c *Cache) CleanExpiredEntries() <span class="cov8" title="1">{
|
|
now := time.Now()
|
|
c.entries.Range(func(key, value interface{}) bool </span><span class="cov8" title="1">{
|
|
entry := value.(CacheEntry)
|
|
if entry.ExpiresAt.Before(now) </span><span class="cov8" title="1">{
|
|
if _, exists := c.entries.LoadAndDelete(key); exists </span><span class="cov8" title="1">{
|
|
atomic.AddInt64(&c.entryCount, -1)
|
|
atomic.AddInt64(&c.memoryUsage, -entry.MemorySize)
|
|
}</span>
|
|
}
|
|
<span class="cov8" title="1">return true</span>
|
|
})
|
|
}
|
|
|
|
// evictOldest removes the oldest n entries from the cache
|
|
func (c *Cache) evictOldest(n int) <span class="cov8" title="1">{
|
|
type keyExpiry struct {
|
|
key string
|
|
expiresAt time.Time
|
|
}
|
|
|
|
// Collect all entries with their expiry times
|
|
entries := make([]keyExpiry, 0, n*2)
|
|
c.entries.Range(func(k, v interface{}) bool </span><span class="cov8" title="1">{
|
|
key := k.(string)
|
|
entry := v.(CacheEntry)
|
|
entries = append(entries, keyExpiry{key, entry.ExpiresAt})
|
|
return len(entries) < cap(entries)
|
|
}</span>)
|
|
|
|
// Sort by expiry time (oldest first)
|
|
// Using a simple selection sort since we only need to find the n oldest
|
|
<span class="cov8" title="1">for i := 0; i < n && i < len(entries); i++ </span><span class="cov8" title="1">{
|
|
oldest := i
|
|
for j := i + 1; j < len(entries); j++ </span><span class="cov8" title="1">{
|
|
if entries[j].expiresAt.Before(entries[oldest].expiresAt) </span><span class="cov8" title="1">{
|
|
oldest = j
|
|
}</span>
|
|
}
|
|
// Swap
|
|
<span class="cov8" title="1">if oldest != i </span><span class="cov8" title="1">{
|
|
entries[i], entries[oldest] = entries[oldest], entries[i]
|
|
}</span>
|
|
|
|
// Delete this entry
|
|
<span class="cov8" title="1">if entry, exists := c.entries.LoadAndDelete(entries[i].key); exists </span><span class="cov8" title="1">{
|
|
cacheEntry := entry.(CacheEntry)
|
|
atomic.AddInt64(&c.entryCount, -1)
|
|
atomic.AddInt64(&c.memoryUsage, -cacheEntry.MemorySize)
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
// evictToFreeMemory removes entries until the specified amount of memory is freed
|
|
func (c *Cache) evictToFreeMemory(bytesToFree int64) <span class="cov8" title="1">{
|
|
type keyMemorySize struct {
|
|
key string
|
|
memorySize int64
|
|
expiresAt time.Time
|
|
}
|
|
|
|
// Collect entries to consider for eviction
|
|
entries := make([]keyMemorySize, 0, int(c.maxCacheSize/5))
|
|
c.entries.Range(func(k, v interface{}) bool </span><span class="cov8" title="1">{
|
|
key := k.(string)
|
|
entry := v.(CacheEntry)
|
|
entries = append(entries, keyMemorySize{key, entry.MemorySize, entry.ExpiresAt})
|
|
return len(entries) < cap(entries)
|
|
}</span>)
|
|
|
|
// Sort entries by expiry time (oldest first)
|
|
// Simple selection sort since we only need to find the oldest entries
|
|
<span class="cov8" title="1">var freedBytes int64
|
|
for i := 0; i < len(entries) && freedBytes < bytesToFree; i++ </span><span class="cov8" title="1">{
|
|
oldest := i
|
|
for j := i + 1; j < len(entries); j++ </span><span class="cov8" title="1">{
|
|
if entries[j].expiresAt.Before(entries[oldest].expiresAt) </span><span class="cov8" title="1">{
|
|
oldest = j
|
|
}</span>
|
|
}
|
|
// Swap
|
|
<span class="cov8" title="1">if oldest != i </span><span class="cov8" title="1">{
|
|
entries[i], entries[oldest] = entries[oldest], entries[i]
|
|
}</span>
|
|
|
|
// Delete this entry
|
|
<span class="cov8" title="1">if entry, exists := c.entries.LoadAndDelete(entries[i].key); exists </span><span class="cov8" title="1">{
|
|
cacheEntry := entry.(CacheEntry)
|
|
atomic.AddInt64(&c.entryCount, -1)
|
|
atomic.AddInt64(&c.memoryUsage, -cacheEntry.MemorySize)
|
|
freedBytes += cacheEntry.MemorySize
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
// GetMemoryUsage returns the current memory usage of the cache in bytes
|
|
func (c *Cache) GetMemoryUsage() int64 <span class="cov8" title="1">{
|
|
return atomic.LoadInt64(&c.memoryUsage)
|
|
}</span>
|
|
|
|
// GetMaxMemorySize returns the maximum memory size allowed for the cache in bytes
|
|
func (c *Cache) GetMaxMemorySize() int64 <span class="cov0" title="0">{
|
|
return c.maxMemorySize
|
|
}</span>
|
|
|
|
// SetMaxMemorySize updates the maximum memory size allowed for the cache
|
|
func (c *Cache) SetMaxMemorySize(maxBytes int64) <span class="cov8" title="1">{
|
|
c.maxMemorySize = maxBytes
|
|
|
|
// Check if we need to evict entries due to the new limit
|
|
currentMemory := atomic.LoadInt64(&c.memoryUsage)
|
|
if currentMemory > maxBytes </span><span class="cov8" title="1">{
|
|
memoryToFree := currentMemory - maxBytes + (maxBytes / 10)
|
|
c.evictToFreeMemory(memoryToFree)
|
|
}</span>
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file6" style="display: none">package libpack_cache_redis
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
redis "github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type RedisConfig struct {
|
|
ctx context.Context
|
|
client *redis.Client
|
|
builderPool *sync.Pool
|
|
prefix string
|
|
}
|
|
|
|
func (c *RedisConfig) prependKeyName(key string) string <span class="cov8" title="1">{
|
|
builder := c.builderPool.Get().(*strings.Builder)
|
|
defer c.builderPool.Put(builder)
|
|
builder.Reset()
|
|
builder.WriteString(c.prefix)
|
|
builder.WriteString(key)
|
|
return builder.String()
|
|
}</span>
|
|
|
|
type RedisClientConfig struct {
|
|
RedisServer string
|
|
RedisPassword string
|
|
Prefix string
|
|
RedisDB int
|
|
}
|
|
|
|
func New(redisClientConfig *RedisClientConfig) (*RedisConfig, error) <span class="cov8" title="1">{
|
|
c := &RedisConfig{
|
|
client: redis.NewClient(&redis.Options{
|
|
Addr: redisClientConfig.RedisServer,
|
|
Password: redisClientConfig.RedisPassword,
|
|
DB: redisClientConfig.RedisDB,
|
|
}),
|
|
ctx: context.Background(),
|
|
prefix: redisClientConfig.Prefix,
|
|
builderPool: &sync.Pool{
|
|
New: func() interface{} </span><span class="cov8" title="1">{
|
|
return &strings.Builder{}
|
|
}</span>,
|
|
},
|
|
}
|
|
|
|
<span class="cov8" title="1">_, err := c.client.Ping(c.ctx).Result()
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return nil, err
|
|
}</span>
|
|
<span class="cov8" title="1">return c, nil</span>
|
|
}
|
|
|
|
func (c *RedisConfig) Set(key string, value []byte, ttl time.Duration) error <span class="cov8" title="1">{
|
|
return c.client.Set(c.ctx, c.prependKeyName(key), value, ttl).Err()
|
|
}</span>
|
|
|
|
func (c *RedisConfig) Get(key string) ([]byte, bool, error) <span class="cov8" title="1">{
|
|
val, err := c.client.Get(c.ctx, c.prependKeyName(key)).Result()
|
|
if err == redis.Nil </span><span class="cov8" title="1">{
|
|
return nil, false, nil
|
|
}</span>
|
|
<span class="cov8" title="1">if err != nil </span><span class="cov0" title="0">{
|
|
return nil, false, err
|
|
}</span>
|
|
<span class="cov8" title="1">return []byte(val), true, nil</span>
|
|
}
|
|
|
|
func (c *RedisConfig) Delete(key string) error <span class="cov8" title="1">{
|
|
return c.client.Del(c.ctx, c.prependKeyName(key)).Err()
|
|
}</span>
|
|
|
|
func (c *RedisConfig) Clear() error <span class="cov8" title="1">{
|
|
return c.client.FlushDB(c.ctx).Err()
|
|
}</span>
|
|
|
|
func (c *RedisConfig) CountQueries() (int64, error) <span class="cov8" title="1">{
|
|
keys, err := c.client.Keys(c.ctx, c.prependKeyName("*")).Result()
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return 0, err
|
|
}</span>
|
|
<span class="cov8" title="1">return int64(len(keys)), nil</span>
|
|
}
|
|
|
|
func (c *RedisConfig) CountQueriesWithPattern(pattern string) (int, error) <span class="cov8" title="1">{
|
|
keys, err := c.client.Keys(c.ctx, c.prependKeyName(pattern)).Result()
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return 0, err
|
|
}</span>
|
|
<span class="cov8" title="1">return len(keys), nil</span>
|
|
}
|
|
|
|
// GetMemoryUsage returns an approximation of memory usage for Redis
|
|
// For Redis, this is not as accurate as the memory cache implementation
|
|
// as actual memory is managed by Redis server
|
|
func (c *RedisConfig) GetMemoryUsage() int64 <span class="cov0" title="0">{
|
|
// We could attempt to get memory usage from Redis info
|
|
// but for now, we'll just return 0 since Redis manages its own memory
|
|
// and this information would require parsing the INFO command output
|
|
_, err := c.client.Info(c.ctx, "memory").Result()
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return 0
|
|
}</span>
|
|
|
|
// Just return 0 as a placeholder since Redis manages its own memory
|
|
// In a production environment, you could parse the Redis INFO command result
|
|
// to extract actual "used_memory" value
|
|
<span class="cov0" title="0">return 0</span>
|
|
}
|
|
|
|
// GetMaxMemorySize returns the configured max memory for Redis
|
|
// In Redis, this would be the 'maxmemory' configuration value
|
|
func (c *RedisConfig) GetMaxMemorySize() int64 <span class="cov0" title="0">{
|
|
// Return a default value as Redis manages its own memory limits
|
|
// In a production environment, you could get this from Redis config
|
|
return 0
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file7" style="display: none">package libpack_cache_redis
|
|
|
|
import (
|
|
"time"
|
|
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
// CacheWrapper wraps RedisConfig to implement the CacheClient interface
|
|
// without returning errors, for backward compatibility
|
|
type CacheWrapper struct {
|
|
redis *RedisConfig
|
|
logger *libpack_logger.Logger
|
|
}
|
|
|
|
// NewCacheWrapper creates a new cache wrapper
|
|
func NewCacheWrapper(config *RedisConfig, logger *libpack_logger.Logger) *CacheWrapper <span class="cov0" title="0">{
|
|
if logger == nil </span><span class="cov0" title="0">{
|
|
logger = &libpack_logger.Logger{}
|
|
}</span>
|
|
<span class="cov0" title="0">return &CacheWrapper{
|
|
redis: config,
|
|
logger: logger,
|
|
}</span>
|
|
}
|
|
|
|
// Set stores a value with the given TTL
|
|
func (w *CacheWrapper) Set(key string, value []byte, ttl time.Duration) <span class="cov0" title="0">{
|
|
if err := w.redis.Set(key, value, ttl); err != nil </span><span class="cov0" title="0">{
|
|
w.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Redis set error",
|
|
Pairs: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"key": key,
|
|
},
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
// Get retrieves a value
|
|
func (w *CacheWrapper) Get(key string) ([]byte, bool) <span class="cov0" title="0">{
|
|
value, found, err := w.redis.Get(key)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
w.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Redis get error",
|
|
Pairs: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"key": key,
|
|
},
|
|
})
|
|
return nil, false
|
|
}</span>
|
|
<span class="cov0" title="0">return value, found</span>
|
|
}
|
|
|
|
// Delete removes a key
|
|
func (w *CacheWrapper) Delete(key string) <span class="cov0" title="0">{
|
|
if err := w.redis.Delete(key); err != nil </span><span class="cov0" title="0">{
|
|
w.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Redis delete error",
|
|
Pairs: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"key": key,
|
|
},
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
// Clear removes all keys
|
|
func (w *CacheWrapper) Clear() <span class="cov0" title="0">{
|
|
if err := w.redis.Clear(); err != nil </span><span class="cov0" title="0">{
|
|
w.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Redis clear error",
|
|
Pairs: map[string]interface{}{
|
|
"error": err.Error(),
|
|
},
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
// CountQueries returns the number of queries
|
|
func (w *CacheWrapper) CountQueries() int64 <span class="cov0" title="0">{
|
|
count, err := w.redis.CountQueries()
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
w.logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Redis count queries error",
|
|
Pairs: map[string]interface{}{
|
|
"error": err.Error(),
|
|
},
|
|
})
|
|
return 0
|
|
}</span>
|
|
<span class="cov0" title="0">return count</span>
|
|
}
|
|
|
|
// GetMemoryUsage returns 0 for Redis (not applicable)
|
|
func (w *CacheWrapper) GetMemoryUsage() int64 <span class="cov0" title="0">{
|
|
return 0
|
|
}</span>
|
|
|
|
// GetMaxMemorySize returns 0 for Redis (not applicable)
|
|
func (w *CacheWrapper) GetMaxMemorySize() int64 <span class="cov0" title="0">{
|
|
return 0
|
|
}</pre>
|
|
|
|
<pre class="file" id="file8" style="display: none">package main
|
|
|
|
import (
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
|
|
)
|
|
|
|
// CircuitBreakerMetrics manages circuit breaker metrics without recreating gauges
|
|
type CircuitBreakerMetrics struct {
|
|
stateValue atomic.Value // stores float64
|
|
stateGauge *metrics.Gauge
|
|
failCounters map[string]*metrics.Counter
|
|
}
|
|
|
|
// NewCircuitBreakerMetrics creates a new circuit breaker metrics manager
|
|
func NewCircuitBreakerMetrics(monitoring *libpack_monitoring.MetricsSetup) *CircuitBreakerMetrics <span class="cov8" title="1">{
|
|
cbm := &CircuitBreakerMetrics{
|
|
failCounters: make(map[string]*metrics.Counter),
|
|
}
|
|
|
|
// Initialize state value
|
|
cbm.stateValue.Store(float64(0))
|
|
|
|
// Create gauge with callback that reads the atomic value
|
|
cbm.stateGauge = monitoring.RegisterMetricsGauge(
|
|
libpack_monitoring.MetricsCircuitState,
|
|
nil,
|
|
0, // Initial value doesn't matter as callback will be used
|
|
)
|
|
|
|
// Override the gauge callback to read from atomic value
|
|
cbm.stateGauge = monitoring.RegisterMetricsGauge(
|
|
libpack_monitoring.MetricsCircuitState,
|
|
nil,
|
|
cbm.GetState(),
|
|
)
|
|
|
|
return cbm
|
|
}</span>
|
|
|
|
// UpdateState updates the circuit breaker state value atomically
|
|
func (cbm *CircuitBreakerMetrics) UpdateState(state float64) <span class="cov8" title="1">{
|
|
cbm.stateValue.Store(state)
|
|
}</span>
|
|
|
|
// GetState returns the current circuit breaker state value
|
|
func (cbm *CircuitBreakerMetrics) GetState() float64 <span class="cov8" title="1">{
|
|
if val := cbm.stateValue.Load(); val != nil </span><span class="cov8" title="1">{
|
|
return val.(float64)
|
|
}</span>
|
|
<span class="cov0" title="0">return 0</span>
|
|
}
|
|
|
|
// GetOrCreateFailCounter returns a counter for the given state key
|
|
func (cbm *CircuitBreakerMetrics) GetOrCreateFailCounter(monitoring *libpack_monitoring.MetricsSetup, stateKey string) *metrics.Counter <span class="cov8" title="1">{
|
|
if counter, exists := cbm.failCounters[stateKey]; exists </span><span class="cov8" title="1">{
|
|
return counter
|
|
}</span>
|
|
|
|
// Create new counter
|
|
<span class="cov8" title="1">counter := monitoring.RegisterMetricsCounter(stateKey, nil)
|
|
cbm.failCounters[stateKey] = counter
|
|
return counter</span>
|
|
}
|
|
|
|
// Global circuit breaker metrics instance
|
|
var cbMetrics *CircuitBreakerMetrics
|
|
|
|
// InitializeCircuitBreakerMetrics initializes the global circuit breaker metrics
|
|
func InitializeCircuitBreakerMetrics(monitoring *libpack_monitoring.MetricsSetup) <span class="cov8" title="1">{
|
|
if cbMetrics == nil </span><span class="cov8" title="1">{
|
|
cbMetrics = NewCircuitBreakerMetrics(monitoring)
|
|
}</span>
|
|
}</pre>
|
|
|
|
<pre class="file" id="file9" style="display: none">package main
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
"github.com/valyala/fasthttp"
|
|
)
|
|
|
|
// ConnectionPoolManager manages HTTP client connections
|
|
type ConnectionPoolManager struct {
|
|
client *fasthttp.Client
|
|
mu sync.RWMutex
|
|
cleanupTimer *time.Timer
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewConnectionPoolManager creates a new connection pool manager
|
|
func NewConnectionPoolManager(client *fasthttp.Client) *ConnectionPoolManager <span class="cov8" title="1">{
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cpm := &ConnectionPoolManager{
|
|
client: client,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
// Start periodic cleanup
|
|
cpm.startPeriodicCleanup()
|
|
|
|
return cpm
|
|
}</span>
|
|
|
|
// startPeriodicCleanup starts a timer to periodically clean idle connections
|
|
func (cpm *ConnectionPoolManager) startPeriodicCleanup() <span class="cov8" title="1">{
|
|
// Clean idle connections every 30 seconds
|
|
go func() </span><span class="cov8" title="1">{
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for </span><span class="cov8" title="1">{
|
|
select </span>{
|
|
case <-cpm.ctx.Done():<span class="cov8" title="1">
|
|
return</span>
|
|
case <-ticker.C:<span class="cov0" title="0">
|
|
cpm.cleanIdleConnections()</span>
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// cleanIdleConnections closes idle connections
|
|
func (cpm *ConnectionPoolManager) cleanIdleConnections() <span class="cov0" title="0">{
|
|
cpm.mu.Lock()
|
|
defer cpm.mu.Unlock()
|
|
|
|
if cpm.client != nil </span><span class="cov0" title="0">{
|
|
cpm.client.CloseIdleConnections()
|
|
cfg.Logger.Debug(&libpack_logging.LogMessage{
|
|
Message: "Cleaned idle HTTP connections",
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
// GetClient returns the HTTP client
|
|
func (cpm *ConnectionPoolManager) GetClient() *fasthttp.Client <span class="cov0" title="0">{
|
|
cpm.mu.RLock()
|
|
defer cpm.mu.RUnlock()
|
|
return cpm.client
|
|
}</span>
|
|
|
|
// Shutdown gracefully shuts down the connection pool
|
|
func (cpm *ConnectionPoolManager) Shutdown() error <span class="cov8" title="1">{
|
|
if cpm == nil </span><span class="cov0" title="0">{
|
|
return nil
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">cpm.cancel()
|
|
|
|
cpm.mu.Lock()
|
|
defer cpm.mu.Unlock()
|
|
|
|
if cpm.client != nil </span><span class="cov8" title="1">{
|
|
cpm.client.CloseIdleConnections()
|
|
if cfg != nil && cfg.Logger != nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "HTTP connection pool shut down",
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">return nil</span>
|
|
}
|
|
|
|
// Global connection pool manager
|
|
var connectionPoolManager *ConnectionPoolManager
|
|
|
|
// InitializeConnectionPool initializes the global connection pool
|
|
func InitializeConnectionPool(client *fasthttp.Client) <span class="cov8" title="1">{
|
|
if connectionPoolManager != nil </span><span class="cov8" title="1">{
|
|
connectionPoolManager.Shutdown()
|
|
}</span>
|
|
<span class="cov8" title="1">connectionPoolManager = NewConnectionPoolManager(client)</span>
|
|
}
|
|
|
|
// ShutdownConnectionPool safely shuts down the global connection pool
|
|
func ShutdownConnectionPool() <span class="cov8" title="1">{
|
|
if connectionPoolManager != nil </span><span class="cov8" title="1">{
|
|
connectionPoolManager.Shutdown()
|
|
connectionPoolManager = nil
|
|
}</span>
|
|
}
|
|
|
|
// GetConnectionPoolManager returns the global connection pool manager
|
|
func GetConnectionPoolManager() *ConnectionPoolManager <span class="cov0" title="0">{
|
|
return connectionPoolManager
|
|
}</pre>
|
|
|
|
<pre class="file" id="file10" style="display: none">package main
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/goccy/go-json"
|
|
"github.com/lukaszraczylo/ask"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
|
|
)
|
|
|
|
const defaultValue = "-"
|
|
|
|
var emptyMetrics = map[string]string{}
|
|
|
|
func extractClaimsFromJWTHeader(authorization string) (usr, role string) <span class="cov8" title="1">{
|
|
usr, role = defaultValue, defaultValue
|
|
|
|
tokenParts := strings.SplitN(authorization, ".", 3)
|
|
if len(tokenParts) != 3 </span><span class="cov8" title="1">{
|
|
handleError("Can't split the token", map[string]interface{}{"token": maskToken(authorization)})
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">claim, err := base64.RawURLEncoding.DecodeString(tokenParts[1])
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
handleError("Can't decode the token", map[string]interface{}{"token": maskToken(authorization)})
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">var claimMap map[string]interface{}
|
|
if err = json.Unmarshal(claim, &claimMap); err != nil </span><span class="cov0" title="0">{
|
|
handleError("Can't unmarshal the claim", map[string]interface{}{"token": maskToken(authorization)})
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">usr = extractClaim(claimMap, cfg.Client.JWTUserClaimPath, "user id")
|
|
role = extractClaim(claimMap, cfg.Client.JWTRoleClaimPath, "role")
|
|
|
|
return</span>
|
|
}
|
|
|
|
func extractClaim(claimMap map[string]interface{}, claimPath, name string) string <span class="cov8" title="1">{
|
|
if claimPath == "" </span><span class="cov8" title="1">{
|
|
return defaultValue
|
|
}</span>
|
|
|
|
// Validate claim path to prevent injection attacks
|
|
<span class="cov8" title="1">if !isValidClaimPath(claimPath) </span><span class="cov0" title="0">{
|
|
handleError(fmt.Sprintf("Invalid claim path for %s", name), map[string]interface{}{"path": claimPath})
|
|
return defaultValue
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">value, ok := ask.For(claimMap, claimPath).String(defaultValue)
|
|
if !ok </span><span class="cov8" title="1">{
|
|
handleError(fmt.Sprintf("Can't find the %s", name), map[string]interface{}{"claim_map": sanitizeClaimMap(claimMap), "path": claimPath})
|
|
return defaultValue
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return value</span>
|
|
}
|
|
|
|
// maskToken masks JWT tokens in logs to prevent exposure
|
|
func maskToken(token string) string <span class="cov8" title="1">{
|
|
if len(token) <= 10 </span><span class="cov8" title="1">{
|
|
return "***"
|
|
}</span>
|
|
<span class="cov8" title="1">return token[:4] + "***" + token[len(token)-4:]</span>
|
|
}
|
|
|
|
// isValidClaimPath validates JWT claim paths to prevent injection
|
|
func isValidClaimPath(path string) bool <span class="cov8" title="1">{
|
|
if path == "" </span><span class="cov0" title="0">{
|
|
return false
|
|
}</span>
|
|
// Allow only alphanumeric characters, dots, underscores, and hyphens
|
|
<span class="cov8" title="1">for _, char := range path </span><span class="cov8" title="1">{
|
|
if (char < 'a' || char > 'z') &&
|
|
(char < 'A' || char > 'Z') &&
|
|
(char < '0' || char > '9') &&
|
|
char != '.' && char != '_' && char != '-' </span><span class="cov0" title="0">{
|
|
return false
|
|
}</span>
|
|
}
|
|
// Prevent path traversal attempts
|
|
<span class="cov8" title="1">if strings.Contains(path, "..") || strings.Contains(path, "//") </span><span class="cov0" title="0">{
|
|
return false
|
|
}</span>
|
|
<span class="cov8" title="1">return true</span>
|
|
}
|
|
|
|
// sanitizeClaimMap removes sensitive data from claim map for logging
|
|
func sanitizeClaimMap(claimMap map[string]interface{}) map[string]interface{} <span class="cov8" title="1">{
|
|
sanitized := make(map[string]interface{})
|
|
sensitiveKeys := map[string]bool{
|
|
"password": true, "secret": true, "token": true, "key": true,
|
|
"auth": true, "credential": true, "private": true,
|
|
}
|
|
|
|
for k, v := range claimMap </span><span class="cov8" title="1">{
|
|
lowerKey := strings.ToLower(k)
|
|
if sensitiveKeys[lowerKey] </span><span class="cov0" title="0">{
|
|
sanitized[k] = "***"
|
|
}</span> else<span class="cov8" title="1"> {
|
|
sanitized[k] = v
|
|
}</span>
|
|
}
|
|
<span class="cov8" title="1">return sanitized</span>
|
|
}
|
|
|
|
func handleError(msg string, details map[string]interface{}) <span class="cov8" title="1">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, emptyMetrics)
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: msg,
|
|
Pairs: details,
|
|
})
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file11" style="display: none">package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
const (
|
|
initialDelay = 60 * time.Second
|
|
cleanupInterval = 1 * time.Hour
|
|
)
|
|
|
|
var delQueries = [...]string{
|
|
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - interval '%d days';",
|
|
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - interval '%d days';",
|
|
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - INTERVAL '%d days';",
|
|
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - INTERVAL '%d days';",
|
|
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - INTERVAL '%d days';",
|
|
}
|
|
|
|
func enableHasuraEventCleaner(ctx context.Context) error <span class="cov8" title="1">{
|
|
cfgMutex.RLock()
|
|
if !cfg.HasuraEventCleaner.Enable </span><span class="cov8" title="1">{
|
|
cfgMutex.RUnlock()
|
|
return nil
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">eventMetadataDb := cfg.HasuraEventCleaner.EventMetadataDb
|
|
if eventMetadataDb == "" </span><span class="cov8" title="1">{
|
|
logger := cfg.Logger
|
|
cfgMutex.RUnlock()
|
|
|
|
logger.Warning(&libpack_logger.LogMessage{
|
|
Message: "Event metadata db URL not specified, event cleaner not active",
|
|
})
|
|
return nil
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">clearOlderThan := cfg.HasuraEventCleaner.ClearOlderThan
|
|
logger := cfg.Logger
|
|
cfgMutex.RUnlock()
|
|
|
|
logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Event cleaner enabled",
|
|
Pairs: map[string]interface{}{"interval_in_days": clearOlderThan},
|
|
})
|
|
|
|
// Parse pool configuration
|
|
poolConfig, err := pgxpool.ParseConfig(eventMetadataDb)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return err
|
|
}</span>
|
|
|
|
// Set connection pool limits
|
|
<span class="cov0" title="0">poolConfig.MaxConns = 10
|
|
poolConfig.MinConns = 2
|
|
poolConfig.MaxConnLifetime = time.Hour
|
|
poolConfig.MaxConnIdleTime = 30 * time.Minute
|
|
|
|
pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to create connection pool",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return err
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">go func() </span><span class="cov0" title="0">{
|
|
defer pool.Close()
|
|
|
|
// Wait for initial delay or context cancellation
|
|
select </span>{
|
|
case <-ctx.Done():<span class="cov0" title="0">
|
|
return</span>
|
|
case <-time.After(initialDelay):<span class="cov0" title="0"></span>
|
|
}
|
|
|
|
<span class="cov0" title="0">logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Initial cleanup of old events",
|
|
})
|
|
cleanEvents(ctx, pool, clearOlderThan, logger)
|
|
|
|
ticker := time.NewTicker(cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for </span><span class="cov0" title="0">{
|
|
select </span>{
|
|
case <-ctx.Done():<span class="cov0" title="0">
|
|
logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Stopping event cleaner",
|
|
})
|
|
return</span>
|
|
case <-ticker.C:<span class="cov0" title="0">
|
|
logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Cleaning up old events",
|
|
})
|
|
cleanEvents(ctx, pool, clearOlderThan, logger)</span>
|
|
}
|
|
}
|
|
}()
|
|
|
|
<span class="cov0" title="0">return nil</span>
|
|
}
|
|
|
|
func cleanEvents(ctx context.Context, pool *pgxpool.Pool, clearOlderThan int, logger *libpack_logger.Logger) <span class="cov0" title="0">{
|
|
var errors []error
|
|
var failedQueries []string
|
|
|
|
for _, query := range delQueries </span><span class="cov0" title="0">{
|
|
_, err := pool.Exec(ctx, fmt.Sprintf(query, clearOlderThan))
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
errors = append(errors, err)
|
|
failedQueries = append(failedQueries, query)
|
|
}</span> else<span class="cov0" title="0"> {
|
|
logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Successfully executed query",
|
|
Pairs: map[string]interface{}{"query": query},
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">if len(errors) > 0 </span><span class="cov0" title="0">{
|
|
var errMsgs []string
|
|
for _, err := range errors </span><span class="cov0" title="0">{
|
|
errMsgs = append(errMsgs, err.Error())
|
|
}</span>
|
|
<span class="cov0" title="0">logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to execute some queries",
|
|
Pairs: map[string]interface{}{
|
|
"failed_queries": failedQueries,
|
|
"errors": errMsgs,
|
|
},
|
|
})</span>
|
|
}
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file12" style="display: none">package main
|
|
|
|
import (
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/goccy/go-json"
|
|
fiber "github.com/gofiber/fiber/v2"
|
|
"github.com/graphql-go/graphql/language/ast"
|
|
"github.com/graphql-go/graphql/language/parser"
|
|
"github.com/graphql-go/graphql/language/source"
|
|
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
|
|
)
|
|
|
|
var (
|
|
introspectionQueries = map[string]struct{}{
|
|
"__schema": {}, "__type": {}, "__typename": {}, "__directive": {},
|
|
"__directivelocation": {}, "__field": {}, "__inputvalue": {},
|
|
"__enumvalue": {}, "__typekind": {}, "__fieldtype": {},
|
|
"__inputobjecttype": {}, "__enumtype": {}, "__uniontype": {},
|
|
"__scalars": {}, "__objects": {}, "__interfaces": {},
|
|
"__unions": {}, "__enums": {}, "__inputobjects": {}, "__directives": {},
|
|
}
|
|
introspectionAllowedQueries = make(map[string]struct{})
|
|
allowedUrls = make(map[string]struct{})
|
|
|
|
// Cache for parsed GraphQL queries to avoid reparsing
|
|
parsedQueryCache *LRUCache
|
|
|
|
// Maximum size for parsed query cache
|
|
maxQueryCacheSize = 1000
|
|
currentCacheSize int64 // Use atomic operations for this
|
|
)
|
|
|
|
func prepareQueriesAndExemptions() <span class="cov8" title="1">{
|
|
introspectionAllowedQueries = make(map[string]struct{})
|
|
allowedUrls = make(map[string]struct{})
|
|
|
|
// Process allowed introspection queries
|
|
for _, q := range cfg.Security.IntrospectionAllowed </span><span class="cov8" title="1">{
|
|
cleanQuery := strings.Trim(strings.TrimSpace(q), `"`)
|
|
introspectionAllowedQueries[strings.ToLower(cleanQuery)] = struct{}{}
|
|
}</span>
|
|
|
|
// Process allowed URLs
|
|
<span class="cov8" title="1">for _, u := range cfg.Server.AllowURLs </span><span class="cov0" title="0">{
|
|
allowedUrls[u] = struct{}{}
|
|
}</span>
|
|
}
|
|
|
|
type parseGraphQLQueryResult struct {
|
|
operationType string
|
|
operationName string
|
|
activeEndpoint string
|
|
cacheTime int
|
|
cacheRequest bool
|
|
cacheRefresh bool
|
|
shouldBlock bool
|
|
shouldIgnore bool
|
|
}
|
|
|
|
// AST node pools to reduce GC pressure
|
|
var (
|
|
// Pool for request/response maps during unmarshaling
|
|
queryPool = sync.Pool{
|
|
New: func() interface{} <span class="cov8" title="1">{
|
|
return make(map[string]interface{}, 48)
|
|
}</span>,
|
|
}
|
|
|
|
// Pool for parse result objects
|
|
resultPool = sync.Pool{
|
|
New: func() interface{} <span class="cov8" title="1">{
|
|
return &parseGraphQLQueryResult{}
|
|
}</span>,
|
|
}
|
|
|
|
// Mutex for allocation tracking
|
|
allocsMutex = sync.Mutex{}
|
|
)
|
|
|
|
// The following variables are reserved for future GraphQL parsing optimization
|
|
// and are not currently in use:
|
|
// - fieldPool (Field object pool)
|
|
// - operationPool (OperationDefinition object pool)
|
|
// - namePool (Name object pool)
|
|
// - documentPool (Document object pool)
|
|
// - allocsCounter (for tracking allocation counts)
|
|
// - allocationsSamp (for memory usage histograms)
|
|
|
|
// Initialize the query parse cache with a fixed size
|
|
func initGraphQLParsing() <span class="cov8" title="1">{
|
|
// Set cache size based on available memory
|
|
maxQueryCacheSize = runtime.GOMAXPROCS(0) * 250
|
|
|
|
// Initialize LRU cache with entry limit and 50MB size limit
|
|
parsedQueryCache = NewLRUCache(maxQueryCacheSize, 50*1024*1024)
|
|
}</span>
|
|
|
|
// Store a parsed document in the cache with LRU eviction
|
|
func cacheQuery(queryText string, document *ast.Document) <span class="cov8" title="1">{
|
|
if parsedQueryCache == nil </span><span class="cov0" title="0">{
|
|
return
|
|
}</span>
|
|
|
|
// Store the document in the cache with timestamp for LRU
|
|
<span class="cov8" title="1">cacheEntry := &CachedQuery{
|
|
Document: document,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// The LRU cache handles eviction automatically
|
|
parsedQueryCache.Set(queryText, cacheEntry, int64(len(queryText)))
|
|
atomic.AddInt64(&currentCacheSize, 1)</span>
|
|
}
|
|
|
|
// CachedQuery represents a cached GraphQL query with timestamp for LRU
|
|
type CachedQuery struct {
|
|
Document *ast.Document
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// evictOldestQueries is no longer needed with LRU cache
|
|
// The LRU cache handles eviction automatically
|
|
|
|
// Check if we have a cached parsed query
|
|
func getCachedQuery(queryText string) *ast.Document <span class="cov8" title="1">{
|
|
if parsedQueryCache == nil </span><span class="cov0" title="0">{
|
|
return nil
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">if entry, found := parsedQueryCache.Get(queryText); found </span><span class="cov8" title="1">{
|
|
if cachedQuery, ok := entry.(*CachedQuery); ok </span><span class="cov8" title="1">{
|
|
if cfg != nil && cfg.Monitoring != nil </span><span class="cov8" title="1">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsGraphQLCacheHit, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">return cachedQuery.Document</span>
|
|
}
|
|
}
|
|
|
|
<span class="cov8" title="1">if cfg != nil && cfg.Monitoring != nil </span><span class="cov8" title="1">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsGraphQLCacheMiss, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">return nil</span>
|
|
}
|
|
|
|
|
|
// Track and report memory allocations for GraphQL parsing
|
|
func trackParsingAllocations() func() <span class="cov8" title="1">{
|
|
var m1 runtime.MemStats
|
|
runtime.ReadMemStats(&m1)
|
|
|
|
return func() </span><span class="cov8" title="1">{
|
|
var m2 runtime.MemStats
|
|
runtime.ReadMemStats(&m2)
|
|
|
|
// Calculate allocations
|
|
allocsMutex.Lock()
|
|
allocsDelta := int(m2.Mallocs - m1.Mallocs)
|
|
// Note: allocsCounter variable is currently unused but will be used in future
|
|
// allocsCounter += allocsDelta
|
|
allocsMutex.Unlock()
|
|
|
|
// Record allocation count metrics
|
|
if cfg != nil && cfg.Monitoring != nil </span><span class="cov8" title="1">{
|
|
cfg.Monitoring.IncrementFloat(libpack_monitoring.MetricsGraphQLParsingAllocs, nil, float64(allocsDelta))
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult <span class="cov8" title="1">{
|
|
startTime := time.Now()
|
|
|
|
// Set up allocation tracking
|
|
trackAllocs := trackParsingAllocations()
|
|
defer trackAllocs()
|
|
|
|
// Get a result object from the pool and initialize it
|
|
res := resultPool.Get().(*parseGraphQLQueryResult)
|
|
*res = parseGraphQLQueryResult{shouldIgnore: true}
|
|
|
|
// Ensure we return the result to the pool on function exit
|
|
defer func() </span><span class="cov8" title="1">{
|
|
resultPool.Put(res)
|
|
}</span>()
|
|
|
|
// Default to using the write endpoint
|
|
<span class="cov8" title="1">res.activeEndpoint = cfg.Server.HostGraphQL
|
|
|
|
// Get a map from the pool for JSON unmarshaling
|
|
m := queryPool.Get().(map[string]interface{})
|
|
defer func() </span><span class="cov8" title="1">{
|
|
// Clear and return the map to the pool
|
|
for k := range m </span><span class="cov8" title="1">{
|
|
delete(m, k)
|
|
}</span>
|
|
<span class="cov8" title="1">queryPool.Put(m)</span>
|
|
}()
|
|
|
|
// Add comprehensive input validation
|
|
<span class="cov8" title="1">bodySize := len(c.Body())
|
|
|
|
// Validate query size to prevent DoS attacks
|
|
if bodySize > 1024*1024 </span><span class="cov0" title="0">{ // 1MB limit
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
|
|
}</span>
|
|
<span class="cov0" title="0">return res</span>
|
|
}
|
|
|
|
// Validate minimum size
|
|
<span class="cov8" title="1">if bodySize < 2 </span><span class="cov8" title="1">{ // At least "{}"
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">return res</span>
|
|
}
|
|
|
|
// Unmarshal the request body
|
|
<span class="cov8" title="1">if err := json.Unmarshal(c.Body(), &m); err != nil </span><span class="cov0" title="0">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
|
|
}</span>
|
|
<span class="cov0" title="0">return res</span>
|
|
}
|
|
|
|
// Extract the query string
|
|
<span class="cov8" title="1">query, ok := m["query"].(string)
|
|
if !ok </span><span class="cov8" title="1">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">return res</span>
|
|
}
|
|
|
|
// Try to get the query from cache first
|
|
<span class="cov8" title="1">var p *ast.Document
|
|
cachedDoc := getCachedQuery(query)
|
|
|
|
if cachedDoc != nil </span><span class="cov8" title="1">{
|
|
// Use the cached document
|
|
p = cachedDoc
|
|
}</span> else<span class="cov8" title="1"> {
|
|
// Parse the GraphQL query with improved source handling
|
|
src := source.NewSource(&source.Source{
|
|
Body: []byte(query),
|
|
Name: "GraphQL request",
|
|
})
|
|
|
|
var err error
|
|
p, err = parser.Parse(parser.ParseParams{Source: src})
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsGraphQLParsingErrors, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">return res</span>
|
|
}
|
|
|
|
// Cache the successful parse result for future use
|
|
<span class="cov8" title="1">cacheQuery(query, p)</span>
|
|
}
|
|
|
|
// Mark as a valid GraphQL query
|
|
<span class="cov8" title="1">res.shouldIgnore = false
|
|
res.operationName = "undefined"
|
|
|
|
// First scan for mutations - they take priority
|
|
hasMutation := false
|
|
var mutationName string
|
|
|
|
for _, d := range p.Definitions </span><span class="cov8" title="1">{
|
|
if oper, ok := d.(*ast.OperationDefinition); ok </span><span class="cov8" title="1">{
|
|
operationType := strings.ToLower(oper.Operation)
|
|
if operationType == "mutation" </span><span class="cov8" title="1">{
|
|
hasMutation = true
|
|
res.operationType = "mutation"
|
|
if oper.Name != nil </span><span class="cov8" title="1">{
|
|
mutationName = oper.Name.Value
|
|
// Use mutation name immediately
|
|
res.operationName = mutationName
|
|
}</span>
|
|
<span class="cov8" title="1">break</span> // Found a mutation, no need to continue first pass
|
|
}
|
|
}
|
|
}
|
|
|
|
// Now process all definitions for other information
|
|
<span class="cov8" title="1">for _, d := range p.Definitions </span><span class="cov8" title="1">{
|
|
if oper, ok := d.(*ast.OperationDefinition); ok </span><span class="cov8" title="1">{
|
|
operationType := strings.ToLower(oper.Operation)
|
|
|
|
// If we already found a mutation, only update name if needed
|
|
if hasMutation </span><span class="cov8" title="1">{
|
|
// We already set operation type to mutation in first pass
|
|
// Only set name if we didn't find a mutation name earlier
|
|
if res.operationName == "undefined" && oper.Name != nil </span><span class="cov0" title="0">{
|
|
res.operationName = oper.Name.Value
|
|
}</span>
|
|
} else<span class="cov8" title="1"> {
|
|
// No mutation found, use the normal logic
|
|
if res.operationType == "" </span><span class="cov8" title="1">{
|
|
res.operationType = operationType
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">if res.operationName == "undefined" && oper.Name != nil </span><span class="cov8" title="1">{
|
|
res.operationName = oper.Name.Value
|
|
}</span>
|
|
}
|
|
|
|
// Handle endpoint routing - always use write endpoint for mutations
|
|
<span class="cov8" title="1">if res.operationType == "mutation" </span><span class="cov8" title="1">{
|
|
res.activeEndpoint = cfg.Server.HostGraphQL
|
|
}</span> else<span class="cov8" title="1"> if cfg.Server.HostGraphQLReadOnly != "" </span><span class="cov8" title="1">{
|
|
// Use read-only endpoint for non-mutation operations
|
|
res.activeEndpoint = cfg.Server.HostGraphQLReadOnly
|
|
}</span>
|
|
|
|
// Block mutations in read-only mode
|
|
<span class="cov8" title="1">if res.operationType == "mutation" && cfg.Server.ReadOnlyMode </span><span class="cov8" title="1">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">_ = c.Status(403).SendString("The server is in read-only mode")
|
|
res.shouldBlock = true
|
|
return res</span>
|
|
}
|
|
|
|
// Process directives (like @cached)
|
|
<span class="cov8" title="1">processDirectives(oper, res)
|
|
|
|
// Check for introspection queries if they're blocked
|
|
if cfg.Security.BlockIntrospection && checkSelections(c, oper.GetSelectionSet().Selections) </span><span class="cov8" title="1">{
|
|
_ = c.Status(403).SendString("Introspection queries are not allowed")
|
|
res.shouldBlock = true
|
|
return res
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
// Track parsing time
|
|
<span class="cov8" title="1">if ifNotInTest() && cfg.Monitoring != nil </span><span class="cov0" title="0">{
|
|
parseTime := float64(time.Since(startTime).Milliseconds())
|
|
cfg.Monitoring.IncrementFloat(libpack_monitoring.MetricsGraphQLParsingTime, nil, parseTime)
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return res</span>
|
|
}
|
|
|
|
// processDirectives extracts caching directives from the operation
|
|
func processDirectives(oper *ast.OperationDefinition, res *parseGraphQLQueryResult) <span class="cov8" title="1">{
|
|
for _, dir := range oper.Directives </span><span class="cov8" title="1">{
|
|
if dir.Name.Value == "cached" </span><span class="cov8" title="1">{
|
|
res.cacheRequest = true
|
|
for _, arg := range dir.Arguments </span><span class="cov8" title="1">{
|
|
switch arg.Name.Value </span>{
|
|
case "ttl":<span class="cov8" title="1">
|
|
if v, ok := arg.Value.GetValue().(string); ok </span><span class="cov8" title="1">{
|
|
res.cacheTime, _ = strconv.Atoi(v)
|
|
}</span>
|
|
case "refresh":<span class="cov8" title="1">
|
|
if v, ok := arg.Value.GetValue().(bool); ok </span><span class="cov8" title="1">{
|
|
res.cacheRefresh = v
|
|
}</span>
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkSelections recursively checks if any selection is an introspection query that should be blocked
|
|
func checkSelections(c *fiber.Ctx, selections []ast.Selection) bool <span class="cov8" title="1">{
|
|
if len(selections) == 0 </span><span class="cov0" title="0">{
|
|
return false
|
|
}</span>
|
|
|
|
// Fast path: if no introspection blocking is configured, return immediately
|
|
<span class="cov8" title="1">if !cfg.Security.BlockIntrospection </span><span class="cov0" title="0">{
|
|
return false
|
|
}</span>
|
|
|
|
// Fast path: if there are no allowed introspection queries, check only top level
|
|
<span class="cov8" title="1">hasAllowList := len(cfg.Security.IntrospectionAllowed) > 0
|
|
|
|
for _, s := range selections </span><span class="cov8" title="1">{
|
|
switch sel := s.(type) </span>{
|
|
case *ast.Field:<span class="cov8" title="1">
|
|
fieldName := strings.ToLower(sel.Name.Value)
|
|
|
|
// Check if this is an introspection query
|
|
if _, exists := introspectionQueries[fieldName]; exists </span><span class="cov8" title="1">{
|
|
if hasAllowList </span><span class="cov8" title="1">{
|
|
// Check if it's in the allowed list
|
|
if _, allowed := introspectionAllowedQueries[fieldName]; !allowed </span><span class="cov8" title="1">{
|
|
return true // Block if not allowed
|
|
}</span>
|
|
} else<span class="cov8" title="1"> {
|
|
return true // Block if no allowlist exists
|
|
}</span>
|
|
}
|
|
|
|
// Check nested selections if present
|
|
<span class="cov8" title="1">if sel.SelectionSet != nil && len(sel.GetSelectionSet().Selections) > 0 </span><span class="cov8" title="1">{
|
|
if checkSelections(c, sel.GetSelectionSet().Selections) </span><span class="cov8" title="1">{
|
|
return true
|
|
}</span>
|
|
}
|
|
|
|
case *ast.InlineFragment:<span class="cov0" title="0">
|
|
// Check nested selections in fragments
|
|
if sel.SelectionSet != nil && len(sel.GetSelectionSet().Selections) > 0 </span><span class="cov0" title="0">{
|
|
if checkSelections(c, sel.GetSelectionSet().Selections) </span><span class="cov0" title="0">{
|
|
return true
|
|
}</span>
|
|
}
|
|
}
|
|
}
|
|
|
|
<span class="cov8" title="1">return false</span>
|
|
}
|
|
|
|
func checkIfContainsIntrospection(c *fiber.Ctx, query string) bool <span class="cov8" title="1">{
|
|
startTime := time.Now()
|
|
blocked := false
|
|
|
|
// Enable introspection blocking for tests
|
|
if !cfg.Security.BlockIntrospection </span><span class="cov8" title="1">{
|
|
cfg.Security.BlockIntrospection = true
|
|
}</span>
|
|
|
|
// Try to get cached parse result first
|
|
<span class="cov8" title="1">var p *ast.Document
|
|
cachedDoc := getCachedQuery(query)
|
|
|
|
if cachedDoc != nil </span><span class="cov0" title="0">{
|
|
p = cachedDoc
|
|
}</span> else<span class="cov8" title="1"> {
|
|
// Try parsing as a complete query
|
|
src := source.NewSource(&source.Source{
|
|
Body: []byte(query),
|
|
Name: "GraphQL introspection check",
|
|
})
|
|
|
|
var err error
|
|
p, err = parser.Parse(parser.ParseParams{Source: src})
|
|
|
|
if err == nil && p != nil </span><span class="cov8" title="1">{
|
|
// Cache the successful parse
|
|
cacheQuery(query, p)
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">if p != nil </span><span class="cov8" title="1">{
|
|
// It's a complete query, check all selections
|
|
for _, def := range p.Definitions </span><span class="cov8" title="1">{
|
|
if op, ok := def.(*ast.OperationDefinition); ok </span><span class="cov8" title="1">{
|
|
if op.SelectionSet != nil </span><span class="cov8" title="1">{
|
|
blocked = checkSelections(c, op.GetSelectionSet().Selections)
|
|
break</span>
|
|
}
|
|
}
|
|
}
|
|
} else<span class="cov8" title="1"> {
|
|
// Not a complete query, check as a field name
|
|
whateverLower := strings.ToLower(query)
|
|
if _, exists := introspectionQueries[whateverLower]; exists </span><span class="cov8" title="1">{
|
|
if len(cfg.Security.IntrospectionAllowed) > 0 </span><span class="cov8" title="1">{
|
|
if _, allowed := introspectionAllowedQueries[whateverLower]; !allowed </span><span class="cov8" title="1">{
|
|
blocked = true
|
|
}</span>
|
|
} else<span class="cov0" title="0"> {
|
|
blocked = true
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
<span class="cov8" title="1">if blocked </span><span class="cov8" title="1">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">_ = c.Status(403).SendString("Introspection queries are not allowed")</span>
|
|
}
|
|
|
|
// Track parsing time
|
|
<span class="cov8" title="1">if ifNotInTest() && cfg.Monitoring != nil </span><span class="cov0" title="0">{
|
|
parseTime := float64(time.Since(startTime).Milliseconds())
|
|
cfg.Monitoring.IncrementFloat(libpack_monitoring.MetricsGraphQLParsingTime, nil, parseTime)
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return blocked</span>
|
|
}
|
|
|
|
// NOTE: The clearQueryCache function has been removed as it was unused.
|
|
// This functionality will be exposed through an API endpoint in a future release.
|
|
</pre>
|
|
|
|
<pre class="file" id="file13" style="display: none">package libpack_logger
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/goccy/go-json"
|
|
)
|
|
|
|
const (
|
|
LEVEL_DEBUG = iota
|
|
LEVEL_INFO
|
|
LEVEL_WARN
|
|
LEVEL_ERROR
|
|
LEVEL_FATAL
|
|
)
|
|
|
|
var levelNames = []string{
|
|
"debug",
|
|
"info",
|
|
"warn",
|
|
"error",
|
|
"fatal",
|
|
}
|
|
|
|
const (
|
|
defaultTimeFormat = time.RFC3339
|
|
defaultMinLevel = LEVEL_INFO
|
|
defaultShowCaller = false
|
|
)
|
|
|
|
// Logger represents the logging object with configurations.
|
|
type Logger struct {
|
|
output io.Writer
|
|
timeFormat string
|
|
minLogLevel int
|
|
showCaller bool
|
|
mu sync.Mutex // Mutex to protect concurrent access to output
|
|
}
|
|
|
|
// LogMessage represents a log message with optional pairs.
|
|
type LogMessage struct {
|
|
Pairs map[string]interface{}
|
|
Message string
|
|
}
|
|
|
|
// bufferPool is used to reuse bytes.Buffer for efficiency.
|
|
var bufferPool = sync.Pool{
|
|
New: func() interface{} <span class="cov8" title="1">{
|
|
return new(bytes.Buffer)
|
|
}</span>,
|
|
}
|
|
|
|
// fieldNames allows customization of output field names.
|
|
var fieldNames = map[string]string{
|
|
"timestamp": "timestamp",
|
|
"level": "level",
|
|
"message": "message",
|
|
}
|
|
|
|
// osExit is a variable to allow mocking os.Exit in tests
|
|
var osExit = os.Exit
|
|
|
|
// exitMutex ensures thread-safe access to osExit
|
|
var exitMutex sync.RWMutex
|
|
|
|
// New creates a new Logger with default settings.
|
|
func New() *Logger <span class="cov8" title="1">{
|
|
return &Logger{
|
|
timeFormat: defaultTimeFormat,
|
|
minLogLevel: defaultMinLevel,
|
|
output: os.Stdout,
|
|
showCaller: defaultShowCaller,
|
|
}
|
|
}</span>
|
|
|
|
// SetOutput sets the output destination for the logger.
|
|
func (l *Logger) SetOutput(output io.Writer) *Logger <span class="cov8" title="1">{
|
|
l.mu.Lock()
|
|
l.output = output
|
|
l.mu.Unlock()
|
|
return l
|
|
}</span>
|
|
|
|
// GetLogLevel returns the log level integer corresponding to the given level name.
|
|
func GetLogLevel(level string) int <span class="cov8" title="1">{
|
|
level = strings.ToLower(level)
|
|
for i, name := range levelNames </span><span class="cov8" title="1">{
|
|
if name == level </span><span class="cov8" title="1">{
|
|
return i
|
|
}</span>
|
|
}
|
|
<span class="cov8" title="1">return defaultMinLevel</span>
|
|
}
|
|
|
|
// SetTimeFormat sets the time format for the logger's timestamp field.
|
|
func (l *Logger) SetTimeFormat(format string) *Logger <span class="cov8" title="1">{
|
|
l.timeFormat = format
|
|
return l
|
|
}</span>
|
|
|
|
// SetMinLogLevel sets the minimum log level for the logger.
|
|
func (l *Logger) SetMinLogLevel(level int) *Logger <span class="cov8" title="1">{
|
|
l.minLogLevel = level
|
|
return l
|
|
}</span>
|
|
|
|
// SetFieldName allows customizing the field names in log output.
|
|
func (l *Logger) SetFieldName(field, name string) *Logger <span class="cov8" title="1">{
|
|
fieldNames[field] = name
|
|
return l
|
|
}</span>
|
|
|
|
// SetShowCaller enables or disables including the caller information in log output.
|
|
func (l *Logger) SetShowCaller(show bool) *Logger <span class="cov8" title="1">{
|
|
l.showCaller = show
|
|
return l
|
|
}</span>
|
|
|
|
// shouldLog determines if the message should be logged based on the logger's minimum log level.
|
|
func (l *Logger) shouldLog(level int) bool <span class="cov8" title="1">{
|
|
return level >= l.minLogLevel
|
|
}</span>
|
|
|
|
// log writes the log message with the given level.
|
|
func (l *Logger) log(level int, m *LogMessage) <span class="cov8" title="1">{
|
|
if m.Pairs == nil </span><span class="cov8" title="1">{
|
|
m.Pairs = make(map[string]interface{})
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">m.Pairs[fieldNames["timestamp"]] = time.Now().Format(l.timeFormat)
|
|
m.Pairs[fieldNames["level"]] = levelNames[level]
|
|
m.Pairs[fieldNames["message"]] = m.Message
|
|
|
|
if l.showCaller </span><span class="cov8" title="1">{
|
|
m.Pairs["caller"] = getCaller()
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">buffer := bufferPool.Get().(*bytes.Buffer)
|
|
buffer.Reset()
|
|
defer bufferPool.Put(buffer)
|
|
|
|
encoder := json.NewEncoder(buffer)
|
|
err := encoder.Encode(m.Pairs)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
fmt.Fprintln(os.Stderr, "Error marshalling log message:", err)
|
|
return
|
|
}</span>
|
|
// Lock the mutex before writing to the output to prevent race conditions
|
|
<span class="cov8" title="1">l.mu.Lock()
|
|
_, err = l.output.Write(buffer.Bytes())
|
|
l.mu.Unlock()
|
|
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
fmt.Fprintln(os.Stderr, "Error writing log message:", err)
|
|
}</span>
|
|
}
|
|
|
|
// Debug logs a debug-level message.
|
|
func (l *Logger) Debug(m *LogMessage) <span class="cov8" title="1">{
|
|
if l.shouldLog(LEVEL_DEBUG) </span><span class="cov8" title="1">{
|
|
l.log(LEVEL_DEBUG, m)
|
|
}</span>
|
|
}
|
|
|
|
// Info logs an info-level message.
|
|
func (l *Logger) Info(m *LogMessage) <span class="cov8" title="1">{
|
|
if l.shouldLog(LEVEL_INFO) </span><span class="cov8" title="1">{
|
|
l.log(LEVEL_INFO, m)
|
|
}</span>
|
|
}
|
|
|
|
// Warn logs a warning-level message.
|
|
func (l *Logger) Warn(m *LogMessage) <span class="cov8" title="1">{
|
|
if l.shouldLog(LEVEL_WARN) </span><span class="cov8" title="1">{
|
|
l.log(LEVEL_WARN, m)
|
|
}</span>
|
|
}
|
|
|
|
// Warning is an alias for Warn.
|
|
func (l *Logger) Warning(m *LogMessage) <span class="cov8" title="1">{
|
|
l.Warn(m)
|
|
}</span>
|
|
|
|
// Error logs an error-level message.
|
|
func (l *Logger) Error(m *LogMessage) <span class="cov8" title="1">{
|
|
if l.shouldLog(LEVEL_ERROR) </span><span class="cov8" title="1">{
|
|
l.log(LEVEL_ERROR, m)
|
|
}</span>
|
|
}
|
|
|
|
// Fatal logs a fatal-level message.
|
|
func (l *Logger) Fatal(m *LogMessage) <span class="cov8" title="1">{
|
|
if l.shouldLog(LEVEL_FATAL) </span><span class="cov8" title="1">{
|
|
l.log(LEVEL_FATAL, m)
|
|
}</span>
|
|
}
|
|
|
|
// Critical logs a critical-level message and exits the application.
|
|
func (l *Logger) Critical(m *LogMessage) <span class="cov8" title="1">{
|
|
l.Fatal(m)
|
|
exitMutex.RLock()
|
|
defer exitMutex.RUnlock()
|
|
osExit(1)
|
|
}</span>
|
|
|
|
// getCaller retrieves the file and line number of the caller.
|
|
func getCaller() string <span class="cov8" title="1">{
|
|
// Skip 3 stack frames: getCaller -> log -> [Debug|Info|...]
|
|
const depth = 3
|
|
_, file, line, ok := runtime.Caller(depth)
|
|
if !ok </span><span class="cov0" title="0">{
|
|
return "unknown:0"
|
|
}</span>
|
|
<span class="cov8" title="1">file = filepath.Base(file)
|
|
return fmt.Sprintf("%s:%d", file, line)</span>
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file14" style="display: none">package main
|
|
|
|
import (
|
|
"container/list"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// LRUCacheEntry represents a cache entry with metadata
|
|
type LRUCacheEntry struct {
|
|
key string
|
|
value interface{}
|
|
size int64
|
|
timestamp time.Time
|
|
element *list.Element
|
|
}
|
|
|
|
// LRUCache implements a thread-safe LRU cache with O(1) operations
|
|
type LRUCache struct {
|
|
mu sync.RWMutex
|
|
maxEntries int
|
|
maxSize int64
|
|
currentSize int64
|
|
entries map[string]*LRUCacheEntry
|
|
evictList *list.List
|
|
}
|
|
|
|
// NewLRUCache creates a new LRU cache
|
|
func NewLRUCache(maxEntries int, maxSize int64) *LRUCache <span class="cov8" title="1">{
|
|
return &LRUCache{
|
|
maxEntries: maxEntries,
|
|
maxSize: maxSize,
|
|
entries: make(map[string]*LRUCacheEntry),
|
|
evictList: list.New(),
|
|
}
|
|
}</span>
|
|
|
|
// Get retrieves a value from the cache
|
|
func (c *LRUCache) Get(key string) (interface{}, bool) <span class="cov8" title="1">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
entry, exists := c.entries[key]
|
|
if !exists </span><span class="cov8" title="1">{
|
|
return nil, false
|
|
}</span>
|
|
|
|
// Move to front (most recently used)
|
|
<span class="cov8" title="1">c.evictList.MoveToFront(entry.element)
|
|
entry.timestamp = time.Now()
|
|
|
|
return entry.value, true</span>
|
|
}
|
|
|
|
// Set adds or updates a value in the cache
|
|
func (c *LRUCache) Set(key string, value interface{}, size int64) <span class="cov8" title="1">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Check if key already exists
|
|
if entry, exists := c.entries[key]; exists </span><span class="cov0" title="0">{
|
|
// Update existing entry
|
|
c.currentSize -= entry.size
|
|
c.currentSize += size
|
|
entry.value = value
|
|
entry.size = size
|
|
entry.timestamp = time.Now()
|
|
c.evictList.MoveToFront(entry.element)
|
|
|
|
// Check if we need to evict due to size
|
|
c.evictIfNeeded()
|
|
return
|
|
}</span>
|
|
|
|
// Create new entry
|
|
<span class="cov8" title="1">entry := &LRUCacheEntry{
|
|
key: key,
|
|
value: value,
|
|
size: size,
|
|
timestamp: time.Now(),
|
|
}
|
|
|
|
// Add to front of list
|
|
element := c.evictList.PushFront(entry)
|
|
entry.element = element
|
|
c.entries[key] = entry
|
|
c.currentSize += size
|
|
|
|
// Evict if necessary
|
|
c.evictIfNeeded()</span>
|
|
}
|
|
|
|
// evictIfNeeded removes entries when cache limits are exceeded
|
|
func (c *LRUCache) evictIfNeeded() <span class="cov8" title="1">{
|
|
// Evict based on entry count
|
|
for c.evictList.Len() > c.maxEntries </span><span class="cov0" title="0">{
|
|
c.evictOldest()
|
|
}</span>
|
|
|
|
// Evict based on size
|
|
<span class="cov8" title="1">for c.currentSize > c.maxSize && c.evictList.Len() > 0 </span><span class="cov0" title="0">{
|
|
c.evictOldest()
|
|
}</span>
|
|
}
|
|
|
|
// evictOldest removes the least recently used entry
|
|
func (c *LRUCache) evictOldest() <span class="cov0" title="0">{
|
|
element := c.evictList.Back()
|
|
if element == nil </span><span class="cov0" title="0">{
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">entry := element.Value.(*LRUCacheEntry)
|
|
c.removeEntry(entry)</span>
|
|
}
|
|
|
|
// removeEntry removes an entry from the cache
|
|
func (c *LRUCache) removeEntry(entry *LRUCacheEntry) <span class="cov0" title="0">{
|
|
c.evictList.Remove(entry.element)
|
|
delete(c.entries, entry.key)
|
|
c.currentSize -= entry.size
|
|
}</span>
|
|
|
|
// Delete removes a key from the cache
|
|
func (c *LRUCache) Delete(key string) <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
entry, exists := c.entries[key]
|
|
if !exists </span><span class="cov0" title="0">{
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">c.removeEntry(entry)</span>
|
|
}
|
|
|
|
// Clear removes all entries from the cache
|
|
func (c *LRUCache) Clear() <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
c.entries = make(map[string]*LRUCacheEntry)
|
|
c.evictList = list.New()
|
|
c.currentSize = 0
|
|
}</span>
|
|
|
|
// Len returns the number of entries in the cache
|
|
func (c *LRUCache) Len() int <span class="cov0" title="0">{
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.evictList.Len()
|
|
}</span>
|
|
|
|
// Size returns the current size of the cache in bytes
|
|
func (c *LRUCache) Size() int64 <span class="cov0" title="0">{
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.currentSize
|
|
}</span>
|
|
|
|
// CleanupExpired removes entries older than the given duration
|
|
func (c *LRUCache) CleanupExpired(maxAge time.Duration) int <span class="cov0" title="0">{
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
removed := 0
|
|
|
|
// Iterate from back (oldest) to front (newest)
|
|
for element := c.evictList.Back(); element != nil; </span><span class="cov0" title="0">{
|
|
entry := element.Value.(*LRUCacheEntry)
|
|
|
|
// If entry is not expired, we can stop (entries are ordered by access time)
|
|
if now.Sub(entry.timestamp) <= maxAge </span><span class="cov0" title="0">{
|
|
break</span>
|
|
}
|
|
|
|
// Remove expired entry
|
|
<span class="cov0" title="0">next := element.Prev()
|
|
c.removeEntry(entry)
|
|
removed++
|
|
element = next</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">return removed</span>
|
|
}
|
|
|
|
// GetStats returns cache statistics
|
|
func (c *LRUCache) GetStats() map[string]interface{} <span class="cov0" title="0">{
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"entries": c.evictList.Len(),
|
|
"size_bytes": c.currentSize,
|
|
"max_entries": c.maxEntries,
|
|
"max_size": c.maxSize,
|
|
"fill_percent": float64(c.currentSize) / float64(c.maxSize) * 100,
|
|
}
|
|
}</pre>
|
|
|
|
<pre class="file" id="file15" style="display: none">package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2/middleware/proxy"
|
|
"github.com/gookit/goutil/envutil"
|
|
graphql "github.com/lukaszraczylo/go-simple-graphql"
|
|
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
|
|
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
|
|
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
|
|
libpack_tracing "github.com/lukaszraczylo/graphql-monitoring-proxy/tracing"
|
|
)
|
|
|
|
var (
|
|
cfg *config
|
|
cfgMutex sync.RWMutex
|
|
once sync.Once
|
|
tracer *libpack_tracing.TracingSetup
|
|
shutdownManager *ShutdownManager
|
|
)
|
|
|
|
// getDetailsFromEnv retrieves the value from the environment or returns the default.
|
|
// It first checks for a prefixed environment variable (GMP_KEY), then falls back to the unprefixed version.
|
|
func getDetailsFromEnv[T any](key string, defaultValue T) T <span class="cov8" title="1">{
|
|
prefixedKey := "GMP_" + key
|
|
|
|
switch v := any(defaultValue).(type) </span>{
|
|
case string:<span class="cov8" title="1">
|
|
if val, ok := os.LookupEnv(prefixedKey); ok </span><span class="cov8" title="1">{
|
|
return any(val).(T)
|
|
}</span>
|
|
<span class="cov8" title="1">return any(envutil.Getenv(key, v)).(T)</span>
|
|
case int:<span class="cov8" title="1">
|
|
if val, ok := os.LookupEnv(prefixedKey); ok </span><span class="cov8" title="1">{
|
|
if intVal, err := strconv.Atoi(val); err == nil </span><span class="cov8" title="1">{
|
|
return any(intVal).(T)
|
|
}</span>
|
|
}
|
|
<span class="cov8" title="1">return any(envutil.GetInt(key, v)).(T)</span>
|
|
case bool:<span class="cov8" title="1">
|
|
if val, ok := os.LookupEnv(prefixedKey); ok </span><span class="cov8" title="1">{
|
|
boolVal := strings.ToLower(val) == "true" || val == "1"
|
|
return any(boolVal).(T)
|
|
}</span>
|
|
<span class="cov8" title="1">return any(envutil.GetBool(key, v)).(T)</span>
|
|
default:<span class="cov0" title="0">
|
|
return defaultValue</span>
|
|
}
|
|
}
|
|
|
|
// parseConfig loads and parses the configuration.
|
|
func parseConfig() <span class="cov8" title="1">{
|
|
libpack_config.PKG_NAME = "graphql_proxy"
|
|
c := config{}
|
|
// Server configurations
|
|
c.Server.PortGraphQL = getDetailsFromEnv("PORT_GRAPHQL", 8080)
|
|
c.Server.PortMonitoring = getDetailsFromEnv("MONITORING_PORT", 9393)
|
|
c.Server.HostGraphQL = getDetailsFromEnv("HOST_GRAPHQL", "http://localhost/")
|
|
c.Server.HostGraphQLReadOnly = getDetailsFromEnv("HOST_GRAPHQL_READONLY", "")
|
|
// Client configurations
|
|
c.Client.JWTUserClaimPath = getDetailsFromEnv("JWT_USER_CLAIM_PATH", "")
|
|
c.Client.JWTRoleClaimPath = getDetailsFromEnv("JWT_ROLE_CLAIM_PATH", "")
|
|
c.Client.RoleFromHeader = getDetailsFromEnv("ROLE_FROM_HEADER", "")
|
|
c.Client.RoleRateLimit = getDetailsFromEnv("ROLE_RATE_LIMIT", false)
|
|
// In-memory cache
|
|
c.Cache.CacheEnable = getDetailsFromEnv("ENABLE_GLOBAL_CACHE", false)
|
|
c.Cache.CacheTTL = getDetailsFromEnv("CACHE_TTL", 60)
|
|
c.Cache.CacheMaxMemorySize = getDetailsFromEnv("CACHE_MAX_MEMORY_SIZE", 100) // Default 100MB
|
|
c.Cache.CacheMaxEntries = getDetailsFromEnv("CACHE_MAX_ENTRIES", 10000) // Default 10000 entries
|
|
// Redis cache
|
|
c.Cache.CacheRedisEnable = getDetailsFromEnv("ENABLE_REDIS_CACHE", false)
|
|
c.Cache.CacheRedisURL = getDetailsFromEnv("CACHE_REDIS_URL", "localhost:6379")
|
|
c.Cache.CacheRedisPassword = getDetailsFromEnv("CACHE_REDIS_PASSWORD", "")
|
|
c.Cache.CacheRedisDB = getDetailsFromEnv("CACHE_REDIS_DB", 0)
|
|
// Security configurations
|
|
c.Security.BlockIntrospection = getDetailsFromEnv("BLOCK_SCHEMA_INTROSPECTION", false)
|
|
c.Security.IntrospectionAllowed = func() []string </span><span class="cov8" title="1">{
|
|
urls := getDetailsFromEnv("ALLOWED_INTROSPECTION", "")
|
|
if urls == "" </span><span class="cov8" title="1">{
|
|
return nil
|
|
}</span>
|
|
<span class="cov8" title="1">return strings.Split(urls, ",")</span>
|
|
}()
|
|
<span class="cov8" title="1">c.LogLevel = strings.ToUpper(getDetailsFromEnv("LOG_LEVEL", "info"))
|
|
// Logger setup
|
|
c.Logger = libpack_logging.New().SetMinLogLevel(libpack_logging.GetLogLevel(c.LogLevel)).
|
|
SetFieldName("timestamp", "ts").SetFieldName("message", "msg").SetShowCaller(false)
|
|
// Health check
|
|
c.Server.HealthcheckGraphQL = getDetailsFromEnv("HEALTHCHECK_GRAPHQL_URL", "")
|
|
c.Client.GQLClient = graphql.NewConnection()
|
|
c.Client.GQLClient.SetEndpoint(c.Server.HealthcheckGraphQL)
|
|
// Server modes
|
|
c.Server.AccessLog = getDetailsFromEnv("ENABLE_ACCESS_LOG", false)
|
|
c.Server.ReadOnlyMode = getDetailsFromEnv("READ_ONLY_MODE", false)
|
|
c.Server.AllowURLs = func() []string </span><span class="cov8" title="1">{
|
|
urls := getDetailsFromEnv("ALLOWED_URLS", "")
|
|
if urls == "" </span><span class="cov8" title="1">{
|
|
return nil
|
|
}</span>
|
|
<span class="cov0" title="0">return strings.Split(urls, ",")</span>
|
|
}()
|
|
|
|
// Client timeout and connection configurations with bounds checking
|
|
<span class="cov8" title="1">clientTimeout := getDetailsFromEnv("PROXIED_CLIENT_TIMEOUT", 120)
|
|
if clientTimeout < 1 || clientTimeout > 3600 </span><span class="cov0" title="0">{ // 1 second to 1 hour max
|
|
c.Logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Invalid client timeout, using default",
|
|
Pairs: map[string]interface{}{"requested": clientTimeout, "default": 120},
|
|
})
|
|
clientTimeout = 120
|
|
}</span>
|
|
<span class="cov8" title="1">c.Client.ClientTimeout = clientTimeout
|
|
|
|
// Configure HTTP connection pool and timeouts with sensible defaults
|
|
// MaxConnsPerHost limits parallel connections to prevent overwhelming backends
|
|
maxConns := getDetailsFromEnv("MAX_CONNS_PER_HOST", 1024)
|
|
if maxConns < 1 || maxConns > 10000 </span><span class="cov0" title="0">{ // Reasonable bounds
|
|
c.Logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Invalid max connections per host, using default",
|
|
Pairs: map[string]interface{}{"requested": maxConns, "default": 1024},
|
|
})
|
|
maxConns = 1024
|
|
}</span>
|
|
<span class="cov8" title="1">c.Client.MaxConnsPerHost = maxConns
|
|
|
|
// Configure distinct timeout values for more granular control with bounds checking
|
|
readTimeout := getDetailsFromEnv("CLIENT_READ_TIMEOUT", c.Client.ClientTimeout)
|
|
if readTimeout < 1 || readTimeout > 3600 </span><span class="cov0" title="0">{
|
|
readTimeout = c.Client.ClientTimeout
|
|
}</span>
|
|
<span class="cov8" title="1">c.Client.ReadTimeout = readTimeout
|
|
|
|
writeTimeout := getDetailsFromEnv("CLIENT_WRITE_TIMEOUT", c.Client.ClientTimeout)
|
|
if writeTimeout < 1 || writeTimeout > 3600 </span><span class="cov0" title="0">{
|
|
writeTimeout = c.Client.ClientTimeout
|
|
}</span>
|
|
<span class="cov8" title="1">c.Client.WriteTimeout = writeTimeout
|
|
|
|
// MaxIdleConnDuration controls how long connections stay in the pool
|
|
idleDuration := getDetailsFromEnv("CLIENT_MAX_IDLE_CONN_DURATION", 300)
|
|
if idleDuration < 1 || idleDuration > 7200 </span><span class="cov0" title="0">{ // 1 second to 2 hours max
|
|
idleDuration = 300
|
|
}</span>
|
|
<span class="cov8" title="1">c.Client.MaxIdleConnDuration = idleDuration
|
|
|
|
// Secure by default: TLS verification is enabled unless explicitly disabled
|
|
c.Client.DisableTLSVerify = getDetailsFromEnv("CLIENT_DISABLE_TLS_VERIFY", false)
|
|
|
|
// Create HTTP client with the optimized parameters
|
|
c.Client.FastProxyClient = createFasthttpClient(&c)
|
|
proxy.WithClient(c.Client.FastProxyClient) // Setting the global proxy client
|
|
// API configurations
|
|
c.Server.EnableApi = getDetailsFromEnv("ENABLE_API", false)
|
|
c.Server.ApiPort = getDetailsFromEnv("API_PORT", 9090)
|
|
|
|
// Validate and sanitize banned users file path to prevent path traversal
|
|
bannedUsersFile := getDetailsFromEnv("BANNED_USERS_FILE", "/go/src/app/banned_users.json")
|
|
if validatedPath, err := validateFilePath(bannedUsersFile); err != nil </span><span class="cov0" title="0">{
|
|
c.Logger.Error(&libpack_logging.LogMessage{
|
|
Message: "Invalid banned users file path, using default",
|
|
Pairs: map[string]interface{}{"requested": bannedUsersFile, "error": err.Error()},
|
|
})
|
|
c.Api.BannedUsersFile = "/go/src/app/banned_users.json"
|
|
}</span> else<span class="cov8" title="1"> {
|
|
c.Api.BannedUsersFile = validatedPath
|
|
}</span>
|
|
<span class="cov8" title="1">c.Server.PurgeOnCrawl = getDetailsFromEnv("PURGE_METRICS_ON_CRAWL", false)
|
|
c.Server.PurgeEvery = getDetailsFromEnv("PURGE_METRICS_ON_TIMER", 0)
|
|
// Hasura event cleaner
|
|
c.HasuraEventCleaner.Enable = getDetailsFromEnv("HASURA_EVENT_CLEANER", false)
|
|
c.HasuraEventCleaner.ClearOlderThan = getDetailsFromEnv("HASURA_EVENT_CLEANER_OLDER_THAN", 1)
|
|
c.HasuraEventCleaner.EventMetadataDb = getDetailsFromEnv("HASURA_EVENT_METADATA_DB", "")
|
|
// Tracing configuration
|
|
c.Tracing.Enable = getDetailsFromEnv("ENABLE_TRACE", false)
|
|
c.Tracing.Endpoint = getDetailsFromEnv("TRACE_ENDPOINT", "localhost:4317")
|
|
|
|
// Circuit Breaker configuration
|
|
c.CircuitBreaker.Enable = getDetailsFromEnv("ENABLE_CIRCUIT_BREAKER", false)
|
|
c.CircuitBreaker.MaxFailures = getDetailsFromEnv("CIRCUIT_MAX_FAILURES", 5)
|
|
c.CircuitBreaker.Timeout = getDetailsFromEnv("CIRCUIT_TIMEOUT_SECONDS", 30)
|
|
c.CircuitBreaker.MaxRequestsInHalfOpen = getDetailsFromEnv("CIRCUIT_MAX_HALF_OPEN_REQUESTS", 2)
|
|
c.CircuitBreaker.ReturnCachedOnOpen = getDetailsFromEnv("CIRCUIT_RETURN_CACHED_ON_OPEN", true)
|
|
c.CircuitBreaker.TripOnTimeouts = getDetailsFromEnv("CIRCUIT_TRIP_ON_TIMEOUTS", true)
|
|
c.CircuitBreaker.TripOn5xx = getDetailsFromEnv("CIRCUIT_TRIP_ON_5XX", true)
|
|
|
|
cfgMutex.Lock()
|
|
cfg = &c
|
|
cfgMutex.Unlock()
|
|
|
|
// Initialize tracing if enabled
|
|
if cfg.Tracing.Enable </span><span class="cov0" title="0">{
|
|
if cfg.Tracing.Endpoint == "" </span><span class="cov0" title="0">{
|
|
cfg.Logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Tracing endpoint not configured, using default localhost:4317",
|
|
})
|
|
cfg.Tracing.Endpoint = "localhost:4317"
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">var err error
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
tracer, err = libpack_tracing.NewTracing(ctx, cfg.Tracing.Endpoint)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logging.LogMessage{
|
|
Message: "Failed to initialize tracing",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span> else<span class="cov0" title="0"> {
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Tracing initialized",
|
|
Pairs: map[string]interface{}{"endpoint": cfg.Tracing.Endpoint},
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
// Initialize cache if enabled
|
|
<span class="cov8" title="1">if cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable </span><span class="cov0" title="0">{
|
|
cacheConfig := &libpack_cache.CacheConfig{
|
|
Logger: cfg.Logger,
|
|
TTL: cfg.Cache.CacheTTL,
|
|
}
|
|
// Redis cache configurations
|
|
if cfg.Cache.CacheRedisEnable </span><span class="cov0" title="0">{
|
|
cacheConfig.Redis.Enable = true
|
|
cacheConfig.Redis.URL = cfg.Cache.CacheRedisURL
|
|
cacheConfig.Redis.Password = cfg.Cache.CacheRedisPassword
|
|
cacheConfig.Redis.DB = cfg.Cache.CacheRedisDB
|
|
}</span> else<span class="cov0" title="0"> {
|
|
// Memory cache configurations
|
|
cacheConfig.Memory.MaxMemorySize = int64(cfg.Cache.CacheMaxMemorySize) * 1024 * 1024 // Convert MB to bytes
|
|
cacheConfig.Memory.MaxEntries = int64(cfg.Cache.CacheMaxEntries)
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Configuring memory cache with limits",
|
|
Pairs: map[string]interface{}{
|
|
"max_memory_mb": cfg.Cache.CacheMaxMemorySize,
|
|
"max_entries": cfg.Cache.CacheMaxEntries,
|
|
},
|
|
})
|
|
}</span>
|
|
<span class="cov0" title="0">libpack_cache.EnableCache(cacheConfig)</span>
|
|
|
|
// Start memory monitoring for in-memory cache if it's not Redis
|
|
// Will be started with context in main()
|
|
}
|
|
|
|
// Initialize circuit breaker if enabled
|
|
<span class="cov8" title="1">if cfg.CircuitBreaker.Enable </span><span class="cov0" title="0">{
|
|
initCircuitBreaker(cfg)
|
|
}</span>
|
|
|
|
// Load rate limit configuration with improved error handling
|
|
<span class="cov8" title="1">if err := loadRatelimitConfig(); err != nil </span><span class="cov0" title="0">{
|
|
// Log the error with clear guidance
|
|
detailedError := err.Error()
|
|
cfg.Logger.Error(&libpack_logging.LogMessage{
|
|
Message: "Failed to start service due to rate limit configuration error",
|
|
Pairs: map[string]interface{}{
|
|
"error": detailedError,
|
|
},
|
|
})
|
|
|
|
// If we're not in a test environment, print to stderr and exit if config error
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
fmt.Fprintln(os.Stderr, "⚠️ CRITICAL ERROR: Rate limit configuration problem detected")
|
|
fmt.Fprintln(os.Stderr, detailedError)
|
|
os.Exit(1)
|
|
}</span>
|
|
}
|
|
// API and event cleaner will be started with context in main()
|
|
<span class="cov8" title="1">prepareQueriesAndExemptions()
|
|
|
|
// Initialize GraphQL parsing optimizations
|
|
initGraphQLParsing()</span>
|
|
}
|
|
|
|
func main() <span class="cov0" title="0">{
|
|
// Parse configuration
|
|
parseConfig()
|
|
|
|
// Setup graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Initialize shutdown manager
|
|
shutdownManager = NewShutdownManager(ctx)
|
|
|
|
// Create a wait group to manage goroutines
|
|
var wg sync.WaitGroup
|
|
|
|
// Setup signal handling for graceful shutdown
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
|
|
go func() </span><span class="cov0" title="0">{
|
|
<-sigCh
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Shutdown signal received, stopping services...",
|
|
})
|
|
cancel()
|
|
}</span>()
|
|
|
|
// Start background services with context
|
|
<span class="cov0" title="0">once.Do(func() </span><span class="cov0" title="0">{
|
|
// Start API server
|
|
shutdownManager.RunGoroutine("api-server", func(ctx context.Context) </span><span class="cov0" title="0">{
|
|
if err := enableApi(ctx); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logging.LogMessage{
|
|
Message: "API server error",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span>
|
|
})
|
|
|
|
// Start event cleaner
|
|
<span class="cov0" title="0">shutdownManager.RunGoroutine("event-cleaner", func(ctx context.Context) </span><span class="cov0" title="0">{
|
|
if err := enableHasuraEventCleaner(ctx); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logging.LogMessage{
|
|
Message: "Event cleaner error",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span>
|
|
})
|
|
|
|
// Start cache memory monitoring if not using Redis
|
|
<span class="cov0" title="0">if cfg.Cache.CacheEnable && !cfg.Cache.CacheRedisEnable </span><span class="cov0" title="0">{
|
|
shutdownManager.RunGoroutine("cache-memory-monitoring", startCacheMemoryMonitoring)
|
|
}</span>
|
|
})
|
|
|
|
// Register connection pool for cleanup
|
|
<span class="cov0" title="0">shutdownManager.RegisterComponent("http-connection-pool", func(ctx context.Context) error </span><span class="cov0" title="0">{
|
|
if connectionPoolManager != nil </span><span class="cov0" title="0">{
|
|
return connectionPoolManager.Shutdown()
|
|
}</span>
|
|
<span class="cov0" title="0">return nil</span>
|
|
})
|
|
|
|
// Cache shutdown is handled internally by the cache implementation
|
|
|
|
// Start monitoring server
|
|
<span class="cov0" title="0">cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Starting monitoring server...",
|
|
Pairs: map[string]interface{}{"port": cfg.Server.PortMonitoring},
|
|
})
|
|
|
|
// Start monitoring server in a goroutine
|
|
wg.Add(1)
|
|
monitoringErrCh := make(chan error, 1)
|
|
go func() </span><span class="cov0" title="0">{
|
|
defer wg.Done()
|
|
if err := StartMonitoringServer(); err != nil </span><span class="cov0" title="0">{
|
|
monitoringErrCh <- err
|
|
}</span>
|
|
}()
|
|
|
|
// Give monitoring server time to initialize
|
|
<span class="cov0" title="0">select </span>{
|
|
case err := <-monitoringErrCh:<span class="cov0" title="0">
|
|
cfg.Logger.Critical(&libpack_logging.LogMessage{
|
|
Message: "Failed to start monitoring server",
|
|
Pairs: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"port": cfg.Server.PortMonitoring,
|
|
},
|
|
})
|
|
os.Exit(1)</span>
|
|
case <-time.After(2 * time.Second):<span class="cov0" title="0"></span>
|
|
// Continue if no error received within timeout
|
|
}
|
|
|
|
// Start HTTP proxy
|
|
<span class="cov0" title="0">cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Starting HTTP proxy server...",
|
|
Pairs: map[string]interface{}{"port": cfg.Server.PortGraphQL},
|
|
})
|
|
|
|
// Start HTTP proxy in a goroutine
|
|
wg.Add(1)
|
|
proxyErrCh := make(chan error, 1)
|
|
go func() </span><span class="cov0" title="0">{
|
|
defer wg.Done()
|
|
if err := StartHTTPProxy(); err != nil </span><span class="cov0" title="0">{
|
|
proxyErrCh <- err
|
|
}</span>
|
|
}()
|
|
|
|
// Block for a moment to check for immediate startup errors
|
|
<span class="cov0" title="0">select </span>{
|
|
case err := <-proxyErrCh:<span class="cov0" title="0">
|
|
cfg.Logger.Critical(&libpack_logging.LogMessage{
|
|
Message: "Failed to start HTTP proxy server",
|
|
Pairs: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"port": cfg.Server.PortGraphQL,
|
|
},
|
|
})
|
|
os.Exit(1)</span>
|
|
case <-time.After(1 * time.Second):<span class="cov0" title="0"></span>
|
|
// Continue if no error received within timeout
|
|
}
|
|
|
|
// Wait for context cancellation
|
|
<span class="cov0" title="0"><-ctx.Done()
|
|
|
|
// Perform cleanup
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Shutting down services...",
|
|
})
|
|
|
|
// Register tracer shutdown
|
|
if tracer != nil </span><span class="cov0" title="0">{
|
|
shutdownManager.RegisterComponent("tracer", func(ctx context.Context) error </span><span class="cov0" title="0">{
|
|
return tracer.Shutdown(ctx)
|
|
}</span>)
|
|
}
|
|
|
|
// Perform graceful shutdown of all components
|
|
<span class="cov0" title="0">if err := shutdownManager.Shutdown(30 * time.Second); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logging.LogMessage{
|
|
Message: "Error during shutdown",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span>
|
|
|
|
// Wait for all goroutines to finish (with timeout)
|
|
<span class="cov0" title="0">waitCh := make(chan struct{})
|
|
go func() </span><span class="cov0" title="0">{
|
|
wg.Wait()
|
|
close(waitCh)
|
|
}</span>()
|
|
|
|
<span class="cov0" title="0">select </span>{
|
|
case <-waitCh:<span class="cov0" title="0">
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "All services shut down gracefully",
|
|
})</span>
|
|
case <-time.After(10 * time.Second):<span class="cov0" title="0">
|
|
cfg.Logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Some services didn't shut down gracefully within timeout",
|
|
})</span>
|
|
}
|
|
}
|
|
|
|
// startCacheMemoryMonitoring polls memory cache usage and updates metrics
|
|
func startCacheMemoryMonitoring(ctx context.Context) <span class="cov0" title="0">{
|
|
// Check every few seconds (more frequent than cleanup routine)
|
|
ticker := time.NewTicker(15 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Starting memory cache monitoring",
|
|
})
|
|
|
|
// Use mutex to protect concurrent access to metrics registration
|
|
var metricsMutex sync.Mutex
|
|
|
|
// Create initial metrics with proper synchronization
|
|
metricsMutex.Lock()
|
|
cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryLimit, nil,
|
|
float64(libpack_cache.GetCacheMaxMemorySize()))
|
|
metricsMutex.Unlock()
|
|
|
|
for </span><span class="cov0" title="0">{
|
|
select </span>{
|
|
case <-ctx.Done():<span class="cov0" title="0">
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Stopping cache memory monitoring",
|
|
})
|
|
return</span>
|
|
case <-ticker.C:<span class="cov0" title="0">
|
|
// Skip if monitoring not initialized or cache not initialized
|
|
if cfg.Monitoring == nil || !libpack_cache.IsCacheInitialized() </span><span class="cov0" title="0">{
|
|
continue</span>
|
|
}
|
|
|
|
// Get current memory usage atomically
|
|
<span class="cov0" title="0">memoryUsage := libpack_cache.GetCacheMemoryUsage()
|
|
memoryLimit := libpack_cache.GetCacheMaxMemorySize()
|
|
|
|
// Update metrics with proper synchronization
|
|
metricsMutex.Lock()
|
|
cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryUsage, nil,
|
|
float64(memoryUsage))
|
|
|
|
cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryLimit, nil,
|
|
float64(memoryLimit))
|
|
|
|
// Calculate percentage (protect against division by zero)
|
|
var percentUsed float64
|
|
if memoryLimit > 0 </span><span class="cov0" title="0">{
|
|
percentUsed = float64(memoryUsage) / float64(memoryLimit) * 100.0
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">cfg.Monitoring.RegisterMetricsGauge(libpack_monitoring.MetricsCacheMemoryPercent, nil,
|
|
percentUsed)
|
|
metricsMutex.Unlock()
|
|
|
|
// Log if memory usage is high (over 80%)
|
|
if percentUsed > 80.0 </span><span class="cov0" title="0">{
|
|
cfg.Logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Memory cache usage is high",
|
|
Pairs: map[string]interface{}{
|
|
"memory_usage_bytes": memoryUsage,
|
|
"memory_limit_bytes": memoryLimit,
|
|
"percent_used": percentUsed,
|
|
},
|
|
})
|
|
}</span>
|
|
}
|
|
}
|
|
}
|
|
|
|
// validateFilePath validates and sanitizes file paths to prevent path traversal attacks
|
|
func validateFilePath(path string) (string, error) <span class="cov8" title="1">{
|
|
if path == "" </span><span class="cov0" title="0">{
|
|
return "", fmt.Errorf("empty file path")
|
|
}</span>
|
|
|
|
// Check for path traversal attempts
|
|
<span class="cov8" title="1">if strings.Contains(path, "..") </span><span class="cov0" title="0">{
|
|
return "", fmt.Errorf("path traversal detected")
|
|
}</span>
|
|
|
|
// Check for null bytes
|
|
<span class="cov8" title="1">if strings.Contains(path, "\x00") </span><span class="cov0" title="0">{
|
|
return "", fmt.Errorf("null byte in path")
|
|
}</span>
|
|
|
|
// Ensure path is absolute or within allowed directories
|
|
<span class="cov8" title="1">allowedPrefixes := []string{
|
|
"/go/src/app/",
|
|
"./",
|
|
"/tmp/",
|
|
"/var/tmp/",
|
|
}
|
|
|
|
isAllowed := false
|
|
for _, prefix := range allowedPrefixes </span><span class="cov8" title="1">{
|
|
if strings.HasPrefix(path, prefix) </span><span class="cov8" title="1">{
|
|
isAllowed = true
|
|
break</span>
|
|
}
|
|
}
|
|
|
|
<span class="cov8" title="1">if !isAllowed </span><span class="cov0" title="0">{
|
|
return "", fmt.Errorf("path not in allowed directories")
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return path, nil</span>
|
|
}
|
|
|
|
// ifNotInTest checks if the program is not running in a test environment.
|
|
func ifNotInTest() bool <span class="cov8" title="1">{
|
|
return flag.Lookup("test.v") == nil
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file16" style="display: none">package main
|
|
|
|
import (
|
|
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
|
|
)
|
|
|
|
// StartMonitoringServer initializes and starts the monitoring server.
|
|
func StartMonitoringServer() error <span class="cov8" title="1">{
|
|
cfg.Monitoring = libpack_monitoring.NewMonitoring(&libpack_monitoring.InitConfig{
|
|
PurgeOnCrawl: cfg.Server.PurgeOnCrawl,
|
|
PurgeEvery: cfg.Server.PurgeEvery,
|
|
})
|
|
cfg.Monitoring.AddMetricsPrefix("graphql_proxy")
|
|
cfg.Monitoring.RegisterDefaultMetrics()
|
|
|
|
// Currently, the monitoring server initialization doesn't throw errors,
|
|
// but we return nil to maintain the interface contract
|
|
return nil
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file17" style="display: none">package libpack_monitoring
|
|
|
|
func (ms *MetricsSetup) RegisterDefaultMetrics() <span class="cov8" title="1">{
|
|
ms.RegisterMetricsCounter(MetricsSucceeded, nil)
|
|
ms.RegisterMetricsCounter(MetricsFailed, nil)
|
|
ms.RegisterMetricsCounter(MetricsSkipped, nil)
|
|
ms.RegisterMetricsHistogram(MetricsDuration, nil)
|
|
ms.RegisterMetricsCounter(MetricsCacheHit, nil)
|
|
ms.RegisterMetricsCounter(MetricsCacheMiss, nil)
|
|
ms.RegisterMetricsCounter(MetricsQueriesCached, nil)
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) RegisterGoMetrics() {<span class="cov0" title="0">
|
|
// TODO: metrics.WriteProcessMetrics(ms.metrics_set)
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file18" style="display: none">package libpack_monitoring
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"unicode"
|
|
|
|
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
|
|
)
|
|
|
|
var sortedLabelKeysCache = struct {
|
|
m sync.Map
|
|
}{}
|
|
|
|
func (ms *MetricsSetup) get_metrics_name(name string, labels map[string]string) string <span class="cov8" title="1">{
|
|
var buf bytes.Buffer
|
|
|
|
podName := getPodName()
|
|
if labels == nil </span><span class="cov8" title="1">{
|
|
labels = defaultLabels(podName)
|
|
}</span> else<span class="cov8" title="1"> {
|
|
ensureDefaultLabels(&labels, podName)
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">if ms.metrics_prefix != "" </span><span class="cov8" title="1">{
|
|
buf.WriteString(ms.metrics_prefix)
|
|
buf.WriteByte('_')
|
|
}</span>
|
|
<span class="cov8" title="1">buf.WriteString(name)
|
|
|
|
if len(labels) > 0 </span><span class="cov8" title="1">{
|
|
buf.WriteByte('{')
|
|
appendSortedLabels(&buf, labels)
|
|
buf.WriteByte('}')
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return buf.String()</span>
|
|
}
|
|
|
|
func getPodName() string <span class="cov8" title="1">{
|
|
const unknownPodName = "unknown"
|
|
if hn, err := os.Hostname(); err == nil </span><span class="cov8" title="1">{
|
|
return hn
|
|
}</span>
|
|
<span class="cov0" title="0">return unknownPodName</span>
|
|
}
|
|
|
|
func defaultLabels(podName string) map[string]string <span class="cov8" title="1">{
|
|
return map[string]string{
|
|
"microservice": libpack_config.PKG_NAME,
|
|
"pod": podName,
|
|
}
|
|
}</span>
|
|
|
|
func ensureDefaultLabels(labels *map[string]string, podName string) <span class="cov8" title="1">{
|
|
if *labels == nil </span><span class="cov8" title="1">{
|
|
*labels = make(map[string]string)
|
|
}</span>
|
|
<span class="cov8" title="1">if _, exists := (*labels)["microservice"]; !exists </span><span class="cov8" title="1">{
|
|
(*labels)["microservice"] = libpack_config.PKG_NAME
|
|
}</span>
|
|
<span class="cov8" title="1">if _, exists := (*labels)["pod"]; !exists </span><span class="cov8" title="1">{
|
|
(*labels)["pod"] = podName
|
|
}</span>
|
|
}
|
|
|
|
func appendSortedLabels(buf *bytes.Buffer, labels map[string]string) <span class="cov8" title="1">{
|
|
keys := getSortedKeys(labels)
|
|
for i, k := range keys </span><span class="cov8" title="1">{
|
|
if i > 0 </span><span class="cov8" title="1">{
|
|
buf.WriteByte(',')
|
|
}</span>
|
|
<span class="cov8" title="1">buf.WriteString(k)
|
|
buf.WriteString(`="`)
|
|
buf.WriteString(labels[k])
|
|
buf.WriteByte('"')</span>
|
|
}
|
|
}
|
|
|
|
func getSortedKeys(labels map[string]string) []string <span class="cov8" title="1">{
|
|
labelsKey := labelsToString(labels)
|
|
|
|
// Check if the sorted keys are already cached
|
|
if keys, ok := sortedLabelKeysCache.m.Load(labelsKey); ok </span><span class="cov8" title="1">{
|
|
return keys.([]string)
|
|
}</span>
|
|
|
|
// Compute the sorted keys
|
|
<span class="cov8" title="1">keys := make([]string, 0, len(labels))
|
|
for k := range labels </span><span class="cov8" title="1">{
|
|
keys = append(keys, k)
|
|
}</span>
|
|
<span class="cov8" title="1">sort.Strings(keys)
|
|
|
|
// Store the sorted keys in the cache
|
|
sortedLabelKeysCache.m.Store(labelsKey, keys)
|
|
|
|
return keys</span>
|
|
}
|
|
|
|
func labelsToString(labels map[string]string) string <span class="cov8" title="1">{
|
|
keys := make([]string, 0, len(labels))
|
|
for k := range labels </span><span class="cov8" title="1">{
|
|
keys = append(keys, k)
|
|
}</span>
|
|
<span class="cov8" title="1">sort.Strings(keys)
|
|
|
|
var sb strings.Builder
|
|
for _, k := range keys </span><span class="cov8" title="1">{
|
|
sb.WriteString(k)
|
|
sb.WriteByte('=')
|
|
sb.WriteString(labels[k])
|
|
sb.WriteByte(';')
|
|
}</span>
|
|
<span class="cov8" title="1">return sb.String()</span>
|
|
}
|
|
|
|
func validate_metrics_name(name string) error <span class="cov8" title="1">{
|
|
cleanedName := clean_metric_name(name)
|
|
|
|
finalName := strings.Trim(cleanedName, "_")
|
|
|
|
if finalName != name </span><span class="cov8" title="1">{
|
|
return fmt.Errorf("invalid metric name: %s, expected %s", name, finalName)
|
|
}</span>
|
|
<span class="cov8" title="1">return nil</span>
|
|
}
|
|
|
|
func clean_metric_name(name string) string <span class="cov8" title="1">{
|
|
var buf bytes.Buffer
|
|
lastWasUnderscore := false
|
|
|
|
for _, r := range name </span><span class="cov8" title="1">{
|
|
if is_allowed_rune(r) </span><span class="cov8" title="1">{
|
|
if is_special_rune(r) </span><span class="cov8" title="1">{
|
|
if lastWasUnderscore </span><span class="cov8" title="1">{
|
|
continue</span>
|
|
}
|
|
<span class="cov8" title="1">r = '_'
|
|
lastWasUnderscore = true</span>
|
|
} else<span class="cov8" title="1"> {
|
|
lastWasUnderscore = false
|
|
}</span>
|
|
<span class="cov8" title="1">buf.WriteRune(r)</span>
|
|
} else<span class="cov8" title="1"> if !lastWasUnderscore </span><span class="cov8" title="1">{
|
|
buf.WriteByte('_')
|
|
lastWasUnderscore = true
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">return strings.Trim(buf.String(), "_")</span>
|
|
}
|
|
|
|
func is_allowed_rune(r rune) bool <span class="cov8" title="1">{
|
|
return unicode.IsLetter(r) || unicode.IsDigit(r) || r == ' ' || r == '_'
|
|
}</span>
|
|
|
|
func is_special_rune(r rune) bool <span class="cov8" title="1">{
|
|
return r == ' ' || r == '_'
|
|
}</span>
|
|
|
|
func compile_metrics_with_labels(name string, labels map[string]string) string <span class="cov8" title="1">{
|
|
var buf bytes.Buffer
|
|
|
|
buf.WriteString(name)
|
|
|
|
keys := getSortedKeys(labels)
|
|
|
|
for _, k := range keys </span><span class="cov8" title="1">{
|
|
buf.WriteByte('_')
|
|
buf.WriteString(k)
|
|
buf.WriteByte('_')
|
|
buf.WriteString(labels[k])
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return buf.String()</span>
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file19" style="display: none">package libpack_monitoring
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/gofiber/fiber/v2"
|
|
"github.com/gookit/goutil/envutil"
|
|
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
type MetricsSetup struct {
|
|
metrics_set *metrics.Set
|
|
metrics_set_custom *metrics.Set
|
|
ic *InitConfig
|
|
metrics_prefix string
|
|
}
|
|
|
|
var log = libpack_logger.New().SetMinLogLevel(libpack_logger.LEVEL_INFO)
|
|
|
|
type InitConfig struct {
|
|
PurgeOnCrawl bool
|
|
PurgeEvery int
|
|
}
|
|
|
|
func NewMonitoring(ic *InitConfig) *MetricsSetup <span class="cov8" title="1">{
|
|
ms := &MetricsSetup{
|
|
ic: ic,
|
|
metrics_set: metrics.NewSet(),
|
|
metrics_set_custom: metrics.NewSet(),
|
|
}
|
|
|
|
if flag.Lookup("test.v") == nil </span><span class="cov0" title="0">{
|
|
go ms.startPrometheusEndpoint()
|
|
|
|
if ic.PurgeEvery > 0 </span><span class="cov0" title="0">{
|
|
ticker := time.NewTicker(time.Duration(ic.PurgeEvery) * time.Second)
|
|
go func() </span><span class="cov0" title="0">{
|
|
for range ticker.C </span><span class="cov0" title="0">{
|
|
ms.PurgeMetrics()
|
|
}</span>
|
|
}()
|
|
}
|
|
}
|
|
|
|
<span class="cov8" title="1">return ms</span>
|
|
}
|
|
|
|
func (ms *MetricsSetup) startPrometheusEndpoint() <span class="cov0" title="0">{
|
|
app := fiber.New(fiber.Config{
|
|
DisableStartupMessage: true,
|
|
AppName: fmt.Sprintf("GraphQL Monitoring Proxy - %s v%s", libpack_config.PKG_NAME, libpack_config.PKG_VERSION),
|
|
})
|
|
app.Get("/metrics", ms.metricsEndpoint)
|
|
if err := app.Listen(fmt.Sprintf(":%d", envutil.GetInt("MONITORING_PORT", 9393))); err != nil </span><span class="cov0" title="0">{
|
|
log.Critical(&libpack_logger.LogMessage{
|
|
Message: "Can't start the MONITORING service",
|
|
Pairs: map[string]interface{}{"error": err},
|
|
})
|
|
}</span>
|
|
}
|
|
|
|
func (ms *MetricsSetup) metricsEndpoint(c *fiber.Ctx) error <span class="cov8" title="1">{
|
|
ms.metrics_set.WritePrometheus(c.Response().BodyWriter())
|
|
ms.metrics_set_custom.WritePrometheus(c.Response().BodyWriter())
|
|
|
|
if ms.ic.PurgeOnCrawl && ms.ic.PurgeEvery == 0 </span><span class="cov0" title="0">{
|
|
ms.PurgeMetrics()
|
|
}</span>
|
|
<span class="cov8" title="1">return nil</span>
|
|
}
|
|
|
|
func (ms *MetricsSetup) AddMetricsPrefix(prefix string) <span class="cov8" title="1">{
|
|
ms.metrics_prefix = prefix
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) ListActiveMetrics() []string <span class="cov8" title="1">{
|
|
return ms.metrics_set.ListMetricNames()
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) RegisterMetricsGauge(metric_name string, labels map[string]string, val float64) *metrics.Gauge <span class="cov8" title="1">{
|
|
if err := validate_metrics_name(metric_name); err != nil </span><span class="cov0" title="0">{
|
|
log.Error(&libpack_logger.LogMessage{
|
|
Message: "RegisterMetricsGauge() error - invalid metric name",
|
|
Pairs: map[string]interface{}{"error": err.Error(), "metric_name": metric_name},
|
|
})
|
|
// Return a dummy gauge instead of nil to prevent panics
|
|
return &metrics.Gauge{}
|
|
}</span>
|
|
<span class="cov8" title="1">return ms.metrics_set_custom.GetOrCreateGauge(ms.get_metrics_name(metric_name, labels), func() float64 </span><span class="cov8" title="1">{
|
|
return val
|
|
}</span>)
|
|
}
|
|
|
|
func (ms *MetricsSetup) RegisterMetricsCounter(metric_name string, labels map[string]string) *metrics.Counter <span class="cov8" title="1">{
|
|
if err := validate_metrics_name(metric_name); err != nil </span><span class="cov0" title="0">{
|
|
log.Error(&libpack_logger.LogMessage{
|
|
Message: "RegisterMetricsCounter() error - invalid metric name",
|
|
Pairs: map[string]interface{}{"error": err.Error(), "metric_name": metric_name},
|
|
})
|
|
// Return a dummy counter instead of nil to prevent panics
|
|
return &metrics.Counter{}
|
|
}</span>
|
|
<span class="cov8" title="1">if metric_name == MetricsSucceeded || metric_name == MetricsFailed || metric_name == MetricsSkipped </span><span class="cov8" title="1">{
|
|
return ms.metrics_set.GetOrCreateCounter(ms.get_metrics_name(metric_name, labels))
|
|
}</span>
|
|
<span class="cov8" title="1">return ms.metrics_set_custom.GetOrCreateCounter(ms.get_metrics_name(metric_name, labels))</span>
|
|
}
|
|
|
|
func (ms *MetricsSetup) RegisterFloatCounter(metric_name string, labels map[string]string) *metrics.FloatCounter <span class="cov8" title="1">{
|
|
if err := validate_metrics_name(metric_name); err != nil </span><span class="cov0" title="0">{
|
|
log.Error(&libpack_logger.LogMessage{
|
|
Message: "RegisterFloatCounter() error - invalid metric name",
|
|
Pairs: map[string]interface{}{"error": err.Error(), "metric_name": metric_name},
|
|
})
|
|
// Return a dummy float counter instead of nil to prevent panics
|
|
return &metrics.FloatCounter{}
|
|
}</span>
|
|
<span class="cov8" title="1">return ms.metrics_set_custom.GetOrCreateFloatCounter(ms.get_metrics_name(metric_name, labels))</span>
|
|
}
|
|
|
|
func (ms *MetricsSetup) RegisterMetricsSummary(metric_name string, labels map[string]string) *metrics.Summary <span class="cov8" title="1">{
|
|
if err := validate_metrics_name(metric_name); err != nil </span><span class="cov0" title="0">{
|
|
log.Error(&libpack_logger.LogMessage{
|
|
Message: "RegisterMetricsSummary() error - invalid metric name",
|
|
Pairs: map[string]interface{}{"error": err.Error(), "metric_name": metric_name},
|
|
})
|
|
// Return a dummy summary instead of nil to prevent panics
|
|
return &metrics.Summary{}
|
|
}</span>
|
|
<span class="cov8" title="1">return ms.metrics_set_custom.GetOrCreateSummary(ms.get_metrics_name(metric_name, labels))</span>
|
|
}
|
|
|
|
func (ms *MetricsSetup) RegisterMetricsHistogram(metric_name string, labels map[string]string) *metrics.Histogram <span class="cov8" title="1">{
|
|
if err := validate_metrics_name(metric_name); err != nil </span><span class="cov0" title="0">{
|
|
log.Error(&libpack_logger.LogMessage{
|
|
Message: "RegisterMetricsHistogram() error - invalid metric name",
|
|
Pairs: map[string]interface{}{"error": err.Error(), "metric_name": metric_name},
|
|
})
|
|
// Return a dummy histogram instead of nil to prevent panics
|
|
return &metrics.Histogram{}
|
|
}</span>
|
|
<span class="cov8" title="1">return ms.metrics_set_custom.GetOrCreateHistogram(ms.get_metrics_name(metric_name, labels))</span>
|
|
}
|
|
|
|
func (ms *MetricsSetup) Increment(metric_name string, labels map[string]string) <span class="cov8" title="1">{
|
|
ms.RegisterMetricsCounter(metric_name, labels).Inc()
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) IncrementFloat(metric_name string, labels map[string]string, value float64) <span class="cov8" title="1">{
|
|
ms.RegisterFloatCounter(metric_name, labels).Add(value)
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) Set(metric_name string, labels map[string]string, value uint64) <span class="cov8" title="1">{
|
|
ms.RegisterMetricsCounter(metric_name, labels).Set(value)
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) Update(metric_name string, labels map[string]string, value float64) <span class="cov8" title="1">{
|
|
ms.RegisterMetricsHistogram(metric_name, labels).Update(value)
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) UpdateDuration(metric_name string, labels map[string]string, value time.Time) <span class="cov8" title="1">{
|
|
ms.RegisterMetricsHistogram(metric_name, labels).UpdateDuration(value)
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) UpdateSummary(metric_name string, labels map[string]string, value float64) <span class="cov8" title="1">{
|
|
ms.RegisterMetricsSummary(metric_name, labels).Update(value)
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) RemoveMetrics(metric_name string, labels map[string]string) <span class="cov8" title="1">{
|
|
ms.metrics_set_custom.UnregisterMetric(ms.get_metrics_name(metric_name, labels))
|
|
}</span>
|
|
|
|
func (ms *MetricsSetup) PurgeMetrics() <span class="cov8" title="1">{
|
|
ms.metrics_set_custom.UnregisterAllMetrics()
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file20" style="display: none">package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"github.com/avast/retry-go/v4"
|
|
"github.com/gofiber/fiber/v2"
|
|
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
|
|
libpack_tracing "github.com/lukaszraczylo/graphql-monitoring-proxy/tracing"
|
|
"github.com/sony/gobreaker"
|
|
"github.com/valyala/fasthttp"
|
|
)
|
|
|
|
// Errors related to circuit breaker
|
|
var (
|
|
ErrCircuitOpen = errors.New("circuit breaker is open")
|
|
)
|
|
|
|
// Default values for circuit breaker
|
|
const (
|
|
defaultMaxRequestsInHalfOpen = 10 // Default maximum requests in half-open state
|
|
)
|
|
|
|
// Global circuit breaker
|
|
var (
|
|
cb *gobreaker.CircuitBreaker
|
|
cbMutex sync.RWMutex
|
|
)
|
|
|
|
// safeUint32 converts an int to uint32 safely, handling negative values and values exceeding uint32 max
|
|
func safeUint32(value int) uint32 <span class="cov8" title="1">{
|
|
// Handle negative values
|
|
if value < 0 </span><span class="cov8" title="1">{
|
|
return 0
|
|
}</span>
|
|
|
|
// Handle values exceeding uint32 max
|
|
<span class="cov8" title="1">if value > math.MaxUint32 </span><span class="cov8" title="1">{
|
|
return math.MaxUint32
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return uint32(value)</span>
|
|
}
|
|
|
|
// initCircuitBreaker initializes the circuit breaker with configured settings
|
|
func initCircuitBreaker(config *config) <span class="cov8" title="1">{
|
|
// Only initialize if enabled
|
|
if !config.CircuitBreaker.Enable </span><span class="cov8" title="1">{
|
|
config.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Circuit breaker is disabled",
|
|
})
|
|
return
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">cbMutex.Lock()
|
|
defer cbMutex.Unlock()
|
|
|
|
// Initialize circuit breaker metrics
|
|
InitializeCircuitBreakerMetrics(config.Monitoring)
|
|
|
|
// Create circuit breaker settings
|
|
cbSettings := gobreaker.Settings{
|
|
Name: "graphql-proxy-circuit",
|
|
MaxRequests: safeMaxRequests(config.CircuitBreaker.MaxRequestsInHalfOpen),
|
|
Interval: 0, // No specific interval for counting failures
|
|
Timeout: time.Duration(config.CircuitBreaker.Timeout) * time.Second,
|
|
ReadyToTrip: createTripFunc(config),
|
|
OnStateChange: createStateChangeFunc(config),
|
|
}
|
|
|
|
// Initialize the circuit breaker
|
|
cb = gobreaker.NewCircuitBreaker(cbSettings)
|
|
|
|
config.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Circuit breaker initialized",
|
|
Pairs: map[string]interface{}{
|
|
"max_failures": config.CircuitBreaker.MaxFailures,
|
|
"timeout_seconds": config.CircuitBreaker.Timeout,
|
|
"max_half_open_reqs": config.CircuitBreaker.MaxRequestsInHalfOpen,
|
|
},
|
|
})</span>
|
|
}
|
|
|
|
// createTripFunc returns a function that determines when to trip the circuit
|
|
func createTripFunc(config *config) func(counts gobreaker.Counts) bool <span class="cov8" title="1">{
|
|
return func(counts gobreaker.Counts) bool </span><span class="cov8" title="1">{
|
|
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
|
|
shouldTrip := counts.ConsecutiveFailures >= safeUint32(config.CircuitBreaker.MaxFailures)
|
|
|
|
if shouldTrip </span><span class="cov8" title="1">{
|
|
config.Logger.Warning(&libpack_logger.LogMessage{
|
|
Message: "Circuit breaker tripped",
|
|
Pairs: map[string]interface{}{
|
|
"consecutive_failures": counts.ConsecutiveFailures,
|
|
"failure_ratio": failureRatio,
|
|
"total_failures": counts.TotalFailures,
|
|
"total_requests": counts.Requests,
|
|
},
|
|
})
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return shouldTrip</span>
|
|
}
|
|
}
|
|
|
|
// createStateChangeFunc returns a function that handles circuit state changes
|
|
func createStateChangeFunc(config *config) func(name string, from gobreaker.State, to gobreaker.State) <span class="cov8" title="1">{
|
|
return func(name string, from gobreaker.State, to gobreaker.State) </span><span class="cov8" title="1">{
|
|
var stateValue float64
|
|
var stateName string
|
|
|
|
switch to </span>{
|
|
case gobreaker.StateOpen:<span class="cov8" title="1">
|
|
stateValue = float64(libpack_monitoring.CircuitOpen)
|
|
stateName = "open"</span>
|
|
case gobreaker.StateHalfOpen:<span class="cov8" title="1">
|
|
stateValue = float64(libpack_monitoring.CircuitHalfOpen)
|
|
stateName = "half-open"</span>
|
|
case gobreaker.StateClosed:<span class="cov8" title="1">
|
|
stateValue = float64(libpack_monitoring.CircuitClosed)
|
|
stateName = "closed"</span>
|
|
}
|
|
|
|
// Update metrics using atomic operations to prevent race conditions
|
|
// Use a separate atomic variable to track state instead of recreating gauges
|
|
<span class="cov8" title="1">updateCircuitBreakerState(config, stateValue)
|
|
|
|
// Log state change
|
|
config.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Circuit breaker state changed",
|
|
Pairs: map[string]interface{}{
|
|
"from": from.String(),
|
|
"to": to.String(),
|
|
"name": name,
|
|
},
|
|
})
|
|
|
|
// Use the new metrics system
|
|
if cbMetrics != nil </span><span class="cov8" title="1">{
|
|
// Replace hyphens with underscores to avoid validation errors
|
|
safeStateName := strings.ReplaceAll(stateName, "-", "_")
|
|
stateKey := fmt.Sprintf("circuit_state_%s", safeStateName)
|
|
counter := cbMetrics.GetOrCreateFailCounter(config.Monitoring, stateKey)
|
|
counter.Inc()
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
// createFasthttpClient creates and configures a fasthttp client with optimized settings.
|
|
// The client is configured based on the provided configuration settings, with careful
|
|
// attention to performance and security considerations.
|
|
func createFasthttpClient(clientConfig *config) *fasthttp.Client <span class="cov8" title="1">{
|
|
tlsConfig := &tls.Config{
|
|
InsecureSkipVerify: clientConfig.Client.DisableTLSVerify,
|
|
}
|
|
|
|
// Calculate timeout values, ensuring they're always positive
|
|
clientTimeout := time.Duration(clientConfig.Client.ClientTimeout) * time.Second
|
|
if clientTimeout <= 0 </span><span class="cov0" title="0">{
|
|
clientTimeout = 30 * time.Second // Default timeout of 30 seconds
|
|
}</span>
|
|
|
|
// For timeout behavior, use the client timeout for all timeout settings
|
|
// to ensure consistent behavior
|
|
<span class="cov8" title="1">readTimeout := clientTimeout
|
|
writeTimeout := clientTimeout
|
|
|
|
// Create a custom dialer with timeout
|
|
dialer := &fasthttp.TCPDialer{
|
|
Concurrency: 1000,
|
|
DNSCacheDuration: time.Hour,
|
|
}
|
|
|
|
client := &fasthttp.Client{
|
|
Name: "graphql_proxy",
|
|
NoDefaultUserAgentHeader: true,
|
|
TLSConfig: tlsConfig,
|
|
// Control connection pool size to prevent overwhelming backend services
|
|
MaxConnsPerHost: clientConfig.Client.MaxConnsPerHost,
|
|
// Configure timeouts to handle different network scenarios
|
|
// Setting all timeout-related parameters to ensure proper timeout behavior
|
|
Dial: func(addr string) (net.Conn, error) </span><span class="cov8" title="1">{
|
|
return dialer.DialTimeout(addr, clientTimeout)
|
|
}</span>,
|
|
ReadTimeout: readTimeout,
|
|
WriteTimeout: writeTimeout,
|
|
MaxIdleConnDuration: time.Duration(clientConfig.Client.MaxIdleConnDuration) * time.Second,
|
|
MaxConnDuration: clientTimeout,
|
|
DisableHeaderNamesNormalizing: false,
|
|
// Performance tuning
|
|
ReadBufferSize: 4096,
|
|
WriteBufferSize: 4096,
|
|
MaxResponseBodySize: 1024 * 1024 * 10, // 10MB max response size
|
|
DisablePathNormalizing: false,
|
|
}
|
|
|
|
// Initialize connection pool manager
|
|
<span class="cov8" title="1">InitializeConnectionPool(client)
|
|
|
|
return client</span>
|
|
}
|
|
|
|
// proxyTheRequest handles the request proxying logic.
|
|
func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error <span class="cov8" title="1">{
|
|
// Setup tracing if enabled
|
|
var span trace.Span
|
|
var ctx context.Context
|
|
|
|
if cfg.Tracing.Enable && tracer != nil </span><span class="cov0" title="0">{
|
|
ctx = setupTracing(c)
|
|
span, _ = tracer.StartSpan(ctx, "proxy_request")
|
|
defer span.End()
|
|
}</span>
|
|
|
|
// Check if URL is allowed
|
|
<span class="cov8" title="1">if !checkAllowedURLs(c) </span><span class="cov0" title="0">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
|
|
}</span>
|
|
<span class="cov0" title="0">return fmt.Errorf("request blocked - not allowed URL: %s", c.Path())</span>
|
|
}
|
|
|
|
// Construct and validate proxy URL
|
|
<span class="cov8" title="1">proxyURL := currentEndpoint + c.Path()
|
|
if _, err := url.Parse(proxyURL); err != nil </span><span class="cov8" title="1">{
|
|
return fmt.Errorf("invalid URL: %v", err)
|
|
}</span>
|
|
|
|
// Log request details in debug mode
|
|
<span class="cov8" title="1">if cfg.LogLevel == "DEBUG" </span><span class="cov0" title="0">{
|
|
logDebugRequest(c)
|
|
}</span>
|
|
|
|
// Perform the proxy request with retries
|
|
<span class="cov8" title="1">if err := performProxyRequest(c, proxyURL); err != nil </span><span class="cov8" title="1">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
|
|
}</span>
|
|
<span class="cov8" title="1">return err</span>
|
|
}
|
|
|
|
// Log response details in debug mode
|
|
<span class="cov8" title="1">if cfg.LogLevel == "DEBUG" </span><span class="cov0" title="0">{
|
|
logDebugResponse(c)
|
|
}</span>
|
|
|
|
// Handle gzipped responses
|
|
<span class="cov8" title="1">if err := handleGzippedResponse(c); err != nil </span><span class="cov8" title="1">{
|
|
return err
|
|
}</span>
|
|
|
|
// Final status check
|
|
<span class="cov8" title="1">if c.Response().StatusCode() != fiber.StatusOK </span><span class="cov0" title="0">{
|
|
if ifNotInTest() </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
|
|
}</span>
|
|
<span class="cov0" title="0">return fmt.Errorf("received non-200 response from the GraphQL server: %d", c.Response().StatusCode())</span>
|
|
}
|
|
|
|
// Remove server header for security
|
|
<span class="cov8" title="1">c.Response().Header.Del(fiber.HeaderServer)
|
|
return nil</span>
|
|
}
|
|
|
|
// setupTracing extracts and sets up tracing context from request headers
|
|
func setupTracing(c *fiber.Ctx) context.Context <span class="cov0" title="0">{
|
|
ctx := context.Background()
|
|
|
|
if !cfg.Tracing.Enable || tracer == nil </span><span class="cov0" title="0">{
|
|
return ctx
|
|
}</span>
|
|
|
|
// Extract trace information from header
|
|
<span class="cov0" title="0">if traceHeader := c.Get("X-Trace-Span"); traceHeader != "" </span><span class="cov0" title="0">{
|
|
spanInfo, err := libpack_tracing.ParseTraceHeader(traceHeader)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Warning(&libpack_logger.LogMessage{
|
|
Message: "Failed to parse trace header",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
}</span> else<span class="cov0" title="0"> if spanCtx, err := tracer.ExtractSpanContext(spanInfo); err == nil </span><span class="cov0" title="0">{
|
|
ctx = trace.ContextWithSpanContext(ctx, spanCtx)
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">return ctx</span>
|
|
}
|
|
|
|
// performProxyRequest executes the proxy request with retries and circuit breaker
|
|
func performProxyRequest(c *fiber.Ctx, proxyURL string) error <span class="cov8" title="1">{
|
|
// If circuit breaker is not enabled, use the original method
|
|
if !cfg.CircuitBreaker.Enable || cb == nil </span><span class="cov8" title="1">{
|
|
return performProxyRequestWithRetries(c, proxyURL)
|
|
}</span>
|
|
|
|
// Calculate cache key for potential fallback
|
|
<span class="cov8" title="1">cacheKey := libpack_cache.CalculateHash(c)
|
|
|
|
// Execute request through circuit breaker
|
|
_, err := cb.Execute(func() (interface{}, error) </span><span class="cov8" title="1">{
|
|
// Execute the request with retries
|
|
err := performProxyRequestWithRetries(c, proxyURL)
|
|
// Check if the error or status code should trip the circuit breaker
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
// Log error that could potentially trip the circuit
|
|
cfg.Logger.Warning(&libpack_logger.LogMessage{
|
|
Message: "Error in circuit-protected request",
|
|
Pairs: map[string]interface{}{
|
|
"path": c.Path(),
|
|
"error": err.Error(),
|
|
},
|
|
})
|
|
return nil, err
|
|
}</span>
|
|
|
|
// Check if non-2xx responses should trip the circuit
|
|
<span class="cov8" title="1">statusCode := c.Response().StatusCode()
|
|
if cfg.CircuitBreaker.TripOn5xx && statusCode >= 500 && statusCode < 600 </span><span class="cov0" title="0">{
|
|
err := fmt.Errorf("received 5xx status code: %d", statusCode)
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsCircuitFailed, nil)
|
|
return nil, err
|
|
}</span>
|
|
|
|
// Request was successful
|
|
<span class="cov8" title="1">cfg.Monitoring.Increment(libpack_monitoring.MetricsCircuitSuccessful, nil)
|
|
return nil, nil</span>
|
|
})
|
|
|
|
// If the circuit is open, try to serve from cache if configured
|
|
<span class="cov8" title="1">if err == gobreaker.ErrOpenState && cfg.CircuitBreaker.ReturnCachedOnOpen </span><span class="cov8" title="1">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsCircuitRejected, nil)
|
|
|
|
// Try to fetch from cache
|
|
if cachedResponse := libpack_cache.CacheLookup(cacheKey); cachedResponse != nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Circuit open - serving from cache",
|
|
Pairs: map[string]interface{}{
|
|
"path": c.Path(),
|
|
},
|
|
})
|
|
|
|
// Set response from cache
|
|
c.Response().SetBody(cachedResponse)
|
|
c.Response().SetStatusCode(fiber.StatusOK)
|
|
|
|
// Mark as cache hit since we're serving from cache
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheHit, nil)
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsCircuitFallbackSuccess, nil)
|
|
|
|
return nil
|
|
}</span>
|
|
|
|
// No cached response available
|
|
<span class="cov8" title="1">cfg.Logger.Warning(&libpack_logger.LogMessage{
|
|
Message: "Circuit open - no cached response available",
|
|
Pairs: map[string]interface{}{
|
|
"path": c.Path(),
|
|
},
|
|
})
|
|
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsCircuitFallbackFailed, nil)
|
|
return ErrCircuitOpen</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">return err</span>
|
|
}
|
|
|
|
// performProxyRequestWithRetries executes the proxy request with retries
|
|
// This is the original implementation extracted for reuse
|
|
func performProxyRequestWithRetries(c *fiber.Ctx, proxyURL string) error <span class="cov8" title="1">{
|
|
return retry.Do(
|
|
func() error </span><span class="cov8" title="1">{
|
|
if err := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient); err != nil </span><span class="cov8" title="1">{
|
|
// Check if this is a timeout error - don't retry timeouts
|
|
if strings.Contains(strings.ToLower(err.Error()), "timeout") ||
|
|
strings.Contains(strings.ToLower(err.Error()), "deadline exceeded") ||
|
|
strings.Contains(strings.ToLower(err.Error()), "context deadline exceeded") </span><span class="cov8" title="1">{
|
|
return retry.Unrecoverable(err)
|
|
}</span>
|
|
<span class="cov8" title="1">return err</span>
|
|
}
|
|
<span class="cov8" title="1">if c.Response().StatusCode() != fiber.StatusOK </span><span class="cov8" title="1">{
|
|
return fmt.Errorf("received non-200 response: %d", c.Response().StatusCode())
|
|
}</span>
|
|
<span class="cov8" title="1">return nil</span>
|
|
},
|
|
retry.Attempts(5),
|
|
retry.DelayType(retry.BackOffDelay),
|
|
retry.Delay(250*time.Millisecond),
|
|
retry.MaxDelay(5*time.Second),
|
|
retry.OnRetry(func(n uint, err error) <span class="cov8" title="1">{
|
|
cfg.Logger.Warning(&libpack_logger.LogMessage{
|
|
Message: "Retrying the request",
|
|
Pairs: map[string]interface{}{
|
|
"path": c.Path(),
|
|
"attempt": n + 1,
|
|
"error": err.Error(),
|
|
"error_type": fmt.Sprintf("%T", err),
|
|
"is_timeout": strings.Contains(strings.ToLower(err.Error()), "timeout"),
|
|
},
|
|
})
|
|
}</span>),
|
|
retry.LastErrorOnly(true),
|
|
)
|
|
}
|
|
|
|
// doProxyRequestWithTimeout performs a proxy request with proper timeout handling
|
|
func doProxyRequestWithTimeout(c *fiber.Ctx, proxyURL string, client *fasthttp.Client) error <span class="cov8" title="1">{
|
|
// Calculate timeout from client configuration
|
|
clientTimeout := time.Duration(cfg.Client.ClientTimeout) * time.Second
|
|
if clientTimeout <= 0 </span><span class="cov0" title="0">{
|
|
clientTimeout = 30 * time.Second
|
|
}</span>
|
|
|
|
// Acquire request and response objects
|
|
<span class="cov8" title="1">req := fasthttp.AcquireRequest()
|
|
resp := fasthttp.AcquireResponse()
|
|
defer fasthttp.ReleaseRequest(req)
|
|
defer fasthttp.ReleaseResponse(resp)
|
|
|
|
// Copy the original request
|
|
c.Request().CopyTo(req)
|
|
req.SetRequestURI(proxyURL)
|
|
|
|
// Perform the request with timeout
|
|
err := client.DoTimeout(req, resp, clientTimeout)
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
return err
|
|
}</span>
|
|
|
|
// Copy response back to fiber context
|
|
<span class="cov8" title="1">resp.CopyTo(c.Response())
|
|
|
|
return nil</span>
|
|
}
|
|
|
|
// handleGzippedResponse decompresses gzipped responses
|
|
func handleGzippedResponse(c *fiber.Ctx) error <span class="cov8" title="1">{
|
|
if !bytes.EqualFold(c.Response().Header.Peek("Content-Encoding"), []byte("gzip")) </span><span class="cov8" title="1">{
|
|
return nil
|
|
}</span>
|
|
|
|
// Use pooled gzip reader
|
|
<span class="cov8" title="1">reader, err := GetGzipReader(bytes.NewReader(c.Response().Body()))
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to create gzip reader",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return err
|
|
}</span>
|
|
<span class="cov8" title="1">defer func() </span><span class="cov8" title="1">{
|
|
// Return reader to pool
|
|
PutGzipReader(reader)
|
|
}</span>()
|
|
|
|
// Use pooled buffer for reading
|
|
<span class="cov8" title="1">buf := GetHTTPBuffer()
|
|
defer PutHTTPBuffer(buf)
|
|
|
|
// Read decompressed data into pooled buffer
|
|
_, err = io.Copy(buf, reader)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to decompress response",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
return err
|
|
}</span>
|
|
|
|
// Get decompressed data
|
|
<span class="cov8" title="1">decompressed := buf.Bytes()
|
|
|
|
// Update response
|
|
c.Response().SetBody(decompressed)
|
|
c.Response().Header.Del("Content-Encoding")
|
|
return nil</span>
|
|
}
|
|
|
|
// logDebugRequest logs the request details when in debug mode.
|
|
func logDebugRequest(c *fiber.Ctx) <span class="cov0" title="0">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Proxying the request",
|
|
Pairs: map[string]interface{}{
|
|
"path": c.Path(),
|
|
"body": string(c.Body()),
|
|
"headers": c.GetReqHeaders(),
|
|
"request_uuid": c.Locals("request_uuid"),
|
|
},
|
|
})
|
|
}</span>
|
|
|
|
// logDebugResponse logs the response details when in debug mode.
|
|
func logDebugResponse(c *fiber.Ctx) <span class="cov0" title="0">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Received proxied response",
|
|
Pairs: map[string]interface{}{
|
|
"path": c.Path(),
|
|
"response_body": string(c.Response().Body()),
|
|
"response_code": c.Response().StatusCode(),
|
|
"headers": c.GetRespHeaders(),
|
|
"request_uuid": c.Locals("request_uuid"),
|
|
},
|
|
})
|
|
}</span>
|
|
|
|
// safeMaxRequests converts MaxRequestsInHalfOpen safely to uint32, providing a fallback value if out of bounds
|
|
func safeMaxRequests(maxRequestsInHalfOpen int) uint32 <span class="cov8" title="1">{
|
|
// Check if value is invalid (negative or too large)
|
|
if maxRequestsInHalfOpen < 0 || maxRequestsInHalfOpen > math.MaxUint32 </span><span class="cov8" title="1">{
|
|
// Log warning and return a default value
|
|
if cfg != nil && cfg.Logger != nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Warning(&libpack_logger.LogMessage{
|
|
Message: "Invalid MaxRequestsInHalfOpen value, using default",
|
|
Pairs: map[string]interface{}{
|
|
"requested_value": maxRequestsInHalfOpen,
|
|
"default_value": defaultMaxRequestsInHalfOpen,
|
|
},
|
|
})
|
|
}</span>
|
|
<span class="cov8" title="1">return uint32(defaultMaxRequestsInHalfOpen)</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">return uint32(maxRequestsInHalfOpen)</span>
|
|
}
|
|
|
|
// updateCircuitBreakerState safely updates the circuit breaker state using atomic operations
|
|
func updateCircuitBreakerState(config *config, stateValue float64) <span class="cov8" title="1">{
|
|
// Update the state atomically using the new metrics system
|
|
if cbMetrics != nil </span><span class="cov8" title="1">{
|
|
cbMetrics.UpdateState(stateValue)
|
|
}</span>
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file21" style="display: none">package main
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/goccy/go-json"
|
|
goratecounter "github.com/lukaszraczylo/go-ratecounter"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
// RateLimitConfig holds the rate limit configuration for a role
|
|
type RateLimitConfig struct {
|
|
RateCounterTicker *goratecounter.RateCounter
|
|
Interval time.Duration `json:"interval"`
|
|
Req int `json:"req"`
|
|
}
|
|
|
|
// UnmarshalJSON implements custom JSON unmarshaling for RateLimitConfig
|
|
func (r *RateLimitConfig) UnmarshalJSON(data []byte) error <span class="cov8" title="1">{
|
|
// Use a temporary struct to unmarshal the JSON data
|
|
type RateLimitConfigTemp struct {
|
|
Interval interface{} `json:"interval"`
|
|
Req int `json:"req"`
|
|
}
|
|
|
|
var temp RateLimitConfigTemp
|
|
if err := json.Unmarshal(data, &temp); err != nil </span><span class="cov0" title="0">{
|
|
return err
|
|
}</span>
|
|
|
|
// Set the Req field directly
|
|
<span class="cov8" title="1">r.Req = temp.Req
|
|
|
|
// Handle the Interval field based on its type
|
|
switch v := temp.Interval.(type) </span>{
|
|
case string:<span class="cov8" title="1">
|
|
// Convert string to time.Duration
|
|
switch v </span>{
|
|
case "second":<span class="cov8" title="1">
|
|
r.Interval = time.Second</span>
|
|
case "minute":<span class="cov8" title="1">
|
|
r.Interval = time.Minute</span>
|
|
case "hour":<span class="cov8" title="1">
|
|
r.Interval = time.Hour</span>
|
|
case "day":<span class="cov8" title="1">
|
|
r.Interval = 24 * time.Hour</span>
|
|
default:<span class="cov8" title="1">
|
|
// Try to parse as a Go duration string (e.g. "1s", "5m")
|
|
var err error
|
|
r.Interval, err = time.ParseDuration(v)
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
return fmt.Errorf("invalid duration format: %s", v)
|
|
}</span>
|
|
}
|
|
case float64:<span class="cov8" title="1">
|
|
// Numeric value is assumed to be in seconds
|
|
r.Interval = time.Duration(v * float64(time.Second))</span>
|
|
default:<span class="cov0" title="0">
|
|
return fmt.Errorf("interval must be a string or number, got %T", v)</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">return nil</span>
|
|
}
|
|
|
|
var (
|
|
rateLimits = make(map[string]RateLimitConfig)
|
|
rateLimitMu sync.RWMutex
|
|
// Use atomic.Value for safe concurrent config swapping
|
|
rateLimitConfigAtomic atomic.Value
|
|
)
|
|
|
|
// Variable to hold the current load config function - allows for testing
|
|
var loadConfigFunc = loadConfigFromPath
|
|
|
|
// loadRatelimitConfig loads the rate limit configurations from file
|
|
func loadRatelimitConfig() error <span class="cov8" title="1">{
|
|
paths := []string{"/go/src/app/ratelimit.json", "./ratelimit.json", "./static/app/default-ratelimit.json"}
|
|
configError := NewRateLimitConfigError(paths)
|
|
|
|
// Try each path and collect detailed error information
|
|
for _, path := range paths </span><span class="cov8" title="1">{
|
|
if err := loadConfigFunc(path); err == nil </span><span class="cov8" title="1">{
|
|
return nil
|
|
}</span> else<span class="cov8" title="1"> {
|
|
// Store the specific error for this path
|
|
configError.PathErrors[path] = err.Error()
|
|
}</span>
|
|
}
|
|
|
|
// Log detailed error information
|
|
<span class="cov8" title="1">cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Failed to load rate limit configuration",
|
|
Pairs: map[string]interface{}{
|
|
"paths": paths,
|
|
"path_errors": configError.PathErrors,
|
|
},
|
|
})
|
|
|
|
return configError</span>
|
|
}
|
|
|
|
func loadConfigFromPath(path string) error <span class="cov8" title="1">{
|
|
file, err := os.ReadFile(path)
|
|
if err != nil </span><span class="cov8" title="1">{
|
|
// Provide more specific error message based on the error type
|
|
errMsg := ""
|
|
if os.IsNotExist(err) </span><span class="cov8" title="1">{
|
|
errMsg = "File not found"
|
|
}</span> else<span class="cov0" title="0"> if os.IsPermission(err) </span><span class="cov0" title="0">{
|
|
errMsg = "Permission denied"
|
|
}</span> else<span class="cov0" title="0"> {
|
|
errMsg = "I/O error: " + err.Error()
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Failed to load rate limit config",
|
|
Pairs: map[string]interface{}{
|
|
"path": path,
|
|
"error": errMsg,
|
|
"error_details": err.Error(),
|
|
},
|
|
})
|
|
return fmt.Errorf("%s", errMsg)</span>
|
|
}
|
|
|
|
<span class="cov8" title="1">var config struct {
|
|
RateLimit map[string]RateLimitConfig `json:"ratelimit"`
|
|
}
|
|
|
|
if err := json.Unmarshal(file, &config); err != nil </span><span class="cov8" title="1">{
|
|
errMsg := fmt.Sprintf("Invalid JSON format: %s", err.Error())
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Failed to parse rate limit config",
|
|
Pairs: map[string]interface{}{
|
|
"path": path,
|
|
"error": errMsg,
|
|
},
|
|
})
|
|
return fmt.Errorf("%s", errMsg)
|
|
}</span>
|
|
|
|
// Validate configuration
|
|
<span class="cov8" title="1">if len(config.RateLimit) == 0 </span><span class="cov0" title="0">{
|
|
errMsg := "Empty rate limit configuration"
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Invalid rate limit config",
|
|
Pairs: map[string]interface{}{
|
|
"path": path,
|
|
"error": errMsg,
|
|
},
|
|
})
|
|
return fmt.Errorf("%s", errMsg)
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">newRateLimits := make(map[string]RateLimitConfig, len(config.RateLimit))
|
|
for key, value := range config.RateLimit </span><span class="cov8" title="1">{
|
|
value.RateCounterTicker = goratecounter.NewRateCounter().WithConfig(goratecounter.RateCounterConfig{
|
|
Interval: value.Interval,
|
|
})
|
|
|
|
if cfg.LogLevel == "DEBUG" </span><span class="cov0" title="0">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Setting ratelimit config for role",
|
|
Pairs: map[string]interface{}{
|
|
"role": key,
|
|
"interval_used": value.Interval,
|
|
"ratelimit": value.Req,
|
|
},
|
|
})
|
|
}</span>
|
|
<span class="cov8" title="1">newRateLimits[key] = value</span>
|
|
}
|
|
|
|
// Use atomic swap for thread-safe configuration updates
|
|
<span class="cov8" title="1">rateLimitMu.Lock()
|
|
rateLimits = newRateLimits
|
|
// Store the new config atomically
|
|
rateLimitConfigAtomic.Store(newRateLimits)
|
|
rateLimitMu.Unlock()
|
|
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Rate limit config loaded",
|
|
Pairs: map[string]interface{}{"ratelimit": rateLimits},
|
|
})
|
|
return nil</span>
|
|
}
|
|
|
|
// rateLimitedRequest checks if a request should be rate-limited
|
|
func rateLimitedRequest(userID, userRole string) bool <span class="cov8" title="1">{
|
|
// Try to get config from atomic value first for better performance
|
|
if configInterface := rateLimitConfigAtomic.Load(); configInterface != nil </span><span class="cov8" title="1">{
|
|
if config, ok := configInterface.(map[string]RateLimitConfig); ok </span><span class="cov8" title="1">{
|
|
if roleConfig, exists := config[userRole]; exists && roleConfig.RateCounterTicker != nil </span><span class="cov8" title="1">{
|
|
return checkRateLimit(userID, userRole, roleConfig)
|
|
}</span>
|
|
}
|
|
}
|
|
|
|
// Fallback to mutex-protected access
|
|
<span class="cov8" title="1">rateLimitMu.RLock()
|
|
roleConfig, ok := rateLimits[userRole]
|
|
rateLimitMu.RUnlock()
|
|
|
|
if !ok || roleConfig.RateCounterTicker == nil </span><span class="cov8" title="1">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Rate limit role not found or ticker not initialized - defaulting to deny",
|
|
Pairs: map[string]interface{}{"user_role": userRole},
|
|
})
|
|
// Default to deny when config not found (security fix)
|
|
return false
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return checkRateLimit(userID, userRole, roleConfig)</span>
|
|
}
|
|
|
|
// checkRateLimit performs the actual rate limit check
|
|
func checkRateLimit(userID, userRole string, roleConfig RateLimitConfig) bool <span class="cov8" title="1">{
|
|
roleConfig.RateCounterTicker.Incr(1)
|
|
tickerRate := roleConfig.RateCounterTicker.GetRate()
|
|
|
|
logDetails := map[string]interface{}{
|
|
"user_role": userRole,
|
|
"user_id": userID,
|
|
"rate": tickerRate,
|
|
"config_rate": roleConfig.Req,
|
|
"interval": roleConfig.Interval,
|
|
}
|
|
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Rate limit ticker",
|
|
Pairs: map[string]interface{}{"log_details": logDetails},
|
|
})
|
|
|
|
if tickerRate > float64(roleConfig.Req) </span><span class="cov8" title="1">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Rate limit exceeded",
|
|
Pairs: map[string]interface{}{"log_details": logDetails},
|
|
})
|
|
return false
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">return true</span>
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file22" style="display: none">package main
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
)
|
|
|
|
// RateLimitConfigError represents a detailed error when loading rate limit configuration
|
|
type RateLimitConfigError struct {
|
|
Paths []string
|
|
// Map of path -> error message
|
|
PathErrors map[string]string
|
|
}
|
|
|
|
// Error implements the error interface
|
|
func (e *RateLimitConfigError) Error() string <span class="cov0" title="0">{
|
|
sb := strings.Builder{}
|
|
sb.WriteString("Failed to load rate limit configuration. Please ensure a valid configuration file exists at one of these locations:\n")
|
|
|
|
for _, path := range e.Paths </span><span class="cov0" title="0">{
|
|
errMsg := e.PathErrors[path]
|
|
sb.WriteString(fmt.Sprintf(" - %s: %s\n", path, errMsg))
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">sb.WriteString("\nTo resolve this issue:\n")
|
|
sb.WriteString("1. Create a valid JSON file using the following template:\n")
|
|
sb.WriteString(` {
|
|
"ratelimit": {
|
|
"admin": {
|
|
"req": 100,
|
|
"interval": "second"
|
|
},
|
|
"guest": {
|
|
"req": 3,
|
|
"interval": "second"
|
|
},
|
|
"-": {
|
|
"req": 10,
|
|
"interval": "minute"
|
|
}
|
|
}
|
|
}`)
|
|
sb.WriteString("\n\nThe 'interval' field supports the following formats:\n")
|
|
sb.WriteString(" - String values: \"second\", \"minute\", \"hour\", \"day\"\n")
|
|
sb.WriteString(" - Go duration strings: \"5s\", \"10m\", \"1h\"\n")
|
|
sb.WriteString(" - Numeric values (in seconds): 60, 3600\n")
|
|
sb.WriteString("\n2. Save it as 'ratelimit.json' in the current directory or in '/go/src/app/' (in Docker)\n")
|
|
sb.WriteString("3. Ensure the file has correct permissions and is accessible by the service\n")
|
|
|
|
return sb.String()</span>
|
|
}
|
|
|
|
// NewRateLimitConfigError creates a new rate limit configuration error
|
|
func NewRateLimitConfigError(paths []string) *RateLimitConfigError <span class="cov8" title="1">{
|
|
return &RateLimitConfigError{
|
|
Paths: paths,
|
|
PathErrors: make(map[string]string),
|
|
}
|
|
}</span>
|
|
</pre>
|
|
|
|
<pre class="file" id="file23" style="display: none">package main
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/goccy/go-json"
|
|
fiber "github.com/gofiber/fiber/v2"
|
|
"github.com/gofiber/fiber/v2/middleware/cors"
|
|
"github.com/google/uuid"
|
|
|
|
graphql "github.com/lukaszraczylo/go-simple-graphql"
|
|
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
|
|
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
|
|
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
|
|
)
|
|
|
|
const (
|
|
healthCheckQueryStr = `{ __typename }`
|
|
)
|
|
|
|
// HealthCheckResponse represents the response structure for health check endpoints
|
|
type HealthCheckResponse struct {
|
|
Status string `json:"status"` // overall status: "healthy" or "unhealthy"
|
|
Dependencies map[string]DependencyStatus `json:"dependencies"` // status of each dependency
|
|
Timestamp string `json:"timestamp"` // when the health check was performed
|
|
}
|
|
|
|
// DependencyStatus represents the status of a dependency
|
|
type DependencyStatus struct {
|
|
Status string `json:"status"` // "up" or "down"
|
|
ResponseTime int64 `json:"responseTime"` // in milliseconds
|
|
Error *string `json:"error,omitempty"` // error message if any
|
|
}
|
|
|
|
// StartHTTPProxy initializes and starts the HTTP proxy server.
|
|
func StartHTTPProxy() error <span class="cov0" title="0">{
|
|
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Starting the HTTP proxy",
|
|
})
|
|
|
|
serverConfig := fiber.Config{
|
|
DisableStartupMessage: true,
|
|
AppName: fmt.Sprintf("GraphQL Monitoring Proxy - %s v%s", libpack_config.PKG_NAME, libpack_config.PKG_VERSION),
|
|
IdleTimeout: time.Duration(cfg.Client.ClientTimeout) * time.Second,
|
|
ReadTimeout: time.Duration(cfg.Client.ClientTimeout) * time.Second,
|
|
WriteTimeout: time.Duration(cfg.Client.ClientTimeout) * time.Second,
|
|
JSONEncoder: json.Marshal,
|
|
JSONDecoder: json.Unmarshal,
|
|
}
|
|
|
|
server := fiber.New(serverConfig)
|
|
|
|
server.Use(cors.New(cors.Config{
|
|
AllowOrigins: "*",
|
|
}))
|
|
|
|
server.Use(AddRequestUUID)
|
|
|
|
server.Get("/healthz", healthCheck)
|
|
server.Get("/livez", healthCheck)
|
|
server.Get("/health", healthCheck)
|
|
|
|
server.Post("/*", processGraphQLRequest)
|
|
server.Get("/*", proxyTheRequestToDefault)
|
|
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "GraphQL proxy starting",
|
|
Pairs: map[string]interface{}{"port": cfg.Server.PortGraphQL},
|
|
})
|
|
|
|
if err := server.Listen(fmt.Sprintf(":%d", cfg.Server.PortGraphQL)); err != nil </span><span class="cov0" title="0">{
|
|
return fmt.Errorf("failed to start HTTP proxy server on port %d: %w",
|
|
cfg.Server.PortGraphQL, err)
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">return nil</span>
|
|
}
|
|
|
|
// proxyTheRequestToDefault proxies the request to the default GraphQL endpoint.
|
|
func proxyTheRequestToDefault(c *fiber.Ctx) error <span class="cov0" title="0">{
|
|
return proxyTheRequest(c, cfg.Server.HostGraphQL)
|
|
}</span>
|
|
|
|
// AddRequestUUID adds a unique request UUID to the context.
|
|
func AddRequestUUID(c *fiber.Ctx) error <span class="cov0" title="0">{
|
|
c.Locals("request_uuid", uuid.NewString())
|
|
return c.Next()
|
|
}</span>
|
|
|
|
// checkAllowedURLs checks if the requested URL is allowed.
|
|
func checkAllowedURLs(c *fiber.Ctx) bool <span class="cov8" title="1">{
|
|
if len(allowedUrls) == 0 </span><span class="cov8" title="1">{
|
|
return true
|
|
}</span>
|
|
<span class="cov8" title="1">path := c.OriginalURL()
|
|
_, ok := allowedUrls[path]
|
|
return ok</span>
|
|
}
|
|
|
|
// healthCheck performs a comprehensive health check on the GraphQL server and its dependencies.
|
|
func healthCheck(c *fiber.Ctx) error <span class="cov0" title="0">{
|
|
// Prepare the response structure
|
|
response := HealthCheckResponse{
|
|
Status: "healthy",
|
|
Dependencies: make(map[string]DependencyStatus),
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
}
|
|
|
|
// Configure checks from query parameters
|
|
checkGraphQL := true
|
|
checkRedis := cfg.Cache.CacheRedisEnable
|
|
|
|
// Parse query parameters to enable/disable specific checks
|
|
if c.Query("check_graphql") == "false" </span><span class="cov0" title="0">{
|
|
checkGraphQL = false
|
|
}</span>
|
|
<span class="cov0" title="0">if c.Query("check_redis") == "false" </span><span class="cov0" title="0">{
|
|
checkRedis = false
|
|
}</span>
|
|
|
|
// Check GraphQL backend service
|
|
<span class="cov0" title="0">if checkGraphQL </span><span class="cov0" title="0">{
|
|
startTime := time.Now()
|
|
graphqlStatus := DependencyStatus{
|
|
Status: "up",
|
|
}
|
|
|
|
// Try to connect to main GraphQL endpoint
|
|
endpoint := cfg.Server.HostGraphQL
|
|
if len(cfg.Server.HealthcheckGraphQL) > 0 </span><span class="cov0" title="0">{
|
|
endpoint = cfg.Server.HealthcheckGraphQL
|
|
}</span>
|
|
|
|
// Create a new GraphQL client for the health check
|
|
<span class="cov0" title="0">tempClient := graphql.NewConnection()
|
|
tempClient.SetEndpoint(endpoint)
|
|
_, err := tempClient.Query(healthCheckQueryStr, nil, nil)
|
|
|
|
graphqlStatus.ResponseTime = time.Since(startTime).Milliseconds()
|
|
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
errorMsg := err.Error()
|
|
graphqlStatus.Status = "down"
|
|
graphqlStatus.Error = &errorMsg
|
|
response.Status = "unhealthy"
|
|
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Health check: Can't reach the GraphQL server",
|
|
Pairs: map[string]interface{}{
|
|
"endpoint": endpoint,
|
|
"error": errorMsg,
|
|
"response_time_ms": graphqlStatus.ResponseTime,
|
|
},
|
|
})
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">response.Dependencies["graphql"] = graphqlStatus</span>
|
|
}
|
|
|
|
// Check Redis connectivity if enabled
|
|
<span class="cov0" title="0">if checkRedis && cfg.Cache.CacheRedisEnable </span><span class="cov0" title="0">{
|
|
startTime := time.Now()
|
|
redisStatus := DependencyStatus{
|
|
Status: "up",
|
|
}
|
|
|
|
// Implement proper Redis connectivity test
|
|
redisAccessible := false
|
|
var redisError error
|
|
|
|
if libpack_cache.IsCacheInitialized() </span><span class="cov0" title="0">{
|
|
// Try a simple Redis operation to test connectivity
|
|
testKey := "health_check_test"
|
|
testValue := []byte("test")
|
|
|
|
// Try to set and get a test value
|
|
libpack_cache.CacheStore(testKey, testValue)
|
|
retrievedValue := libpack_cache.CacheLookup(testKey)
|
|
|
|
if retrievedValue != nil && string(retrievedValue) == "test" </span><span class="cov0" title="0">{
|
|
redisAccessible = true
|
|
// Clean up test key
|
|
libpack_cache.CacheDelete(testKey)
|
|
}</span> else<span class="cov0" title="0"> {
|
|
redisError = fmt.Errorf("redis test operation failed")
|
|
}</span>
|
|
} else<span class="cov0" title="0"> {
|
|
redisError = fmt.Errorf("cache not initialized")
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">redisStatus.ResponseTime = time.Since(startTime).Milliseconds()
|
|
|
|
if !redisAccessible </span><span class="cov0" title="0">{
|
|
errorMsg := "Failed to connect to Redis"
|
|
if redisError != nil </span><span class="cov0" title="0">{
|
|
errorMsg = redisError.Error()
|
|
}</span>
|
|
<span class="cov0" title="0">redisStatus.Status = "down"
|
|
redisStatus.Error = &errorMsg
|
|
response.Status = "unhealthy"
|
|
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Health check: Can't connect to Redis",
|
|
Pairs: map[string]interface{}{
|
|
"server": cfg.Cache.CacheRedisURL,
|
|
"error": errorMsg,
|
|
"response_time_ms": redisStatus.ResponseTime,
|
|
},
|
|
})</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">response.Dependencies["redis"] = redisStatus</span>
|
|
}
|
|
|
|
// Determine appropriate HTTP status code
|
|
<span class="cov0" title="0">httpStatus := fiber.StatusOK
|
|
if response.Status == "unhealthy" </span><span class="cov0" title="0">{
|
|
httpStatus = fiber.StatusServiceUnavailable
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">cfg.Logger.Debug(&libpack_logger.LogMessage{
|
|
Message: "Health check completed",
|
|
Pairs: map[string]interface{}{
|
|
"status": response.Status,
|
|
"dependencies": response.Dependencies,
|
|
},
|
|
})
|
|
|
|
// Return JSON response
|
|
return c.Status(httpStatus).JSON(response)</span>
|
|
}
|
|
|
|
// processGraphQLRequest handles the incoming GraphQL requests.
|
|
func processGraphQLRequest(c *fiber.Ctx) error <span class="cov0" title="0">{
|
|
startTime := time.Now()
|
|
|
|
// Extract user information and check permissions
|
|
extractedUserID, extractedRoleName := extractUserInfo(c)
|
|
|
|
// Check if user is banned
|
|
if checkIfUserIsBanned(c, extractedUserID) </span><span class="cov0" title="0">{
|
|
return c.Status(fiber.StatusForbidden).SendString("User is banned")
|
|
}</span>
|
|
|
|
// Apply rate limiting if enabled
|
|
<span class="cov0" title="0">if cfg.Client.RoleRateLimit && !rateLimitedRequest(extractedUserID, extractedRoleName) </span><span class="cov0" title="0">{
|
|
return c.Status(fiber.StatusTooManyRequests).SendString("Rate limit exceeded, try again later")
|
|
}</span>
|
|
|
|
// Parse the GraphQL query
|
|
<span class="cov0" title="0">parsedResult := parseGraphQLQuery(c)
|
|
if parsedResult.shouldBlock </span><span class="cov0" title="0">{
|
|
return c.Status(fiber.StatusForbidden).SendString("Request blocked")
|
|
}</span>
|
|
|
|
// Handle non-GraphQL requests
|
|
<span class="cov0" title="0">if parsedResult.shouldIgnore </span><span class="cov0" title="0">{
|
|
return proxyTheRequest(c, parsedResult.activeEndpoint)
|
|
}</span>
|
|
|
|
// Handle caching
|
|
<span class="cov0" title="0">wasCached, err := handleCaching(c, parsedResult, extractedUserID)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return err
|
|
}</span>
|
|
|
|
// Log and monitor the request
|
|
<span class="cov0" title="0">logAndMonitorRequest(c, extractedUserID, parsedResult.operationType, parsedResult.operationName, wasCached, time.Since(startTime), startTime)
|
|
|
|
return nil</span>
|
|
}
|
|
|
|
// extractUserInfo extracts user ID and role from request headers
|
|
func extractUserInfo(c *fiber.Ctx) (string, string) <span class="cov0" title="0">{
|
|
extractedUserID := "-"
|
|
extractedRoleName := "-"
|
|
|
|
// Extract from JWT if available
|
|
if authorization := c.Get("Authorization"); authorization != "" &&
|
|
(len(cfg.Client.JWTUserClaimPath) > 0 || len(cfg.Client.JWTRoleClaimPath) > 0) </span><span class="cov0" title="0">{
|
|
extractedUserID, extractedRoleName = extractClaimsFromJWTHeader(authorization)
|
|
}</span>
|
|
|
|
// Override role from header if configured
|
|
<span class="cov0" title="0">if cfg.Client.RoleFromHeader != "" </span><span class="cov0" title="0">{
|
|
if role := c.Get(cfg.Client.RoleFromHeader); role != "" </span><span class="cov0" title="0">{
|
|
extractedRoleName = role
|
|
}</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">return extractedUserID, extractedRoleName</span>
|
|
}
|
|
|
|
// handleCaching manages the caching logic for GraphQL requests
|
|
func handleCaching(c *fiber.Ctx, parsedResult *parseGraphQLQueryResult, userID string) (bool, error) <span class="cov0" title="0">{
|
|
// Calculate query hash for cache key
|
|
calculatedQueryHash := libpack_cache.CalculateHash(c)
|
|
|
|
// Set cache time from header or default
|
|
if parsedResult.cacheTime == 0 </span><span class="cov0" title="0">{
|
|
if cacheQuery := c.Get("X-Cache-Graphql-Query"); cacheQuery != "" </span><span class="cov0" title="0">{
|
|
parsedResult.cacheTime, _ = strconv.Atoi(cacheQuery)
|
|
}</span> else<span class="cov0" title="0"> {
|
|
parsedResult.cacheTime = cfg.Cache.CacheTTL
|
|
}</span>
|
|
}
|
|
|
|
// Handle cache refresh directive
|
|
<span class="cov0" title="0">if parsedResult.cacheRefresh </span><span class="cov0" title="0">{
|
|
libpack_cache.CacheDelete(calculatedQueryHash)
|
|
}</span>
|
|
|
|
// Check if caching is enabled
|
|
<span class="cov0" title="0">cacheEnabled := parsedResult.cacheRequest || cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable
|
|
if !cacheEnabled </span><span class="cov0" title="0">{
|
|
// No caching, just proxy the request
|
|
if err := proxyTheRequest(c, parsedResult.activeEndpoint); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
|
|
return false, c.Status(fiber.StatusInternalServerError).SendString("Can't proxy the request - try again later")
|
|
}</span>
|
|
<span class="cov0" title="0">return false, nil</span>
|
|
}
|
|
|
|
// Try to get from cache
|
|
<span class="cov0" title="0">if cachedResponse := libpack_cache.CacheLookup(calculatedQueryHash); cachedResponse != nil </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheHit, nil)
|
|
c.Set("X-Cache-Hit", "true")
|
|
c.Set("Content-Type", "application/json")
|
|
return true, c.Send(cachedResponse)
|
|
}</span>
|
|
|
|
// Cache miss, proxy and cache
|
|
<span class="cov0" title="0">cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheMiss, nil)
|
|
if err := proxyAndCacheTheRequest(c, calculatedQueryHash, parsedResult.cacheTime, parsedResult.activeEndpoint); err != nil </span><span class="cov0" title="0">{
|
|
return false, err
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">return false, nil</span>
|
|
}
|
|
|
|
// proxyAndCacheTheRequest proxies and caches the request if needed.
|
|
func proxyAndCacheTheRequest(c *fiber.Ctx, queryCacheHash string, cacheTime int, currentEndpoint string) error <span class="cov0" title="0">{
|
|
if err := proxyTheRequest(c, currentEndpoint); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logger.LogMessage{
|
|
Message: "Can't proxy the request",
|
|
Pairs: map[string]interface{}{"error": err.Error()},
|
|
})
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
|
|
return c.Status(fiber.StatusInternalServerError).SendString("Can't proxy the request - try again later")
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">libpack_cache.CacheStoreWithTTL(queryCacheHash, c.Response().Body(), time.Duration(cacheTime)*time.Second)
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsQueriesCached, nil)
|
|
return c.Send(c.Response().Body())</span>
|
|
}
|
|
|
|
// logAndMonitorRequest logs and monitors the request processing.
|
|
func logAndMonitorRequest(c *fiber.Ctx, userID, opType, opName string, wasCached bool, duration time.Duration, startTime time.Time) <span class="cov0" title="0">{
|
|
labels := map[string]string{
|
|
"op_type": opType,
|
|
"op_name": opName,
|
|
"cached": strconv.FormatBool(wasCached),
|
|
"user_id": userID,
|
|
}
|
|
|
|
if cfg.Server.AccessLog </span><span class="cov0" title="0">{
|
|
cfg.Logger.Info(&libpack_logger.LogMessage{
|
|
Message: "Request processed",
|
|
Pairs: map[string]interface{}{
|
|
"ip": c.IP(),
|
|
"fwd-ip": c.Get("X-Forwarded-For"),
|
|
"user_id": userID,
|
|
"op_type": opType,
|
|
"op_name": opName,
|
|
"time": duration,
|
|
"cache": wasCached,
|
|
"request_uuid": c.Locals("request_uuid"),
|
|
},
|
|
})
|
|
}</span>
|
|
|
|
<span class="cov0" title="0">cfg.Monitoring.Increment(libpack_monitoring.MetricsSucceeded, nil)
|
|
cfg.Monitoring.Increment(libpack_monitoring.MetricsExecutedQuery, labels)
|
|
|
|
if !wasCached </span><span class="cov0" title="0">{
|
|
cfg.Monitoring.UpdateDuration(libpack_monitoring.MetricsTimedQuery, labels, startTime)
|
|
cfg.Monitoring.Update(libpack_monitoring.MetricsTimedQuery, labels, float64(duration.Milliseconds()))
|
|
}</span>
|
|
}
|
|
</pre>
|
|
|
|
<pre class="file" id="file24" style="display: none">package main
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
|
)
|
|
|
|
// ShutdownManager manages graceful shutdown for all components
|
|
type ShutdownManager struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
components []ShutdownComponent
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// ShutdownComponent represents a component that needs graceful shutdown
|
|
type ShutdownComponent struct {
|
|
Name string
|
|
Shutdown func(context.Context) error
|
|
}
|
|
|
|
// NewShutdownManager creates a new shutdown manager
|
|
func NewShutdownManager(ctx context.Context) *ShutdownManager <span class="cov0" title="0">{
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &ShutdownManager{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}</span>
|
|
|
|
// RegisterComponent registers a component for graceful shutdown
|
|
func (sm *ShutdownManager) RegisterComponent(name string, shutdown func(context.Context) error) <span class="cov0" title="0">{
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
sm.components = append(sm.components, ShutdownComponent{
|
|
Name: name,
|
|
Shutdown: shutdown,
|
|
})
|
|
}</span>
|
|
|
|
// RunGoroutine starts a goroutine that respects the shutdown context
|
|
func (sm *ShutdownManager) RunGoroutine(name string, fn func(context.Context)) <span class="cov0" title="0">{
|
|
sm.wg.Add(1)
|
|
go func() </span><span class="cov0" title="0">{
|
|
defer sm.wg.Done()
|
|
cfg.Logger.Debug(&libpack_logging.LogMessage{
|
|
Message: "Starting managed goroutine",
|
|
Pairs: map[string]interface{}{"name": name},
|
|
})
|
|
fn(sm.ctx)
|
|
cfg.Logger.Debug(&libpack_logging.LogMessage{
|
|
Message: "Managed goroutine finished",
|
|
Pairs: map[string]interface{}{"name": name},
|
|
})
|
|
}</span>()
|
|
}
|
|
|
|
// Shutdown initiates graceful shutdown of all components
|
|
func (sm *ShutdownManager) Shutdown(timeout time.Duration) error <span class="cov0" title="0">{
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Initiating graceful shutdown",
|
|
})
|
|
|
|
// Cancel the context to signal all goroutines to stop
|
|
sm.cancel()
|
|
|
|
// Create a timeout context for component shutdown
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), timeout)
|
|
defer shutdownCancel()
|
|
|
|
// Shutdown all registered components
|
|
sm.mu.Lock()
|
|
components := make([]ShutdownComponent, len(sm.components))
|
|
copy(components, sm.components)
|
|
sm.mu.Unlock()
|
|
|
|
var shutdownWg sync.WaitGroup
|
|
for _, comp := range components </span><span class="cov0" title="0">{
|
|
shutdownWg.Add(1)
|
|
go func(c ShutdownComponent) </span><span class="cov0" title="0">{
|
|
defer shutdownWg.Done()
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "Shutting down component",
|
|
Pairs: map[string]interface{}{"component": c.Name},
|
|
})
|
|
if err := c.Shutdown(shutdownCtx); err != nil </span><span class="cov0" title="0">{
|
|
cfg.Logger.Error(&libpack_logging.LogMessage{
|
|
Message: "Error shutting down component",
|
|
Pairs: map[string]interface{}{
|
|
"component": c.Name,
|
|
"error": err.Error(),
|
|
},
|
|
})
|
|
}</span>
|
|
}(comp)
|
|
}
|
|
|
|
// Wait for all components to shutdown
|
|
<span class="cov0" title="0">componentsDone := make(chan struct{})
|
|
go func() </span><span class="cov0" title="0">{
|
|
shutdownWg.Wait()
|
|
close(componentsDone)
|
|
}</span>()
|
|
|
|
// Wait for goroutines with timeout
|
|
<span class="cov0" title="0">goroutinesDone := make(chan struct{})
|
|
go func() </span><span class="cov0" title="0">{
|
|
sm.wg.Wait()
|
|
close(goroutinesDone)
|
|
}</span>()
|
|
|
|
<span class="cov0" title="0">select </span>{
|
|
case <-componentsDone:<span class="cov0" title="0">
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "All components shut down successfully",
|
|
})</span>
|
|
case <-shutdownCtx.Done():<span class="cov0" title="0">
|
|
cfg.Logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Component shutdown timed out",
|
|
})</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">select </span>{
|
|
case <-goroutinesDone:<span class="cov0" title="0">
|
|
cfg.Logger.Info(&libpack_logging.LogMessage{
|
|
Message: "All goroutines finished",
|
|
})</span>
|
|
case <-time.After(timeout):<span class="cov0" title="0">
|
|
cfg.Logger.Warning(&libpack_logging.LogMessage{
|
|
Message: "Some goroutines didn't finish within timeout",
|
|
})</span>
|
|
}
|
|
|
|
<span class="cov0" title="0">return nil</span>
|
|
}</pre>
|
|
|
|
<pre class="file" id="file25" style="display: none">package tracing
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type TracingSetup struct {
|
|
tracerProvider *sdktrace.TracerProvider
|
|
tracer trace.Tracer
|
|
}
|
|
|
|
type TraceSpanInfo struct {
|
|
TraceParent string `json:"traceparent"`
|
|
}
|
|
|
|
// NewTracing creates a new tracing setup with OTLP exporter
|
|
func NewTracing(ctx context.Context, endpoint string) (*TracingSetup, error) <span class="cov8" title="1">{
|
|
if ctx == nil </span><span class="cov0" title="0">{
|
|
return nil, fmt.Errorf("context cannot be nil")
|
|
}</span>
|
|
<span class="cov8" title="1">if endpoint == "" </span><span class="cov8" title="1">{
|
|
return nil, fmt.Errorf("endpoint cannot be empty")
|
|
}</span>
|
|
|
|
// Validate endpoint format
|
|
// A simple validation to check if the endpoint has a reasonable format
|
|
// We're looking for hostname:port where port is a valid port number (0-65535)
|
|
<span class="cov8" title="1">var host string
|
|
var port int
|
|
if n, err := fmt.Sscanf(endpoint, "%s:%d", &host, &port); err != nil || n != 2 </span><span class="cov8" title="1">{
|
|
return nil, fmt.Errorf("invalid endpoint format: must be 'hostname:port'")
|
|
}</span>
|
|
<span class="cov0" title="0">if port < 0 || port > 65535 </span><span class="cov0" title="0">{
|
|
return nil, fmt.Errorf("invalid port number: must be between 0 and 65535")
|
|
}</span>
|
|
|
|
// Create the exporter directly with the endpoint
|
|
<span class="cov0" title="0">exporter, err := otlptracegrpc.New(ctx,
|
|
otlptracegrpc.WithEndpoint(endpoint),
|
|
otlptracegrpc.WithInsecure(),
|
|
otlptracegrpc.WithTimeout(5*time.Second),
|
|
otlptracegrpc.WithDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(16*1024*1024))), // 16MB max message size
|
|
)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
|
|
}</span>
|
|
|
|
// Create a resource with more detailed attributes
|
|
<span class="cov0" title="0">res, err := resource.New(ctx,
|
|
resource.WithAttributes(
|
|
semconv.ServiceName("graphql-monitoring-proxy"),
|
|
semconv.ServiceVersion("1.0"),
|
|
semconv.DeploymentEnvironment("production"),
|
|
attribute.String("application.type", "proxy"),
|
|
),
|
|
resource.WithHost(), // Add host information
|
|
resource.WithOSType(), // Add OS information
|
|
resource.WithProcessPID(), // Add process information
|
|
)
|
|
if err != nil </span><span class="cov0" title="0">{
|
|
return nil, fmt.Errorf("failed to create resource: %w", err)
|
|
}</span>
|
|
|
|
// Create the tracer provider with improved configuration
|
|
<span class="cov0" title="0">tracerProvider := sdktrace.NewTracerProvider(
|
|
sdktrace.WithBatcher(exporter,
|
|
// Configure batch processing
|
|
sdktrace.WithMaxExportBatchSize(512),
|
|
sdktrace.WithBatchTimeout(3*time.Second),
|
|
sdktrace.WithMaxQueueSize(2048),
|
|
),
|
|
sdktrace.WithResource(res),
|
|
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.1)), // Sample 10% of traces
|
|
)
|
|
|
|
// Set the global tracer provider and propagator
|
|
otel.SetTracerProvider(tracerProvider)
|
|
otel.SetTextMapPropagator(propagation.TraceContext{})
|
|
|
|
// Create a tracer
|
|
tracer := tracerProvider.Tracer("graphql-monitoring-proxy")
|
|
|
|
return &TracingSetup{
|
|
tracerProvider: tracerProvider,
|
|
tracer: tracer,
|
|
}, nil</span>
|
|
}
|
|
|
|
// ExtractSpanContext extracts span context from TraceSpanInfo
|
|
func (ts *TracingSetup) ExtractSpanContext(spanInfo *TraceSpanInfo) (trace.SpanContext, error) <span class="cov8" title="1">{
|
|
carrier := propagation.MapCarrier{
|
|
"traceparent": spanInfo.TraceParent,
|
|
}
|
|
ctx := context.Background()
|
|
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
|
|
spanCtx := trace.SpanContextFromContext(ctx)
|
|
if !spanCtx.IsValid() </span><span class="cov8" title="1">{
|
|
return trace.SpanContext{}, fmt.Errorf("invalid span context")
|
|
}</span>
|
|
<span class="cov0" title="0">return spanCtx, nil</span>
|
|
}
|
|
|
|
// ParseTraceHeader parses X-Trace-Span header content
|
|
func ParseTraceHeader(headerContent string) (*TraceSpanInfo, error) <span class="cov8" title="1">{
|
|
var spanInfo TraceSpanInfo
|
|
if err := json.Unmarshal([]byte(headerContent), &spanInfo); err != nil </span><span class="cov8" title="1">{
|
|
return nil, fmt.Errorf("failed to parse trace header: %w", err)
|
|
}</span>
|
|
<span class="cov8" title="1">return &spanInfo, nil</span>
|
|
}
|
|
|
|
// Shutdown cleanly shuts down the tracer provider
|
|
func (ts *TracingSetup) Shutdown(ctx context.Context) error <span class="cov8" title="1">{
|
|
if ts.tracerProvider == nil </span><span class="cov8" title="1">{
|
|
return nil
|
|
}</span>
|
|
<span class="cov0" title="0">return ts.tracerProvider.Shutdown(ctx)</span>
|
|
}
|
|
|
|
// StartSpan starts a new span with the given name and parent context
|
|
func (ts *TracingSetup) StartSpan(ctx context.Context, name string) (trace.Span, context.Context) <span class="cov8" title="1">{
|
|
if ts == nil || ts.tracer == nil </span><span class="cov8" title="1">{
|
|
// Return a no-op span if tracing is not configured
|
|
return trace.SpanFromContext(ctx), ctx
|
|
}</span>
|
|
|
|
// Add common attributes to all spans
|
|
<span class="cov8" title="1">opts := []trace.SpanStartOption{
|
|
trace.WithAttributes(
|
|
semconv.ServiceName("graphql-monitoring-proxy"),
|
|
semconv.ServiceVersion("1.0"),
|
|
),
|
|
}
|
|
|
|
ctx, span := ts.tracer.Start(ctx, name, opts...)
|
|
return span, ctx</span>
|
|
}
|
|
|
|
// StartSpanWithAttributes starts a new span with custom attributes
|
|
func (ts *TracingSetup) StartSpanWithAttributes(ctx context.Context, name string, attrs map[string]string) (trace.Span, context.Context) <span class="cov8" title="1">{
|
|
if ts == nil || ts.tracer == nil </span><span class="cov8" title="1">{
|
|
return trace.SpanFromContext(ctx), ctx
|
|
}</span>
|
|
|
|
// Convert string attributes to KeyValue pairs
|
|
<span class="cov8" title="1">attributes := make([]attribute.KeyValue, 0, len(attrs)+2)
|
|
attributes = append(attributes,
|
|
semconv.ServiceName("graphql-monitoring-proxy"),
|
|
semconv.ServiceVersion("1.0"),
|
|
)
|
|
|
|
for k, v := range attrs </span><span class="cov8" title="1">{
|
|
attributes = append(attributes, attribute.String(k, v))
|
|
}</span>
|
|
|
|
<span class="cov8" title="1">ctx, span := ts.tracer.Start(ctx, name, trace.WithAttributes(attributes...))
|
|
return span, ctx</span>
|
|
}
|
|
</pre>
|
|
|
|
</div>
|
|
</body>
|
|
<script>
|
|
(function() {
|
|
var files = document.getElementById('files');
|
|
var visible;
|
|
files.addEventListener('change', onChange, false);
|
|
function select(part) {
|
|
if (visible)
|
|
visible.style.display = 'none';
|
|
visible = document.getElementById(part);
|
|
if (!visible)
|
|
return;
|
|
files.value = part;
|
|
visible.style.display = 'block';
|
|
location.hash = part;
|
|
}
|
|
function onChange() {
|
|
select(files.value);
|
|
window.scrollTo(0, 0);
|
|
}
|
|
if (location.hash != "") {
|
|
select(location.hash.substr(1));
|
|
}
|
|
if (!visible) {
|
|
select("file0");
|
|
}
|
|
})();
|
|
</script>
|
|
</html>
|