mirror of
https://github.com/lukaszraczylo/traefikoidc.git
synced 2026-06-05 22:44:17 +00:00
72e2b682bb
The v1.0.14 fix replaced one contended sync.RWMutex (RefreshCoordinator.
refreshMutex) with sync.Map. Production showed the same death-spiral
signature recurring ~2 hours later — same shape, different mutex:
65 goroutines stuck on a sync.(*RWMutex).Lock at one address, pod
pinned at 1000m CPU, identical Yaegi runCfg/reflect.Value.Call stack
pattern. The mutex was RefreshCoordinator.attemptsMutex.
Generalising: under Yaegi (interpreted Go for traefik plugins), any
per-request global mutex acquisition is a latent serialization point.
reflect.Value.Call dispatch on a held lock turns a microsecond
critical section into a multi-millisecond one, and on a GOMAXPROCS=1
pod the queue is unbounded.
This commit removes every per-request global mutex on the hot path:
1. RefreshCoordinator.attemptsMutex (sync.RWMutex)
sessionRefreshAttempts: map -> sync.Map.
refreshAttemptTracker: all fields atomic (int32, int64 UnixNano,
cooldownEndNano == 0 as the not-in-cooldown sentinel, replacing
the inCooldown bool).
isInCooldown / recordRefreshAttempt / recordRefreshSuccess /
recordRefreshFailure all become lock-free. Cooldown entry uses
CompareAndSwapInt64 so only one goroutine logs the transition.
2. RefreshCircuitBreaker.mutex (sync.RWMutex)
lastFailureTime / lastSuccessTime -> atomic.Int64 UnixNano.
state and failures already atomic.
AllowRequest / RecordSuccess / RecordFailure now pure atomic ops.
3. TraefikOidc.firstRequestMutex (sync.Mutex)
firstRequestReceived bool -> firstRequestStarted int32.
metadataRefreshStarted bool -> metadataRefreshStartedAtomic int32.
ServeHTTP bootstrap path uses CompareAndSwapInt32 — fires once,
zero steady-state cost. Previously the mutex was acquired on
every non-health request forever.
4. TraefikOidc.metadataRetryMutex (sync.Mutex)
lastMetadataRetryTime time.Time -> lastMetadataRetryNano int64.
The 30-second retry throttle is now a CAS on lastMetadataRetryNano.
cleanupStaleEntries iterates via sync.Map.Range; eviction is a
CompareAndDelete by pointer identity so a tracker freshly re-used by
a concurrent caller is not lost.
Empirical evidence (3 specialist-agent analysis of the v1.0.14 spike,
profiles in /tmp/traefik-spike-1779511683/):
* mutex profile: 97% delay in sync.(*Mutex).Unlock via
HTTPHandlerSwitcher -> accesslog -> metrics -> backoff.RetryNotify
* 65 stuck goroutines at one RWMutex address (0x40022eb648),
identical Yaegi CFG pointer, all on rc.attemptsMutex via
recordRefreshAttempt + isInCooldown
* traffic driver: long-lived in-cluster Go-http-client doing
~5.4 req/s POST embeddings via OIDC cookie session → same
sessionID → contention all funnels to one tracker entry
Yaegi support for sync/atomic confirmed at
github.com/traefik/yaegi@v0.16.1/stdlib/go1_22_sync_atomic.go:
AddInt32/Int64, LoadInt32/Int64, StoreInt32/Int64,
CompareAndSwapInt32/Int64 all exposed via reflect.ValueOf. Yaegi
dispatches each call through reflect.Value.Call to the COMPILED
atomic.* function, which executes a single hardware CAS/LOCK-XADD
instruction. Each atomic op still pays Yaegi dispatch cost but
cannot block — no queueing, no death spiral.
Trade-off acknowledged: v1.0.15 issues ~6-8 atomic/sync.Map ops per
leader-path request vs the 4 mutex ops of v1.0.14. Under low
contention this is a modest CPU bump. Under high contention it's
an unbounded → bounded transformation. Net win.
All tests pass with -race; golangci-lint clean.
692 lines
24 KiB
Go
692 lines
24 KiB
Go
package traefikoidc
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// RefreshCoordinator prevents duplicate refresh token operations and manages
|
|
// refresh attempt tracking to prevent infinite loops and OOM conditions.
|
|
// It implements request coalescing, rate limiting, and circuit breaking
|
|
// specifically for token refresh operations.
|
|
type RefreshCoordinator struct {
|
|
// inFlightRefreshes maps tokenHash -> *refreshOperation. sync.Map is used
|
|
// instead of a plain map + RWMutex so concurrent refreshes do not
|
|
// serialize on a single global lock. Under Yaegi the previous
|
|
// refreshMutex.Lock() was held for tens of milliseconds per request due
|
|
// to interpreter overhead on the work inside the critical section,
|
|
// causing dozens of goroutines to stack up on it and pin one CPU core.
|
|
inFlightRefreshes sync.Map
|
|
// sessionRefreshAttempts maps sessionID -> *refreshAttemptTracker.
|
|
// sync.Map + atomic tracker fields means isInCooldown/recordRefreshAttempt/
|
|
// recordRefreshSuccess/recordRefreshFailure are lock-free. Previously
|
|
// these used attemptsMutex sync.RWMutex; under Yaegi every Lock() acquisition
|
|
// adds 10-50ms of dispatch overhead, and they were called twice per leader
|
|
// request (once for recordRefreshAttempt, once for isInCooldown). That
|
|
// serializing pattern caused the v1.0.15 death spiral after v1.0.14
|
|
// removed the refreshMutex (same architectural shape, different mutex).
|
|
sessionRefreshAttempts sync.Map
|
|
cleanupTimers map[string]*time.Timer
|
|
circuitBreaker *RefreshCircuitBreaker
|
|
metrics *RefreshMetrics
|
|
logger *Logger
|
|
stopChan chan struct{}
|
|
config RefreshCoordinatorConfig
|
|
wg sync.WaitGroup
|
|
cleanupTimerMu sync.Mutex
|
|
}
|
|
|
|
// RefreshCoordinatorConfig configures the refresh coordinator behavior
|
|
type RefreshCoordinatorConfig struct {
|
|
// Maximum refresh attempts per session before giving up
|
|
MaxRefreshAttempts int
|
|
// Time window for refresh attempt tracking
|
|
RefreshAttemptWindow time.Duration
|
|
// Cooldown period after max attempts reached
|
|
RefreshCooldownPeriod time.Duration
|
|
// Maximum concurrent refresh operations
|
|
MaxConcurrentRefreshes int
|
|
// Timeout for individual refresh operations
|
|
RefreshTimeout time.Duration
|
|
// Enable memory pressure detection
|
|
EnableMemoryPressureDetection bool
|
|
// Memory pressure threshold (in MB)
|
|
MemoryPressureThresholdMB uint64
|
|
// Cleanup interval for stale entries
|
|
CleanupInterval time.Duration
|
|
// Delay before cleaning up completed refresh operations from deduplication map
|
|
// Set to 0 for immediate cleanup (useful for tests)
|
|
DeduplicationCleanupDelay time.Duration
|
|
}
|
|
|
|
// DefaultRefreshCoordinatorConfig returns production-ready configuration
|
|
func DefaultRefreshCoordinatorConfig() RefreshCoordinatorConfig {
|
|
return RefreshCoordinatorConfig{
|
|
MaxRefreshAttempts: 5,
|
|
RefreshAttemptWindow: 5 * time.Minute,
|
|
RefreshCooldownPeriod: 10 * time.Minute,
|
|
MaxConcurrentRefreshes: 10,
|
|
RefreshTimeout: 30 * time.Second,
|
|
EnableMemoryPressureDetection: true,
|
|
MemoryPressureThresholdMB: 500, // 500MB threshold
|
|
CleanupInterval: 1 * time.Minute,
|
|
DeduplicationCleanupDelay: 100 * time.Millisecond, // Default 100ms for production
|
|
}
|
|
}
|
|
|
|
// refreshOperation represents an in-flight refresh operation
|
|
type refreshOperation struct {
|
|
startTime time.Time
|
|
result *refreshResult
|
|
done chan struct{}
|
|
refreshToken string
|
|
mutex sync.RWMutex
|
|
waiterCount int32
|
|
}
|
|
|
|
// refreshResult contains the result of a refresh operation
|
|
type refreshResult struct {
|
|
tokenResponse *TokenResponse
|
|
err error
|
|
fromCache bool
|
|
}
|
|
|
|
// refreshAttemptTracker tracks refresh attempts for a session. All fields are
|
|
// accessed via sync/atomic so isInCooldown/recordRefreshAttempt/Success/Failure
|
|
// can run without holding any per-coordinator lock. Times are UnixNano so they
|
|
// fit in an int64 and can be read with a single atomic.LoadInt64.
|
|
//
|
|
// cooldownEndNano == 0 means "not in cooldown". This sentinel replaces the
|
|
// inCooldown bool that the previous implementation kept under attemptsMutex —
|
|
// under Yaegi any per-request global mutex turns into a serializing bottleneck
|
|
// (the v1.0.14 refreshMutex -> sync.Map fix removed only one such bottleneck;
|
|
// attemptsMutex was the next one in the queue).
|
|
type refreshAttemptTracker struct {
|
|
lastAttemptNano int64 // atomic, UnixNano of last attempt
|
|
windowStartNano int64 // atomic, UnixNano of attempt-window start
|
|
cooldownEndNano int64 // atomic, UnixNano; 0 = not in cooldown
|
|
attempts int32 // atomic
|
|
consecutiveFailures int32 // atomic
|
|
}
|
|
|
|
// RefreshMetrics tracks coordinator performance metrics
|
|
type RefreshMetrics struct {
|
|
totalRefreshRequests int64
|
|
deduplicatedRequests int64
|
|
successfulRefreshes int64
|
|
failedRefreshes int64
|
|
circuitBreakerTrips int64
|
|
memoryPressureEvents int64
|
|
cooldownsTriggered int64
|
|
currentInFlightRefreshes int32
|
|
}
|
|
|
|
// RefreshCircuitBreaker implements a circuit breaker specifically for refresh
|
|
// operations. All mutable fields are atomic so AllowRequest/RecordSuccess/
|
|
// RecordFailure run without any mutex. The previous sync.RWMutex.RLock() was
|
|
// taken on every CoordinateRefresh — under Yaegi this added 10-50ms of
|
|
// interpreter dispatch per call, which compounded with attemptsMutex to keep
|
|
// the pod's single CPU core saturated.
|
|
type RefreshCircuitBreaker struct {
|
|
lastFailureNano int64 // atomic, UnixNano of most recent failure
|
|
lastSuccessNano int64 // atomic, UnixNano of most recent success
|
|
config RefreshCircuitBreakerConfig
|
|
state int32 // atomic: 0=closed, 1=open, 2=half-open
|
|
failures int32 // atomic
|
|
}
|
|
|
|
// RefreshCircuitBreakerConfig configures the refresh circuit breaker
|
|
type RefreshCircuitBreakerConfig struct {
|
|
MaxFailures int
|
|
OpenDuration time.Duration
|
|
HalfOpenRequests int
|
|
}
|
|
|
|
// NewRefreshCoordinator creates a new refresh coordinator
|
|
func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *RefreshCoordinator {
|
|
if logger == nil {
|
|
logger = GetSingletonNoOpLogger()
|
|
}
|
|
|
|
rc := &RefreshCoordinator{
|
|
// inFlightRefreshes and sessionRefreshAttempts are both sync.Map;
|
|
// their zero values are ready to use.
|
|
config: config,
|
|
metrics: &RefreshMetrics{},
|
|
logger: logger,
|
|
stopChan: make(chan struct{}),
|
|
cleanupTimers: make(map[string]*time.Timer),
|
|
circuitBreaker: &RefreshCircuitBreaker{
|
|
config: RefreshCircuitBreakerConfig{
|
|
MaxFailures: 3,
|
|
OpenDuration: 30 * time.Second,
|
|
HalfOpenRequests: 1,
|
|
},
|
|
},
|
|
}
|
|
|
|
// Start cleanup goroutine
|
|
rc.wg.Add(1)
|
|
go rc.cleanupRoutine()
|
|
|
|
return rc
|
|
}
|
|
|
|
// CoordinateRefresh ensures only one refresh operation happens per refresh token
|
|
// and implements request coalescing for concurrent refresh attempts
|
|
func (rc *RefreshCoordinator) CoordinateRefresh(
|
|
ctx context.Context,
|
|
sessionID string,
|
|
refreshToken string,
|
|
refreshFunc func() (*TokenResponse, error),
|
|
) (*TokenResponse, error) {
|
|
// Increment total request count
|
|
atomic.AddInt64(&rc.metrics.totalRefreshRequests, 1)
|
|
|
|
// Check circuit breaker first
|
|
if !rc.circuitBreaker.AllowRequest() {
|
|
atomic.AddInt64(&rc.metrics.circuitBreakerTrips, 1)
|
|
return nil, fmt.Errorf("refresh circuit breaker is open due to repeated failures")
|
|
}
|
|
|
|
// Create hash of refresh token for deduplication
|
|
tokenHash := rc.hashRefreshToken(refreshToken)
|
|
|
|
// CRITICAL FIX: Atomically check for existing operation OR create new one
|
|
// This prevents the race where multiple goroutines check, find nothing, then all create
|
|
operation, isNew, err := rc.getOrCreateOperation(ctx, sessionID, tokenHash, refreshToken)
|
|
|
|
if err != nil {
|
|
// Operation creation was rejected (rate limit, memory pressure, concurrent limit)
|
|
return nil, err
|
|
}
|
|
|
|
if isNew {
|
|
// We created a new operation, so we need to execute it
|
|
go rc.executeRefreshAsync(operation, sessionID, tokenHash, refreshFunc)
|
|
} else {
|
|
// Joined existing operation - this is a deduplicated request
|
|
atomic.AddInt64(&rc.metrics.deduplicatedRequests, 1)
|
|
}
|
|
|
|
// Wait for the operation to complete
|
|
select {
|
|
case <-operation.done:
|
|
// Get the result
|
|
operation.mutex.RLock()
|
|
result := operation.result
|
|
operation.mutex.RUnlock()
|
|
|
|
if result != nil {
|
|
// Record metrics based on result
|
|
if result.err != nil {
|
|
rc.circuitBreaker.RecordFailure()
|
|
rc.recordRefreshFailure(sessionID)
|
|
atomic.AddInt64(&rc.metrics.failedRefreshes, 1)
|
|
} else {
|
|
rc.circuitBreaker.RecordSuccess()
|
|
rc.recordRefreshSuccess(sessionID)
|
|
atomic.AddInt64(&rc.metrics.successfulRefreshes, 1)
|
|
}
|
|
return result.tokenResponse, result.err
|
|
}
|
|
return nil, fmt.Errorf("refresh operation completed without result")
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// getOrCreateOperation atomically checks for an existing operation or creates a new one
|
|
// Returns (operation, true, nil) if a new operation was created
|
|
// Returns (operation, false, nil) if joined an existing operation
|
|
// Returns (nil, false, error) if the operation was rejected
|
|
func (rc *RefreshCoordinator) getOrCreateOperation(
|
|
_ context.Context,
|
|
sessionID string,
|
|
tokenHash string,
|
|
refreshToken string,
|
|
) (*refreshOperation, bool, error) {
|
|
// Speculatively construct the operation we WOULD register if we win the
|
|
// race. Allocating here keeps the LoadOrStore call below atomic and
|
|
// avoids any global lock — under Yaegi the previous map+RWMutex design
|
|
// held the write lock long enough (tens of ms per call) that concurrent
|
|
// refreshes on the same coordinator serialized into a queue that grew
|
|
// without bound. See struct comment on inFlightRefreshes.
|
|
candidate := &refreshOperation{
|
|
refreshToken: refreshToken,
|
|
done: make(chan struct{}),
|
|
startTime: time.Now(),
|
|
waiterCount: 1,
|
|
}
|
|
|
|
if existing, loaded := rc.inFlightRefreshes.LoadOrStore(tokenHash, candidate); loaded {
|
|
existingOp, ok := existing.(*refreshOperation)
|
|
if !ok {
|
|
// Defensive: anything stored here is always *refreshOperation, but
|
|
// keep the typed assert so a programming error elsewhere doesn't
|
|
// surface as a confusing panic in an interpreter frame.
|
|
return nil, false, fmt.Errorf("inFlightRefreshes corrupt: unexpected type %T", existing)
|
|
}
|
|
if existingOp.refreshToken == refreshToken {
|
|
atomic.AddInt32(&existingOp.waiterCount, 1)
|
|
return existingOp, false, nil
|
|
}
|
|
// Different refresh token for same hash - should not happen
|
|
return nil, false, fmt.Errorf("refresh token mismatch")
|
|
}
|
|
|
|
// We won the race and registered `candidate`. Apply gates now. If any
|
|
// gate fails we must remove our entry from the map and signal failure
|
|
// to any joiners that snuck in between LoadOrStore and now.
|
|
if err := rc.applyLeaderGates(sessionID); err != nil {
|
|
rc.failCandidate(tokenHash, candidate, err)
|
|
return nil, false, err
|
|
}
|
|
|
|
// Reserve concurrent slot via CAS — without the old global lock we can
|
|
// no longer rely on mutex-mediated check-then-increment. If we lose the
|
|
// CAS race we retry; if the limit has since been reached we back out.
|
|
for {
|
|
current := atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes)
|
|
if int(current) >= rc.config.MaxConcurrentRefreshes {
|
|
err := fmt.Errorf("maximum concurrent refresh operations reached")
|
|
rc.failCandidate(tokenHash, candidate, err)
|
|
return nil, false, err
|
|
}
|
|
if atomic.CompareAndSwapInt32(&rc.metrics.currentInFlightRefreshes, current, current+1) {
|
|
break
|
|
}
|
|
}
|
|
|
|
return candidate, true, nil
|
|
}
|
|
|
|
// applyLeaderGates runs the rate-limit, cooldown, and memory-pressure checks
|
|
// that previously ran under the global refreshMutex. Only the leader (the
|
|
// goroutine that just registered the operation) runs them; joiners share the
|
|
// leader's outcome via operation.done.
|
|
func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error {
|
|
rc.recordRefreshAttempt(sessionID)
|
|
if rc.isInCooldown(sessionID) {
|
|
atomic.AddInt64(&rc.metrics.cooldownsTriggered, 1)
|
|
return fmt.Errorf("refresh attempts exceeded for session, in cooldown period")
|
|
}
|
|
if rc.config.EnableMemoryPressureDetection && rc.isUnderMemoryPressure() {
|
|
atomic.AddInt64(&rc.metrics.memoryPressureEvents, 1)
|
|
return fmt.Errorf("system under memory pressure, refresh denied")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// failCandidate removes the leader's just-registered operation from the
|
|
// in-flight map and signals the error to any joiners by recording the result
|
|
// and closing the done channel. This keeps the (nil, false, err) return path
|
|
// equivalent to the pre-sync.Map version: callers see the error directly,
|
|
// joiners see it via operation.done.
|
|
func (rc *RefreshCoordinator) failCandidate(tokenHash string, op *refreshOperation, err error) {
|
|
rc.inFlightRefreshes.Delete(tokenHash)
|
|
op.mutex.Lock()
|
|
op.result = &refreshResult{err: err}
|
|
op.mutex.Unlock()
|
|
close(op.done)
|
|
}
|
|
|
|
// executeRefreshAsync performs the actual refresh operation asynchronously
|
|
func (rc *RefreshCoordinator) executeRefreshAsync(
|
|
operation *refreshOperation,
|
|
_ string, // sessionID - reserved for future metrics/logging
|
|
tokenHash string,
|
|
refreshFunc func() (*TokenResponse, error),
|
|
) {
|
|
defer func() {
|
|
// Signal completion to all waiters
|
|
close(operation.done)
|
|
|
|
// Schedule delayed cleanup using timer instead of spawning a goroutine
|
|
// This prevents goroutine explosion under high load
|
|
rc.scheduleDelayedCleanup(tokenHash)
|
|
}()
|
|
|
|
// Create timeout context
|
|
refreshCtx, cancel := context.WithTimeout(context.Background(), rc.config.RefreshTimeout)
|
|
defer cancel()
|
|
|
|
// Execute refresh in goroutine to respect timeout
|
|
resultChan := make(chan struct {
|
|
resp *TokenResponse
|
|
err error
|
|
}, 1)
|
|
|
|
go func() {
|
|
resp, err := refreshFunc()
|
|
select {
|
|
case resultChan <- struct {
|
|
resp *TokenResponse
|
|
err error
|
|
}{resp, err}:
|
|
case <-refreshCtx.Done():
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case result := <-resultChan:
|
|
// Store result for all waiters
|
|
operation.mutex.Lock()
|
|
operation.result = &refreshResult{
|
|
tokenResponse: result.resp,
|
|
err: result.err,
|
|
fromCache: false,
|
|
}
|
|
operation.mutex.Unlock()
|
|
case <-refreshCtx.Done():
|
|
// Timeout occurred
|
|
timeoutErr := fmt.Errorf("refresh operation timed out after %v", rc.config.RefreshTimeout)
|
|
operation.mutex.Lock()
|
|
operation.result = &refreshResult{
|
|
tokenResponse: nil,
|
|
err: timeoutErr,
|
|
fromCache: false,
|
|
}
|
|
operation.mutex.Unlock()
|
|
}
|
|
}
|
|
|
|
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning a goroutine
|
|
// This prevents goroutine explosion under high load (500+ req/sec)
|
|
func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) {
|
|
delay := rc.config.DeduplicationCleanupDelay
|
|
if delay <= 0 {
|
|
// Immediate cleanup
|
|
rc.performCleanup(tokenHash)
|
|
return
|
|
}
|
|
|
|
// Use time.AfterFunc which is more efficient than spawning a goroutine with Sleep
|
|
// time.AfterFunc uses the runtime's timer heap which is much more efficient
|
|
rc.cleanupTimerMu.Lock()
|
|
// Cancel any existing timer for this hash (shouldn't happen, but just in case)
|
|
if existingTimer, exists := rc.cleanupTimers[tokenHash]; exists {
|
|
existingTimer.Stop()
|
|
}
|
|
rc.cleanupTimers[tokenHash] = time.AfterFunc(delay, func() {
|
|
rc.performCleanup(tokenHash)
|
|
// Remove timer from map
|
|
rc.cleanupTimerMu.Lock()
|
|
delete(rc.cleanupTimers, tokenHash)
|
|
rc.cleanupTimerMu.Unlock()
|
|
})
|
|
rc.cleanupTimerMu.Unlock()
|
|
}
|
|
|
|
// performCleanup removes the operation from the in-flight map.
|
|
// Idempotent: only decrements the in-flight counter if an entry was actually
|
|
// removed. LoadAndDelete is atomic so any concurrent failCandidate or repeat
|
|
// cleanup call will see exactly one removal — the budget cannot be corrupted
|
|
// by double-decrement.
|
|
func (rc *RefreshCoordinator) performCleanup(tokenHash string) {
|
|
if _, existed := rc.inFlightRefreshes.LoadAndDelete(tokenHash); existed {
|
|
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
|
}
|
|
}
|
|
|
|
// getOrCreateTracker fetches the tracker for sessionID or atomically creates a
|
|
// fresh one. The sync.Map.LoadOrStore semantics make this lock-free even under
|
|
// concurrent first-touch races: at most one tracker per sessionID survives.
|
|
//
|
|
// trackerFromMapValue centralizes the type assertion so the lint-mandated
|
|
// two-value form lives in one place; the stored type is always
|
|
// *refreshAttemptTracker by construction.
|
|
func trackerFromMapValue(v interface{}) *refreshAttemptTracker {
|
|
t, _ := v.(*refreshAttemptTracker)
|
|
return t
|
|
}
|
|
|
|
func (rc *RefreshCoordinator) getOrCreateTracker(sessionID string) *refreshAttemptTracker {
|
|
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
|
return trackerFromMapValue(v)
|
|
}
|
|
fresh := &refreshAttemptTracker{
|
|
windowStartNano: time.Now().UnixNano(),
|
|
}
|
|
actual, _ := rc.sessionRefreshAttempts.LoadOrStore(sessionID, fresh)
|
|
return trackerFromMapValue(actual)
|
|
}
|
|
|
|
// isInCooldown checks if a session is in cooldown. Lock-free read with a
|
|
// best-effort cooldown-reset CAS on the cooldownEndNano sentinel. If the
|
|
// reset races with another goroutine we accept the loser's view (the winner's
|
|
// reset still happens). The attempt-window expiry and limit-exceeded paths
|
|
// are write-mostly but use atomic.StoreInt64/AddInt32 — never a held lock.
|
|
func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
|
|
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
|
if !ok {
|
|
return false // No tracker means first attempt, not in cooldown
|
|
}
|
|
tracker := trackerFromMapValue(v)
|
|
now := time.Now()
|
|
nowNano := now.UnixNano()
|
|
|
|
// Already in cooldown?
|
|
if cooldownEnd := atomic.LoadInt64(&tracker.cooldownEndNano); cooldownEnd != 0 {
|
|
if nowNano <= cooldownEnd {
|
|
return true // still in cooldown
|
|
}
|
|
// Cooldown expired. Best-effort reset (a concurrent caller may also
|
|
// reset; the result is equivalent — fresh window + one recorded
|
|
// attempt — so the CAS race is benign).
|
|
if atomic.CompareAndSwapInt64(&tracker.cooldownEndNano, cooldownEnd, 0) {
|
|
atomic.StoreInt32(&tracker.attempts, 1)
|
|
atomic.StoreInt32(&tracker.consecutiveFailures, 0)
|
|
atomic.StoreInt64(&tracker.windowStartNano, nowNano)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Window expired?
|
|
if windowStart := atomic.LoadInt64(&tracker.windowStartNano); time.Duration(nowNano-windowStart) > rc.config.RefreshAttemptWindow {
|
|
atomic.StoreInt32(&tracker.attempts, 1)
|
|
atomic.StoreInt64(&tracker.windowStartNano, nowNano)
|
|
return false
|
|
}
|
|
|
|
// Just exceeded attempt limit?
|
|
if int(atomic.LoadInt32(&tracker.attempts)) >= rc.config.MaxRefreshAttempts {
|
|
end := now.Add(rc.config.RefreshCooldownPeriod).UnixNano()
|
|
// Only one CAS winner publishes the cooldown end + logs.
|
|
if atomic.CompareAndSwapInt64(&tracker.cooldownEndNano, 0, end) {
|
|
rc.logger.Infof("Session %s entering refresh cooldown after %d attempts",
|
|
sessionID, atomic.LoadInt32(&tracker.attempts))
|
|
}
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// recordRefreshAttempt records a refresh attempt for rate limiting. Lock-free:
|
|
// LoadOrStore for the tracker, atomic counters/timestamps for fields.
|
|
func (rc *RefreshCoordinator) recordRefreshAttempt(sessionID string) {
|
|
tracker := rc.getOrCreateTracker(sessionID)
|
|
atomic.AddInt32(&tracker.attempts, 1)
|
|
atomic.StoreInt64(&tracker.lastAttemptNano, time.Now().UnixNano())
|
|
}
|
|
|
|
// recordRefreshSuccess records a successful refresh. Lock-free.
|
|
func (rc *RefreshCoordinator) recordRefreshSuccess(sessionID string) {
|
|
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
|
atomic.StoreInt32(&trackerFromMapValue(v).consecutiveFailures, 0)
|
|
}
|
|
}
|
|
|
|
// recordRefreshFailure records a failed refresh. Lock-free.
|
|
func (rc *RefreshCoordinator) recordRefreshFailure(sessionID string) {
|
|
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
|
atomic.AddInt32(&trackerFromMapValue(v).consecutiveFailures, 1)
|
|
}
|
|
}
|
|
|
|
// hashRefreshToken creates a hash of the refresh token for deduplication
|
|
func (rc *RefreshCoordinator) hashRefreshToken(token string) string {
|
|
return refreshCoordinatorSessionID(token)
|
|
}
|
|
|
|
// refreshCoordinatorSessionID derives a stable identifier from a refresh token
|
|
// for both deduplication and per-session attempt tracking. Using sha256 of the
|
|
// raw token means each rotation produces a fresh sessionID with its own attempt
|
|
// budget, which is what we want.
|
|
func refreshCoordinatorSessionID(token string) string {
|
|
hash := sha256.Sum256([]byte(token))
|
|
return hex.EncodeToString(hash[:])
|
|
}
|
|
|
|
// refreshCoordinatorWaitTimeout caps how long a request may wait for a
|
|
// coordinated refresh result. It is wider than RefreshTimeout so a follower
|
|
// always sees the leader's result instead of timing out independently.
|
|
const refreshCoordinatorWaitTimeout = 35 * time.Second
|
|
|
|
// isUnderMemoryPressure checks if the system is under memory pressure by
|
|
// consulting the global memory monitor. Returns true when pressure reaches
|
|
// High or Critical, at which point we refuse new refresh operations to
|
|
// avoid aggravating an already-stressed heap.
|
|
func (rc *RefreshCoordinator) isUnderMemoryPressure() bool {
|
|
monitor := GetGlobalMemoryMonitor()
|
|
if monitor == nil {
|
|
return false
|
|
}
|
|
return monitor.GetMemoryPressure() >= MemoryPressureHigh
|
|
}
|
|
|
|
// cleanupRoutine periodically cleans up stale tracking entries
|
|
func (rc *RefreshCoordinator) cleanupRoutine() {
|
|
defer rc.wg.Done()
|
|
|
|
ticker := time.NewTicker(rc.config.CleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
rc.cleanupStaleEntries()
|
|
case <-rc.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupStaleEntries removes outdated tracking entries. Lock-free iteration
|
|
// via sync.Map.Range; safe to race with concurrent reads/writes.
|
|
func (rc *RefreshCoordinator) cleanupStaleEntries() {
|
|
cutoff := time.Now().Add(-2 * rc.config.RefreshAttemptWindow).UnixNano()
|
|
rc.sessionRefreshAttempts.Range(func(key, value interface{}) bool {
|
|
tracker := trackerFromMapValue(value)
|
|
if tracker == nil {
|
|
return true
|
|
}
|
|
if atomic.LoadInt64(&tracker.lastAttemptNano) < cutoff {
|
|
// Compare-and-delete to avoid evicting a tracker that was just
|
|
// re-used by a concurrent caller. We compare by pointer identity.
|
|
rc.sessionRefreshAttempts.CompareAndDelete(key, value)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// GetMetrics returns current coordinator metrics
|
|
func (rc *RefreshCoordinator) GetMetrics() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"total_requests": atomic.LoadInt64(&rc.metrics.totalRefreshRequests),
|
|
"deduplicated_requests": atomic.LoadInt64(&rc.metrics.deduplicatedRequests),
|
|
"successful_refreshes": atomic.LoadInt64(&rc.metrics.successfulRefreshes),
|
|
"failed_refreshes": atomic.LoadInt64(&rc.metrics.failedRefreshes),
|
|
"circuit_breaker_trips": atomic.LoadInt64(&rc.metrics.circuitBreakerTrips),
|
|
"memory_pressure_events": atomic.LoadInt64(&rc.metrics.memoryPressureEvents),
|
|
"cooldowns_triggered": atomic.LoadInt64(&rc.metrics.cooldownsTriggered),
|
|
"current_inflight": atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes),
|
|
"circuit_breaker_state": rc.circuitBreaker.GetState(),
|
|
}
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the coordinator
|
|
func (rc *RefreshCoordinator) Shutdown() {
|
|
close(rc.stopChan)
|
|
|
|
// Cancel all pending cleanup timers
|
|
rc.cleanupTimerMu.Lock()
|
|
for _, timer := range rc.cleanupTimers {
|
|
timer.Stop()
|
|
}
|
|
rc.cleanupTimers = make(map[string]*time.Timer)
|
|
rc.cleanupTimerMu.Unlock()
|
|
|
|
rc.wg.Wait()
|
|
}
|
|
|
|
// AllowRequest reports whether the circuit breaker allows a request. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) AllowRequest() bool {
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 0: // closed
|
|
return true
|
|
case 1: // open
|
|
lastFail := atomic.LoadInt64(&cb.lastFailureNano)
|
|
if time.Duration(time.Now().UnixNano()-lastFail) > cb.config.OpenDuration {
|
|
// Transition to half-open; first CAS winner gets the probe.
|
|
if atomic.CompareAndSwapInt32(&cb.state, 1, 2) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
case 2: // half-open
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// RecordSuccess records a successful operation. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) RecordSuccess() {
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 2: // half-open -> close
|
|
atomic.StoreInt32(&cb.state, 0)
|
|
atomic.StoreInt32(&cb.failures, 0)
|
|
case 0: // closed
|
|
atomic.StoreInt32(&cb.failures, 0)
|
|
}
|
|
atomic.StoreInt64(&cb.lastSuccessNano, time.Now().UnixNano())
|
|
}
|
|
|
|
// RecordFailure records a failed operation. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) RecordFailure() {
|
|
failures := atomic.AddInt32(&cb.failures, 1)
|
|
atomic.StoreInt64(&cb.lastFailureNano, time.Now().UnixNano())
|
|
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 0:
|
|
if int(failures) >= cb.config.MaxFailures {
|
|
atomic.StoreInt32(&cb.state, 1)
|
|
}
|
|
case 2:
|
|
// Half-open probe failed -> back to open.
|
|
atomic.StoreInt32(&cb.state, 1)
|
|
}
|
|
}
|
|
|
|
// GetState returns the current state of the circuit breaker
|
|
func (cb *RefreshCircuitBreaker) GetState() string {
|
|
state := atomic.LoadInt32(&cb.state)
|
|
switch state {
|
|
case 0:
|
|
return "closed"
|
|
case 1:
|
|
return "open"
|
|
case 2:
|
|
return "half-open"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|