mirror of
https://github.com/lukaszraczylo/traefikoidc.git
synced 2026-06-05 22:44:17 +00:00
17e3f8ef62
Two related lock-free snapshot refactors addressing the remaining
post-v1.0.16 code-review findings.
1. refreshAttemptTracker: per-field atomic.Load/Store -> atomic.Value
snapshot of *attemptState (refresh_coordinator.go).
Previously each tracker held five independently-atomic fields. The
cooldown-exit reset wrote cooldownEndNano = 0 first, then separately
stored attempts = 1 and windowStartNano = now. A concurrent
isInCooldown call could observe cooldownEndNano = 0 (reset just
completed) with attempts still at MaxRefreshAttempts, immediately
triggering a fresh cooldown — a benign double-trigger race that
nonetheless meant the state machine had observable intermediate
states.
New design: state is a *attemptState (immutable) published via
atomic.Value. All transitions (record/success/failure/window-reset/
cooldown-enter/cooldown-exit) go through mutateState, which runs a
CAS loop: load current snapshot -> construct fresh snapshot ->
CompareAndSwap. Either the entire new state publishes or none of
it does — no intermediate visibility, no cross-field race.
Under Yaegi this collapses 3-5 per-field atomic dispatches into one
atomic.Value.Load on the read path. Write paths pay an extra
allocation for the new snapshot but avoid the cross-field hazard.
2. MetadataSnapshot: hot-path readers use atomic.Value instead of
metadataMu.RLock (middleware.go, types.go, main.go, utilities.go).
middleware.ServeHTTP previously took metadataMu.RLock on every
non-bypass request to read the single field issuerURL. Under Yaegi
each RLock acquisition costs 1-5ms of interpreter dispatch.
updateMetadataEndpoints now also publishes an immutable
*MetadataSnapshot via atomic.Value; the hot-path reader loads it
in one op via t.metadataSnap(). Falls back to the legacy
metadataMu.RLock pattern when the snapshot is unpublished (some
test setups initialize the struct fields directly without going
through updateMetadataEndpoints).
Less-frequent callers (helpers, logout, token_introspection) still
take metadataMu.RLock and are unchanged. The snapshot strictly
subsets the metadataMu-protected fields, so those readers see
identical data.
Note on atomic.Pointer[T]: this would have been the cleaner type but
yaegi v0.16.1's stdlib (used by traefik:v3.7.1) exposes only the
legacy unsafe.Pointer-based atomic primitives — no generic Pointer[T].
atomic.Value provides the same semantics via interface{} + type assert.
All tests pass with -race; golangci-lint clean.
776 lines
28 KiB
Go
776 lines
28 KiB
Go
package traefikoidc
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// RefreshCoordinator prevents duplicate refresh token operations and manages
|
|
// refresh attempt tracking to prevent infinite loops and OOM conditions.
|
|
// It implements request coalescing, rate limiting, and circuit breaking
|
|
// specifically for token refresh operations.
|
|
type RefreshCoordinator struct {
|
|
// inFlightRefreshes maps tokenHash -> *refreshOperation. sync.Map is used
|
|
// instead of a plain map + RWMutex so concurrent refreshes do not
|
|
// serialize on a single global lock. Under Yaegi the previous
|
|
// refreshMutex.Lock() was held for tens of milliseconds per request due
|
|
// to interpreter overhead on the work inside the critical section,
|
|
// causing dozens of goroutines to stack up on it and pin one CPU core.
|
|
inFlightRefreshes sync.Map
|
|
// sessionRefreshAttempts maps sessionID -> *refreshAttemptTracker.
|
|
// sync.Map + atomic tracker fields means isInCooldown/recordRefreshAttempt/
|
|
// recordRefreshSuccess/recordRefreshFailure are lock-free. Previously
|
|
// these used attemptsMutex sync.RWMutex; under Yaegi every Lock() acquisition
|
|
// adds 10-50ms of dispatch overhead, and they were called twice per leader
|
|
// request (once for recordRefreshAttempt, once for isInCooldown). That
|
|
// serializing pattern caused the v1.0.15 death spiral after v1.0.14
|
|
// removed the refreshMutex (same architectural shape, different mutex).
|
|
sessionRefreshAttempts sync.Map
|
|
circuitBreaker *RefreshCircuitBreaker
|
|
metrics *RefreshMetrics
|
|
logger *Logger
|
|
stopChan chan struct{}
|
|
config RefreshCoordinatorConfig
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// RefreshCoordinatorConfig configures the refresh coordinator behavior
|
|
type RefreshCoordinatorConfig struct {
|
|
// Maximum refresh attempts per session before giving up
|
|
MaxRefreshAttempts int
|
|
// Time window for refresh attempt tracking
|
|
RefreshAttemptWindow time.Duration
|
|
// Cooldown period after max attempts reached
|
|
RefreshCooldownPeriod time.Duration
|
|
// Maximum concurrent refresh operations
|
|
MaxConcurrentRefreshes int
|
|
// Timeout for individual refresh operations
|
|
RefreshTimeout time.Duration
|
|
// Enable memory pressure detection
|
|
EnableMemoryPressureDetection bool
|
|
// Memory pressure threshold (in MB)
|
|
MemoryPressureThresholdMB uint64
|
|
// Cleanup interval for stale entries
|
|
CleanupInterval time.Duration
|
|
// Delay before cleaning up completed refresh operations from deduplication map
|
|
// Set to 0 for immediate cleanup (useful for tests)
|
|
DeduplicationCleanupDelay time.Duration
|
|
}
|
|
|
|
// DefaultRefreshCoordinatorConfig returns production-ready configuration
|
|
func DefaultRefreshCoordinatorConfig() RefreshCoordinatorConfig {
|
|
return RefreshCoordinatorConfig{
|
|
MaxRefreshAttempts: 5,
|
|
RefreshAttemptWindow: 5 * time.Minute,
|
|
RefreshCooldownPeriod: 10 * time.Minute,
|
|
MaxConcurrentRefreshes: 10,
|
|
RefreshTimeout: 30 * time.Second,
|
|
EnableMemoryPressureDetection: true,
|
|
MemoryPressureThresholdMB: 500, // 500MB threshold
|
|
CleanupInterval: 1 * time.Minute,
|
|
DeduplicationCleanupDelay: 100 * time.Millisecond, // Default 100ms for production
|
|
}
|
|
}
|
|
|
|
// refreshOperation represents an in-flight refresh operation
|
|
type refreshOperation struct {
|
|
startTime time.Time
|
|
result *refreshResult
|
|
done chan struct{}
|
|
refreshToken string
|
|
mutex sync.RWMutex
|
|
waiterCount int32
|
|
}
|
|
|
|
// refreshResult contains the result of a refresh operation
|
|
type refreshResult struct {
|
|
tokenResponse *TokenResponse
|
|
err error
|
|
fromCache bool
|
|
}
|
|
|
|
// attemptState is the immutable snapshot of a session's refresh-attempt
|
|
// state. Lives behind refreshAttemptTracker.state (atomic.Value). Every
|
|
// transition (record, success, failure, window-reset, cooldown-enter,
|
|
// cooldown-exit) constructs a fresh attemptState and publishes it via
|
|
// CompareAndSwap so the entire field set is updated together.
|
|
//
|
|
// Per-field atomic.Load/Store (the previous v1.0.15 design) had a benign
|
|
// but observable hazard: the cooldown-exit reset wrote cooldownEndNano = 0
|
|
// first, then separately stored attempts = 1 and windowStartNano = now.
|
|
// A concurrent isInCooldown call could see cooldownEndNano = 0 (reset
|
|
// just completed) with attempts still at MaxRefreshAttempts, triggering
|
|
// a fresh cooldown immediately. The snapshot approach eliminates the
|
|
// intermediate state entirely.
|
|
type attemptState struct {
|
|
lastAttemptNano int64 // UnixNano of last attempt
|
|
windowStartNano int64 // UnixNano of attempt-window start
|
|
cooldownEndNano int64 // UnixNano; 0 = not in cooldown
|
|
attempts int32
|
|
consecutiveFailures int32
|
|
}
|
|
|
|
// refreshAttemptTracker tracks refresh attempts for a session via a single
|
|
// atomic.Value holding a *attemptState pointer. Readers do exactly one Load.
|
|
// Writers do Load → construct new → CompareAndSwap (retry on conflict).
|
|
// Under Yaegi this collapses 3-4 per-field atomic dispatches into one Load,
|
|
// and eliminates the cross-field race in the window-reset path.
|
|
type refreshAttemptTracker struct {
|
|
state atomic.Value // *attemptState
|
|
}
|
|
|
|
// stateOf returns the current attemptState, or a zero-value snapshot if none
|
|
// has been published yet. The empty snapshot represents "no attempts recorded".
|
|
func (t *refreshAttemptTracker) stateOf() *attemptState {
|
|
if v := t.state.Load(); v != nil {
|
|
s, _ := v.(*attemptState)
|
|
if s != nil {
|
|
return s
|
|
}
|
|
}
|
|
return &attemptState{}
|
|
}
|
|
|
|
// RefreshMetrics tracks coordinator performance metrics
|
|
type RefreshMetrics struct {
|
|
totalRefreshRequests int64
|
|
deduplicatedRequests int64
|
|
successfulRefreshes int64
|
|
failedRefreshes int64
|
|
circuitBreakerTrips int64
|
|
memoryPressureEvents int64
|
|
cooldownsTriggered int64
|
|
currentInFlightRefreshes int32
|
|
}
|
|
|
|
// RefreshCircuitBreaker implements a circuit breaker specifically for refresh
|
|
// operations. All mutable fields are atomic so AllowRequest/RecordSuccess/
|
|
// RecordFailure run without any mutex. The previous sync.RWMutex.RLock() was
|
|
// taken on every CoordinateRefresh — under Yaegi this added 10-50ms of
|
|
// interpreter dispatch per call, which compounded with attemptsMutex to keep
|
|
// the pod's single CPU core saturated.
|
|
type RefreshCircuitBreaker struct {
|
|
lastFailureNano int64 // atomic, UnixNano of most recent failure
|
|
lastSuccessNano int64 // atomic, UnixNano of most recent success
|
|
config RefreshCircuitBreakerConfig
|
|
state int32 // atomic: 0=closed, 1=open, 2=half-open
|
|
failures int32 // atomic
|
|
}
|
|
|
|
// RefreshCircuitBreakerConfig configures the refresh circuit breaker
|
|
type RefreshCircuitBreakerConfig struct {
|
|
MaxFailures int
|
|
OpenDuration time.Duration
|
|
HalfOpenRequests int
|
|
}
|
|
|
|
// NewRefreshCoordinator creates a new refresh coordinator
|
|
func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *RefreshCoordinator {
|
|
if logger == nil {
|
|
logger = GetSingletonNoOpLogger()
|
|
}
|
|
|
|
rc := &RefreshCoordinator{
|
|
// inFlightRefreshes and sessionRefreshAttempts are both sync.Map;
|
|
// their zero values are ready to use.
|
|
config: config,
|
|
metrics: &RefreshMetrics{},
|
|
logger: logger,
|
|
stopChan: make(chan struct{}),
|
|
circuitBreaker: &RefreshCircuitBreaker{
|
|
config: RefreshCircuitBreakerConfig{
|
|
MaxFailures: 3,
|
|
OpenDuration: 30 * time.Second,
|
|
HalfOpenRequests: 1,
|
|
},
|
|
},
|
|
}
|
|
|
|
// Start cleanup goroutine
|
|
rc.wg.Add(1)
|
|
go rc.cleanupRoutine()
|
|
|
|
return rc
|
|
}
|
|
|
|
// CoordinateRefresh ensures only one refresh operation happens per refresh token
|
|
// and implements request coalescing for concurrent refresh attempts
|
|
func (rc *RefreshCoordinator) CoordinateRefresh(
|
|
ctx context.Context,
|
|
sessionID string,
|
|
refreshToken string,
|
|
refreshFunc func() (*TokenResponse, error),
|
|
) (*TokenResponse, error) {
|
|
// Increment total request count
|
|
atomic.AddInt64(&rc.metrics.totalRefreshRequests, 1)
|
|
|
|
// Check circuit breaker first
|
|
if !rc.circuitBreaker.AllowRequest() {
|
|
atomic.AddInt64(&rc.metrics.circuitBreakerTrips, 1)
|
|
return nil, fmt.Errorf("refresh circuit breaker is open due to repeated failures")
|
|
}
|
|
|
|
// Create hash of refresh token for deduplication
|
|
tokenHash := rc.hashRefreshToken(refreshToken)
|
|
|
|
// CRITICAL FIX: Atomically check for existing operation OR create new one
|
|
// This prevents the race where multiple goroutines check, find nothing, then all create
|
|
operation, isNew, err := rc.getOrCreateOperation(ctx, sessionID, tokenHash, refreshToken)
|
|
|
|
if err != nil {
|
|
// Operation creation was rejected (rate limit, memory pressure, concurrent limit)
|
|
return nil, err
|
|
}
|
|
|
|
if isNew {
|
|
// We created a new operation, so we need to execute it
|
|
go rc.executeRefreshAsync(operation, sessionID, tokenHash, refreshFunc)
|
|
} else {
|
|
// Joined existing operation - this is a deduplicated request
|
|
atomic.AddInt64(&rc.metrics.deduplicatedRequests, 1)
|
|
}
|
|
|
|
// Wait for the operation to complete
|
|
select {
|
|
case <-operation.done:
|
|
// Get the result
|
|
operation.mutex.RLock()
|
|
result := operation.result
|
|
operation.mutex.RUnlock()
|
|
|
|
if result != nil {
|
|
// Record metrics based on result
|
|
if result.err != nil {
|
|
rc.circuitBreaker.RecordFailure()
|
|
rc.recordRefreshFailure(sessionID)
|
|
atomic.AddInt64(&rc.metrics.failedRefreshes, 1)
|
|
} else {
|
|
rc.circuitBreaker.RecordSuccess()
|
|
rc.recordRefreshSuccess(sessionID)
|
|
atomic.AddInt64(&rc.metrics.successfulRefreshes, 1)
|
|
}
|
|
return result.tokenResponse, result.err
|
|
}
|
|
return nil, fmt.Errorf("refresh operation completed without result")
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// getOrCreateOperation atomically checks for an existing operation or creates a new one
|
|
// Returns (operation, true, nil) if a new operation was created
|
|
// Returns (operation, false, nil) if joined an existing operation
|
|
// Returns (nil, false, error) if the operation was rejected
|
|
func (rc *RefreshCoordinator) getOrCreateOperation(
|
|
_ context.Context,
|
|
sessionID string,
|
|
tokenHash string,
|
|
refreshToken string,
|
|
) (*refreshOperation, bool, error) {
|
|
// Speculatively construct the operation we WOULD register if we win the
|
|
// race. Allocating here keeps the LoadOrStore call below atomic and
|
|
// avoids any global lock — under Yaegi the previous map+RWMutex design
|
|
// held the write lock long enough (tens of ms per call) that concurrent
|
|
// refreshes on the same coordinator serialized into a queue that grew
|
|
// without bound. See struct comment on inFlightRefreshes.
|
|
candidate := &refreshOperation{
|
|
refreshToken: refreshToken,
|
|
done: make(chan struct{}),
|
|
startTime: time.Now(),
|
|
waiterCount: 1,
|
|
}
|
|
|
|
if existing, loaded := rc.inFlightRefreshes.LoadOrStore(tokenHash, candidate); loaded {
|
|
existingOp, ok := existing.(*refreshOperation)
|
|
if !ok {
|
|
// Defensive: anything stored here is always *refreshOperation, but
|
|
// keep the typed assert so a programming error elsewhere doesn't
|
|
// surface as a confusing panic in an interpreter frame.
|
|
return nil, false, fmt.Errorf("inFlightRefreshes corrupt: unexpected type %T", existing)
|
|
}
|
|
if existingOp.refreshToken == refreshToken {
|
|
atomic.AddInt32(&existingOp.waiterCount, 1)
|
|
return existingOp, false, nil
|
|
}
|
|
// Different refresh token for same hash - should not happen
|
|
return nil, false, fmt.Errorf("refresh token mismatch")
|
|
}
|
|
|
|
// We won the race and registered `candidate`. Apply gates now. If any
|
|
// gate fails we must remove our entry from the map and signal failure
|
|
// to any joiners that snuck in between LoadOrStore and now.
|
|
if err := rc.applyLeaderGates(sessionID); err != nil {
|
|
rc.failCandidate(tokenHash, candidate, err)
|
|
return nil, false, err
|
|
}
|
|
|
|
// Reserve concurrent slot via ticket-and-return: increment optimistically,
|
|
// decrement if we overshot the limit. The previous CAS-loop allowed a
|
|
// transient overshoot of up to N-1 leaders when several goroutines all
|
|
// observed `current < max` in the same scheduling slice before any one
|
|
// of them succeeded their CAS — visible to readers as
|
|
// currentInFlightRefreshes > MaxConcurrentRefreshes for a brief window.
|
|
// The ticket pattern is strictly bounded: the counter momentarily reads
|
|
// max+k for k concurrent attempts past the limit, but only the k that
|
|
// produced max+1..max+k decrement back, and only k=1 ever observes max+1
|
|
// as committed.
|
|
newCount := atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, 1)
|
|
if int(newCount) > rc.config.MaxConcurrentRefreshes {
|
|
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
|
err := fmt.Errorf("maximum concurrent refresh operations reached")
|
|
rc.failCandidate(tokenHash, candidate, err)
|
|
return nil, false, err
|
|
}
|
|
|
|
return candidate, true, nil
|
|
}
|
|
|
|
// applyLeaderGates runs the rate-limit, cooldown, and memory-pressure checks
|
|
// that previously ran under the global refreshMutex. Only the leader (the
|
|
// goroutine that just registered the operation) runs them; joiners share the
|
|
// leader's outcome via operation.done.
|
|
func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error {
|
|
// Cooldown check FIRST, BEFORE incrementing the attempt counter.
|
|
// Previously this function recorded the attempt and then read the
|
|
// cooldown state. Under burst load (many concurrent leaders with
|
|
// different token hashes but same session) every goroutine could
|
|
// increment past MaxRefreshAttempts before any one of them observed
|
|
// the threshold, so the cooldown gate fired too late — the same
|
|
// thundering-herd shape that drove v1.0.14 into the ground.
|
|
if rc.isInCooldown(sessionID) {
|
|
atomic.AddInt64(&rc.metrics.cooldownsTriggered, 1)
|
|
return fmt.Errorf("refresh attempts exceeded for session, in cooldown period")
|
|
}
|
|
if rc.config.EnableMemoryPressureDetection && rc.isUnderMemoryPressure() {
|
|
atomic.AddInt64(&rc.metrics.memoryPressureEvents, 1)
|
|
return fmt.Errorf("system under memory pressure, refresh denied")
|
|
}
|
|
// Only count attempts that actually progress past the gates.
|
|
rc.recordRefreshAttempt(sessionID)
|
|
return nil
|
|
}
|
|
|
|
// failCandidate removes the leader's just-registered operation from the
|
|
// in-flight map and signals the error to any joiners by recording the result
|
|
// and closing the done channel. This keeps the (nil, false, err) return path
|
|
// equivalent to the pre-sync.Map version: callers see the error directly,
|
|
// joiners see it via operation.done.
|
|
func (rc *RefreshCoordinator) failCandidate(tokenHash string, op *refreshOperation, err error) {
|
|
rc.inFlightRefreshes.Delete(tokenHash)
|
|
op.mutex.Lock()
|
|
op.result = &refreshResult{err: err}
|
|
op.mutex.Unlock()
|
|
close(op.done)
|
|
}
|
|
|
|
// executeRefreshAsync performs the actual refresh operation asynchronously
|
|
func (rc *RefreshCoordinator) executeRefreshAsync(
|
|
operation *refreshOperation,
|
|
_ string, // sessionID - reserved for future metrics/logging
|
|
tokenHash string,
|
|
refreshFunc func() (*TokenResponse, error),
|
|
) {
|
|
defer func() {
|
|
// Signal completion to all waiters
|
|
close(operation.done)
|
|
|
|
// Schedule delayed cleanup using timer instead of spawning a goroutine
|
|
// This prevents goroutine explosion under high load
|
|
rc.scheduleDelayedCleanup(tokenHash)
|
|
}()
|
|
|
|
// Create timeout context
|
|
refreshCtx, cancel := context.WithTimeout(context.Background(), rc.config.RefreshTimeout)
|
|
defer cancel()
|
|
|
|
// Execute refresh in goroutine to respect timeout
|
|
resultChan := make(chan struct {
|
|
resp *TokenResponse
|
|
err error
|
|
}, 1)
|
|
|
|
go func() {
|
|
resp, err := refreshFunc()
|
|
select {
|
|
case resultChan <- struct {
|
|
resp *TokenResponse
|
|
err error
|
|
}{resp, err}:
|
|
case <-refreshCtx.Done():
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case result := <-resultChan:
|
|
// Store result for all waiters
|
|
operation.mutex.Lock()
|
|
operation.result = &refreshResult{
|
|
tokenResponse: result.resp,
|
|
err: result.err,
|
|
fromCache: false,
|
|
}
|
|
operation.mutex.Unlock()
|
|
case <-refreshCtx.Done():
|
|
// Timeout occurred
|
|
timeoutErr := fmt.Errorf("refresh operation timed out after %v", rc.config.RefreshTimeout)
|
|
operation.mutex.Lock()
|
|
operation.result = &refreshResult{
|
|
tokenResponse: nil,
|
|
err: timeoutErr,
|
|
fromCache: false,
|
|
}
|
|
operation.mutex.Unlock()
|
|
}
|
|
}
|
|
|
|
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning
|
|
// a goroutine — time.AfterFunc uses the runtime's timer heap and never spawns
|
|
// a per-timer goroutine until the callback actually fires.
|
|
//
|
|
// The previous implementation tracked every pending timer in a map guarded by
|
|
// cleanupTimerMu so a duplicate scheduling could cancel the prior timer. That
|
|
// "shouldn't happen" path was the only consumer of the map, but the mutex
|
|
// fired on every successful refresh completion — yet another per-request
|
|
// Yaegi-dispatched lock acquisition. performCleanup is already idempotent
|
|
// (LoadAndDelete on the sync.Map), so a duplicate scheduling at worst fires
|
|
// performCleanup twice; the second call is a no-op. Dropping the map removes
|
|
// the whole class of contention on this code path.
|
|
func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) {
|
|
delay := rc.config.DeduplicationCleanupDelay
|
|
if delay <= 0 {
|
|
rc.performCleanup(tokenHash)
|
|
return
|
|
}
|
|
time.AfterFunc(delay, func() { rc.performCleanup(tokenHash) })
|
|
}
|
|
|
|
// performCleanup removes the operation from the in-flight map.
|
|
// Idempotent: only decrements the in-flight counter if an entry was actually
|
|
// removed. LoadAndDelete is atomic so any concurrent failCandidate or repeat
|
|
// cleanup call will see exactly one removal — the budget cannot be corrupted
|
|
// by double-decrement.
|
|
func (rc *RefreshCoordinator) performCleanup(tokenHash string) {
|
|
if _, existed := rc.inFlightRefreshes.LoadAndDelete(tokenHash); existed {
|
|
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
|
}
|
|
}
|
|
|
|
// getOrCreateTracker fetches the tracker for sessionID or atomically creates a
|
|
// fresh one. The sync.Map.LoadOrStore semantics make this lock-free even under
|
|
// concurrent first-touch races: at most one tracker per sessionID survives.
|
|
//
|
|
// trackerFromMapValue centralizes the type assertion so the lint-mandated
|
|
// two-value form lives in one place; the stored type is always
|
|
// *refreshAttemptTracker by construction.
|
|
func trackerFromMapValue(v interface{}) *refreshAttemptTracker {
|
|
t, _ := v.(*refreshAttemptTracker)
|
|
return t
|
|
}
|
|
|
|
func (rc *RefreshCoordinator) getOrCreateTracker(sessionID string) *refreshAttemptTracker {
|
|
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
|
return trackerFromMapValue(v)
|
|
}
|
|
fresh := &refreshAttemptTracker{}
|
|
fresh.state.Store(&attemptState{windowStartNano: time.Now().UnixNano()})
|
|
actual, _ := rc.sessionRefreshAttempts.LoadOrStore(sessionID, fresh)
|
|
return trackerFromMapValue(actual)
|
|
}
|
|
|
|
// mutateState performs a CompareAndSwap loop that applies mutate to the
|
|
// current snapshot. mutate must be PURE: it receives an immutable view of
|
|
// the current state and returns a fresh *attemptState. If mutate returns nil
|
|
// the update is skipped (used by isInCooldown for "no change needed" paths).
|
|
//
|
|
// Retries on CAS conflict are bounded by the number of concurrent writers —
|
|
// in practice 1-3. Under Yaegi each retry pays the dispatch cost of one Load
|
|
// + one CompareAndSwap; still cheaper than the previous per-field atomic
|
|
// sequence and immune to the cross-field race the v1.0.15 design had.
|
|
func (t *refreshAttemptTracker) mutateState(mutate func(cur *attemptState) *attemptState) *attemptState {
|
|
for {
|
|
cur := t.stateOf()
|
|
next := mutate(cur)
|
|
if next == nil {
|
|
return cur
|
|
}
|
|
if t.state.CompareAndSwap(t.state.Load(), next) {
|
|
return next
|
|
}
|
|
}
|
|
}
|
|
|
|
// isInCooldown checks if a session is in cooldown. Snapshot-based: every
|
|
// transition publishes a fresh *attemptState atomically so readers never see
|
|
// a partially-updated state. The previous per-field atomic design had a
|
|
// benign race in the cooldown-exit path (cooldownEndNano reset before
|
|
// attempts reset) that could double-trigger cooldown.
|
|
func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
|
|
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
|
if !ok {
|
|
return false // No tracker means first attempt, not in cooldown
|
|
}
|
|
tracker := trackerFromMapValue(v)
|
|
now := time.Now()
|
|
nowNano := now.UnixNano()
|
|
maxAttempts := rc.config.MaxRefreshAttempts
|
|
window := rc.config.RefreshAttemptWindow
|
|
cooldownPeriod := rc.config.RefreshCooldownPeriod
|
|
|
|
cur := tracker.stateOf()
|
|
|
|
// Already in cooldown?
|
|
if cur.cooldownEndNano != 0 {
|
|
if nowNano <= cur.cooldownEndNano {
|
|
return true // still in cooldown
|
|
}
|
|
// Cooldown expired: atomically publish a fresh state with the window
|
|
// restarted from one attempt. Whichever goroutine wins the CAS sets
|
|
// the new snapshot; losers see it via the next stateOf load.
|
|
tracker.mutateState(func(s *attemptState) *attemptState {
|
|
if s.cooldownEndNano == 0 || nowNano <= s.cooldownEndNano {
|
|
return nil // someone else already reset, or back in cooldown
|
|
}
|
|
return &attemptState{
|
|
windowStartNano: nowNano,
|
|
attempts: 1,
|
|
}
|
|
})
|
|
return false
|
|
}
|
|
|
|
// Window expired?
|
|
if time.Duration(nowNano-cur.windowStartNano) > window {
|
|
tracker.mutateState(func(s *attemptState) *attemptState {
|
|
if time.Duration(nowNano-s.windowStartNano) <= window {
|
|
return nil
|
|
}
|
|
next := *s
|
|
next.windowStartNano = nowNano
|
|
next.attempts = 1
|
|
return &next
|
|
})
|
|
return false
|
|
}
|
|
|
|
// Just exceeded attempt limit?
|
|
if int(cur.attempts) >= maxAttempts {
|
|
end := now.Add(cooldownPeriod).UnixNano()
|
|
published := tracker.mutateState(func(s *attemptState) *attemptState {
|
|
if s.cooldownEndNano != 0 {
|
|
return nil
|
|
}
|
|
next := *s
|
|
next.cooldownEndNano = end
|
|
return &next
|
|
})
|
|
if published.cooldownEndNano == end {
|
|
rc.logger.Infof("Session %s entering refresh cooldown after %d attempts",
|
|
sessionID, published.attempts)
|
|
}
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// recordRefreshAttempt records a refresh attempt for rate limiting. Lock-free
|
|
// snapshot mutation; attempts and lastAttemptNano are advanced atomically.
|
|
func (rc *RefreshCoordinator) recordRefreshAttempt(sessionID string) {
|
|
tracker := rc.getOrCreateTracker(sessionID)
|
|
nowNano := time.Now().UnixNano()
|
|
tracker.mutateState(func(s *attemptState) *attemptState {
|
|
next := *s
|
|
next.attempts++
|
|
next.lastAttemptNano = nowNano
|
|
return &next
|
|
})
|
|
}
|
|
|
|
// recordRefreshSuccess records a successful refresh: zero consecutiveFailures.
|
|
func (rc *RefreshCoordinator) recordRefreshSuccess(sessionID string) {
|
|
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
|
|
if s.consecutiveFailures == 0 {
|
|
return nil
|
|
}
|
|
next := *s
|
|
next.consecutiveFailures = 0
|
|
return &next
|
|
})
|
|
}
|
|
|
|
// recordRefreshFailure records a failed refresh: increments consecutiveFailures.
|
|
func (rc *RefreshCoordinator) recordRefreshFailure(sessionID string) {
|
|
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
|
|
next := *s
|
|
next.consecutiveFailures++
|
|
return &next
|
|
})
|
|
}
|
|
|
|
// hashRefreshToken creates a hash of the refresh token for deduplication
|
|
func (rc *RefreshCoordinator) hashRefreshToken(token string) string {
|
|
return refreshCoordinatorSessionID(token)
|
|
}
|
|
|
|
// refreshCoordinatorSessionID derives a stable identifier from a refresh token
|
|
// for both deduplication and per-session attempt tracking. Using sha256 of the
|
|
// raw token means each rotation produces a fresh sessionID with its own attempt
|
|
// budget, which is what we want.
|
|
func refreshCoordinatorSessionID(token string) string {
|
|
hash := sha256.Sum256([]byte(token))
|
|
return hex.EncodeToString(hash[:])
|
|
}
|
|
|
|
// refreshCoordinatorWaitTimeout caps how long a request may wait for a
|
|
// coordinated refresh result. It is wider than RefreshTimeout so a follower
|
|
// always sees the leader's result instead of timing out independently.
|
|
const refreshCoordinatorWaitTimeout = 35 * time.Second
|
|
|
|
// isUnderMemoryPressure checks if the system is under memory pressure by
|
|
// consulting the global memory monitor. Returns true when pressure reaches
|
|
// High or Critical, at which point we refuse new refresh operations to
|
|
// avoid aggravating an already-stressed heap.
|
|
func (rc *RefreshCoordinator) isUnderMemoryPressure() bool {
|
|
monitor := GetGlobalMemoryMonitor()
|
|
if monitor == nil {
|
|
return false
|
|
}
|
|
return monitor.GetMemoryPressure() >= MemoryPressureHigh
|
|
}
|
|
|
|
// cleanupRoutine periodically cleans up stale tracking entries
|
|
func (rc *RefreshCoordinator) cleanupRoutine() {
|
|
defer rc.wg.Done()
|
|
|
|
ticker := time.NewTicker(rc.config.CleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
rc.cleanupStaleEntries()
|
|
case <-rc.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupStaleEntries removes outdated tracking entries. Lock-free iteration
|
|
// via sync.Map.Range; safe to race with concurrent reads/writes.
|
|
func (rc *RefreshCoordinator) cleanupStaleEntries() {
|
|
cutoff := time.Now().Add(-2 * rc.config.RefreshAttemptWindow).UnixNano()
|
|
rc.sessionRefreshAttempts.Range(func(key, value interface{}) bool {
|
|
tracker := trackerFromMapValue(value)
|
|
if tracker == nil {
|
|
return true
|
|
}
|
|
if tracker.stateOf().lastAttemptNano < cutoff {
|
|
// Compare-and-delete to avoid evicting a tracker that was just
|
|
// re-used by a concurrent caller. We compare by pointer identity.
|
|
rc.sessionRefreshAttempts.CompareAndDelete(key, value)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// GetMetrics returns current coordinator metrics
|
|
func (rc *RefreshCoordinator) GetMetrics() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"total_requests": atomic.LoadInt64(&rc.metrics.totalRefreshRequests),
|
|
"deduplicated_requests": atomic.LoadInt64(&rc.metrics.deduplicatedRequests),
|
|
"successful_refreshes": atomic.LoadInt64(&rc.metrics.successfulRefreshes),
|
|
"failed_refreshes": atomic.LoadInt64(&rc.metrics.failedRefreshes),
|
|
"circuit_breaker_trips": atomic.LoadInt64(&rc.metrics.circuitBreakerTrips),
|
|
"memory_pressure_events": atomic.LoadInt64(&rc.metrics.memoryPressureEvents),
|
|
"cooldowns_triggered": atomic.LoadInt64(&rc.metrics.cooldownsTriggered),
|
|
"current_inflight": atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes),
|
|
"circuit_breaker_state": rc.circuitBreaker.GetState(),
|
|
}
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the coordinator. Pending delayed-cleanup
|
|
// timers are NOT canceled explicitly: time.AfterFunc callbacks are tiny
|
|
// (one map LoadAndDelete) and harmless after Shutdown — sync.Map operations
|
|
// remain safe on an unused coordinator until GC.
|
|
func (rc *RefreshCoordinator) Shutdown() {
|
|
close(rc.stopChan)
|
|
rc.wg.Wait()
|
|
}
|
|
|
|
// AllowRequest reports whether the circuit breaker allows a request. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) AllowRequest() bool {
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 0: // closed
|
|
return true
|
|
case 1: // open
|
|
lastFail := atomic.LoadInt64(&cb.lastFailureNano)
|
|
if time.Duration(time.Now().UnixNano()-lastFail) > cb.config.OpenDuration {
|
|
// Transition to half-open; first CAS winner gets the probe.
|
|
if atomic.CompareAndSwapInt32(&cb.state, 1, 2) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
case 2: // half-open
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// RecordSuccess records a successful operation. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) RecordSuccess() {
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 2: // half-open -> close
|
|
atomic.StoreInt32(&cb.state, 0)
|
|
atomic.StoreInt32(&cb.failures, 0)
|
|
case 0: // closed
|
|
atomic.StoreInt32(&cb.failures, 0)
|
|
}
|
|
atomic.StoreInt64(&cb.lastSuccessNano, time.Now().UnixNano())
|
|
}
|
|
|
|
// RecordFailure records a failed operation. Lock-free.
|
|
func (cb *RefreshCircuitBreaker) RecordFailure() {
|
|
failures := atomic.AddInt32(&cb.failures, 1)
|
|
atomic.StoreInt64(&cb.lastFailureNano, time.Now().UnixNano())
|
|
|
|
switch atomic.LoadInt32(&cb.state) {
|
|
case 0:
|
|
if int(failures) >= cb.config.MaxFailures {
|
|
atomic.StoreInt32(&cb.state, 1)
|
|
}
|
|
case 2:
|
|
// Half-open probe failed -> back to open.
|
|
atomic.StoreInt32(&cb.state, 1)
|
|
}
|
|
}
|
|
|
|
// GetState returns the current state of the circuit breaker
|
|
func (cb *RefreshCircuitBreaker) GetState() string {
|
|
state := atomic.LoadInt32(&cb.state)
|
|
switch state {
|
|
case 0:
|
|
return "closed"
|
|
case 1:
|
|
return "open"
|
|
case 2:
|
|
return "half-open"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|