mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-14 02:32:10 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 09c3e4cd95 | |||
| d07ee4090c | |||
| b1045b8bc2 |
+59
-20
@@ -1277,15 +1277,34 @@
|
||||
stateEl.classList.remove('loading');
|
||||
|
||||
let badgeClass = 'badge-info';
|
||||
if (data.state === 'closed') badgeClass = 'badge-success';
|
||||
else if (data.state === 'open') badgeClass = 'badge-danger';
|
||||
else if (data.state === 'half-open') badgeClass = 'badge-warning';
|
||||
let stateText = data.state || 'unknown';
|
||||
|
||||
stateEl.innerHTML = `<span class="badge ${badgeClass}">${data.state || 'Unknown'}</span>`;
|
||||
// For cluster mode, determine state from instance counts
|
||||
if (data.instances_open !== undefined) {
|
||||
// Cluster mode data
|
||||
if (data.instances_open > 0) {
|
||||
stateText = `${data.instances_open} open`;
|
||||
badgeClass = 'badge-danger';
|
||||
} else if (data.instances_halfopen > 0) {
|
||||
stateText = `${data.instances_halfopen} half-open`;
|
||||
badgeClass = 'badge-warning';
|
||||
} else if (data.instances_closed > 0) {
|
||||
stateText = `${data.instances_closed} closed`;
|
||||
badgeClass = 'badge-success';
|
||||
}
|
||||
} else {
|
||||
// Single instance mode
|
||||
if (data.state === 'closed') badgeClass = 'badge-success';
|
||||
else if (data.state === 'open') badgeClass = 'badge-danger';
|
||||
else if (data.state === 'half-open') badgeClass = 'badge-warning';
|
||||
}
|
||||
|
||||
stateEl.innerHTML = `<span class="badge ${badgeClass}">${stateText}</span>`;
|
||||
|
||||
document.getElementById('cb-enabled').textContent = data.enabled ? 'Yes' : 'No';
|
||||
|
||||
if (data.counts) {
|
||||
// Single instance mode with detailed counts
|
||||
document.getElementById('cb-total-requests').textContent =
|
||||
(data.counts.requests || 0).toLocaleString();
|
||||
document.getElementById('cb-total-successes').textContent =
|
||||
@@ -1296,6 +1315,14 @@
|
||||
(data.counts.consecutive_successes || 0).toLocaleString();
|
||||
document.getElementById('cb-consecutive-failures').textContent =
|
||||
(data.counts.consecutive_failures || 0).toLocaleString();
|
||||
} else if (data.instances_open !== undefined) {
|
||||
// Cluster mode - show instance distribution instead
|
||||
const total = (data.instances_open || 0) + (data.instances_closed || 0) + (data.instances_halfopen || 0);
|
||||
document.getElementById('cb-total-requests').textContent = total + ' instances';
|
||||
document.getElementById('cb-total-successes').textContent = (data.instances_closed || 0).toLocaleString();
|
||||
document.getElementById('cb-total-failures').textContent = (data.instances_open || 0).toLocaleString();
|
||||
document.getElementById('cb-consecutive-successes').textContent = '--';
|
||||
document.getElementById('cb-consecutive-failures').textContent = '--';
|
||||
}
|
||||
|
||||
if (data.config) {
|
||||
@@ -1307,23 +1334,31 @@
|
||||
function updateCoalescing(data) {
|
||||
document.getElementById('coalescing-rate').textContent =
|
||||
(data.backend_savings_pct || 0).toFixed(1) + '%';
|
||||
document.getElementById('coalescing-total').textContent =
|
||||
(data.total_requests || 0).toLocaleString();
|
||||
document.getElementById('coalescing-primary').textContent =
|
||||
(data.primary_requests || 0).toLocaleString();
|
||||
document.getElementById('coalescing-coalesced').textContent =
|
||||
(data.coalesced_requests || 0).toLocaleString();
|
||||
|
||||
// Handle both single instance (total_requests) and cluster mode (total_coalesced + total_primary)
|
||||
const totalRequests = data.total_requests ||
|
||||
((data.total_coalesced_requests || 0) + (data.total_primary_requests || 0));
|
||||
document.getElementById('coalescing-total').textContent = totalRequests.toLocaleString();
|
||||
|
||||
// Handle both single instance and cluster mode field names
|
||||
const primaryRequests = data.primary_requests || data.total_primary_requests || 0;
|
||||
document.getElementById('coalescing-primary').textContent = primaryRequests.toLocaleString();
|
||||
|
||||
const coalescedRequests = data.coalesced_requests || data.total_coalesced_requests || 0;
|
||||
document.getElementById('coalescing-coalesced').textContent = coalescedRequests.toLocaleString();
|
||||
|
||||
document.getElementById('coalescing-savings').textContent =
|
||||
(data.backend_savings_pct || 0).toFixed(1) + '%';
|
||||
}
|
||||
|
||||
function updateRetryBudget(data) {
|
||||
document.getElementById('retry-tokens').textContent =
|
||||
data.current_tokens || '--';
|
||||
document.getElementById('retry-current-tokens').textContent =
|
||||
data.current_tokens || '--';
|
||||
document.getElementById('retry-max-tokens').textContent =
|
||||
data.max_tokens || '--';
|
||||
// Use explicit undefined check to handle 0 values correctly
|
||||
const currentTokens = data.current_tokens !== undefined ? data.current_tokens : '--';
|
||||
const maxTokens = data.max_tokens !== undefined ? data.max_tokens : '--';
|
||||
|
||||
document.getElementById('retry-tokens').textContent = currentTokens;
|
||||
document.getElementById('retry-current-tokens').textContent = currentTokens;
|
||||
document.getElementById('retry-max-tokens').textContent = maxTokens;
|
||||
document.getElementById('retry-total').textContent =
|
||||
(data.total_attempts || 0).toLocaleString();
|
||||
document.getElementById('retry-denied').textContent =
|
||||
@@ -1333,13 +1368,17 @@
|
||||
}
|
||||
|
||||
function updateWebSocket(data) {
|
||||
document.getElementById('ws-connections').textContent =
|
||||
data.active_connections || 0;
|
||||
// Handle both single instance (active_connections) and cluster mode (total_connections)
|
||||
const connections = data.active_connections !== undefined ? data.active_connections :
|
||||
(data.total_connections !== undefined ? data.total_connections : 0);
|
||||
document.getElementById('ws-connections').textContent = connections;
|
||||
}
|
||||
|
||||
function updateConnections(data) {
|
||||
document.getElementById('pool-connections').textContent =
|
||||
data.active_connections || 0;
|
||||
// Handle both single instance (active_connections) and cluster mode (total_active)
|
||||
const connections = data.active_connections !== undefined ? data.active_connections :
|
||||
(data.total_active !== undefined ? data.total_active : 0);
|
||||
document.getElementById('pool-connections').textContent = connections;
|
||||
}
|
||||
|
||||
async function resetCoalescing() {
|
||||
|
||||
@@ -20,7 +20,7 @@ require (
|
||||
github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.89
|
||||
github.com/redis/go-redis/v9 v9.17.1
|
||||
github.com/redis/go-redis/v9 v9.17.2
|
||||
github.com/sony/gobreaker v1.0.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/valyala/fasthttp v1.68.0
|
||||
@@ -47,7 +47,7 @@ require (
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/klauspost/compress v1.18.1 // indirect
|
||||
github.com/klauspost/compress v1.18.2 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.19 // indirect
|
||||
|
||||
@@ -64,8 +64,8 @@ github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
|
||||
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
|
||||
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
|
||||
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
|
||||
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
@@ -84,8 +84,8 @@ github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byF
|
||||
github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.17.1 h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
|
||||
github.com/redis/go-redis/v9 v9.17.1/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=
|
||||
github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 h1:McifyVxygw1d67y6vxUqls2D46J8W9nrki9c8c0eVvE=
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -783,3 +784,251 @@ func (suite *Tests) TestRequestCoalescingIntegration() {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestRetryBudgetIntegration tests that retry budget correctly limits retry attempts
|
||||
func (suite *Tests) TestRetryBudgetIntegration() {
|
||||
// Initialize a retry budget with limited tokens for testing
|
||||
budgetCtx := context.Background()
|
||||
testBudget := NewRetryBudgetWithContext(budgetCtx, RetryBudgetConfig{
|
||||
MaxTokens: 3, // Only allow 3 retries total
|
||||
TokensPerSecond: 0, // Don't refill during test
|
||||
Enabled: true,
|
||||
}, cfg.Logger)
|
||||
|
||||
// Replace global retry budget
|
||||
originalBudget := retryBudget
|
||||
retryBudget = testBudget
|
||||
defer func() {
|
||||
testBudget.Shutdown()
|
||||
retryBudget = originalBudget
|
||||
}()
|
||||
|
||||
suite.Run("retry_budget_limits_retries", func() {
|
||||
testBudget.Reset()
|
||||
|
||||
// Verify retry budget is set and works correctly
|
||||
rb := GetRetryBudget()
|
||||
suite.NotNil(rb, "Retry budget should be set")
|
||||
suite.True(rb.enabled, "Retry budget should be enabled")
|
||||
suite.T().Logf("Retry budget: enabled=%v, tokens=%d", rb.enabled, rb.currentTokens.Load())
|
||||
|
||||
// Test that AllowRetry consumes tokens correctly
|
||||
initialTokens := rb.currentTokens.Load()
|
||||
suite.Equal(int64(3), initialTokens, "Should start with 3 tokens")
|
||||
|
||||
// First 3 retries should be allowed
|
||||
suite.True(rb.AllowRetry(), "First retry should be allowed")
|
||||
suite.True(rb.AllowRetry(), "Second retry should be allowed")
|
||||
suite.True(rb.AllowRetry(), "Third retry should be allowed")
|
||||
|
||||
// Fourth retry should be denied (tokens exhausted)
|
||||
suite.False(rb.AllowRetry(), "Fourth retry should be denied - budget exhausted")
|
||||
|
||||
// Verify stats
|
||||
stats := rb.GetStats()
|
||||
suite.Equal(int64(4), stats["total_attempts"].(int64), "Should have 4 total attempts")
|
||||
suite.Equal(int64(3), stats["allowed_retries"].(int64), "Should have 3 allowed retries")
|
||||
suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry")
|
||||
|
||||
suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d",
|
||||
stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"])
|
||||
})
|
||||
|
||||
suite.Run("retry_budget_exhaustion", func() {
|
||||
// Create a new budget with only 1 token
|
||||
testBudget.Shutdown()
|
||||
budgetCtx2 := context.Background()
|
||||
testBudget2 := NewRetryBudgetWithContext(budgetCtx2, RetryBudgetConfig{
|
||||
MaxTokens: 1, // Only allow 1 retry
|
||||
TokensPerSecond: 0, // Don't refill
|
||||
Enabled: true,
|
||||
}, cfg.Logger)
|
||||
retryBudget = testBudget2
|
||||
defer func() {
|
||||
testBudget2.Shutdown()
|
||||
}()
|
||||
|
||||
// Test budget exhaustion with 1 token
|
||||
rb := GetRetryBudget()
|
||||
suite.NotNil(rb, "Retry budget should be set")
|
||||
suite.Equal(int64(1), rb.currentTokens.Load(), "Should start with 1 token")
|
||||
|
||||
// First retry should be allowed
|
||||
suite.True(rb.AllowRetry(), "First retry should be allowed")
|
||||
|
||||
// Second retry should be denied (only 1 token available)
|
||||
suite.False(rb.AllowRetry(), "Second retry should be denied - budget exhausted")
|
||||
|
||||
// Verify stats
|
||||
stats := rb.GetStats()
|
||||
suite.Equal(int64(2), stats["total_attempts"].(int64), "Should have 2 total attempts")
|
||||
suite.Equal(int64(1), stats["allowed_retries"].(int64), "Should have 1 allowed retry")
|
||||
suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry")
|
||||
|
||||
suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d",
|
||||
stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"])
|
||||
})
|
||||
}
|
||||
|
||||
// TestConnectionPoolStatsIntegration tests that connection pool stats are tracked
|
||||
func (suite *Tests) TestConnectionPoolStatsIntegration() {
|
||||
// Save original config
|
||||
originalClient := cfg.Client.FastProxyClient
|
||||
originalHostGraphQL := cfg.Server.HostGraphQL
|
||||
originalCoalescing := cfg.RequestCoalescing.Enable
|
||||
|
||||
// Restore after test
|
||||
defer func() {
|
||||
cfg.Client.FastProxyClient = originalClient
|
||||
cfg.Server.HostGraphQL = originalHostGraphQL
|
||||
cfg.RequestCoalescing.Enable = originalCoalescing
|
||||
}()
|
||||
|
||||
// Disable request coalescing for accurate tracking
|
||||
cfg.RequestCoalescing.Enable = false
|
||||
|
||||
suite.Run("connection_success_tracked", func() {
|
||||
// Create test server that succeeds
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"data":{"test":"success"}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg.Server.HostGraphQL = server.URL
|
||||
cfg.Client.ClientTimeout = 5
|
||||
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
|
||||
|
||||
// Initialize connection pool
|
||||
InitializeConnectionPool(cfg.Client.FastProxyClient)
|
||||
defer ShutdownConnectionPool()
|
||||
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
|
||||
|
||||
// Get stats before
|
||||
statsBefore := poolMgr.GetConnectionStats()
|
||||
successBefore := statsBefore["total_connections"].(int64)
|
||||
|
||||
// Make a successful request
|
||||
reqCtx := &fasthttp.RequestCtx{}
|
||||
reqCtx.Request.SetRequestURI("/graphql")
|
||||
reqCtx.Request.Header.SetMethod("POST")
|
||||
reqCtx.Request.Header.Set("Content-Type", "application/json")
|
||||
reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`))
|
||||
|
||||
ctx := suite.app.AcquireCtx(reqCtx)
|
||||
err := proxyTheRequest(ctx, cfg.Server.HostGraphQL)
|
||||
suite.app.ReleaseCtx(ctx)
|
||||
|
||||
suite.Nil(err, "Request should succeed")
|
||||
|
||||
// Get stats after
|
||||
statsAfter := poolMgr.GetConnectionStats()
|
||||
successAfter := statsAfter["total_connections"].(int64)
|
||||
|
||||
suite.Greater(successAfter, successBefore,
|
||||
"Total connections should increase after successful request")
|
||||
})
|
||||
|
||||
suite.Run("connection_failure_tracked_on_5xx", func() {
|
||||
// Create test server that returns 503
|
||||
// Note: 503 triggers retry which records failures
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
_, _ = w.Write([]byte(`{"errors":[{"message":"Service unavailable"}]}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg.Server.HostGraphQL = server.URL
|
||||
cfg.Client.ClientTimeout = 2
|
||||
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
|
||||
|
||||
// Initialize connection pool
|
||||
InitializeConnectionPool(cfg.Client.FastProxyClient)
|
||||
defer ShutdownConnectionPool()
|
||||
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
|
||||
|
||||
// Get stats before
|
||||
statsBefore := poolMgr.GetConnectionStats()
|
||||
failuresBefore := statsBefore["connection_failures"].(int64)
|
||||
|
||||
// Make a failing request (503 is retryable, so it will retry and track failures)
|
||||
reqCtx := &fasthttp.RequestCtx{}
|
||||
reqCtx.Request.SetRequestURI("/graphql")
|
||||
reqCtx.Request.Header.SetMethod("POST")
|
||||
reqCtx.Request.Header.Set("Content-Type", "application/json")
|
||||
reqCtx.Request.SetBody([]byte(`{"query": "query { fail }"}`))
|
||||
|
||||
ctx := suite.app.AcquireCtx(reqCtx)
|
||||
_ = proxyTheRequest(ctx, cfg.Server.HostGraphQL)
|
||||
suite.app.ReleaseCtx(ctx)
|
||||
|
||||
// Get stats after - should have failures from retry attempts
|
||||
statsAfter := poolMgr.GetConnectionStats()
|
||||
failuresAfter := statsAfter["connection_failures"].(int64)
|
||||
|
||||
suite.Greater(failuresAfter, failuresBefore,
|
||||
"Connection failures should increase after 5xx responses that trigger retries")
|
||||
|
||||
suite.T().Logf("Connection failures: before=%d, after=%d",
|
||||
failuresBefore, failuresAfter)
|
||||
})
|
||||
|
||||
suite.Run("stats_reflect_request_outcomes", func() {
|
||||
// This test verifies that connection stats properly reflect the
|
||||
// combination of successes and failures over multiple requests
|
||||
|
||||
// Start with a fresh server
|
||||
var requestCount atomic.Int32
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
count := requestCount.Add(1)
|
||||
// First 2 requests succeed, rest fail
|
||||
if count <= 2 {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"data":{"test":"success"}}`))
|
||||
} else {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
_, _ = w.Write([]byte(`{"errors":[{"message":"Error"}]}`))
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg.Server.HostGraphQL = server.URL
|
||||
cfg.Client.ClientTimeout = 2
|
||||
cfg.Client.FastProxyClient = createFasthttpClient(cfg)
|
||||
|
||||
// Initialize connection pool
|
||||
InitializeConnectionPool(cfg.Client.FastProxyClient)
|
||||
defer ShutdownConnectionPool()
|
||||
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
suite.NotNil(poolMgr, "Connection pool manager should be initialized")
|
||||
|
||||
// Make 2 successful requests
|
||||
for i := 0; i < 2; i++ {
|
||||
reqCtx := &fasthttp.RequestCtx{}
|
||||
reqCtx.Request.SetRequestURI("/graphql")
|
||||
reqCtx.Request.Header.SetMethod("POST")
|
||||
reqCtx.Request.Header.Set("Content-Type", "application/json")
|
||||
reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`))
|
||||
|
||||
ctx := suite.app.AcquireCtx(reqCtx)
|
||||
_ = proxyTheRequest(ctx, cfg.Server.HostGraphQL)
|
||||
suite.app.ReleaseCtx(ctx)
|
||||
}
|
||||
|
||||
// Get stats after successes
|
||||
statsAfterSuccess := poolMgr.GetConnectionStats()
|
||||
totalConnections := statsAfterSuccess["total_connections"].(int64)
|
||||
|
||||
suite.GreaterOrEqual(totalConnections, int64(2),
|
||||
"Should have at least 2 successful connections tracked")
|
||||
|
||||
suite.T().Logf("Total connections after 2 successful requests: %d", totalConnections)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -519,7 +519,10 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
|
||||
totalRetryAllowed int64
|
||||
totalRetryDenied int64
|
||||
totalRetryAttempts int64
|
||||
totalCurrentTokens int64
|
||||
totalMaxTokens int64
|
||||
retryBudgetEnabled = false
|
||||
retryTokensPerSec float64 // Use max tokens_per_sec from any instance
|
||||
|
||||
// Circuit breaker stats
|
||||
cbOpenCount int
|
||||
@@ -645,6 +648,17 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
|
||||
if attempts, ok := instance.RetryBudget["total_attempts"].(float64); ok {
|
||||
totalRetryAttempts += int64(attempts)
|
||||
}
|
||||
if currentTokens, ok := instance.RetryBudget["current_tokens"].(float64); ok {
|
||||
totalCurrentTokens += int64(currentTokens)
|
||||
}
|
||||
if maxTokens, ok := instance.RetryBudget["max_tokens"].(float64); ok {
|
||||
totalMaxTokens += int64(maxTokens)
|
||||
}
|
||||
if tokensPerSec, ok := instance.RetryBudget["tokens_per_sec"].(float64); ok {
|
||||
if tokensPerSec > retryTokensPerSec {
|
||||
retryTokensPerSec = tokensPerSec
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate circuit breaker stats
|
||||
@@ -748,6 +762,9 @@ func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[str
|
||||
"denied_retries": totalRetryDenied,
|
||||
"total_attempts": totalRetryAttempts,
|
||||
"denial_rate_pct": retryDenialRate,
|
||||
"current_tokens": totalCurrentTokens,
|
||||
"max_tokens": totalMaxTokens,
|
||||
"tokens_per_sec": retryTokensPerSec,
|
||||
},
|
||||
"circuit_breaker": map[string]interface{}{
|
||||
"enabled": circuitBreakerEnabled,
|
||||
|
||||
@@ -454,19 +454,36 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error {
|
||||
return retry.Unrecoverable(fmt.Errorf("fiber context became nil during retry"))
|
||||
}
|
||||
|
||||
// Get connection pool manager for stats tracking
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
|
||||
// Execute the proxy request
|
||||
if err := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient); err != nil {
|
||||
proxyErr := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient)
|
||||
if proxyErr != nil {
|
||||
// Check if this is a connection error
|
||||
if isConnectionError(err) {
|
||||
if isConnectionError(proxyErr) {
|
||||
notifyHealthManager(false)
|
||||
return err // Connection errors are retryable
|
||||
// Track connection failure
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionFailure()
|
||||
}
|
||||
return proxyErr // Connection errors are retryable
|
||||
}
|
||||
|
||||
// Check if this is a timeout error - don't retry timeouts
|
||||
if isTimeoutError(err) {
|
||||
return retry.Unrecoverable(err)
|
||||
if isTimeoutError(proxyErr) {
|
||||
return retry.Unrecoverable(proxyErr)
|
||||
}
|
||||
return err
|
||||
|
||||
// Check if this is a retryable HTTP error (e.g., 503)
|
||||
// These indicate the server responded but with an error status
|
||||
if strings.Contains(proxyErr.Error(), "non-200 response") {
|
||||
// Track as a failure for retryable HTTP errors
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionFailure()
|
||||
}
|
||||
}
|
||||
return proxyErr
|
||||
}
|
||||
|
||||
// Safety check before accessing response
|
||||
@@ -481,10 +498,18 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error {
|
||||
if err == nil {
|
||||
// Success case
|
||||
notifyHealthManager(true)
|
||||
// Track successful connection
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionSuccess()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if shouldRetry {
|
||||
// Track connection failure for retryable errors (5xx, etc)
|
||||
if poolMgr != nil {
|
||||
poolMgr.RecordConnectionFailure()
|
||||
}
|
||||
return err // Retryable error
|
||||
}
|
||||
|
||||
@@ -541,31 +566,51 @@ func performProxyRequestWithEnhancedRetries(c *fiber.Ctx, proxyURL string, backe
|
||||
retry.LastErrorOnly(true),
|
||||
retry.RetryIf(func(err error) bool {
|
||||
// Don't retry if context is cancelled or context is nil
|
||||
defer func() {
|
||||
// Recover from any panic when accessing context
|
||||
if r := recover(); r != nil {
|
||||
// If we panic, don't retry
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if c == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Try to safely access the context
|
||||
ctx := c.Context()
|
||||
if ctx == nil {
|
||||
// Safely check if context is done/cancelled
|
||||
// Note: fasthttp.RequestCtx.Done() can panic if not properly initialized
|
||||
// If we panic, don't retry (maintains backward compatibility with test behavior)
|
||||
shouldRetry := true
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// If we panic accessing context, don't retry
|
||||
// This typically happens in test scenarios with mock contexts
|
||||
shouldRetry = false
|
||||
}
|
||||
}()
|
||||
ctx := c.Context()
|
||||
if ctx == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
shouldRetry = false
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
if !shouldRetry {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if context is done/cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
// Check retry budget before allowing retry
|
||||
if rb := GetRetryBudget(); rb != nil {
|
||||
if !rb.AllowRetry() {
|
||||
cfg.Logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "Retry denied by budget",
|
||||
Pairs: map[string]interface{}{
|
||||
"path": c.Path(),
|
||||
"error": err.Error(),
|
||||
},
|
||||
})
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user