mirror of
https://github.com/lukaszraczylo/traefikoidc.git
synced 2026-06-05 22:44:17 +00:00
f821b8829b
UniversalCache.getLocal(): when a cached token expires, the RLock fast path (line 385-398) previously fell through to c.mu.Lock() (write lock). Under Yaegi, the write-lock holder takes 10-100ms for LRU manipulation, and Go's RWMutex writer-priority blocks ALL new RLock callers. A single expired-token event turned every concurrent request from read-parallel into write-serialized — the convoy that produced the 737-goroutine pileup at 0x400275a608 (pprof captured at /tmp/traefik-spike-1779663149). Fix: return (nil, false) immediately on expiry for Token/JWK/Session cache types. The periodic cleanup goroutine handles eviction. Write lock is never taken on the read path for these cache types. refreshAttemptTracker.mutateState(): the CAS loop used t.state.CompareAndSwap(t.state.Load(), next) — a second Load that can see a different value from a concurrent writer, silently overwriting their update. Fixed to CompareAndSwap(cur, next) using the snapshot we computed the mutation from.
776 lines
28 KiB
Go
776 lines
28 KiB
Go
package traefikoidc
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// RefreshCoordinator prevents duplicate refresh token operations and manages
|
|
// refresh attempt tracking to prevent infinite loops and OOM conditions.
|
|
// It implements request coalescing, rate limiting, and circuit breaking
|
|
// specifically for token refresh operations.
|
|
type RefreshCoordinator struct {
|
|
// inFlightRefreshes maps tokenHash -> *refreshOperation. sync.Map is used
|
|
// instead of a plain map + RWMutex so concurrent refreshes do not
|
|
// serialize on a single global lock. Under Yaegi the previous
|
|
// refreshMutex.Lock() was held for tens of milliseconds per request due
|
|
// to interpreter overhead on the work inside the critical section,
|
|
// causing dozens of goroutines to stack up on it and pin one CPU core.
|
|
inFlightRefreshes sync.Map
|
|
// sessionRefreshAttempts maps sessionID -> *refreshAttemptTracker.
|
|
// sync.Map + atomic tracker fields means isInCooldown/recordRefreshAttempt/
|
|
// recordRefreshSuccess/recordRefreshFailure are lock-free. Previously
|
|
// these used attemptsMutex sync.RWMutex; under Yaegi every Lock() acquisition
|
|
// adds 10-50ms of dispatch overhead, and they were called twice per leader
|
|
// request (once for recordRefreshAttempt, once for isInCooldown). That
|
|
// serializing pattern caused the v1.0.15 death spiral after v1.0.14
|
|
// removed the refreshMutex (same architectural shape, different mutex).
|
|
sessionRefreshAttempts sync.Map
|
|
circuitBreaker *RefreshCircuitBreaker
|
|
metrics *RefreshMetrics
|
|
logger *Logger
|
|
stopChan chan struct{}
|
|
config RefreshCoordinatorConfig
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// RefreshCoordinatorConfig configures the refresh coordinator behavior
|
|
type RefreshCoordinatorConfig struct {
|
|
// Maximum refresh attempts per session before giving up
|
|
MaxRefreshAttempts int
|
|
// Time window for refresh attempt tracking
|
|
RefreshAttemptWindow time.Duration
|
|
// Cooldown period after max attempts reached
|
|
RefreshCooldownPeriod time.Duration
|
|
// Maximum concurrent refresh operations
|
|
MaxConcurrentRefreshes int
|
|
// Timeout for individual refresh operations
|
|
RefreshTimeout time.Duration
|
|
// Enable memory pressure detection
|
|
EnableMemoryPressureDetection bool
|
|
// Memory pressure threshold (in MB)
|
|
MemoryPressureThresholdMB uint64
|
|
// Cleanup interval for stale entries
|
|
CleanupInterval time.Duration
|
|
// Delay before cleaning up completed refresh operations from deduplication map
|
|
// Set to 0 for immediate cleanup (useful for tests)
|
|
DeduplicationCleanupDelay time.Duration
|
|
}
|
|
|
|
// DefaultRefreshCoordinatorConfig returns production-ready configuration
|
|
func DefaultRefreshCoordinatorConfig() RefreshCoordinatorConfig {
|
|
return RefreshCoordinatorConfig{
|
|
MaxRefreshAttempts: 5,
|
|
RefreshAttemptWindow: 5 * time.Minute,
|
|
RefreshCooldownPeriod: 10 * time.Minute,
|
|
MaxConcurrentRefreshes: 10,
|
|
RefreshTimeout: 30 * time.Second,
|
|
EnableMemoryPressureDetection: true,
|
|
MemoryPressureThresholdMB: 500, // 500MB threshold
|
|
CleanupInterval: 1 * time.Minute,
|
|
DeduplicationCleanupDelay: 100 * time.Millisecond, // Default 100ms for production
|
|
}
|
|
}
|
|
|
|
// refreshOperation represents an in-flight refresh operation
|
|
type refreshOperation struct {
|
|
startTime time.Time
|
|
result *refreshResult
|
|
done chan struct{}
|
|
refreshToken string
|
|
mutex sync.RWMutex
|
|
waiterCount int32
|
|
}
|
|
|
|
// refreshResult contains the result of a refresh operation
|
|
type refreshResult struct {
|
|
tokenResponse *TokenResponse
|
|
err error
|
|
fromCache bool
|
|
}
|
|
|
|
// attemptState is the immutable snapshot of a session's refresh-attempt
|
|
// state. Lives behind refreshAttemptTracker.state (atomic.Value). Every
|
|
// transition (record, success, failure, window-reset, cooldown-enter,
|
|
// cooldown-exit) constructs a fresh attemptState and publishes it via
|
|
// CompareAndSwap so the entire field set is updated together.
|
|
//
|
|
// Per-field atomic.Load/Store (the previous v1.0.15 design) had a benign
|
|
// but observable hazard: the cooldown-exit reset wrote cooldownEndNano = 0
|
|
// first, then separately stored attempts = 1 and windowStartNano = now.
|
|
// A concurrent isInCooldown call could see cooldownEndNano = 0 (reset
|
|
// just completed) with attempts still at MaxRefreshAttempts, triggering
|
|
// a fresh cooldown immediately. The snapshot approach eliminates the
|
|
// intermediate state entirely.
|
|
type attemptState struct {
|
|
lastAttemptNano int64 // UnixNano of last attempt
|
|
windowStartNano int64 // UnixNano of attempt-window start
|
|
cooldownEndNano int64 // UnixNano; 0 = not in cooldown
|
|
attempts int32
|
|
consecutiveFailures int32
|
|
}
|
|
|
|
// refreshAttemptTracker tracks refresh attempts for a session via a single
|
|
// atomic.Value holding a *attemptState pointer. Readers do exactly one Load.
|
|
// Writers do Load → construct new → CompareAndSwap (retry on conflict).
|
|
// Under Yaegi this collapses 3-4 per-field atomic dispatches into one Load,
|
|
// and eliminates the cross-field race in the window-reset path.
|
|
type refreshAttemptTracker struct {
|
|
state atomic.Value // *attemptState
|
|
}
|
|
|
|
// stateOf returns the current attemptState, or a zero-value snapshot if none
|
|
// has been published yet. The empty snapshot represents "no attempts recorded".
|
|
func (t *refreshAttemptTracker) stateOf() *attemptState {
|
|
if v := t.state.Load(); v != nil {
|
|
s, _ := v.(*attemptState)
|
|
if s != nil {
|
|
return s
|
|
}
|
|
}
|
|
return &attemptState{}
|
|
}
|
|
|
|
// RefreshMetrics tracks coordinator performance metrics
|
|
type RefreshMetrics struct {
|
|
totalRefreshRequests int64
|
|
deduplicatedRequests int64
|
|
successfulRefreshes int64
|
|
failedRefreshes int64
|
|
circuitBreakerTrips int64
|
|
memoryPressureEvents int64
|
|
cooldownsTriggered int64
|
|
currentInFlightRefreshes int32
|
|
}
|
|
|
|
// RefreshCircuitBreaker implements a circuit breaker specifically for refresh
|
|
// operations. All mutable fields are atomic so AllowRequest/RecordSuccess/
|
|
// RecordFailure run without any mutex. The previous sync.RWMutex.RLock() was
|
|
// taken on every CoordinateRefresh — under Yaegi this added 10-50ms of
|
|
// interpreter dispatch per call, which compounded with attemptsMutex to keep
|
|
// the pod's single CPU core saturated.
|
|
type RefreshCircuitBreaker struct {
|
|
lastFailureNano int64 // atomic, UnixNano of most recent failure
|
|
lastSuccessNano int64 // atomic, UnixNano of most recent success
|
|
config RefreshCircuitBreakerConfig
|
|
state int32 // atomic: 0=closed, 1=open, 2=half-open
|
|
failures int32 // atomic
|
|
}
|
|
|
|
// RefreshCircuitBreakerConfig configures the refresh circuit breaker
|
|
type RefreshCircuitBreakerConfig struct {
|
|
MaxFailures int
|
|
OpenDuration time.Duration
|
|
HalfOpenRequests int
|
|
}
|
|
|
|
// NewRefreshCoordinator creates a new refresh coordinator
|
|
func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *RefreshCoordinator {
|
|
if logger == nil {
|
|
logger = GetSingletonNoOpLogger()
|
|
}
|
|
|
|
rc := &RefreshCoordinator{
|
|
// inFlightRefreshes and sessionRefreshAttempts are both sync.Map;
|
|
// their zero values are ready to use.
|
|
config: config,
|
|
metrics: &RefreshMetrics{},
|
|
logger: logger,
|
|
stopChan: make(chan struct{}),
|
|
circuitBreaker: &RefreshCircuitBreaker{
|
|
config: RefreshCircuitBreakerConfig{
|
|
MaxFailures: 3,
|
|
OpenDuration: 30 * time.Second,
|
|
HalfOpenRequests: 1,
|
|
},
|
|
},
|
|
}
|
|
|
|
// Start cleanup goroutine
|
|
rc.wg.Add(1)
|
|
go rc.cleanupRoutine()
|
|
|
|
return rc
|
|
}
|
|
|
|
// CoordinateRefresh ensures only one refresh operation happens per refresh token
|
|
// and implements request coalescing for concurrent refresh attempts
|
|
func (rc *RefreshCoordinator) CoordinateRefresh(
|
|
ctx context.Context,
|
|
sessionID string,
|
|
refreshToken string,
|
|
refreshFunc func() (*TokenResponse, error),
|
|
) (*TokenResponse, error) {
|
|
// Increment total request count
|
|
atomic.AddInt64(&rc.metrics.totalRefreshRequests, 1)
|
|
|
|
// Check circuit breaker first
|
|
if !rc.circuitBreaker.AllowRequest() {
|
|
atomic.AddInt64(&rc.metrics.circuitBreakerTrips, 1)
|
|
return nil, fmt.Errorf("refresh circuit breaker is open due to repeated failures")
|
|
}
|
|
|
|
// Create hash of refresh token for deduplication
|
|
tokenHash := rc.hashRefreshToken(refreshToken)
|
|
|
|
// CRITICAL FIX: Atomically check for existing operation OR create new one
|
|
// This prevents the race where multiple goroutines check, find nothing, then all create
|
|
operation, isNew, err := rc.getOrCreateOperation(ctx, sessionID, tokenHash, refreshToken)
|
|
|
|
if err != nil {
|
|
// Operation creation was rejected (rate limit, memory pressure, concurrent limit)
|
|
return nil, err
|
|
}
|
|
|
|
if isNew {
|
|
// We created a new operation, so we need to execute it
|
|
go rc.executeRefreshAsync(operation, sessionID, tokenHash, refreshFunc)
|
|
} else {
|
|
// Joined existing operation - this is a deduplicated request
|
|
atomic.AddInt64(&rc.metrics.deduplicatedRequests, 1)
|
|
}
|
|
|
|
// Wait for the operation to complete
|
|
select {
|
|
case <-operation.done:
|
|
// Get the result
|
|
operation.mutex.RLock()
|
|
result := operation.result
|
|
operation.mutex.RUnlock()
|
|
|
|
if result != nil {
|
|
// Record metrics based on result
|
|
if result.err != nil {
|
|
rc.circuitBreaker.RecordFailure()
|
|
rc.recordRefreshFailure(sessionID)
|
|
atomic.AddInt64(&rc.metrics.failedRefreshes, 1)
|
|
} else {
|
|
rc.circuitBreaker.RecordSuccess()
|
|
rc.recordRefreshSuccess(sessionID)
|
|
atomic.AddInt64(&rc.metrics.successfulRefreshes, 1)
|
|
}
|
|
return result.tokenResponse, result.err
|
|
}
|
|
return nil, fmt.Errorf("refresh operation completed without result")
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// getOrCreateOperation atomically checks for an existing operation or creates a new one
|
|
// Returns (operation, true, nil) if a new operation was created
|
|
// Returns (operation, false, nil) if joined an existing operation
|
|
// Returns (nil, false, error) if the operation was rejected
|
|
func (rc *RefreshCoordinator) getOrCreateOperation(
|
|
_ context.Context,
|
|
sessionID string,
|
|
tokenHash string,
|
|
refreshToken string,
|
|
) (*refreshOperation, bool, error) {
|
|
// Speculatively construct the operation we WOULD register if we win the
|
|
// race. Allocating here keeps the LoadOrStore call below atomic and
|
|
// avoids any global lock — under Yaegi the previous map+RWMutex design
|
|
// held the write lock long enough (tens of ms per call) that concurrent
|
|
// refreshes on the same coordinator serialized into a queue that grew
|
|
// without bound. See struct comment on inFlightRefreshes.
|
|
candidate := &refreshOperation{
|
|
refreshToken: refreshToken,
|
|
done: make(chan struct{}),
|
|
startTime: time.Now(),
|
|
waiterCount: 1,
|
|
}
|
|
|
|
if existing, loaded := rc.inFlightRefreshes.LoadOrStore(tokenHash, candidate); loaded {
|
|
existingOp, ok := existing.(*refreshOperation)
|
|
if !ok {
|
|
// Defensive: anything stored here is always *refreshOperation, but
|
|
// keep the typed assert so a programming error elsewhere doesn't
|
|
// surface as a confusing panic in an interpreter frame.
|
|
return nil, false, fmt.Errorf("inFlightRefreshes corrupt: unexpected type %T", existing)
|
|
}
|
|
if existingOp.refreshToken == refreshToken {
|
|
atomic.AddInt32(&existingOp.waiterCount, 1)
|
|
return existingOp, false, nil
|
|
}
|
|
// Different refresh token for same hash - should not happen
|
|
return nil, false, fmt.Errorf("refresh token mismatch")
|
|
}
|
|
|
|
// We won the race and registered `candidate`. Apply gates now. If any
|
|
// gate fails we must remove our entry from the map and signal failure
|
|
// to any joiners that snuck in between LoadOrStore and now.
|
|
if err := rc.applyLeaderGates(sessionID); err != nil {
|
|
rc.failCandidate(tokenHash, candidate, err)
|
|
return nil, false, err
|
|
}
|
|
|
|
// Reserve concurrent slot via ticket-and-return: increment optimistically,
|
|
// decrement if we overshot the limit. The previous CAS-loop allowed a
|
|
// transient overshoot of up to N-1 leaders when several goroutines all
|
|
// observed `current < max` in the same scheduling slice before any one
|
|
// of them succeeded their CAS — visible to readers as
|
|
// currentInFlightRefreshes > MaxConcurrentRefreshes for a brief window.
|
|
// The ticket pattern is strictly bounded: the counter momentarily reads
|
|
// max+k for k concurrent attempts past the limit, but only the k that
|
|
// produced max+1..max+k decrement back, and only k=1 ever observes max+1
|
|
// as committed.
|
|
newCount := atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, 1)
|
|
if int(newCount) > rc.config.MaxConcurrentRefreshes {
|
|
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
|
err := fmt.Errorf("maximum concurrent refresh operations reached")
|
|
rc.failCandidate(tokenHash, candidate, err)
|
|
return nil, false, err
|
|
}
|
|
|
|
return candidate, true, nil
|
|
}
|
|
|
|
// applyLeaderGates runs the rate-limit, cooldown, and memory-pressure checks
|
|
// that previously ran under the global refreshMutex. Only the leader (the
|
|
// goroutine that just registered the operation) runs them; joiners share the
|
|
// leader's outcome via operation.done.
|
|
func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error {
|
|
// Cooldown check FIRST, BEFORE incrementing the attempt counter.
|
|
// Previously this function recorded the attempt and then read the
|
|
// cooldown state. Under burst load (many concurrent leaders with
|
|
// different token hashes but same session) every goroutine could
|
|
// increment past MaxRefreshAttempts before any one of them observed
|
|
// the threshold, so the cooldown gate fired too late — the same
|
|
// thundering-herd shape that drove v1.0.14 into the ground.
|
|
if rc.isInCooldown(sessionID) {
|
|
atomic.AddInt64(&rc.metrics.cooldownsTriggered, 1)
|
|
return fmt.Errorf("refresh attempts exceeded for session, in cooldown period")
|
|
}
|
|
if rc.config.EnableMemoryPressureDetection && rc.isUnderMemoryPressure() {
|
|
atomic.AddInt64(&rc.metrics.memoryPressureEvents, 1)
|
|
return fmt.Errorf("system under memory pressure, refresh denied")
|
|
}
|
|
// Only count attempts that actually progress past the gates.
|
|
rc.recordRefreshAttempt(sessionID)
|
|
return nil
|
|
}
|
|
|
|
// failCandidate removes the leader's just-registered operation from the
|
|
// in-flight map and signals the error to any joiners by recording the result
|
|
// and closing the done channel. This keeps the (nil, false, err) return path
|
|
// equivalent to the pre-sync.Map version: callers see the error directly,
|
|
// joiners see it via operation.done.
|
|
func (rc *RefreshCoordinator) failCandidate(tokenHash string, op *refreshOperation, err error) {
|
|
rc.inFlightRefreshes.Delete(tokenHash)
|
|
op.mutex.Lock()
|
|
op.result = &refreshResult{err: err}
|
|
op.mutex.Unlock()
|
|
close(op.done)
|
|
}
|
|
|
|
// executeRefreshAsync performs the actual refresh operation asynchronously
|
|
func (rc *RefreshCoordinator) executeRefreshAsync(
|
|
operation *refreshOperation,
|
|
_ string, // sessionID - reserved for future metrics/logging
|
|
tokenHash string,
|
|
refreshFunc func() (*TokenResponse, error),
|
|
) {
|
|
defer func() {
|
|
// Signal completion to all waiters
|
|
close(operation.done)
|
|
|
|
// Schedule delayed cleanup using timer instead of spawning a goroutine
|
|
// This prevents goroutine explosion under high load
|
|
rc.scheduleDelayedCleanup(tokenHash)
|
|
}()
|
|
|
|
// Create timeout context
|
|
refreshCtx, cancel := context.WithTimeout(context.Background(), rc.config.RefreshTimeout)
|
|
defer cancel()
|
|
|
|
// Execute refresh in goroutine to respect timeout
|
|
resultChan := make(chan struct {
|
|
resp *TokenResponse
|
|
err error
|
|
}, 1)
|
|
|
|
go func() {
|
|
resp, err := refreshFunc()
|
|
select {
|
|
case resultChan <- struct {
|
|
resp *TokenResponse
|
|
err error
|
|
}{resp, err}:
|
|
case <-refreshCtx.Done():
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case result := <-resultChan:
|
|
// Store result for all waiters
|
|
operation.mutex.Lock()
|
|
operation.result = &refreshResult{
|
|
tokenResponse: result.resp,
|
|
err: result.err,
|
|
fromCache: false,
|
|
}
|
|
operation.mutex.Unlock()
|
|
case <-refreshCtx.Done():
|
|
// Timeout occurred
|
|
timeoutErr := fmt.Errorf("refresh operation timed out after %v", rc.config.RefreshTimeout)
|
|
operation.mutex.Lock()
|
|
operation.result = &refreshResult{
|
|
tokenResponse: nil,
|
|
err: timeoutErr,
|
|
fromCache: false,
|
|
}
|
|
operation.mutex.Unlock()
|
|
}
|
|
}
|
|
|
|
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning
|
|
// a goroutine — time.AfterFunc uses the runtime's timer heap and never spawns
|
|
// a per-timer goroutine until the callback actually fires.
|
|
//
|
|
// The previous implementation tracked every pending timer in a map guarded by
|
|
// cleanupTimerMu so a duplicate scheduling could cancel the prior timer. That
|
|
// "shouldn't happen" path was the only consumer of the map, but the mutex
|
|
// fired on every successful refresh completion — yet another per-request
|
|
// Yaegi-dispatched lock acquisition. performCleanup is already idempotent
|
|
// (LoadAndDelete on the sync.Map), so a duplicate scheduling at worst fires
|
|
// performCleanup twice; the second call is a no-op. Dropping the map removes
|
|
// the whole class of contention on this code path.
|
|
func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) {
|
|
delay := rc.config.DeduplicationCleanupDelay
|
|
if delay <= 0 {
|
|
rc.performCleanup(tokenHash)
|
|
return
|
|
}
|
|
time.AfterFunc(delay, func() { rc.performCleanup(tokenHash) })
|
|
}
|
|
|
|
// performCleanup removes the operation from the in-flight map.
|
|
// Idempotent: only decrements the in-flight counter if an entry was actually
|
|
// removed. LoadAndDelete is atomic so any concurrent failCandidate or repeat
|
|
// cleanup call will see exactly one removal — the budget cannot be corrupted
|
|
// by double-decrement.
|
|
func (rc *RefreshCoordinator) performCleanup(tokenHash string) {
|
|
if _, existed := rc.inFlightRefreshes.LoadAndDelete(tokenHash); existed {
|
|
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
|
}
|
|
}
|
|
|
|
// getOrCreateTracker fetches the tracker for sessionID or atomically creates a
|
|
// fresh one. The sync.Map.LoadOrStore semantics make this lock-free even under
|
|
// concurrent first-touch races: at most one tracker per sessionID survives.
|
|
//
|
|
// trackerFromMapValue centralizes the type assertion so the lint-mandated
|
|
// two-value form lives in one place; the stored type is always
|
|
// *refreshAttemptTracker by construction.
|
|
func trackerFromMapValue(v interface{}) *refreshAttemptTracker {
|
|
t, _ := v.(*refreshAttemptTracker)
|
|
return t
|
|
}
|
|
|
|
func (rc *RefreshCoordinator) getOrCreateTracker(sessionID string) *refreshAttemptTracker {
|
|
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
|
return trackerFromMapValue(v)
|
|
}
|
|
fresh := &refreshAttemptTracker{}
|
|
fresh.state.Store(&attemptState{windowStartNano: time.Now().UnixNano()})
|
|
actual, _ := rc.sessionRefreshAttempts.LoadOrStore(sessionID, fresh)
|
|
return trackerFromMapValue(actual)
|
|
}
|
|
|
|
// mutateState performs a CompareAndSwap loop that applies mutate to the
|
|
// current snapshot. mutate must be PURE: it receives an immutable view of
|
|
// the current state and returns a fresh *attemptState. If mutate returns nil
|
|
// the update is skipped (used by isInCooldown for "no change needed" paths).
|
|
//
|
|
// Retries on CAS conflict are bounded by the number of concurrent writers —
|
|
// in practice 1-3. Under Yaegi each retry pays the dispatch cost of one Load
|
|
// + one CompareAndSwap; still cheaper than the previous per-field atomic
|
|
// sequence and immune to the cross-field race the v1.0.15 design had.
|
|
func (t *refreshAttemptTracker) mutateState(mutate func(cur *attemptState) *attemptState) *attemptState {
|
|
for {
|
|
cur := t.stateOf()
|
|
next := mutate(cur)
|
|
if next == nil {
|
|
return cur
|
|
}
|
|
if t.state.CompareAndSwap(cur, next) {
|
|
return next
|
|
}
|
|
}
|
|
}
|
|
|
|
// isInCooldown checks if a session is in cooldown. Snapshot-based: every
|
|
// transition publishes a fresh *attemptState atomically so readers never see
|
|
// a partially-updated state. The previous per-field atomic design had a
|
|
// benign race in the cooldown-exit path (cooldownEndNano reset before
|
|
// attempts reset) that could double-trigger cooldown.
|
|
func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
|
|
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
|
if !ok {
|
|
return false // No tracker means first attempt, not in cooldown
|
|
}
|
|
tracker := trackerFromMapValue(v)
|
|
now := time.Now()
|
|
nowNano := now.UnixNano()
|
|
maxAttempts := rc.config.MaxRefreshAttempts
|
|
window := rc.config.RefreshAttemptWindow
|
|
cooldownPeriod := rc.config.RefreshCooldownPeriod
|
|
|
|
cur := tracker.stateOf()
|
|
|
|
// Already in cooldown?
|
|
if cur.cooldownEndNano != 0 {
|
|
if nowNano <= cur.cooldownEndNano {
|
|
return true // still in cooldown
|
|
}
|
|
// Cooldown expired: atomically publish a fresh state with the window
|
|
// restarted from one attempt. Whichever goroutine wins the CAS sets
|
|
// the new snapshot; losers see it via the next stateOf load.
|
|
tracker.mutateState(func(s *attemptState) *attemptState {
|
|
if s.cooldownEndNano == 0 || nowNano <= s.cooldownEndNano {
|
|
return nil // someone else already reset, or back in cooldown
|
|
}
|
|
return &attemptState{
|
|
windowStartNano: nowNano,
|
|
attempts: 1,
|
|
}
|
|
})
|
|
return false
|
|
}
|
|
|
|
// Window expired?
|
|
if time.Duration(nowNano-cur.windowStartNano) > window {
|
|
tracker.mutateState(func(s *attemptState) *attemptState {
|
|
if time.Duration(nowNano-s.windowStartNano) <= window {
|
|
return nil
|
|
}
|
|
next := *s
|
|
next.windowStartNano = nowNano
|
|
next.attempts = 1
|
|
return &next
|
|
})
|
|
return false
|
|
}
|
|
|
|
// Just exceeded attempt limit?
|
|
if int(cur.attempts) >= maxAttempts {
|
|
end := now.Add(cooldownPeriod).UnixNano()
|
|
published := tracker.mutateState(func(s *attemptState) *attemptState {
|
|
if s.cooldownEndNano != 0 {
|
|
return nil
|
|
}
|
|
next := *s
|
|
next.cooldownEndNano = end
|
|
return &next
|
|
})
|
|
if published.cooldownEndNano == end {
|
|
rc.logger.Infof("Session %s entering refresh cooldown after %d attempts",
|
|
sessionID, published.attempts)
|
|
}
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// recordRefreshAttempt records a refresh attempt for rate limiting. Lock-free
|
|
// snapshot mutation; attempts and lastAttemptNano are advanced atomically.
|
|
func (rc *RefreshCoordinator) recordRefreshAttempt(sessionID string) {
|
|
tracker := rc.getOrCreateTracker(sessionID)
|
|
nowNano := time.Now().UnixNano()
|
|
tracker.mutateState(func(s *attemptState) *attemptState {
|
|
next := *s
|
|
next.attempts++
|
|
next.lastAttemptNano = nowNano
|
|
return &next
|
|
})
|
|
}
|
|
|
|
// recordRefreshSuccess records a successful refresh: zero consecutiveFailures.
|
|
func (rc *RefreshCoordinator) recordRefreshSuccess(sessionID string) {
|
|
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
|
|
if s.consecutiveFailures == 0 {
|
|
return nil
|
|
}
|
|
next := *s
|
|
next.consecutiveFailures = 0
|
|
return &next
|
|
})
|
|
}
|
|
|
|
// recordRefreshFailure records a failed refresh: increments consecutiveFailures.
|
|
func (rc *RefreshCoordinator) recordRefreshFailure(sessionID string) {
|
|
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
|
|
next := *s
|
|
next.consecutiveFailures++
|
|
return &next
|
|
})
|
|
}
|
|
|
|
// hashRefreshToken creates a hash of the refresh token for deduplication
|
|
func (rc *RefreshCoordinator) hashRefreshToken(token string) string {
|
|
return refreshCoordinatorSessionID(token)
|
|
}
|
|
|
|
// refreshCoordinatorSessionID derives a stable identifier from a refresh token
|
|
// for both deduplication and per-session attempt tracking. Using sha256 of the
|
|
// raw token means each rotation produces a fresh sessionID with its own attempt
|
|
// budget, which is what we want.
|
|
func refreshCoordinatorSessionID(token string) string {
|
|
hash := sha256.Sum256([]byte(token))
|
|
return hex.EncodeToString(hash[:])
|
|
}
|
|
|
|
// refreshCoordinatorWaitTimeout caps how long a request may wait for a
|
|
// coordinated refresh result. It is wider than RefreshTimeout so a follower
|
|
// always sees the leader's result instead of timing out independently.
|
|
const refreshCoordinatorWaitTimeout = 35 * time.Second
|
|
|
|
// isUnderMemoryPressure checks if the system is under memory pressure by
|
|
// consulting the global memory monitor. Returns true when pressure reaches
|
|
// High or Critical, at which point we refuse new refresh operations to
|
|
// avoid aggravating an already-stressed heap.
|
|
func (rc *RefreshCoordinator) isUnderMemoryPressure() bool {
|
|
monitor := GetGlobalMemoryMonitor()
|
|
if monitor == nil {
|
|
return false
|
|
}
|
|
return monitor.GetMemoryPressure() >= MemoryPressureHigh
|
|
}
|
|
|
|
// cleanupRoutine periodically cleans up stale tracking entries
|
|
func (rc *RefreshCoordinator) cleanupRoutine() {
|
|
defer rc.wg.Done()
|
|
|
|
ticker := time.NewTicker(rc.config.CleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
rc.cleanupStaleEntries()
|
|
case <-rc.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupStaleEntries removes outdated tracking entries. Lock-free iteration
|
|
// via sync.Map.Range; safe to race with concurrent reads/writes.
|
|
func (rc *RefreshCoordinator) cleanupStaleEntries() {
|
|
cutoff := time.Now().Add(-2 * rc.config.RefreshAttemptWindow).UnixNano()
|
|
rc.sessionRefreshAttempts.Range(func(key, value interface{}) bool {
|
|
tracker := trackerFromMapValue(value)
|
|
if tracker == nil {
|
|
return true
|
|
}
|
|
if tracker.stateOf().lastAttemptNano < cutoff {
|
|
// Compare-and-delete to avoid evicting a tracker that was just
|
|
// re-used by a concurrent caller. We compare by pointer identity.
|
|
rc.sessionRefreshAttempts.CompareAndDelete(key, value)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// GetMetrics returns current coordinator metrics
|
|
func (rc *RefreshCoordinator) GetMetrics() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"total_requests": atomic.LoadInt64(&rc.metrics.totalRefreshRequests),
|
|
"deduplicated_requests": atomic.LoadInt64(&rc.metrics.deduplicatedRequests),
|
|
"successful_refreshes": atomic.LoadInt64(&rc.metrics.successfulRefreshes),
|
|
"failed_refreshes": atomic.LoadInt64(&rc.metrics.failedRefreshes),
|
|
"circuit_breaker_trips": atomic.LoadInt64(&rc.metrics.circuitBreakerTrips),
|
|
"memory_pressure_events": atomic.LoadInt64(&rc.metrics.memoryPressureEvents),
|
|
"cooldowns_triggered": atomic.LoadInt64(&rc.metrics.cooldownsTriggered),
|
|
"current_inflight": atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes),
|
|
"circuit_breaker_state": rc.circuitBreaker.GetState(),
|
|
}
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the coordinator. Pending delayed-cleanup
|
|
// timers are NOT canceled explicitly: time.AfterFunc callbacks are tiny
|
|
// (one map LoadAndDelete) and harmless after Shutdown — sync.Map operations
|
|
// remain safe on an unused coordinator until GC.
|
|
func (rc *RefreshCoordinator) Shutdown() {
|
|
close(rc.stopChan)
|
|
rc.wg.Wait()
|
|
}
|
|
|
|
// AllowRequest reports whether the circuit breaker allows a request. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) AllowRequest() bool {
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 0: // closed
|
|
return true
|
|
case 1: // open
|
|
lastFail := atomic.LoadInt64(&cb.lastFailureNano)
|
|
if time.Duration(time.Now().UnixNano()-lastFail) > cb.config.OpenDuration {
|
|
// Transition to half-open; first CAS winner gets the probe.
|
|
if atomic.CompareAndSwapInt32(&cb.state, 1, 2) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
case 2: // half-open
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// RecordSuccess records a successful operation. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) RecordSuccess() {
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 2: // half-open -> close
|
|
atomic.StoreInt32(&cb.state, 0)
|
|
atomic.StoreInt32(&cb.failures, 0)
|
|
case 0: // closed
|
|
atomic.StoreInt32(&cb.failures, 0)
|
|
}
|
|
atomic.StoreInt64(&cb.lastSuccessNano, time.Now().UnixNano())
|
|
}
|
|
|
|
// RecordFailure records a failed operation. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) RecordFailure() {
|
|
failures := atomic.AddInt32(&cb.failures, 1)
|
|
atomic.StoreInt64(&cb.lastFailureNano, time.Now().UnixNano())
|
|
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 0:
|
|
if int(failures) >= cb.config.MaxFailures {
|
|
atomic.StoreInt32(&cb.state, 1)
|
|
}
|
|
case 2:
|
|
// Half-open probe failed -> back to open.
|
|
atomic.StoreInt32(&cb.state, 1)
|
|
}
|
|
}
|
|
|
|
// GetState returns the current state of the circuit breaker
|
|
func (cb *RefreshCircuitBreaker) GetState() string {
|
|
state := atomic.LoadInt32(&cb.state)
|
|
switch state {
|
|
case 0:
|
|
return "closed"
|
|
case 1:
|
|
return "open"
|
|
case 2:
|
|
return "half-open"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|