diff --git a/integration_test.go b/integration_test.go index 08c25fa..dbc3000 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -783,3 +784,251 @@ func (suite *Tests) TestRequestCoalescingIntegration() { } }) } + +// TestRetryBudgetIntegration tests that retry budget correctly limits retry attempts +func (suite *Tests) TestRetryBudgetIntegration() { + // Initialize a retry budget with limited tokens for testing + budgetCtx := context.Background() + testBudget := NewRetryBudgetWithContext(budgetCtx, RetryBudgetConfig{ + MaxTokens: 3, // Only allow 3 retries total + TokensPerSecond: 0, // Don't refill during test + Enabled: true, + }, cfg.Logger) + + // Replace global retry budget + originalBudget := retryBudget + retryBudget = testBudget + defer func() { + testBudget.Shutdown() + retryBudget = originalBudget + }() + + suite.Run("retry_budget_limits_retries", func() { + testBudget.Reset() + + // Verify retry budget is set and works correctly + rb := GetRetryBudget() + suite.NotNil(rb, "Retry budget should be set") + suite.True(rb.enabled, "Retry budget should be enabled") + suite.T().Logf("Retry budget: enabled=%v, tokens=%d", rb.enabled, rb.currentTokens.Load()) + + // Test that AllowRetry consumes tokens correctly + initialTokens := rb.currentTokens.Load() + suite.Equal(int64(3), initialTokens, "Should start with 3 tokens") + + // First 3 retries should be allowed + suite.True(rb.AllowRetry(), "First retry should be allowed") + suite.True(rb.AllowRetry(), "Second retry should be allowed") + suite.True(rb.AllowRetry(), "Third retry should be allowed") + + // Fourth retry should be denied (tokens exhausted) + suite.False(rb.AllowRetry(), "Fourth retry should be denied - budget exhausted") + + // Verify stats + stats := rb.GetStats() + suite.Equal(int64(4), stats["total_attempts"].(int64), "Should have 4 total attempts") + suite.Equal(int64(3), stats["allowed_retries"].(int64), "Should have 3 allowed retries") + suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry") + + suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d", + stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"]) + }) + + suite.Run("retry_budget_exhaustion", func() { + // Create a new budget with only 1 token + testBudget.Shutdown() + budgetCtx2 := context.Background() + testBudget2 := NewRetryBudgetWithContext(budgetCtx2, RetryBudgetConfig{ + MaxTokens: 1, // Only allow 1 retry + TokensPerSecond: 0, // Don't refill + Enabled: true, + }, cfg.Logger) + retryBudget = testBudget2 + defer func() { + testBudget2.Shutdown() + }() + + // Test budget exhaustion with 1 token + rb := GetRetryBudget() + suite.NotNil(rb, "Retry budget should be set") + suite.Equal(int64(1), rb.currentTokens.Load(), "Should start with 1 token") + + // First retry should be allowed + suite.True(rb.AllowRetry(), "First retry should be allowed") + + // Second retry should be denied (only 1 token available) + suite.False(rb.AllowRetry(), "Second retry should be denied - budget exhausted") + + // Verify stats + stats := rb.GetStats() + suite.Equal(int64(2), stats["total_attempts"].(int64), "Should have 2 total attempts") + suite.Equal(int64(1), stats["allowed_retries"].(int64), "Should have 1 allowed retry") + suite.Equal(int64(1), stats["denied_retries"].(int64), "Should have 1 denied retry") + + suite.T().Logf("Retry budget stats: total=%d, allowed=%d, denied=%d", + stats["total_attempts"], stats["allowed_retries"], stats["denied_retries"]) + }) +} + +// TestConnectionPoolStatsIntegration tests that connection pool stats are tracked +func (suite *Tests) TestConnectionPoolStatsIntegration() { + // Save original config + originalClient := cfg.Client.FastProxyClient + originalHostGraphQL := cfg.Server.HostGraphQL + originalCoalescing := cfg.RequestCoalescing.Enable + + // Restore after test + defer func() { + cfg.Client.FastProxyClient = originalClient + cfg.Server.HostGraphQL = originalHostGraphQL + cfg.RequestCoalescing.Enable = originalCoalescing + }() + + // Disable request coalescing for accurate tracking + cfg.RequestCoalescing.Enable = false + + suite.Run("connection_success_tracked", func() { + // Create test server that succeeds + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"data":{"test":"success"}}`)) + })) + defer server.Close() + + cfg.Server.HostGraphQL = server.URL + cfg.Client.ClientTimeout = 5 + cfg.Client.FastProxyClient = createFasthttpClient(cfg) + + // Initialize connection pool + InitializeConnectionPool(cfg.Client.FastProxyClient) + defer ShutdownConnectionPool() + + poolMgr := GetConnectionPoolManager() + suite.NotNil(poolMgr, "Connection pool manager should be initialized") + + // Get stats before + statsBefore := poolMgr.GetConnectionStats() + successBefore := statsBefore["total_connections"].(int64) + + // Make a successful request + reqCtx := &fasthttp.RequestCtx{} + reqCtx.Request.SetRequestURI("/graphql") + reqCtx.Request.Header.SetMethod("POST") + reqCtx.Request.Header.Set("Content-Type", "application/json") + reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`)) + + ctx := suite.app.AcquireCtx(reqCtx) + err := proxyTheRequest(ctx, cfg.Server.HostGraphQL) + suite.app.ReleaseCtx(ctx) + + suite.Nil(err, "Request should succeed") + + // Get stats after + statsAfter := poolMgr.GetConnectionStats() + successAfter := statsAfter["total_connections"].(int64) + + suite.Greater(successAfter, successBefore, + "Total connections should increase after successful request") + }) + + suite.Run("connection_failure_tracked_on_5xx", func() { + // Create test server that returns 503 + // Note: 503 triggers retry which records failures + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"errors":[{"message":"Service unavailable"}]}`)) + })) + defer server.Close() + + cfg.Server.HostGraphQL = server.URL + cfg.Client.ClientTimeout = 2 + cfg.Client.FastProxyClient = createFasthttpClient(cfg) + + // Initialize connection pool + InitializeConnectionPool(cfg.Client.FastProxyClient) + defer ShutdownConnectionPool() + + poolMgr := GetConnectionPoolManager() + suite.NotNil(poolMgr, "Connection pool manager should be initialized") + + // Get stats before + statsBefore := poolMgr.GetConnectionStats() + failuresBefore := statsBefore["connection_failures"].(int64) + + // Make a failing request (503 is retryable, so it will retry and track failures) + reqCtx := &fasthttp.RequestCtx{} + reqCtx.Request.SetRequestURI("/graphql") + reqCtx.Request.Header.SetMethod("POST") + reqCtx.Request.Header.Set("Content-Type", "application/json") + reqCtx.Request.SetBody([]byte(`{"query": "query { fail }"}`)) + + ctx := suite.app.AcquireCtx(reqCtx) + _ = proxyTheRequest(ctx, cfg.Server.HostGraphQL) + suite.app.ReleaseCtx(ctx) + + // Get stats after - should have failures from retry attempts + statsAfter := poolMgr.GetConnectionStats() + failuresAfter := statsAfter["connection_failures"].(int64) + + suite.Greater(failuresAfter, failuresBefore, + "Connection failures should increase after 5xx responses that trigger retries") + + suite.T().Logf("Connection failures: before=%d, after=%d", + failuresBefore, failuresAfter) + }) + + suite.Run("stats_reflect_request_outcomes", func() { + // This test verifies that connection stats properly reflect the + // combination of successes and failures over multiple requests + + // Start with a fresh server + var requestCount atomic.Int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count := requestCount.Add(1) + // First 2 requests succeed, rest fail + if count <= 2 { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"data":{"test":"success"}}`)) + } else { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"errors":[{"message":"Error"}]}`)) + } + })) + defer server.Close() + + cfg.Server.HostGraphQL = server.URL + cfg.Client.ClientTimeout = 2 + cfg.Client.FastProxyClient = createFasthttpClient(cfg) + + // Initialize connection pool + InitializeConnectionPool(cfg.Client.FastProxyClient) + defer ShutdownConnectionPool() + + poolMgr := GetConnectionPoolManager() + suite.NotNil(poolMgr, "Connection pool manager should be initialized") + + // Make 2 successful requests + for i := 0; i < 2; i++ { + reqCtx := &fasthttp.RequestCtx{} + reqCtx.Request.SetRequestURI("/graphql") + reqCtx.Request.Header.SetMethod("POST") + reqCtx.Request.Header.Set("Content-Type", "application/json") + reqCtx.Request.SetBody([]byte(`{"query": "query { test }"}`)) + + ctx := suite.app.AcquireCtx(reqCtx) + _ = proxyTheRequest(ctx, cfg.Server.HostGraphQL) + suite.app.ReleaseCtx(ctx) + } + + // Get stats after successes + statsAfterSuccess := poolMgr.GetConnectionStats() + totalConnections := statsAfterSuccess["total_connections"].(int64) + + suite.GreaterOrEqual(totalConnections, int64(2), + "Should have at least 2 successful connections tracked") + + suite.T().Logf("Total connections after 2 successful requests: %d", totalConnections) + }) +} diff --git a/proxy.go b/proxy.go index 6dcbc64..e5070bc 100644 --- a/proxy.go +++ b/proxy.go @@ -454,19 +454,36 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error { return retry.Unrecoverable(fmt.Errorf("fiber context became nil during retry")) } + // Get connection pool manager for stats tracking + poolMgr := GetConnectionPoolManager() + // Execute the proxy request - if err := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient); err != nil { + proxyErr := doProxyRequestWithTimeout(c, proxyURL, cfg.Client.FastProxyClient) + if proxyErr != nil { // Check if this is a connection error - if isConnectionError(err) { + if isConnectionError(proxyErr) { notifyHealthManager(false) - return err // Connection errors are retryable + // Track connection failure + if poolMgr != nil { + poolMgr.RecordConnectionFailure() + } + return proxyErr // Connection errors are retryable } // Check if this is a timeout error - don't retry timeouts - if isTimeoutError(err) { - return retry.Unrecoverable(err) + if isTimeoutError(proxyErr) { + return retry.Unrecoverable(proxyErr) } - return err + + // Check if this is a retryable HTTP error (e.g., 503) + // These indicate the server responded but with an error status + if strings.Contains(proxyErr.Error(), "non-200 response") { + // Track as a failure for retryable HTTP errors + if poolMgr != nil { + poolMgr.RecordConnectionFailure() + } + } + return proxyErr } // Safety check before accessing response @@ -481,10 +498,18 @@ func executeProxyAttempt(c *fiber.Ctx, proxyURL string) error { if err == nil { // Success case notifyHealthManager(true) + // Track successful connection + if poolMgr != nil { + poolMgr.RecordConnectionSuccess() + } return nil } if shouldRetry { + // Track connection failure for retryable errors (5xx, etc) + if poolMgr != nil { + poolMgr.RecordConnectionFailure() + } return err // Retryable error } @@ -541,31 +566,51 @@ func performProxyRequestWithEnhancedRetries(c *fiber.Ctx, proxyURL string, backe retry.LastErrorOnly(true), retry.RetryIf(func(err error) bool { // Don't retry if context is cancelled or context is nil - defer func() { - // Recover from any panic when accessing context - if r := recover(); r != nil { - // If we panic, don't retry - return - } - }() - if c == nil { return false } - // Try to safely access the context - ctx := c.Context() - if ctx == nil { + // Safely check if context is done/cancelled + // Note: fasthttp.RequestCtx.Done() can panic if not properly initialized + // If we panic, don't retry (maintains backward compatibility with test behavior) + shouldRetry := true + func() { + defer func() { + if r := recover(); r != nil { + // If we panic accessing context, don't retry + // This typically happens in test scenarios with mock contexts + shouldRetry = false + } + }() + ctx := c.Context() + if ctx == nil { + return + } + select { + case <-ctx.Done(): + shouldRetry = false + default: + } + }() + + if !shouldRetry { return false } - // Check if context is done/cancelled - select { - case <-ctx.Done(): - return false - default: - return true + // Check retry budget before allowing retry + if rb := GetRetryBudget(); rb != nil { + if !rb.AllowRetry() { + cfg.Logger.Warning(&libpack_logger.LogMessage{ + Message: "Retry denied by budget", + Pairs: map[string]interface{}{ + "path": c.Path(), + "error": err.Error(), + }, + }) + return false + } } + return true }), ) }