Compare commits

...

2 Commits

Author SHA1 Message Date
lukaszraczylo b1045b8bc2 Retry budget. 2025-11-29 15:36:17 +00:00
lukaszraczylo cc35031db9 Fixes issue with the dashboard metrics. 2025-11-29 14:51:29 +00:00
4 changed files with 385 additions and 25 deletions
+57
View File
@@ -1075,6 +1075,63 @@
// Update all statistics
function updateAllStats(data) {
// Check if this is cluster mode data
if (data.cluster_mode && data.stats) {
// Cluster mode: data is structured differently
// Stats contains aggregated data with nested objects
const stats = data.stats;
// Update cluster status section
document.getElementById('cluster-status-section').style.display = 'block';
document.getElementById('cluster-total-instances').textContent = data.total_instances || 0;
document.getElementById('cluster-healthy-instances').textContent = data.healthy_instances || 0;
document.getElementById('overview-title').textContent = 'Cluster Overview';
// Update cluster info in toggle
const totalInstances = data.total_instances || 0;
document.getElementById('cluster-info').textContent =
`(${totalInstances} instance${totalInstances !== 1 ? 's' : ''} available)`;
// Build stats object with uptime from cluster_uptime
const statsWithUptime = {
...stats,
uptime_seconds: stats.cluster_uptime || 0,
uptime_human: formatUptime(stats.cluster_uptime || 0)
};
updateStats(statsWithUptime);
// Extract nested objects from stats for cluster mode
if (stats.circuit_breaker) updateCircuitBreaker(stats.circuit_breaker);
if (stats.coalescing) updateCoalescing(stats.coalescing);
if (stats.retry_budget) updateRetryBudget(stats.retry_budget);
if (stats.websocket) updateWebSocket(stats.websocket);
if (stats.connections) updateConnections(stats.connections);
// Handle memory for cluster mode (Redis doesn't track memory per instance)
if (stats.memory) {
const totalMemMB = stats.memory.total_usage_mb;
if (totalMemMB < 0) {
// All instances are using Redis cache
document.getElementById('cache-memory').textContent = 'N/A';
document.getElementById('cache-memory').title = 'Memory tracking not available for Redis cache';
document.getElementById('cache-memory-pct').textContent = 'Redis cache';
document.getElementById('memory-progress').style.width = '0%';
} else {
document.getElementById('cache-memory').textContent = totalMemMB.toFixed(2) + ' MB';
document.getElementById('cache-memory-pct').textContent = 'Cluster total';
}
}
// Update instance list if available
if (data.instances && data.instances.length > 0) {
document.getElementById('instance-details-section').style.display = 'block';
updateInstanceList(data.instances, null);
}
return;
}
// Non-cluster mode: original behavior
if (data.stats) updateStats(data.stats);
if (data.health) updateHealth(data.health);
if (data.circuit_breaker) updateCircuitBreaker(data.circuit_breaker);
+249
View File
@@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
@@ -783,3 +784,251 @@ func (suite *Tests) TestRequestCoalescingIntegration() {
}
})
}
// TestRetryBudgetIntegration tests that retry budget correctly limits retry attempts
func (suite *Tests) TestRetryBudgetIntegration() {
// Initialize a retry budget with limited tokens for testing
budgetCtx := context.Background()
testBudget := NewRetryBudgetWithContext(budgetCtx, RetryBudgetConfig{
MaxTokens: 3, // Only allow 3 retries total
TokensPerSecond: 0, // Don't refill during test
Enabled: true,
}, cfg.Logger)
// Replace global retry budget
originalBudget := retryBudget
retryBudget = testBudget
defer func() {
testBudget.Shutdown()
retryBudget = originalBudget
}()
suite.Run("retry_budget_limits_retries", func() {
testBudget.Reset()
// Verify retry budget is set and works correctly
rb := GetRetryBudget()
suite.NotNil(rb, "Retry budget should be set")
suite.True(rb.enabled, "Retry budget should be enabled")
suite.T().Logf("Retry budget: enabled=%v, tokens=%d", rb.enabled, rb.currentTokens.Load())
// Test that AllowRetry consumes tokens correctly
initialTokens := rb.currentTokens.Load()
suite.Equal(int64(3), initialTokens, "Should start with 3 tokens")
// First 3 retries should be allowed
suite.True(rb.AllowRetry(), "First retry should be allowed")
suite.True(rb.AllowRetry(), "Second retry should be allowed")
suite.True(rb.AllowRetry(), "Third retry should be allowed")
// Fourth retry should be denied (tokens exhausted)
suite.False(rb.AllowRetry(), "Fourth retry should be denied - budget exhausted")
// Verify stats
stats := rb.GetStats()
suite.Equal(int64(4), stats["total_attempts"].(int64), "Should have 4 total attempts")
suite.Equal(int64(3), stats["allowed_retries"].(int64), "Should have 3 allowed retries")
suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry")
suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d",
stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"])
})
suite.Run("retry_budget_exhaustion", func() {
// Create a new budget with only 1 token
testBudget.Shutdown()
budgetCtx2 := context.Background()
testBudget2 := NewRetryBudgetWithContext(budgetCtx2, RetryBudgetConfig{
MaxTokens: 1, // Only allow 1 retry
TokensPerSecond: 0, // Don't refill
Enabled: true,
}, cfg.Logger)
retryBudget = testBudget2
defer func() {
testBudget2.Shutdown()
}()
// Test budget exhaustion with 1 token
rb := GetRetryBudget()
suite.NotNil(rb, "Retry budget should be set")
suite.Equal(int64(1), rb.currentTokens.Load(), "Should start with 1 token")
// First retry should be allowed
suite.True(rb.AllowRetry(), "First retry should be allowed")
// Second retry should be denied (only 1 token available)
suite.False(rb.AllowRetry(), "Second retry should be denied - budget exhausted")
// Verify stats
stats := rb.GetStats()
suite.Equal(int64(2), stats["total_attempts"].(int64), "Should have 2 total attempts")
suite.Equal(int64(1), stats["allowed_retries"].(int64), "Should have 1 allowed retry")
suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry")
suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d",
stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"])
})
}
// TestConnectionPoolStatsIntegration tests that connection pool stats are tracked
func (suite *Tests) TestConnectionPoolStatsIntegration() {
// Save original config
originalClient := cfg.Client.FastProxyClient
originalHostGraphQL := cfg.Server.HostGraphQL
originalCoalescing := cfg.RequestCoalescing.Enable
// Restore after test
defer func() {
cfg.Client.FastProxyClient = originalClient
cfg.Server.HostGraphQL = originalHostGraphQL
cfg.RequestCoalescing.Enable = originalCoalescing
}()
// Disable request coalescing for accurate tracking
cfg.RequestCoalescing.Enable = false
suite.Run("connection_success_tracked", func() {
// Create test server that succeeds
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"data":{"test":"success"}}`))
}))
defer server.Close()
cfg.Server.HostGraphQL = server.URL
cfg.Client.ClientTimeout = 5
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
// Initialize connection pool
InitializeConnectionPool(cfg.Client.FastProxyClient)
defer ShutdownConnectionPool()
poolMgr := GetConnectionPoolManager()
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
// Get stats before
statsBefore := poolMgr.GetConnectionStats()
successBefore := statsBefore["total_connections"].(int64)
// Make a successful request
reqCtx := &fasthttp.RequestCtx{}
reqCtx.Request.SetRequestURI("/graphql")
reqCtx.Request.Header.SetMethod("POST")
reqCtx.Request.Header.Set("Content-Type", "application/json")
reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`))
ctx := suite.app.AcquireCtx(reqCtx)
err := proxyTheRequest(ctx, cfg.Server.HostGraphQL)
suite.app.ReleaseCtx(ctx)
suite.Nil(err, "Request should succeed")
// Get stats after
statsAfter := poolMgr.GetConnectionStats()
successAfter := statsAfter["total_connections"].(int64)
suite.Greater(successAfter, successBefore,
"Total connections should increase after successful request")
})
suite.Run("connection_failure_tracked_on_5xx", func() {
// Create test server that returns 503
// Note: 503 triggers retry which records failures
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"errors":[{"message":"Service unavailable"}]}`))
}))
defer server.Close()
cfg.Server.HostGraphQL = server.URL
cfg.Client.ClientTimeout = 2
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
// Initialize connection pool
InitializeConnectionPool(cfg.Client.FastProxyClient)
defer ShutdownConnectionPool()
poolMgr := GetConnectionPoolManager()
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
// Get stats before
statsBefore := poolMgr.GetConnectionStats()
failuresBefore := statsBefore["connection_failures"].(int64)
// Make a failing request (503 is retryable, so it will retry and track failures)
reqCtx := &fasthttp.RequestCtx{}
reqCtx.Request.SetRequestURI("/graphql")
reqCtx.Request.Header.SetMethod("POST")
reqCtx.Request.Header.Set("Content-Type", "application/json")
reqCtx.Request.SetBody([]byte(`{"query": "query { fail }"}`))
ctx := suite.app.AcquireCtx(reqCtx)
_ = proxyTheRequest(ctx, cfg.Server.HostGraphQL)
suite.app.ReleaseCtx(ctx)
// Get stats after - should have failures from retry attempts
statsAfter := poolMgr.GetConnectionStats()
failuresAfter := statsAfter["connection_failures"].(int64)
suite.Greater(failuresAfter, failuresBefore,
"Connection failures should increase after 5xx responses that trigger retries")
suite.T().Logf("Connection failures: before=%d, after=%d",
failuresBefore, failuresAfter)
})
suite.Run("stats_reflect_request_outcomes", func() {
// This test verifies that connection stats properly reflect the
// combination of successes and failures over multiple requests
// Start with a fresh server
var requestCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count := requestCount.Add(1)
// First 2 requests succeed, rest fail
if count <= 2 {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"data":{"test":"success"}}`))
} else {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"errors":[{"message":"Error"}]}`))
}
}))
defer server.Close()
cfg.Server.HostGraphQL = server.URL
cfg.Client.ClientTimeout = 2
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
// Initialize connection pool
InitializeConnectionPool(cfg.Client.FastProxyClient)
defer ShutdownConnectionPool()
poolMgr := GetConnectionPoolManager()
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
// Make 2 successful requests
for i := 0; i < 2; i++ {
reqCtx := &fasthttp.RequestCtx{}
reqCtx.Request.SetRequestURI("/graphql")
reqCtx.Request.Header.SetMethod("POST")
reqCtx.Request.Header.Set("Content-Type", "application/json")
reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`))
ctx := suite.app.AcquireCtx(reqCtx)
_ = proxyTheRequest(ctx, cfg.Server.HostGraphQL)
suite.app.ReleaseCtx(ctx)
}
// Get stats after successes
statsAfterSuccess := poolMgr.GetConnectionStats()
totalConnections := statsAfterSuccess["total_connections"].(int64)
suite.GreaterOrEqual(totalConnections, int64(2),
"Should have at least 2 successful connections tracked")
suite.T().Logf("Total connections after 2 successful requests: %d", totalConnections)
})
}
+11 -2
View File
@@ -506,6 +506,7 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
totalCacheMisses int64
totalCachedQueries int64
totalMemoryUsageMB float64
hasValidMemoryStats bool // Track if any instance has valid memory stats
totalCurrentRPS float64
totalAvgRPS float64
totalActiveConnections int64
@@ -598,9 +599,11 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
}
// Aggregate memory usage from full cache details
// Skip -1 values which indicate Redis cache (memory tracking not available)
if len(instance.Cache) > 0 {
if memMB, ok := instance.Cache["memory_usage_mb"].(float64); ok {
if memMB, ok := instance.Cache["memory_usage_mb"].(float64); ok && memMB >= 0 {
totalMemoryUsageMB += memMB
hasValidMemoryStats = true
}
}
@@ -718,7 +721,13 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
"total_cached": totalCachedQueries,
},
"memory": map[string]interface{}{
"total_usage_mb": totalMemoryUsageMB,
"total_usage_mb": func() float64 {
if hasValidMemoryStats {
return totalMemoryUsageMB
}
return -1
}(),
"available": hasValidMemoryStats,
},
"connections": map[string]interface{}{
"total_active": totalActiveConnections,
+68 -23
View File
@@ -454,19 +454,36 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error {
return retry.Unrecoverable(fmt.Errorf("fiber context became nil during retry"))
}
// Get connection pool manager for stats tracking
poolMgr := GetConnectionPoolManager()
// Execute the proxy request
if err := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient); err != nil {
proxyErr := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient)
if proxyErr != nil {
// Check if this is a connection error
if isConnectionError(err) {
if isConnectionError(proxyErr) {
notifyHealthManager(false)
return err // Connection errors are retryable
// Track connection failure
if poolMgr != nil {
poolMgr.RecordConnectionFailure()
}
return proxyErr // Connection errors are retryable
}
// Check if this is a timeout error - don't retry timeouts
if isTimeoutError(err) {
return retry.Unrecoverable(err)
if isTimeoutError(proxyErr) {
return retry.Unrecoverable(proxyErr)
}
return err
// Check if this is a retryable HTTP error (e.g., 503)
// These indicate the server responded but with an error status
if strings.Contains(proxyErr.Error(), "non-200 response") {
// Track as a failure for retryable HTTP errors
if poolMgr != nil {
poolMgr.RecordConnectionFailure()
}
}
return proxyErr
}
// Safety check before accessing response
@@ -481,10 +498,18 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error {
if err == nil {
// Success case
notifyHealthManager(true)
// Track successful connection
if poolMgr != nil {
poolMgr.RecordConnectionSuccess()
}
return nil
}
if shouldRetry {
// Track connection failure for retryable errors (5xx, etc)
if poolMgr != nil {
poolMgr.RecordConnectionFailure()
}
return err // Retryable error
}
@@ -541,31 +566,51 @@ func performProxyRequestWithEnhancedRetries(c *fiber.Ctx, proxyURL string, backe
retry.LastErrorOnly(true),
retry.RetryIf(func(err error) bool {
// Don't retry if context is cancelled or context is nil
defer func() {
// Recover from any panic when accessing context
if r := recover(); r != nil {
// If we panic, don't retry
return
}
}()
if c == nil {
return false
}
// Try to safely access the context
ctx := c.Context()
if ctx == nil {
// Safely check if context is done/cancelled
// Note: fasthttp.RequestCtx.Done() can panic if not properly initialized
// If we panic, don't retry (maintains backward compatibility with test behavior)
shouldRetry := true
func() {
defer func() {
if r := recover(); r != nil {
// If we panic accessing context, don't retry
// This typically happens in test scenarios with mock contexts
shouldRetry = false
}
}()
ctx := c.Context()
if ctx == nil {
return
}
select {
case <-ctx.Done():
shouldRetry = false
default:
}
}()
if !shouldRetry {
return false
}
// Check if context is done/cancelled
select {
case <-ctx.Done():
return false
default:
return true
// Check retry budget before allowing retry
if rb := GetRetryBudget(); rb != nil {
if !rb.AllowRetry() {
cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Retry denied by budget",
Pairs: map[string]interface{}{
"path": c.Path(),
"error": err.Error(),
},
})
return false
}
}
return true
}),
)
}