mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-14 02:32:10 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| da8ec5f21d | |||
| 3d80f457d3 | |||
| 09c3e4cd95 | |||
| d07ee4090c | |||
| b1045b8bc2 |
@@ -155,6 +155,7 @@ You can still use the non-prefixed environment variables in the spirit of the ba
|
||||
| `CACHE_TTL` | The cache TTL | `60` |
|
||||
| `CACHE_MAX_MEMORY_SIZE` | Maximum memory size for cache in MB | `100` |
|
||||
| `CACHE_MAX_ENTRIES` | Maximum number of entries in cache | `10000` |
|
||||
| `CACHE_USE_LRU` | Use LRU eviction algorithm (see [Cache Eviction](#cache-eviction-algorithms)) | `false` |
|
||||
| `CACHE_PER_USER_DISABLED` | **⚠️ SECURITY**: Disable per-user cache isolation | `false` (**DO NOT** set to `true` in multi-user apps) |
|
||||
| `ENABLE_REDIS_CACHE` | Enable distributed Redis cache | `false` |
|
||||
| `CACHE_REDIS_URL` | URL to redis server / cluster endpoint | `localhost:6379` |
|
||||
@@ -439,6 +440,32 @@ These features ensure the cache runs efficiently even under high load and with l
|
||||
Since version `0.5.30` the cache is gzipped in the memory, which should optimise the memory usage quite significantly.
|
||||
Since version `0.15.48` the you can also use the distributed Redis cache.
|
||||
|
||||
#### Cache Eviction Algorithms
|
||||
|
||||
The proxy supports two cache eviction strategies:
|
||||
|
||||
**Standard (default):** Uses Go's `sync.Map` with approximate eviction. When memory limits are reached, entries are evicted based on iteration order (pseudo-random). This is memory-efficient and has excellent concurrent read performance.
|
||||
|
||||
**LRU (Least Recently Used):** Uses a proper LRU algorithm with a linked list to track access order. When limits are reached, the least recently accessed entries are evicted first. Enable with `CACHE_USE_LRU=true`.
|
||||
|
||||
| Feature | Standard | LRU |
|
||||
|---------|----------|-----|
|
||||
| Eviction order | Pseudo-random | Least recently used |
|
||||
| Read performance | Excellent | Good |
|
||||
| Memory tracking | Approximate | Precise |
|
||||
| Best for | High read throughput | Cache hit optimization |
|
||||
|
||||
*LRU cache configuration:*
|
||||
```bash
|
||||
GMP_ENABLE_GLOBAL_CACHE=true
|
||||
GMP_CACHE_TTL=300
|
||||
GMP_CACHE_USE_LRU=true
|
||||
GMP_CACHE_MAX_MEMORY_SIZE=200
|
||||
GMP_CACHE_MAX_ENTRIES=5000
|
||||
```
|
||||
|
||||
Use LRU when cache hit rate is critical and you want to ensure frequently accessed data stays cached. Use Standard (default) for maximum read throughput with less memory overhead.
|
||||
|
||||
#### Read-only endpoint
|
||||
|
||||
You can now specify the read-only GraphQL endpoint by setting the `HOST_GRAPHQL_READONLY` environment variable. The default value is empty, preventing the proxy from using the read-only endpoint for the queries and directing all the requests to the main endpoint specified as `HOST_GRAPHQL`. If the `HOST_GRAPHQL_READONLY` is set, the proxy will use the read-only endpoint for the queries with the `query` type and the main endpoint for the `mutation` type queries. Format of the read-only endpoint is the same as `HOST_GRAPHQL` endpoint, for example `http://localhost:8080/`.
|
||||
|
||||
+59
-20
@@ -1277,15 +1277,34 @@
|
||||
stateEl.classList.remove('loading');
|
||||
|
||||
let badgeClass = 'badge-info';
|
||||
if (data.state === 'closed') badgeClass = 'badge-success';
|
||||
else if (data.state === 'open') badgeClass = 'badge-danger';
|
||||
else if (data.state === 'half-open') badgeClass = 'badge-warning';
|
||||
let stateText = data.state || 'unknown';
|
||||
|
||||
stateEl.innerHTML = `<span class="badge ${badgeClass}">${data.state || 'Unknown'}</span>`;
|
||||
// For cluster mode, determine state from instance counts
|
||||
if (data.instances_open !== undefined) {
|
||||
// Cluster mode data
|
||||
if (data.instances_open > 0) {
|
||||
stateText = `${data.instances_open} open`;
|
||||
badgeClass = 'badge-danger';
|
||||
} else if (data.instances_halfopen > 0) {
|
||||
stateText = `${data.instances_halfopen} half-open`;
|
||||
badgeClass = 'badge-warning';
|
||||
} else if (data.instances_closed > 0) {
|
||||
stateText = `${data.instances_closed} closed`;
|
||||
badgeClass = 'badge-success';
|
||||
}
|
||||
} else {
|
||||
// Single instance mode
|
||||
if (data.state === 'closed') badgeClass = 'badge-success';
|
||||
else if (data.state === 'open') badgeClass = 'badge-danger';
|
||||
else if (data.state === 'half-open') badgeClass = 'badge-warning';
|
||||
}
|
||||
|
||||
stateEl.innerHTML = `<span class="badge ${badgeClass}">${stateText}</span>`;
|
||||
|
||||
document.getElementById('cb-enabled').textContent = data.enabled ? 'Yes' : 'No';
|
||||
|
||||
if (data.counts) {
|
||||
// Single instance mode with detailed counts
|
||||
document.getElementById('cb-total-requests').textContent =
|
||||
(data.counts.requests || 0).toLocaleString();
|
||||
document.getElementById('cb-total-successes').textContent =
|
||||
@@ -1296,6 +1315,14 @@
|
||||
(data.counts.consecutive_successes || 0).toLocaleString();
|
||||
document.getElementById('cb-consecutive-failures').textContent =
|
||||
(data.counts.consecutive_failures || 0).toLocaleString();
|
||||
} else if (data.instances_open !== undefined) {
|
||||
// Cluster mode - show instance distribution instead
|
||||
const total = (data.instances_open || 0) + (data.instances_closed || 0) + (data.instances_halfopen || 0);
|
||||
document.getElementById('cb-total-requests').textContent = total + ' instances';
|
||||
document.getElementById('cb-total-successes').textContent = (data.instances_closed || 0).toLocaleString();
|
||||
document.getElementById('cb-total-failures').textContent = (data.instances_open || 0).toLocaleString();
|
||||
document.getElementById('cb-consecutive-successes').textContent = '--';
|
||||
document.getElementById('cb-consecutive-failures').textContent = '--';
|
||||
}
|
||||
|
||||
if (data.config) {
|
||||
@@ -1307,23 +1334,31 @@
|
||||
function updateCoalescing(data) {
|
||||
document.getElementById('coalescing-rate').textContent =
|
||||
(data.backend_savings_pct || 0).toFixed(1) + '%';
|
||||
document.getElementById('coalescing-total').textContent =
|
||||
(data.total_requests || 0).toLocaleString();
|
||||
document.getElementById('coalescing-primary').textContent =
|
||||
(data.primary_requests || 0).toLocaleString();
|
||||
document.getElementById('coalescing-coalesced').textContent =
|
||||
(data.coalesced_requests || 0).toLocaleString();
|
||||
|
||||
// Handle both single instance (total_requests) and cluster mode (total_coalesced + total_primary)
|
||||
const totalRequests = data.total_requests ||
|
||||
((data.total_coalesced_requests || 0) + (data.total_primary_requests || 0));
|
||||
document.getElementById('coalescing-total').textContent = totalRequests.toLocaleString();
|
||||
|
||||
// Handle both single instance and cluster mode field names
|
||||
const primaryRequests = data.primary_requests || data.total_primary_requests || 0;
|
||||
document.getElementById('coalescing-primary').textContent = primaryRequests.toLocaleString();
|
||||
|
||||
const coalescedRequests = data.coalesced_requests || data.total_coalesced_requests || 0;
|
||||
document.getElementById('coalescing-coalesced').textContent = coalescedRequests.toLocaleString();
|
||||
|
||||
document.getElementById('coalescing-savings').textContent =
|
||||
(data.backend_savings_pct || 0).toFixed(1) + '%';
|
||||
}
|
||||
|
||||
function updateRetryBudget(data) {
|
||||
document.getElementById('retry-tokens').textContent =
|
||||
data.current_tokens || '--';
|
||||
document.getElementById('retry-current-tokens').textContent =
|
||||
data.current_tokens || '--';
|
||||
document.getElementById('retry-max-tokens').textContent =
|
||||
data.max_tokens || '--';
|
||||
// Use explicit undefined check to handle 0 values correctly
|
||||
const currentTokens = data.current_tokens !== undefined ? data.current_tokens : '--';
|
||||
const maxTokens = data.max_tokens !== undefined ? data.max_tokens : '--';
|
||||
|
||||
document.getElementById('retry-tokens').textContent = currentTokens;
|
||||
document.getElementById('retry-current-tokens').textContent = currentTokens;
|
||||
document.getElementById('retry-max-tokens').textContent = maxTokens;
|
||||
document.getElementById('retry-total').textContent =
|
||||
(data.total_attempts || 0).toLocaleString();
|
||||
document.getElementById('retry-denied').textContent =
|
||||
@@ -1333,13 +1368,17 @@
|
||||
}
|
||||
|
||||
function updateWebSocket(data) {
|
||||
document.getElementById('ws-connections').textContent =
|
||||
data.active_connections || 0;
|
||||
// Handle both single instance (active_connections) and cluster mode (total_connections)
|
||||
const connections = data.active_connections !== undefined ? data.active_connections :
|
||||
(data.total_connections !== undefined ? data.total_connections : 0);
|
||||
document.getElementById('ws-connections').textContent = connections;
|
||||
}
|
||||
|
||||
function updateConnections(data) {
|
||||
document.getElementById('pool-connections').textContent =
|
||||
data.active_connections || 0;
|
||||
// Handle both single instance (active_connections) and cluster mode (total_active)
|
||||
const connections = data.active_connections !== undefined ? data.active_connections :
|
||||
(data.total_active !== undefined ? data.total_active : 0);
|
||||
document.getElementById('pool-connections').textContent = connections;
|
||||
}
|
||||
|
||||
async function resetCoalescing() {
|
||||
|
||||
+1
-1
@@ -429,7 +429,7 @@ func (ad *AdminDashboard) getWebSocketStats(c *fiber.Ctx) error {
|
||||
|
||||
// clearCache clears the cache
|
||||
func (ad *AdminDashboard) clearCache(c *fiber.Ctx) error {
|
||||
// TODO: Implement cache clearing
|
||||
libpack_cache.CacheClear()
|
||||
return c.JSON(map[string]interface{}{
|
||||
"success": true,
|
||||
"message": "Cache cleared successfully",
|
||||
|
||||
@@ -214,6 +214,7 @@ func TestAdminDashboard_GetCacheStats(t *testing.T) {
|
||||
CacheRedisEnable bool
|
||||
CacheMaxMemorySize int
|
||||
CacheMaxEntries int
|
||||
CacheUseLRU bool
|
||||
GraphQLQueryCacheSize int
|
||||
PerUserCacheDisabled bool
|
||||
}{
|
||||
@@ -221,6 +222,7 @@ func TestAdminDashboard_GetCacheStats(t *testing.T) {
|
||||
CacheTTL: 60,
|
||||
CacheMaxMemorySize: 100,
|
||||
CacheMaxEntries: 10000,
|
||||
CacheUseLRU: false,
|
||||
PerUserCacheDisabled: false,
|
||||
},
|
||||
}
|
||||
|
||||
Vendored
+28
-27
@@ -27,6 +27,7 @@ type CacheConfig struct {
|
||||
Memory struct {
|
||||
MaxMemorySize int64 `json:"max_memory_size"` // Maximum memory size in bytes
|
||||
MaxEntries int64 `json:"max_entries"` // Maximum number of entries
|
||||
UseLRU bool `json:"use_lru"` // Use LRU eviction algorithm instead of random eviction
|
||||
}
|
||||
TTL int `json:"ttl"`
|
||||
IncludeUserContext bool `json:"include_user_context"` // Include user ID and role in cache key
|
||||
@@ -96,16 +97,6 @@ func CalculateHash(c *fiber.Ctx, userID string, userRole string) string {
|
||||
return strutil.Md5(cacheKeyData)
|
||||
}
|
||||
|
||||
// CalculateHashLegacy generates a cache hash using only the request body (DEPRECATED).
|
||||
// This function exists for backward compatibility only and should NOT be used
|
||||
// in production multi-user applications as it creates a security vulnerability
|
||||
// where users can see each other's cached data.
|
||||
//
|
||||
// Deprecated: Use CalculateHash with user context instead.
|
||||
func CalculateHashLegacy(c *fiber.Ctx) string {
|
||||
return strutil.Md5(c.Body())
|
||||
}
|
||||
|
||||
func EnableCache(cfg *CacheConfig) {
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = libpack_logger.New()
|
||||
@@ -134,34 +125,41 @@ func EnableCache(cfg *CacheConfig) {
|
||||
cfg.Client = libpack_cache_redis.NewCacheWrapper(redisClient, cfg.Logger)
|
||||
}
|
||||
} else {
|
||||
// Calculate memory and entry limits
|
||||
maxMemory := cfg.Memory.MaxMemorySize
|
||||
if maxMemory <= 0 {
|
||||
maxMemory = libpack_cache_memory.DefaultMaxMemorySize
|
||||
}
|
||||
|
||||
maxEntries := cfg.Memory.MaxEntries
|
||||
if maxEntries <= 0 {
|
||||
maxEntries = libpack_cache_memory.DefaultMaxCacheSize
|
||||
}
|
||||
|
||||
cacheType := "standard"
|
||||
if cfg.Memory.UseLRU {
|
||||
cacheType = "LRU"
|
||||
}
|
||||
|
||||
cfg.Logger.Debug(&libpack_logger.LogMessage{
|
||||
Message: "Using in-memory cache",
|
||||
Pairs: map[string]interface{}{
|
||||
"max_memory_size_bytes": cfg.Memory.MaxMemorySize,
|
||||
"max_entries": cfg.Memory.MaxEntries,
|
||||
"type": cacheType,
|
||||
"max_memory_size_bytes": maxMemory,
|
||||
"max_entries": maxEntries,
|
||||
},
|
||||
})
|
||||
|
||||
// Use memory size and entry limits if configured, otherwise use defaults
|
||||
if cfg.Memory.MaxMemorySize > 0 || cfg.Memory.MaxEntries > 0 {
|
||||
maxMemory := cfg.Memory.MaxMemorySize
|
||||
if maxMemory <= 0 {
|
||||
maxMemory = libpack_cache_memory.DefaultMaxMemorySize
|
||||
}
|
||||
|
||||
maxEntries := cfg.Memory.MaxEntries
|
||||
if maxEntries <= 0 {
|
||||
maxEntries = libpack_cache_memory.DefaultMaxCacheSize
|
||||
}
|
||||
|
||||
if cfg.Memory.UseLRU {
|
||||
// Use LRU cache with proper eviction algorithm
|
||||
cfg.Client = libpack_cache_memory.NewLRUMemoryCache(maxMemory, maxEntries)
|
||||
} else {
|
||||
// Use standard sync.Map-based cache
|
||||
cfg.Client = libpack_cache_memory.NewWithSize(
|
||||
time.Duration(cfg.TTL)*time.Second,
|
||||
maxMemory,
|
||||
maxEntries,
|
||||
)
|
||||
} else {
|
||||
// Backward compatibility
|
||||
cfg.Client = libpack_cache_memory.New(time.Duration(cfg.TTL) * time.Second)
|
||||
}
|
||||
}
|
||||
config = cfg
|
||||
@@ -271,6 +269,9 @@ func CacheGetQueries() int64 {
|
||||
}
|
||||
|
||||
func CacheClear() {
|
||||
if !IsCacheInitialized() {
|
||||
return
|
||||
}
|
||||
config.Client.Clear()
|
||||
cacheStats = &CacheStats{}
|
||||
}
|
||||
|
||||
Vendored
+5
@@ -279,3 +279,8 @@ func (c *LRUMemoryCache) GetMemoryUsage() int64 {
|
||||
func (c *LRUMemoryCache) GetMaxMemorySize() int64 {
|
||||
return c.maxMemorySize
|
||||
}
|
||||
|
||||
// CountQueries returns the number of entries in the cache
|
||||
func (c *LRUMemoryCache) CountQueries() int64 {
|
||||
return atomic.LoadInt64(&c.currentCount)
|
||||
}
|
||||
|
||||
+343
@@ -0,0 +1,343 @@
|
||||
package libpack_cache_memory
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type LRUMemoryCacheTestSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestLRUMemoryCacheTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(LRUMemoryCacheTestSuite))
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestNewLRUMemoryCache() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100) // 1MB, 100 entries
|
||||
suite.NotNil(cache)
|
||||
suite.Equal(int64(0), cache.CountQueries())
|
||||
suite.Equal(int64(0), cache.GetMemoryUsage())
|
||||
suite.Equal(int64(1024*1024), cache.GetMaxMemorySize())
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestSetAndGet() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
// Set a value
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
|
||||
// Get the value
|
||||
val, found := cache.Get("key1")
|
||||
suite.True(found)
|
||||
suite.Equal([]byte("value1"), val)
|
||||
|
||||
// Get non-existent key
|
||||
val, found = cache.Get("nonexistent")
|
||||
suite.False(found)
|
||||
suite.Nil(val)
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestUpdateExisting() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
cache.Set("key1", []byte("value2"), 5*time.Second)
|
||||
|
||||
val, found := cache.Get("key1")
|
||||
suite.True(found)
|
||||
suite.Equal([]byte("value2"), val)
|
||||
suite.Equal(int64(1), cache.CountQueries())
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestDelete() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
suite.Equal(int64(1), cache.CountQueries())
|
||||
|
||||
cache.Delete("key1")
|
||||
suite.Equal(int64(0), cache.CountQueries())
|
||||
|
||||
val, found := cache.Get("key1")
|
||||
suite.False(found)
|
||||
suite.Nil(val)
|
||||
|
||||
// Delete non-existent key should not panic
|
||||
cache.Delete("nonexistent")
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestClear() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
cache.Set("key2", []byte("value2"), 5*time.Second)
|
||||
cache.Set("key3", []byte("value3"), 5*time.Second)
|
||||
suite.Equal(int64(3), cache.CountQueries())
|
||||
|
||||
cache.Clear()
|
||||
suite.Equal(int64(0), cache.CountQueries())
|
||||
suite.Equal(int64(0), cache.GetMemoryUsage())
|
||||
|
||||
_, found := cache.Get("key1")
|
||||
suite.False(found)
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestExpiration() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
cache.Set("key1", []byte("value1"), 100*time.Millisecond)
|
||||
|
||||
// Should exist immediately
|
||||
val, found := cache.Get("key1")
|
||||
suite.True(found)
|
||||
suite.Equal([]byte("value1"), val)
|
||||
|
||||
// Wait for expiration
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
// Should be expired
|
||||
val, found = cache.Get("key1")
|
||||
suite.False(found)
|
||||
suite.Nil(val)
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestEvictionByCount() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 3) // Max 3 entries
|
||||
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
cache.Set("key2", []byte("value2"), 5*time.Second)
|
||||
cache.Set("key3", []byte("value3"), 5*time.Second)
|
||||
|
||||
// All 3 should exist
|
||||
_, found := cache.Get("key1")
|
||||
suite.True(found)
|
||||
_, found = cache.Get("key2")
|
||||
suite.True(found)
|
||||
_, found = cache.Get("key3")
|
||||
suite.True(found)
|
||||
|
||||
// Add 4th entry - should evict oldest (key1)
|
||||
cache.Set("key4", []byte("value4"), 5*time.Second)
|
||||
|
||||
suite.Equal(int64(3), cache.CountQueries())
|
||||
|
||||
// key1 should be evicted (it was least recently used)
|
||||
_, found = cache.Get("key1")
|
||||
suite.False(found)
|
||||
|
||||
// Others should still exist
|
||||
_, found = cache.Get("key2")
|
||||
suite.True(found)
|
||||
_, found = cache.Get("key3")
|
||||
suite.True(found)
|
||||
_, found = cache.Get("key4")
|
||||
suite.True(found)
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestLRUOrder() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 3) // Max 3 entries
|
||||
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
cache.Set("key2", []byte("value2"), 5*time.Second)
|
||||
cache.Set("key3", []byte("value3"), 5*time.Second)
|
||||
|
||||
// Access key1 to make it recently used
|
||||
cache.Get("key1")
|
||||
|
||||
// Add key4 - should evict key2 (now least recently used)
|
||||
cache.Set("key4", []byte("value4"), 5*time.Second)
|
||||
|
||||
// key2 should be evicted
|
||||
_, found := cache.Get("key2")
|
||||
suite.False(found)
|
||||
|
||||
// key1 should still exist (was accessed recently)
|
||||
_, found = cache.Get("key1")
|
||||
suite.True(found)
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestEvictionByMemory() {
|
||||
// Small memory limit - 500 bytes
|
||||
cache := NewLRUMemoryCache(500, 100)
|
||||
|
||||
// Each entry has ~64 bytes overhead + key + value
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
cache.Set("key2", []byte("value2"), 5*time.Second)
|
||||
cache.Set("key3", []byte("value3"), 5*time.Second)
|
||||
|
||||
// Add large entry that should trigger eviction
|
||||
largeValue := make([]byte, 200)
|
||||
cache.Set("large", largeValue, 5*time.Second)
|
||||
|
||||
// Memory should be under limit
|
||||
suite.LessOrEqual(cache.GetMemoryUsage(), int64(500))
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestCompression() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
// Create a compressible value (> 1KB to trigger compression)
|
||||
largeValue := make([]byte, 2048)
|
||||
for i := range largeValue {
|
||||
largeValue[i] = 'A' // Highly compressible
|
||||
}
|
||||
|
||||
cache.Set("compressed", largeValue, 5*time.Second)
|
||||
|
||||
// Should be able to retrieve it correctly
|
||||
val, found := cache.Get("compressed")
|
||||
suite.True(found)
|
||||
suite.Equal(largeValue, val)
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestGetStats() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
cache.Set("key2", []byte("value2"), 5*time.Second)
|
||||
|
||||
stats := cache.GetStats()
|
||||
suite.Equal(int64(2), stats["entries"])
|
||||
suite.Equal(int64(1024*1024), stats["max_memory"])
|
||||
suite.Equal(int64(100), stats["max_entries"])
|
||||
suite.NotNil(stats["memory_bytes"])
|
||||
suite.NotNil(stats["fill_percent"])
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestConcurrentAccess() {
|
||||
cache := NewLRUMemoryCache(10*1024*1024, 1000)
|
||||
const numGoroutines = 50
|
||||
const numOperations = 500
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numGoroutines * 3) // readers, writers, deleters
|
||||
|
||||
// Writers
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < numOperations; j++ {
|
||||
key := fmt.Sprintf("key-%d-%d", id, j)
|
||||
value := []byte(fmt.Sprintf("value-%d-%d", id, j))
|
||||
cache.Set(key, value, 5*time.Second)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Readers
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < numOperations; j++ {
|
||||
key := fmt.Sprintf("key-%d-%d", id, j)
|
||||
cache.Get(key)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Deleters
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < numOperations; j++ {
|
||||
key := fmt.Sprintf("key-%d-%d", id, j%100)
|
||||
cache.Delete(key)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestCleanExpiredEntries() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
cache.Set("expire1", []byte("value1"), 50*time.Millisecond)
|
||||
cache.Set("expire2", []byte("value2"), 50*time.Millisecond)
|
||||
cache.Set("keep", []byte("value3"), 5*time.Second)
|
||||
|
||||
suite.Equal(int64(3), cache.CountQueries())
|
||||
|
||||
// Wait for some to expire
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Clean expired entries
|
||||
cache.CleanExpiredEntries()
|
||||
|
||||
// Only "keep" should remain
|
||||
suite.Equal(int64(1), cache.CountQueries())
|
||||
|
||||
_, found := cache.Get("keep")
|
||||
suite.True(found)
|
||||
}
|
||||
|
||||
func (suite *LRUMemoryCacheTestSuite) TestCountQueries() {
|
||||
cache := NewLRUMemoryCache(1024*1024, 100)
|
||||
|
||||
suite.Equal(int64(0), cache.CountQueries())
|
||||
|
||||
cache.Set("key1", []byte("value1"), 5*time.Second)
|
||||
suite.Equal(int64(1), cache.CountQueries())
|
||||
|
||||
cache.Set("key2", []byte("value2"), 5*time.Second)
|
||||
suite.Equal(int64(2), cache.CountQueries())
|
||||
|
||||
cache.Delete("key1")
|
||||
suite.Equal(int64(1), cache.CountQueries())
|
||||
|
||||
cache.Clear()
|
||||
suite.Equal(int64(0), cache.CountQueries())
|
||||
}
|
||||
|
||||
// Benchmarks
|
||||
|
||||
func BenchmarkLRUMemoryCacheSet(b *testing.B) {
|
||||
cache := NewLRUMemoryCache(100*1024*1024, 100000)
|
||||
value := []byte("benchmark-value")
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
key := fmt.Sprintf("key-%d", i)
|
||||
cache.Set(key, value, 5*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLRUMemoryCacheGet(b *testing.B) {
|
||||
cache := NewLRUMemoryCache(100*1024*1024, 100000)
|
||||
value := []byte("benchmark-value")
|
||||
|
||||
// Pre-populate
|
||||
for i := 0; i < 10000; i++ {
|
||||
key := fmt.Sprintf("key-%d", i)
|
||||
cache.Set(key, value, 5*time.Minute)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
key := fmt.Sprintf("key-%d", i%10000)
|
||||
cache.Get(key)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLRUMemoryCacheConcurrent(b *testing.B) {
|
||||
cache := NewLRUMemoryCache(100*1024*1024, 100000)
|
||||
value := []byte("benchmark-value")
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
i := 0
|
||||
for pb.Next() {
|
||||
key := fmt.Sprintf("key-%d", i)
|
||||
if i%2 == 0 {
|
||||
cache.Set(key, value, 5*time.Second)
|
||||
} else {
|
||||
cache.Get(key)
|
||||
}
|
||||
i++
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -106,117 +106,8 @@ func (e *ProxyError) WithMetadata(key string, value interface{}) *ProxyError {
|
||||
return e
|
||||
}
|
||||
|
||||
// Common error constructors
|
||||
|
||||
// NewConnectionError creates a connection-related error
|
||||
func NewConnectionError(err error) *ProxyError {
|
||||
code := ErrCodeConnectionRefused
|
||||
if err != nil {
|
||||
errStr := err.Error()
|
||||
if contains(errStr, "reset") {
|
||||
code = ErrCodeConnectionReset
|
||||
}
|
||||
}
|
||||
|
||||
return NewProxyError(code, "Failed to connect to backend", 502, true).
|
||||
WithCause(err)
|
||||
}
|
||||
|
||||
// NewTimeoutError creates a timeout error
|
||||
func NewTimeoutError(err error) *ProxyError {
|
||||
return NewProxyError(ErrCodeTimeout, "Request timed out", 504, false).
|
||||
WithCause(err)
|
||||
}
|
||||
|
||||
// NewCircuitOpenError creates a circuit breaker open error
|
||||
func NewCircuitOpenError() *ProxyError {
|
||||
return NewProxyError(ErrCodeCircuitOpen, "Service temporarily unavailable due to circuit breaker", 503, false).
|
||||
WithDetails("The backend service is currently experiencing issues. Please try again later.")
|
||||
}
|
||||
|
||||
// NewRateLimitError creates a rate limit error
|
||||
func NewRateLimitError(userID, role string) *ProxyError {
|
||||
return NewProxyError(ErrCodeRateLimited, "Rate limit exceeded", 429, false).
|
||||
WithDetails("You have exceeded the rate limit for your role").
|
||||
WithMetadata("user_id", userID).
|
||||
WithMetadata("role", role)
|
||||
}
|
||||
|
||||
// NewBackendError creates a backend error from status code
|
||||
func NewBackendError(statusCode int, body string) *ProxyError {
|
||||
code := ErrCodeBackendError
|
||||
message := "Backend returned an error"
|
||||
retryable := false
|
||||
|
||||
switch {
|
||||
case statusCode == 429:
|
||||
code = ErrCodeRateLimited
|
||||
message = "Backend rate limit exceeded"
|
||||
retryable = true
|
||||
case statusCode == 503:
|
||||
code = ErrCodeServiceUnavailable
|
||||
message = "Backend service unavailable"
|
||||
retryable = true
|
||||
case statusCode == 502 || statusCode == 504:
|
||||
code = ErrCodeBadGateway
|
||||
message = "Bad gateway"
|
||||
retryable = true
|
||||
case statusCode >= 500:
|
||||
code = ErrCodeBackendError
|
||||
message = "Backend server error"
|
||||
retryable = true
|
||||
case statusCode == 404:
|
||||
code = ErrCodeNotFound
|
||||
message = "Resource not found"
|
||||
case statusCode == 403:
|
||||
code = ErrCodeForbidden
|
||||
message = "Access forbidden"
|
||||
case statusCode == 401:
|
||||
code = ErrCodeUnauthorized
|
||||
message = "Unauthorized"
|
||||
case statusCode >= 400:
|
||||
code = ErrCodeInvalidRequest
|
||||
message = "Invalid request"
|
||||
}
|
||||
|
||||
return NewProxyError(code, message, statusCode, retryable).
|
||||
WithMetadata("backend_status", statusCode).
|
||||
WithMetadata("backend_body", truncateString(body, 500))
|
||||
}
|
||||
|
||||
// NewInvalidResponseError creates an invalid response error
|
||||
func NewInvalidResponseError(details string) *ProxyError {
|
||||
return NewProxyError(ErrCodeInvalidResponse, "Backend returned invalid response", 502, false).
|
||||
WithDetails(details)
|
||||
}
|
||||
|
||||
// NewInternalError creates an internal error
|
||||
func NewInternalError(err error) *ProxyError {
|
||||
return NewProxyError(ErrCodeInternalError, "Internal proxy error", 500, false).
|
||||
WithCause(err)
|
||||
}
|
||||
|
||||
// NewContextCanceledError creates a context canceled error
|
||||
func NewContextCanceledError() *ProxyError {
|
||||
return NewProxyError(ErrCodeContextCanceled, "Request canceled", 499, false).
|
||||
WithDetails("The request was canceled by the client")
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
|
||||
func contains(s, substr string) bool {
|
||||
return len(s) > 0 && len(substr) > 0 && len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr || containsMiddle(s, substr)))
|
||||
}
|
||||
|
||||
func containsMiddle(s, substr string) bool {
|
||||
for i := 0; i <= len(s)-len(substr); i++ {
|
||||
if s[i:i+len(substr)] == substr {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func truncateString(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
|
||||
@@ -20,7 +20,7 @@ require (
|
||||
github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.89
|
||||
github.com/redis/go-redis/v9 v9.17.1
|
||||
github.com/redis/go-redis/v9 v9.17.2
|
||||
github.com/sony/gobreaker v1.0.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/valyala/fasthttp v1.68.0
|
||||
@@ -47,7 +47,7 @@ require (
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/klauspost/compress v1.18.1 // indirect
|
||||
github.com/klauspost/compress v1.18.2 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.19 // indirect
|
||||
@@ -67,8 +67,8 @@ require (
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/term v0.37.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251124214823-79d6a2a48846 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251124214823-79d6a2a48846 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
|
||||
google.golang.org/protobuf v1.36.10 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -64,8 +64,8 @@ github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
|
||||
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
|
||||
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
|
||||
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
|
||||
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
@@ -84,8 +84,8 @@ github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byF
|
||||
github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.17.1 h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
|
||||
github.com/redis/go-redis/v9 v9.17.1/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=
|
||||
github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 h1:McifyVxygw1d67y6vxUqls2D46J8W9nrki9c8c0eVvE=
|
||||
@@ -144,10 +144,10 @@ golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
|
||||
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
|
||||
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251124214823-79d6a2a48846 h1:ZdyUkS9po3H7G0tuh955QVyyotWvOD4W0aEapeGeUYk=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251124214823-79d6a2a48846/go.mod h1:Fk4kyraUvqD7i5H6S43sj2W98fbZa75lpZz/eUyhfO0=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251124214823-79d6a2a48846 h1:Wgl1rcDNThT+Zn47YyCXOXyX/COgMTIdhJ717F0l4xk=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251124214823-79d6a2a48846/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM=
|
||||
google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -783,3 +784,251 @@ func (suite *Tests) TestRequestCoalescingIntegration() {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestRetryBudgetIntegration tests that retry budget correctly limits retry attempts
|
||||
func (suite *Tests) TestRetryBudgetIntegration() {
|
||||
// Initialize a retry budget with limited tokens for testing
|
||||
budgetCtx := context.Background()
|
||||
testBudget := NewRetryBudgetWithContext(budgetCtx, RetryBudgetConfig{
|
||||
MaxTokens: 3, // Only allow 3 retries total
|
||||
TokensPerSecond: 0, // Don't refill during test
|
||||
Enabled: true,
|
||||
}, cfg.Logger)
|
||||
|
||||
// Replace global retry budget
|
||||
originalBudget := retryBudget
|
||||
retryBudget = testBudget
|
||||
defer func() {
|
||||
testBudget.Shutdown()
|
||||
retryBudget = originalBudget
|
||||
}()
|
||||
|
||||
suite.Run("retry_budget_limits_retries", func() {
|
||||
testBudget.Reset()
|
||||
|
||||
// Verify retry budget is set and works correctly
|
||||
rb := GetRetryBudget()
|
||||
suite.NotNil(rb, "Retry budget should be set")
|
||||
suite.True(rb.enabled, "Retry budget should be enabled")
|
||||
suite.T().Logf("Retry budget: enabled=%v, tokens=%d", rb.enabled, rb.currentTokens.Load())
|
||||
|
||||
// Test that AllowRetry consumes tokens correctly
|
||||
initialTokens := rb.currentTokens.Load()
|
||||
suite.Equal(int64(3), initialTokens, "Should start with 3 tokens")
|
||||
|
||||
// First 3 retries should be allowed
|
||||
suite.True(rb.AllowRetry(), "First retry should be allowed")
|
||||
suite.True(rb.AllowRetry(), "Second retry should be allowed")
|
||||
suite.True(rb.AllowRetry(), "Third retry should be allowed")
|
||||
|
||||
// Fourth retry should be denied (tokens exhausted)
|
||||
suite.False(rb.AllowRetry(), "Fourth retry should be denied - budget exhausted")
|
||||
|
||||
// Verify stats
|
||||
stats := rb.GetStats()
|
||||
suite.Equal(int64(4), stats["total_attempts"].(int64), "Should have 4 total attempts")
|
||||
suite.Equal(int64(3), stats["allowed_retries"].(int64), "Should have 3 allowed retries")
|
||||
suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry")
|
||||
|
||||
suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d",
|
||||
stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"])
|
||||
})
|
||||
|
||||
suite.Run("retry_budget_exhaustion", func() {
|
||||
// Create a new budget with only 1 token
|
||||
testBudget.Shutdown()
|
||||
budgetCtx2 := context.Background()
|
||||
testBudget2 := NewRetryBudgetWithContext(budgetCtx2, RetryBudgetConfig{
|
||||
MaxTokens: 1, // Only allow 1 retry
|
||||
TokensPerSecond: 0, // Don't refill
|
||||
Enabled: true,
|
||||
}, cfg.Logger)
|
||||
retryBudget = testBudget2
|
||||
defer func() {
|
||||
testBudget2.Shutdown()
|
||||
}()
|
||||
|
||||
// Test budget exhaustion with 1 token
|
||||
rb := GetRetryBudget()
|
||||
suite.NotNil(rb, "Retry budget should be set")
|
||||
suite.Equal(int64(1), rb.currentTokens.Load(), "Should start with 1 token")
|
||||
|
||||
// First retry should be allowed
|
||||
suite.True(rb.AllowRetry(), "First retry should be allowed")
|
||||
|
||||
// Second retry should be denied (only 1 token available)
|
||||
suite.False(rb.AllowRetry(), "Second retry should be denied - budget exhausted")
|
||||
|
||||
// Verify stats
|
||||
stats := rb.GetStats()
|
||||
suite.Equal(int64(2), stats["total_attempts"].(int64), "Should have 2 total attempts")
|
||||
suite.Equal(int64(1), stats["allowed_retries"].(int64), "Should have 1 allowed retry")
|
||||
suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry")
|
||||
|
||||
suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d",
|
||||
stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"])
|
||||
})
|
||||
}
|
||||
|
||||
// TestConnectionPoolStatsIntegration tests that connection pool stats are tracked
|
||||
func (suite *Tests) TestConnectionPoolStatsIntegration() {
|
||||
// Save original config
|
||||
originalClient := cfg.Client.FastProxyClient
|
||||
originalHostGraphQL := cfg.Server.HostGraphQL
|
||||
originalCoalescing := cfg.RequestCoalescing.Enable
|
||||
|
||||
// Restore after test
|
||||
defer func() {
|
||||
cfg.Client.FastProxyClient = originalClient
|
||||
cfg.Server.HostGraphQL = originalHostGraphQL
|
||||
cfg.RequestCoalescing.Enable = originalCoalescing
|
||||
}()
|
||||
|
||||
// Disable request coalescing for accurate tracking
|
||||
cfg.RequestCoalescing.Enable = false
|
||||
|
||||
suite.Run("connection_success_tracked", func() {
|
||||
// Create test server that succeeds
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"data":{"test":"success"}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg.Server.HostGraphQL = server.URL
|
||||
cfg.Client.ClientTimeout = 5
|
||||
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
|
||||
|
||||
// Initialize connection pool
|
||||
InitializeConnectionPool(cfg.Client.FastProxyClient)
|
||||
defer ShutdownConnectionPool()
|
||||
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
|
||||
|
||||
// Get stats before
|
||||
statsBefore := poolMgr.GetConnectionStats()
|
||||
successBefore := statsBefore["total_connections"].(int64)
|
||||
|
||||
// Make a successful request
|
||||
reqCtx := &fasthttp.RequestCtx{}
|
||||
reqCtx.Request.SetRequestURI("/graphql")
|
||||
reqCtx.Request.Header.SetMethod("POST")
|
||||
reqCtx.Request.Header.Set("Content-Type", "application/json")
|
||||
reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`))
|
||||
|
||||
ctx := suite.app.AcquireCtx(reqCtx)
|
||||
err := proxyTheRequest(ctx, cfg.Server.HostGraphQL)
|
||||
suite.app.ReleaseCtx(ctx)
|
||||
|
||||
suite.Nil(err, "Request should succeed")
|
||||
|
||||
// Get stats after
|
||||
statsAfter := poolMgr.GetConnectionStats()
|
||||
successAfter := statsAfter["total_connections"].(int64)
|
||||
|
||||
suite.Greater(successAfter, successBefore,
|
||||
"Total connections should increase after successful request")
|
||||
})
|
||||
|
||||
suite.Run("connection_failure_tracked_on_5xx", func() {
|
||||
// Create test server that returns 503
|
||||
// Note: 503 triggers retry which records failures
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
_, _ = w.Write([]byte(`{"errors":[{"message":"Service unavailable"}]}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg.Server.HostGraphQL = server.URL
|
||||
cfg.Client.ClientTimeout = 2
|
||||
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
|
||||
|
||||
// Initialize connection pool
|
||||
InitializeConnectionPool(cfg.Client.FastProxyClient)
|
||||
defer ShutdownConnectionPool()
|
||||
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
|
||||
|
||||
// Get stats before
|
||||
statsBefore := poolMgr.GetConnectionStats()
|
||||
failuresBefore := statsBefore["connection_failures"].(int64)
|
||||
|
||||
// Make a failing request (503 is retryable, so it will retry and track failures)
|
||||
reqCtx := &fasthttp.RequestCtx{}
|
||||
reqCtx.Request.SetRequestURI("/graphql")
|
||||
reqCtx.Request.Header.SetMethod("POST")
|
||||
reqCtx.Request.Header.Set("Content-Type", "application/json")
|
||||
reqCtx.Request.SetBody([]byte(`{"query": "query { fail }"}`))
|
||||
|
||||
ctx := suite.app.AcquireCtx(reqCtx)
|
||||
_ = proxyTheRequest(ctx, cfg.Server.HostGraphQL)
|
||||
suite.app.ReleaseCtx(ctx)
|
||||
|
||||
// Get stats after - should have failures from retry attempts
|
||||
statsAfter := poolMgr.GetConnectionStats()
|
||||
failuresAfter := statsAfter["connection_failures"].(int64)
|
||||
|
||||
suite.Greater(failuresAfter, failuresBefore,
|
||||
"Connection failures should increase after 5xx responses that trigger retries")
|
||||
|
||||
suite.T().Logf("Connection failures: before=%d, after=%d",
|
||||
failuresBefore, failuresAfter)
|
||||
})
|
||||
|
||||
suite.Run("stats_reflect_request_outcomes", func() {
|
||||
// This test verifies that connection stats properly reflect the
|
||||
// combination of successes and failures over multiple requests
|
||||
|
||||
// Start with a fresh server
|
||||
var requestCount atomic.Int32
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
count := requestCount.Add(1)
|
||||
// First 2 requests succeed, rest fail
|
||||
if count <= 2 {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"data":{"test":"success"}}`))
|
||||
} else {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
_, _ = w.Write([]byte(`{"errors":[{"message":"Error"}]}`))
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg.Server.HostGraphQL = server.URL
|
||||
cfg.Client.ClientTimeout = 2
|
||||
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
|
||||
|
||||
// Initialize connection pool
|
||||
InitializeConnectionPool(cfg.Client.FastProxyClient)
|
||||
defer ShutdownConnectionPool()
|
||||
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
|
||||
|
||||
// Make 2 successful requests
|
||||
for i := 0; i < 2; i++ {
|
||||
reqCtx := &fasthttp.RequestCtx{}
|
||||
reqCtx.Request.SetRequestURI("/graphql")
|
||||
reqCtx.Request.Header.SetMethod("POST")
|
||||
reqCtx.Request.Header.Set("Content-Type", "application/json")
|
||||
reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`))
|
||||
|
||||
ctx := suite.app.AcquireCtx(reqCtx)
|
||||
_ = proxyTheRequest(ctx, cfg.Server.HostGraphQL)
|
||||
suite.app.ReleaseCtx(ctx)
|
||||
}
|
||||
|
||||
// Get stats after successes
|
||||
statsAfterSuccess := poolMgr.GetConnectionStats()
|
||||
totalConnections := statsAfterSuccess["total_connections"].(int64)
|
||||
|
||||
suite.GreaterOrEqual(totalConnections, int64(2),
|
||||
"Should have at least 2 successful connections tracked")
|
||||
|
||||
suite.T().Logf("Total connections after 2 successful requests: %d", totalConnections)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -131,6 +131,7 @@ func parseConfig() {
|
||||
c.Cache.CacheTTL = getDetailsFromEnv("CACHE_TTL", 60)
|
||||
c.Cache.CacheMaxMemorySize = getDetailsFromEnv("CACHE_MAX_MEMORY_SIZE", 100) // Default 100MB
|
||||
c.Cache.CacheMaxEntries = getDetailsFromEnv("CACHE_MAX_ENTRIES", 10000) // Default 10000 entries
|
||||
c.Cache.CacheUseLRU = getDetailsFromEnv("CACHE_USE_LRU", false) // Use LRU eviction algorithm
|
||||
// GraphQL query parsing cache - auto-calculate based on CPU cores if not set
|
||||
c.Cache.GraphQLQueryCacheSize = getDetailsFromEnv("GRAPHQL_QUERY_CACHE_SIZE", runtime.GOMAXPROCS(0)*250)
|
||||
|
||||
@@ -390,9 +391,16 @@ func parseConfig() {
|
||||
// Memory cache configurations
|
||||
cacheConfig.Memory.MaxMemorySize = int64(cfg.Cache.CacheMaxMemorySize) * 1024 * 1024 // Convert MB to bytes
|
||||
cacheConfig.Memory.MaxEntries = int64(cfg.Cache.CacheMaxEntries)
|
||||
cacheConfig.Memory.UseLRU = cfg.Cache.CacheUseLRU
|
||||
|
||||
cacheType := "standard"
|
||||
if cfg.Cache.CacheUseLRU {
|
||||
cacheType = "LRU"
|
||||
}
|
||||
cfg.Logger.Info(&libpack_logging.LogMessage{
|
||||
Message: "Configuring memory cache with limits",
|
||||
Pairs: map[string]interface{}{
|
||||
"type": cacheType,
|
||||
"max_memory_mb": cfg.Cache.CacheMaxMemorySize,
|
||||
"max_entries": cfg.Cache.CacheMaxEntries,
|
||||
},
|
||||
|
||||
@@ -519,7 +519,10 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
|
||||
totalRetryAllowed int64
|
||||
totalRetryDenied int64
|
||||
totalRetryAttempts int64
|
||||
totalCurrentTokens int64
|
||||
totalMaxTokens int64
|
||||
retryBudgetEnabled = false
|
||||
retryTokensPerSec float64 // Use max tokens_per_sec from any instance
|
||||
|
||||
// Circuit breaker stats
|
||||
cbOpenCount int
|
||||
@@ -645,6 +648,17 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
|
||||
if attempts, ok := instance.RetryBudget["total_attempts"].(float64); ok {
|
||||
totalRetryAttempts += int64(attempts)
|
||||
}
|
||||
if currentTokens, ok := instance.RetryBudget["current_tokens"].(float64); ok {
|
||||
totalCurrentTokens += int64(currentTokens)
|
||||
}
|
||||
if maxTokens, ok := instance.RetryBudget["max_tokens"].(float64); ok {
|
||||
totalMaxTokens += int64(maxTokens)
|
||||
}
|
||||
if tokensPerSec, ok := instance.RetryBudget["tokens_per_sec"].(float64); ok {
|
||||
if tokensPerSec > retryTokensPerSec {
|
||||
retryTokensPerSec = tokensPerSec
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate circuit breaker stats
|
||||
@@ -748,6 +762,9 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
|
||||
"denied_retries": totalRetryDenied,
|
||||
"total_attempts": totalRetryAttempts,
|
||||
"denial_rate_pct": retryDenialRate,
|
||||
"current_tokens": totalCurrentTokens,
|
||||
"max_tokens": totalMaxTokens,
|
||||
"tokens_per_sec": retryTokensPerSec,
|
||||
},
|
||||
"circuit_breaker": map[string]interface{}{
|
||||
"enabled": circuitBreakerEnabled,
|
||||
|
||||
@@ -9,7 +9,3 @@ func (ms *MetricsSetup) RegisterDefaultMetrics() {
|
||||
ms.RegisterMetricsCounter(MetricsCacheMiss, nil)
|
||||
ms.RegisterMetricsCounter(MetricsQueriesCached, nil)
|
||||
}
|
||||
|
||||
func (ms *MetricsSetup) RegisterGoMetrics() {
|
||||
// TODO: metrics.WriteProcessMetrics(ms.metrics_set)
|
||||
}
|
||||
|
||||
@@ -454,19 +454,36 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error {
|
||||
return retry.Unrecoverable(fmt.Errorf("fiber context became nil during retry"))
|
||||
}
|
||||
|
||||
// Get connection pool manager for stats tracking
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
|
||||
// Execute the proxy request
|
||||
if err := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient); err != nil {
|
||||
proxyErr := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient)
|
||||
if proxyErr != nil {
|
||||
// Check if this is a connection error
|
||||
if isConnectionError(err) {
|
||||
if isConnectionError(proxyErr) {
|
||||
notifyHealthManager(false)
|
||||
return err // Connection errors are retryable
|
||||
// Track connection failure
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionFailure()
|
||||
}
|
||||
return proxyErr // Connection errors are retryable
|
||||
}
|
||||
|
||||
// Check if this is a timeout error - don't retry timeouts
|
||||
if isTimeoutError(err) {
|
||||
return retry.Unrecoverable(err)
|
||||
if isTimeoutError(proxyErr) {
|
||||
return retry.Unrecoverable(proxyErr)
|
||||
}
|
||||
return err
|
||||
|
||||
// Check if this is a retryable HTTP error (e.g., 503)
|
||||
// These indicate the server responded but with an error status
|
||||
if strings.Contains(proxyErr.Error(), "non-200 response") {
|
||||
// Track as a failure for retryable HTTP errors
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionFailure()
|
||||
}
|
||||
}
|
||||
return proxyErr
|
||||
}
|
||||
|
||||
// Safety check before accessing response
|
||||
@@ -481,10 +498,18 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error {
|
||||
if err == nil {
|
||||
// Success case
|
||||
notifyHealthManager(true)
|
||||
// Track successful connection
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionSuccess()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if shouldRetry {
|
||||
// Track connection failure for retryable errors (5xx, etc)
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionFailure()
|
||||
}
|
||||
return err // Retryable error
|
||||
}
|
||||
|
||||
@@ -541,31 +566,51 @@ func performProxyRequestWithEnhancedRetries(c *fiber.Ctx, proxyURL string, backe
|
||||
retry.LastErrorOnly(true),
|
||||
retry.RetryIf(func(err error) bool {
|
||||
// Don't retry if context is cancelled or context is nil
|
||||
defer func() {
|
||||
// Recover from any panic when accessing context
|
||||
if r := recover(); r != nil {
|
||||
// If we panic, don't retry
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if c == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Try to safely access the context
|
||||
ctx := c.Context()
|
||||
if ctx == nil {
|
||||
// Safely check if context is done/cancelled
|
||||
// Note: fasthttp.RequestCtx.Done() can panic if not properly initialized
|
||||
// If we panic, don't retry (maintains backward compatibility with test behavior)
|
||||
shouldRetry := true
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// If we panic accessing context, don't retry
|
||||
// This typically happens in test scenarios with mock contexts
|
||||
shouldRetry = false
|
||||
}
|
||||
}()
|
||||
ctx := c.Context()
|
||||
if ctx == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
shouldRetry = false
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
if !shouldRetry {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if context is done/cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
// Check retry budget before allowing retry
|
||||
if rb := GetRetryBudget(); rb != nil {
|
||||
if !rb.AllowRetry() {
|
||||
cfg.Logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "Retry denied by budget",
|
||||
Pairs: map[string]interface{}{
|
||||
"path": c.Path(),
|
||||
"error": err.Error(),
|
||||
},
|
||||
})
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ type config struct {
|
||||
CacheRedisEnable bool
|
||||
CacheMaxMemorySize int
|
||||
CacheMaxEntries int
|
||||
CacheUseLRU bool // Use LRU eviction algorithm instead of random eviction
|
||||
GraphQLQueryCacheSize int // Max number of parsed GraphQL queries to cache
|
||||
PerUserCacheDisabled bool // Disable per-user cache isolation (SECURITY RISK - not recommended)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user