diff --git a/refresh_coordinator.go b/refresh_coordinator.go index ee81c5b..587ca88 100644 --- a/refresh_coordinator.go +++ b/refresh_coordinator.go @@ -15,7 +15,13 @@ import ( // It implements request coalescing, rate limiting, and circuit breaking // specifically for token refresh operations. type RefreshCoordinator struct { - inFlightRefreshes map[string]*refreshOperation + // inFlightRefreshes maps tokenHash -> *refreshOperation. sync.Map is used + // instead of a plain map + RWMutex so concurrent refreshes do not + // serialize on a single global lock. Under Yaegi the previous + // refreshMutex.Lock() was held for tens of milliseconds per request due + // to interpreter overhead on the work inside the critical section, + // causing dozens of goroutines to stack up on it and pin one CPU core. + inFlightRefreshes sync.Map cleanupTimers map[string]*time.Timer sessionRefreshAttempts map[string]*refreshAttemptTracker circuitBreaker *RefreshCircuitBreaker @@ -25,7 +31,6 @@ type RefreshCoordinator struct { config RefreshCoordinatorConfig wg sync.WaitGroup attemptsMutex sync.RWMutex - refreshMutex sync.RWMutex cleanupTimerMu sync.Mutex } @@ -130,7 +135,7 @@ func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *Ref } rc := &RefreshCoordinator{ - inFlightRefreshes: make(map[string]*refreshOperation), + // inFlightRefreshes is a sync.Map; zero value is ready to use. sessionRefreshAttempts: make(map[string]*refreshAttemptTracker), config: config, metrics: &RefreshMetrics{}, @@ -227,13 +232,28 @@ func (rc *RefreshCoordinator) getOrCreateOperation( tokenHash string, refreshToken string, ) (*refreshOperation, bool, error) { - rc.refreshMutex.Lock() - defer rc.refreshMutex.Unlock() + // Speculatively construct the operation we WOULD register if we win the + // race. Allocating here keeps the LoadOrStore call below atomic and + // avoids any global lock — under Yaegi the previous map+RWMutex design + // held the write lock long enough (tens of ms per call) that concurrent + // refreshes on the same coordinator serialized into a queue that grew + // without bound. See struct comment on inFlightRefreshes. + candidate := &refreshOperation{ + refreshToken: refreshToken, + done: make(chan struct{}), + startTime: time.Now(), + waiterCount: 1, + } - // Check for existing operation while holding the lock - if existingOp, exists := rc.inFlightRefreshes[tokenHash]; exists { + if existing, loaded := rc.inFlightRefreshes.LoadOrStore(tokenHash, candidate); loaded { + existingOp, ok := existing.(*refreshOperation) + if !ok { + // Defensive: anything stored here is always *refreshOperation, but + // keep the typed assert so a programming error elsewhere doesn't + // surface as a confusing panic in an interpreter frame. + return nil, false, fmt.Errorf("inFlightRefreshes corrupt: unexpected type %T", existing) + } if existingOp.refreshToken == refreshToken { - // Join existing operation atomic.AddInt32(&existingOp.waiterCount, 1) return existingOp, false, nil } @@ -241,41 +261,60 @@ func (rc *RefreshCoordinator) getOrCreateOperation( return nil, false, fmt.Errorf("refresh token mismatch") } - // No existing operation - check if we can create a new one - // All checks happen while holding the lock to prevent races + // We won the race and registered `candidate`. Apply gates now. If any + // gate fails we must remove our entry from the map and signal failure + // to any joiners that snuck in between LoadOrStore and now. + if err := rc.applyLeaderGates(sessionID); err != nil { + rc.failCandidate(tokenHash, candidate, err) + return nil, false, err + } - // Check and record refresh attempt for rate limiting + // Reserve concurrent slot via CAS — without the old global lock we can + // no longer rely on mutex-mediated check-then-increment. If we lose the + // CAS race we retry; if the limit has since been reached we back out. + for { + current := atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes) + if int(current) >= rc.config.MaxConcurrentRefreshes { + err := fmt.Errorf("maximum concurrent refresh operations reached") + rc.failCandidate(tokenHash, candidate, err) + return nil, false, err + } + if atomic.CompareAndSwapInt32(&rc.metrics.currentInFlightRefreshes, current, current+1) { + break + } + } + + return candidate, true, nil +} + +// applyLeaderGates runs the rate-limit, cooldown, and memory-pressure checks +// that previously ran under the global refreshMutex. Only the leader (the +// goroutine that just registered the operation) runs them; joiners share the +// leader's outcome via operation.done. +func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error { rc.recordRefreshAttempt(sessionID) if rc.isInCooldown(sessionID) { atomic.AddInt64(&rc.metrics.cooldownsTriggered, 1) - return nil, false, fmt.Errorf("refresh attempts exceeded for session, in cooldown period") + return fmt.Errorf("refresh attempts exceeded for session, in cooldown period") } - - // Check memory pressure if rc.config.EnableMemoryPressureDetection && rc.isUnderMemoryPressure() { atomic.AddInt64(&rc.metrics.memoryPressureEvents, 1) - return nil, false, fmt.Errorf("system under memory pressure, refresh denied") + return fmt.Errorf("system under memory pressure, refresh denied") } + return nil +} - // Check and reserve concurrent refresh slot atomically - current := atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes) - if int(current) >= rc.config.MaxConcurrentRefreshes { - return nil, false, fmt.Errorf("maximum concurrent refresh operations reached") - } - - // Reserve the slot - we're still holding the lock so this is safe - atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, 1) - - // Create and register new operation - operation := &refreshOperation{ - refreshToken: refreshToken, - done: make(chan struct{}), - startTime: time.Now(), - waiterCount: 1, - } - rc.inFlightRefreshes[tokenHash] = operation - - return operation, true, nil +// failCandidate removes the leader's just-registered operation from the +// in-flight map and signals the error to any joiners by recording the result +// and closing the done channel. This keeps the (nil, false, err) return path +// equivalent to the pre-sync.Map version: callers see the error directly, +// joiners see it via operation.done. +func (rc *RefreshCoordinator) failCandidate(tokenHash string, op *refreshOperation, err error) { + rc.inFlightRefreshes.Delete(tokenHash) + op.mutex.Lock() + op.result = &refreshResult{err: err} + op.mutex.Unlock() + close(op.done) } // executeRefreshAsync performs the actual refresh operation asynchronously @@ -367,16 +406,11 @@ func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) { // performCleanup removes the operation from the in-flight map. // Idempotent: only decrements the in-flight counter if an entry was actually -// removed. This guards against any future path accidentally calling cleanup -// twice for the same tokenHash (which would corrupt the refresh budget). +// removed. LoadAndDelete is atomic so any concurrent failCandidate or repeat +// cleanup call will see exactly one removal — the budget cannot be corrupted +// by double-decrement. func (rc *RefreshCoordinator) performCleanup(tokenHash string) { - rc.refreshMutex.Lock() - _, existed := rc.inFlightRefreshes[tokenHash] - if existed { - delete(rc.inFlightRefreshes, tokenHash) - } - rc.refreshMutex.Unlock() - if existed { + if _, existed := rc.inFlightRefreshes.LoadAndDelete(tokenHash); existed { atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1) } }