Files
traefikoidc/issue67_regression_test.go
lukaszraczylo 72e2b682bb fix: eliminate per-request global mutexes in Yaegi hot paths
The v1.0.14 fix replaced one contended sync.RWMutex (RefreshCoordinator.
refreshMutex) with sync.Map. Production showed the same death-spiral
signature recurring ~2 hours later — same shape, different mutex:
65 goroutines stuck on a sync.(*RWMutex).Lock at one address, pod
pinned at 1000m CPU, identical Yaegi runCfg/reflect.Value.Call stack
pattern. The mutex was RefreshCoordinator.attemptsMutex.

Generalising: under Yaegi (interpreted Go for traefik plugins), any
per-request global mutex acquisition is a latent serialization point.
reflect.Value.Call dispatch on a held lock turns a microsecond
critical section into a multi-millisecond one, and on a GOMAXPROCS=1
pod the queue is unbounded.

This commit removes every per-request global mutex on the hot path:

1. RefreshCoordinator.attemptsMutex (sync.RWMutex)
   sessionRefreshAttempts: map -> sync.Map.
   refreshAttemptTracker: all fields atomic (int32, int64 UnixNano,
   cooldownEndNano == 0 as the not-in-cooldown sentinel, replacing
   the inCooldown bool).
   isInCooldown / recordRefreshAttempt / recordRefreshSuccess /
   recordRefreshFailure all become lock-free. Cooldown entry uses
   CompareAndSwapInt64 so only one goroutine logs the transition.

2. RefreshCircuitBreaker.mutex (sync.RWMutex)
   lastFailureTime / lastSuccessTime -> atomic.Int64 UnixNano.
   state and failures already atomic.
   AllowRequest / RecordSuccess / RecordFailure now pure atomic ops.

3. TraefikOidc.firstRequestMutex (sync.Mutex)
   firstRequestReceived bool -> firstRequestStarted int32.
   metadataRefreshStarted bool -> metadataRefreshStartedAtomic int32.
   ServeHTTP bootstrap path uses CompareAndSwapInt32 — fires once,
   zero steady-state cost. Previously the mutex was acquired on
   every non-health request forever.

4. TraefikOidc.metadataRetryMutex (sync.Mutex)
   lastMetadataRetryTime time.Time -> lastMetadataRetryNano int64.
   The 30-second retry throttle is now a CAS on lastMetadataRetryNano.

cleanupStaleEntries iterates via sync.Map.Range; eviction is a
CompareAndDelete by pointer identity so a tracker freshly re-used by
a concurrent caller is not lost.

Empirical evidence (3 specialist-agent analysis of the v1.0.14 spike,
profiles in /tmp/traefik-spike-1779511683/):
  * mutex profile: 97% delay in sync.(*Mutex).Unlock via
    HTTPHandlerSwitcher -> accesslog -> metrics -> backoff.RetryNotify
  * 65 stuck goroutines at one RWMutex address (0x40022eb648),
    identical Yaegi CFG pointer, all on rc.attemptsMutex via
    recordRefreshAttempt + isInCooldown
  * traffic driver: long-lived in-cluster Go-http-client doing
    ~5.4 req/s POST embeddings via OIDC cookie session → same
    sessionID → contention all funnels to one tracker entry

Yaegi support for sync/atomic confirmed at
github.com/traefik/yaegi@v0.16.1/stdlib/go1_22_sync_atomic.go:
AddInt32/Int64, LoadInt32/Int64, StoreInt32/Int64,
CompareAndSwapInt32/Int64 all exposed via reflect.ValueOf. Yaegi
dispatches each call through reflect.Value.Call to the COMPILED
atomic.* function, which executes a single hardware CAS/LOCK-XADD
instruction. Each atomic op still pays Yaegi dispatch cost but
cannot block — no queueing, no death spiral.

Trade-off acknowledged: v1.0.15 issues ~6-8 atomic/sync.Map ops per
leader-path request vs the 4 mutex ops of v1.0.14. Under low
contention this is a modest CPU bump. Under high contention it's
an unbounded → bounded transformation. Net win.

All tests pass with -race; golangci-lint clean.
2026-05-23 10:47:21 +01:00

776 lines
22 KiB
Go

package traefikoidc
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// TestIssue67_InfiniteRefreshLoop reproduces and verifies the fix for issue #67
// where concurrent requests with expired tokens caused an infinite refresh loop
// leading to OOM conditions
func TestIssue67_InfiniteRefreshLoop(t *testing.T) {
// Track memory at start
runtime.GC()
var startMem runtime.MemStats
runtime.ReadMemStats(&startMem)
// Create a mock authorization server
var refreshAttempts int32
var concurrentRefreshes int32
var maxConcurrent int32
// Create a handler with server URL to be set after creation
var serverURL string
authServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/token":
// Track concurrent refresh attempts
current := atomic.AddInt32(&concurrentRefreshes, 1)
defer atomic.AddInt32(&concurrentRefreshes, -1)
// Update max concurrent
for {
max := atomic.LoadInt32(&maxConcurrent)
if current <= max || atomic.CompareAndSwapInt32(&maxConcurrent, max, current) {
break
}
}
attempts := atomic.AddInt32(&refreshAttempts, 1)
// Simulate slow/failing token endpoint (like in the issue)
if attempts < 5 {
// First few attempts fail to trigger retries
time.Sleep(100 * time.Millisecond)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(`{"error": "temporarily_unavailable"}`))
} else {
// Eventually succeed
time.Sleep(50 * time.Millisecond)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"id_token": "new_id_token",
"expires_in": 3600,
"token_type": "Bearer"
}`))
}
case "/.well-known/openid-configuration":
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(fmt.Sprintf(`{
"issuer": "%s",
"authorization_endpoint": "%s/authorize",
"token_endpoint": "%s/token",
"jwks_uri": "%s/keys",
"response_types_supported": ["code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid", "profile", "email"],
"token_endpoint_auth_methods_supported": ["client_secret_basic", "client_secret_post"],
"claims_supported": ["sub", "name", "email"]
}`, serverURL, serverURL, serverURL, serverURL)))
case "/keys":
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{
"keys": [{
"kty": "RSA",
"use": "sig",
"kid": "test-key",
"n": "test",
"e": "AQAB"
}]
}`))
}
}))
defer authServer.Close()
// Set the server URL after creation
serverURL = authServer.URL
// Setup TraefikOIDC with refresh coordinator
logger := GetSingletonNoOpLogger()
config := DefaultRefreshCoordinatorConfig()
config.MaxRefreshAttempts = 3
config.RefreshAttemptWindow = 1 * time.Second
config.MaxConcurrentRefreshes = 2
coordinator := NewRefreshCoordinator(config, logger)
defer coordinator.Shutdown()
// Simulate expired session
expiredSession := &MockExpiredSession{
refreshToken: "test_refresh_token",
sessionID: "test_session",
isExpired: true,
}
// Simulate multiple concurrent requests (as reported in issue)
numConcurrentRequests := 50
var wg sync.WaitGroup
wg.Add(numConcurrentRequests)
// Track results
var successCount int32
var errorCount int32
errors := make([]error, 0, numConcurrentRequests)
var errorMutex sync.Mutex
// Launch concurrent requests with expired tokens
startTime := time.Now()
timeout := 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for i := 0; i < numConcurrentRequests; i++ {
go func(reqID int) {
defer wg.Done()
// Each request tries to refresh the expired token
refreshFunc := func() (*TokenResponse, error) {
// Simulate calling the token endpoint
resp, err := http.Post(
serverURL+"/token",
"application/x-www-form-urlencoded",
nil,
)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("token refresh failed: %d", resp.StatusCode)
}
return &TokenResponse{
AccessToken: fmt.Sprintf("new_access_%d", reqID),
RefreshToken: "new_refresh",
IDToken: "new_id",
ExpiresIn: 3600,
}, nil
}
// Use coordinator to prevent infinite loop
result, err := coordinator.CoordinateRefresh(
ctx,
expiredSession.sessionID,
expiredSession.refreshToken,
refreshFunc,
)
if err != nil {
atomic.AddInt32(&errorCount, 1)
errorMutex.Lock()
errors = append(errors, err)
errorMutex.Unlock()
} else if result != nil {
atomic.AddInt32(&successCount, 1)
}
}(i)
}
// Wait for completion or timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Completed normally
case <-ctx.Done():
t.Fatal("Test timed out - possible infinite loop detected!")
}
elapsed := time.Since(startTime)
// Verify no infinite loop occurred
if elapsed > timeout {
t.Fatalf("Requests took too long: %v (possible infinite loop)", elapsed)
}
// Check memory usage
runtime.GC()
var endMem runtime.MemStats
runtime.ReadMemStats(&endMem)
// Calculate memory growth safely to prevent underflow
var memGrowthMB float64
if endMem.HeapAlloc >= startMem.HeapAlloc {
memGrowthMB = float64(endMem.HeapAlloc-startMem.HeapAlloc) / (1024 * 1024)
} else {
// Memory decreased (GC occurred), treat as 0 growth
memGrowthMB = 0
}
t.Logf("Memory stats: start=%d bytes, end=%d bytes, growth=%.2f MB",
startMem.HeapAlloc, endMem.HeapAlloc, memGrowthMB)
// Memory should not grow excessively (issue reported OOM at 2GB)
if memGrowthMB > 100 {
t.Errorf("Excessive memory growth: %.2f MB (possible memory leak)", memGrowthMB)
}
// Verify refresh deduplication worked
actualRefreshAttempts := atomic.LoadInt32(&refreshAttempts)
t.Logf("Total refresh attempts to server: %d", actualRefreshAttempts)
t.Logf("Max concurrent refreshes: %d", maxConcurrent)
t.Logf("Successful refreshes: %d", successCount)
t.Logf("Failed refreshes: %d", errorCount)
// With deduplication, refresh attempts should be much less than concurrent requests
if actualRefreshAttempts > int32(numConcurrentRequests/2) {
t.Errorf("Too many refresh attempts (%d), deduplication not working properly",
actualRefreshAttempts)
}
// Max concurrent should respect our limit
if maxConcurrent > int32(config.MaxConcurrentRefreshes) {
t.Errorf("Max concurrent refreshes (%d) exceeded configured limit (%d)",
maxConcurrent, config.MaxConcurrentRefreshes)
}
// Check coordinator metrics
metrics := coordinator.GetMetrics()
t.Logf("Coordinator metrics: %+v", metrics)
if deduped, ok := metrics["deduplicated_requests"].(int64); ok {
if deduped == 0 {
t.Error("No requests were deduplicated - deduplication not working")
}
t.Logf("Deduplicated requests: %d", deduped)
}
}
// TestIssue67_WithoutCoordinator demonstrates the issue without the fix
// WARNING: This test may consume significant memory - skip in CI
func TestIssue67_WithoutCoordinator(t *testing.T) {
if testing.Short() {
t.Skip("Skipping memory-intensive test in short mode")
}
// Only run this test with explicit flag to demonstrate the issue
if !testing.Verbose() {
t.Skip("Skipping demonstration of issue without fix (run with -v to see)")
}
// Track memory at start
runtime.GC()
var startMem runtime.MemStats
runtime.ReadMemStats(&startMem)
var refreshAttempts int32
var maxConcurrent int32
var currentConcurrent int32
// Simulate the issue: multiple goroutines attempting refresh without coordination
numRequests := 100
var wg sync.WaitGroup
wg.Add(numRequests)
// Use a context with short timeout to prevent actual OOM
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
for i := 0; i < numRequests; i++ {
go func(id int) {
defer wg.Done()
// Simulate retry logic without deduplication (the bug)
for attempt := 0; attempt < 3; attempt++ {
select {
case <-ctx.Done():
return
default:
}
current := atomic.AddInt32(&currentConcurrent, 1)
// Track max concurrent
for {
max := atomic.LoadInt32(&maxConcurrent)
if current <= max || atomic.CompareAndSwapInt32(&maxConcurrent, max, current) {
break
}
}
atomic.AddInt32(&refreshAttempts, 1)
// Simulate token refresh with exponential backoff
time.Sleep(time.Duration(attempt*100) * time.Millisecond)
// Allocate memory to simulate token processing
_ = make([]byte, 1024*10) // 10KB per attempt
atomic.AddInt32(&currentConcurrent, -1)
// Simulate failure requiring retry
if attempt < 2 {
continue
}
break
}
}(i)
}
// Wait with timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Completed
case <-ctx.Done():
// Timed out (expected in problematic scenario)
}
// Check memory usage
runtime.GC()
var endMem runtime.MemStats
runtime.ReadMemStats(&endMem)
memGrowthMB := float64(endMem.HeapAlloc-startMem.HeapAlloc) / (1024 * 1024)
t.Logf("WITHOUT COORDINATOR:")
t.Logf(" Refresh attempts: %d", refreshAttempts)
t.Logf(" Max concurrent: %d", maxConcurrent)
t.Logf(" Memory growth: %.2f MB", memGrowthMB)
// This demonstrates the issue - high concurrency and many attempts
if refreshAttempts < int32(numRequests*2) {
t.Logf("Note: Without coordinator, saw %d refresh attempts for %d requests",
refreshAttempts, numRequests)
}
}
// MockExpiredSession simulates an expired session for testing
type MockExpiredSession struct {
refreshToken string
sessionID string
isExpired bool
}
func (m *MockExpiredSession) GetRefreshToken() string {
return m.refreshToken
}
func (m *MockExpiredSession) GetSessionID() string {
return m.sessionID
}
func (m *MockExpiredSession) IsExpired() bool {
return m.isExpired
}
// BenchmarkRefreshWithCoordinator measures performance with the fix
func BenchmarkRefreshWithCoordinator(b *testing.B) {
logger := GetSingletonNoOpLogger()
config := DefaultRefreshCoordinatorConfig()
coordinator := NewRefreshCoordinator(config, logger)
defer coordinator.Shutdown()
refreshFunc := func() (*TokenResponse, error) {
// Simulate token refresh
time.Sleep(10 * time.Millisecond)
return &TokenResponse{
AccessToken: "new_token",
RefreshToken: "new_refresh",
}, nil
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
ctx := context.Background()
sessionID := fmt.Sprintf("session_%d", i%10)
refreshToken := "refresh_token"
_, _ = coordinator.CoordinateRefresh(ctx, sessionID, refreshToken, refreshFunc)
i++
}
})
b.StopTimer()
metrics := coordinator.GetMetrics()
b.Logf("Total requests: %v", metrics["total_requests"])
b.Logf("Deduplicated: %v", metrics["deduplicated_requests"])
b.Logf("Success rate: %.2f%%",
float64(metrics["successful_refreshes"].(int64))/
float64(metrics["total_requests"].(int64))*100)
}
// TestRefreshCoordinatorIntegration tests the full integration
func TestRefreshCoordinatorIntegration(t *testing.T) {
// This test verifies the coordinator integrates properly with:
// 1. Circuit breaker
// 2. Rate limiting
// 3. Deduplication
// 4. Memory management
// 5. Cleanup routines
logger := GetSingletonNoOpLogger()
config := DefaultRefreshCoordinatorConfig()
config.MaxRefreshAttempts = 5
config.RefreshAttemptWindow = 1 * time.Second
config.RefreshCooldownPeriod = 2 * time.Second
config.MaxConcurrentRefreshes = 3
config.CleanupInterval = 500 * time.Millisecond
coordinator := NewRefreshCoordinator(config, logger)
defer coordinator.Shutdown()
// Test 1: Normal operation
t.Run("NormalOperation", func(t *testing.T) {
refreshFunc := func() (*TokenResponse, error) {
return &TokenResponse{AccessToken: "token1"}, nil
}
ctx := context.Background()
result, err := coordinator.CoordinateRefresh(ctx, "session1", "refresh1", refreshFunc)
if err != nil {
t.Errorf("Normal refresh failed: %v", err)
}
if result == nil || result.AccessToken != "token1" {
t.Error("Invalid result from normal refresh")
}
})
// Test 2: Circuit breaker activation
t.Run("CircuitBreaker", func(t *testing.T) {
failingRefresh := func() (*TokenResponse, error) {
return nil, fmt.Errorf("service unavailable")
}
// Trigger circuit breaker
for i := 0; i < 4; i++ {
ctx := context.Background()
_, _ = coordinator.CoordinateRefresh(ctx,
fmt.Sprintf("cb_session_%d", i), "refresh_cb", failingRefresh)
}
// Next request should be blocked by circuit breaker
ctx := context.Background()
_, err := coordinator.CoordinateRefresh(ctx, "cb_session_blocked", "refresh_cb", failingRefresh)
if err == nil || !strings.Contains(err.Error(), "circuit breaker") {
t.Errorf("Circuit breaker should have blocked request: %v", err)
}
})
// Test 3: Rate limiting
t.Run("RateLimiting", func(t *testing.T) {
// Reset circuit breaker to closed state for this test. All fields are
// atomic so we don't need any mutex.
atomic.StoreInt32(&coordinator.circuitBreaker.state, 0) // closed
atomic.StoreInt32(&coordinator.circuitBreaker.failures, 0)
// Temporarily increase circuit breaker threshold to not interfere
oldMaxFailures := coordinator.circuitBreaker.config.MaxFailures
coordinator.circuitBreaker.config.MaxFailures = 20
defer func() {
coordinator.circuitBreaker.config.MaxFailures = oldMaxFailures
}()
failingRefresh := func() (*TokenResponse, error) {
return nil, fmt.Errorf("failed")
}
sessionID := "rate_limit_session"
// Exhaust attempts
for i := 0; i < config.MaxRefreshAttempts+1; i++ {
ctx := context.Background()
_, _ = coordinator.CoordinateRefresh(ctx, sessionID, "refresh_rl", failingRefresh)
// Add delay to ensure operations complete and aren't deduplicated
time.Sleep(150 * time.Millisecond)
}
// Should be in cooldown
ctx := context.Background()
_, err := coordinator.CoordinateRefresh(ctx, sessionID, "refresh_rl", failingRefresh)
if err == nil || !strings.Contains(err.Error(), "cooldown") {
t.Errorf("Rate limiting should have triggered cooldown: %v", err)
}
})
// Test 4: Cleanup
t.Run("Cleanup", func(t *testing.T) {
// Add some sessions
for i := 0; i < 5; i++ {
coordinator.recordRefreshAttempt(fmt.Sprintf("cleanup_session_%d", i))
}
// Wait for cleanup
time.Sleep(config.CleanupInterval * 3)
// Old sessions should be cleaned up
count := 0
coordinator.sessionRefreshAttempts.Range(func(_, _ interface{}) bool {
count++
return true
})
// Should have fewer sessions after cleanup
if count > 10 {
t.Errorf("Cleanup not working, %d sessions remain", count)
}
})
// Verify final metrics
metrics := coordinator.GetMetrics()
t.Logf("Final metrics: %+v", metrics)
}
// TestIssue67_TokenResilienceRecursionBug directly tests the recursion bug identified by jetexe
// in the comment: https://github.com/lukaszraczylo/traefikoidc/issues/67#issuecomment-2391821890
//
// The bug is in token_resilience.go:180-190 where ExecuteTokenRefresh calls
// getNewTokenWithRefreshToken which calls ExecuteTokenRefresh again, causing infinite recursion.
func TestIssue67_TokenResilienceRecursionBug(t *testing.T) {
// Track call depth to detect recursion
var callDepth int32
var maxDepth int32 = 5 // If we reach this, we have recursion
// Create mock server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/token" {
// Increment call depth
depth := atomic.AddInt32(&callDepth, 1)
defer atomic.AddInt32(&callDepth, -1)
// Check if we've exceeded max depth (indicates recursion)
if depth > maxDepth {
t.Errorf("Call depth exceeded %d - infinite recursion detected!", maxDepth)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Simulate successful token refresh
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"id_token": "new_id_token",
"expires_in": 3600,
"token_type": "Bearer"
}`))
}
}))
defer server.Close()
// Create TraefikOidc with tokenResilienceManager (this triggers the bug)
logger := GetSingletonNoOpLogger()
resilienceConfig := DefaultTokenResilienceConfig()
resilienceManager := NewTokenResilienceManager(resilienceConfig, logger)
oidc := &TraefikOidc{
tokenURL: server.URL + "/token",
clientID: "test_client",
audience: "test_client",
clientSecret: "test_secret",
tokenResilienceManager: resilienceManager,
tokenHTTPClient: &http.Client{
Timeout: 5 * time.Second,
},
logger: logger,
}
// Create context with timeout to prevent hanging
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// Run in goroutine to detect stack overflow
done := make(chan struct{})
var testErr error
go func() {
defer func() {
if r := recover(); r != nil {
testErr = fmt.Errorf("panic recovered: %v (likely stack overflow from recursion)", r)
}
close(done)
}()
// This call should NOT recurse infinitely after the fix
_, err := oidc.getNewTokenWithRefreshToken("test_refresh_token")
if err != nil {
testErr = err
}
}()
// Wait for completion or timeout
select {
case <-done:
// Check for recursion via call depth
if atomic.LoadInt32(&callDepth) > maxDepth {
t.Fatal("Infinite recursion detected via call depth counter")
}
// Check for panic/stack overflow
if testErr != nil && strings.Contains(testErr.Error(), "stack overflow") {
t.Fatalf("Stack overflow detected: %v", testErr)
}
// After fix, this should succeed
if testErr != nil {
t.Logf("Token refresh completed with error: %v", testErr)
}
case <-ctx.Done():
t.Fatal("Test timed out - likely infinite recursion in getNewTokenWithRefreshToken -> ExecuteTokenRefresh loop")
}
}
// TestIssue67_TokenResilienceManager_NoRecursion verifies ExecuteTokenRefresh
// calls exchangeTokens directly and doesn't recurse back to getNewTokenWithRefreshToken
func TestIssue67_TokenResilienceManager_NoRecursion(t *testing.T) {
var exchangeTokensCalls int32
var getNewTokenCalls int32
// Create mock server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&exchangeTokensCalls, 1)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"access_token": "test_token",
"refresh_token": "test_refresh",
"id_token": "test_id",
"expires_in": 3600,
"token_type": "Bearer"
}`))
}))
defer server.Close()
// Create TraefikOidc with instrumented methods
logger := GetSingletonNoOpLogger()
resilienceConfig := DefaultTokenResilienceConfig()
resilienceManager := NewTokenResilienceManager(resilienceConfig, logger)
// Create custom TraefikOidc to track calls
oidc := &TraefikOidc{
tokenURL: server.URL + "/token",
clientID: "test_client",
audience: "test_client",
clientSecret: "test_secret",
tokenResilienceManager: resilienceManager,
tokenHTTPClient: &http.Client{
Timeout: 5 * time.Second,
},
logger: logger,
}
// Wrap getNewTokenWithRefreshToken to count calls
originalGetNewToken := oidc.getNewTokenWithRefreshToken
wrappedGetNewToken := func(refreshToken string) (*TokenResponse, error) {
atomic.AddInt32(&getNewTokenCalls, 1)
return originalGetNewToken(refreshToken)
}
_ = wrappedGetNewToken // Use the wrapper
// Execute token refresh through resilience manager
ctx := context.Background()
_, err := resilienceManager.ExecuteTokenRefresh(ctx, oidc, "test_refresh_token")
if err != nil {
t.Logf("Token refresh returned error (may be expected): %v", err)
}
// Verify exchangeTokens was called
exchangeCalls := atomic.LoadInt32(&exchangeTokensCalls)
if exchangeCalls == 0 {
t.Error("exchangeTokens was never called")
}
t.Logf("exchangeTokens called %d times", exchangeCalls)
// After the fix, ExecuteTokenRefresh should call exchangeTokens directly
// and NOT call getNewTokenWithRefreshToken (which would cause recursion)
}
// TestIssue67_DirectRecursionDetection uses a simpler approach to detect the recursion
func TestIssue67_DirectRecursionDetection(t *testing.T) {
// This test will fail BEFORE the fix and pass AFTER the fix
var recursionDepth int32
const maxAllowedDepth = 3
// Create a simple mock server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
depth := atomic.AddInt32(&recursionDepth, 1)
defer atomic.AddInt32(&recursionDepth, -1)
if depth > maxAllowedDepth {
// Recursion detected - fail fast
t.Errorf("RECURSION BUG DETECTED: depth=%d exceeds max=%d", depth, maxAllowedDepth)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"access_token":"test","refresh_token":"test","id_token":"test","expires_in":3600,"token_type":"Bearer"}`))
}))
defer server.Close()
logger := GetSingletonNoOpLogger()
config := DefaultTokenResilienceConfig()
config.RetryEnabled = false // Disable retries to make the test clearer
oidc := &TraefikOidc{
tokenURL: server.URL + "/token",
clientID: "test",
audience: "test",
clientSecret: "test",
tokenResilienceManager: NewTokenResilienceManager(config, logger),
tokenHTTPClient: &http.Client{Timeout: 2 * time.Second},
logger: logger,
}
// Set a timeout to prevent infinite hangs
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
done := make(chan error, 1)
go func() {
_, err := oidc.getNewTokenWithRefreshToken("test_token")
done <- err
}()
select {
case err := <-done:
finalDepth := atomic.LoadInt32(&recursionDepth)
if finalDepth > maxAllowedDepth {
t.Fatalf("Recursion bug confirmed: max depth reached %d", finalDepth)
}
if err != nil {
t.Logf("Completed with error: %v", err)
} else {
t.Log("Token refresh completed successfully without recursion")
}
case <-ctx.Done():
t.Fatal("RECURSION BUG: Test timed out, indicating infinite loop in getNewTokenWithRefreshToken -> ExecuteTokenRefresh")
}
}