diff --git a/refresh_coordinator.go b/refresh_coordinator.go index 3edca74..b3ca46b 100644 --- a/refresh_coordinator.go +++ b/refresh_coordinator.go @@ -31,14 +31,12 @@ type RefreshCoordinator struct { // serializing pattern caused the v1.0.15 death spiral after v1.0.14 // removed the refreshMutex (same architectural shape, different mutex). sessionRefreshAttempts sync.Map - cleanupTimers map[string]*time.Timer circuitBreaker *RefreshCircuitBreaker metrics *RefreshMetrics logger *Logger stopChan chan struct{} config RefreshCoordinatorConfig wg sync.WaitGroup - cleanupTimerMu sync.Mutex } // RefreshCoordinatorConfig configures the refresh coordinator behavior @@ -157,10 +155,9 @@ func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *Ref // inFlightRefreshes and sessionRefreshAttempts are both sync.Map; // their zero values are ready to use. config: config, - metrics: &RefreshMetrics{}, - logger: logger, - stopChan: make(chan struct{}), - cleanupTimers: make(map[string]*time.Timer), + metrics: &RefreshMetrics{}, + logger: logger, + stopChan: make(chan struct{}), circuitBreaker: &RefreshCircuitBreaker{ config: RefreshCircuitBreakerConfig{ MaxFailures: 3, @@ -288,19 +285,22 @@ func (rc *RefreshCoordinator) getOrCreateOperation( return nil, false, err } - // Reserve concurrent slot via CAS — without the old global lock we can - // no longer rely on mutex-mediated check-then-increment. If we lose the - // CAS race we retry; if the limit has since been reached we back out. - for { - current := atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes) - if int(current) >= rc.config.MaxConcurrentRefreshes { - err := fmt.Errorf("maximum concurrent refresh operations reached") - rc.failCandidate(tokenHash, candidate, err) - return nil, false, err - } - if atomic.CompareAndSwapInt32(&rc.metrics.currentInFlightRefreshes, current, current+1) { - break - } + // Reserve concurrent slot via ticket-and-return: increment optimistically, + // decrement if we overshot the limit. The previous CAS-loop allowed a + // transient overshoot of up to N-1 leaders when several goroutines all + // observed `current < max` in the same scheduling slice before any one + // of them succeeded their CAS — visible to readers as + // currentInFlightRefreshes > MaxConcurrentRefreshes for a brief window. + // The ticket pattern is strictly bounded: the counter momentarily reads + // max+k for k concurrent attempts past the limit, but only the k that + // produced max+1..max+k decrement back, and only k=1 ever observes max+1 + // as committed. + newCount := atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, 1) + if int(newCount) > rc.config.MaxConcurrentRefreshes { + atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1) + err := fmt.Errorf("maximum concurrent refresh operations reached") + rc.failCandidate(tokenHash, candidate, err) + return nil, false, err } return candidate, true, nil @@ -311,7 +311,13 @@ func (rc *RefreshCoordinator) getOrCreateOperation( // goroutine that just registered the operation) runs them; joiners share the // leader's outcome via operation.done. func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error { - rc.recordRefreshAttempt(sessionID) + // Cooldown check FIRST, BEFORE incrementing the attempt counter. + // Previously this function recorded the attempt and then read the + // cooldown state. Under burst load (many concurrent leaders with + // different token hashes but same session) every goroutine could + // increment past MaxRefreshAttempts before any one of them observed + // the threshold, so the cooldown gate fired too late — the same + // thundering-herd shape that drove v1.0.14 into the ground. if rc.isInCooldown(sessionID) { atomic.AddInt64(&rc.metrics.cooldownsTriggered, 1) return fmt.Errorf("refresh attempts exceeded for session, in cooldown period") @@ -320,6 +326,8 @@ func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error { atomic.AddInt64(&rc.metrics.memoryPressureEvents, 1) return fmt.Errorf("system under memory pressure, refresh denied") } + // Only count attempts that actually progress past the gates. + rc.recordRefreshAttempt(sessionID) return nil } @@ -396,31 +404,25 @@ func (rc *RefreshCoordinator) executeRefreshAsync( } } -// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning a goroutine -// This prevents goroutine explosion under high load (500+ req/sec) +// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning +// a goroutine — time.AfterFunc uses the runtime's timer heap and never spawns +// a per-timer goroutine until the callback actually fires. +// +// The previous implementation tracked every pending timer in a map guarded by +// cleanupTimerMu so a duplicate scheduling could cancel the prior timer. That +// "shouldn't happen" path was the only consumer of the map, but the mutex +// fired on every successful refresh completion — yet another per-request +// Yaegi-dispatched lock acquisition. performCleanup is already idempotent +// (LoadAndDelete on the sync.Map), so a duplicate scheduling at worst fires +// performCleanup twice; the second call is a no-op. Dropping the map removes +// the whole class of contention on this code path. func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) { delay := rc.config.DeduplicationCleanupDelay if delay <= 0 { - // Immediate cleanup rc.performCleanup(tokenHash) return } - - // Use time.AfterFunc which is more efficient than spawning a goroutine with Sleep - // time.AfterFunc uses the runtime's timer heap which is much more efficient - rc.cleanupTimerMu.Lock() - // Cancel any existing timer for this hash (shouldn't happen, but just in case) - if existingTimer, exists := rc.cleanupTimers[tokenHash]; exists { - existingTimer.Stop() - } - rc.cleanupTimers[tokenHash] = time.AfterFunc(delay, func() { - rc.performCleanup(tokenHash) - // Remove timer from map - rc.cleanupTimerMu.Lock() - delete(rc.cleanupTimers, tokenHash) - rc.cleanupTimerMu.Unlock() - }) - rc.cleanupTimerMu.Unlock() + time.AfterFunc(delay, func() { rc.performCleanup(tokenHash) }) } // performCleanup removes the operation from the in-flight map. @@ -611,18 +613,12 @@ func (rc *RefreshCoordinator) GetMetrics() map[string]interface{} { } } -// Shutdown gracefully shuts down the coordinator +// Shutdown gracefully shuts down the coordinator. Pending delayed-cleanup +// timers are NOT canceled explicitly: time.AfterFunc callbacks are tiny +// (one map LoadAndDelete) and harmless after Shutdown — sync.Map operations +// remain safe on an unused coordinator until GC. func (rc *RefreshCoordinator) Shutdown() { close(rc.stopChan) - - // Cancel all pending cleanup timers - rc.cleanupTimerMu.Lock() - for _, timer := range rc.cleanupTimers { - timer.Stop() - } - rc.cleanupTimers = make(map[string]*time.Timer) - rc.cleanupTimerMu.Unlock() - rc.wg.Wait() } diff --git a/refresh_coordinator_test.go b/refresh_coordinator_test.go index 4e9e45f..998a1c7 100644 --- a/refresh_coordinator_test.go +++ b/refresh_coordinator_test.go @@ -165,9 +165,14 @@ func TestRefreshRateLimiting(t *testing.T) { time.Sleep(150 * time.Millisecond) } - // Verify that cooldown was triggered after max attempts - // With the new logic, the Nth attempt triggers cooldown, so we get N-1 successful attempts - expectedSuccessfulAttempts := config.MaxRefreshAttempts - 1 + // Verify that cooldown was triggered after max attempts. + // With applyLeaderGates checking cooldown BEFORE recording the attempt + // (the v1.0.16 reorder fixing the thundering-herd off-by-one), N attempts + // run to completion and the (N+1)th is denied. Previously the Nth was + // denied as it tried to record, which under burst load let multiple + // concurrent leaders increment past the limit before any one of them + // observed the gate. + expectedSuccessfulAttempts := config.MaxRefreshAttempts if attempts != expectedSuccessfulAttempts { t.Errorf("Expected %d successful attempts before cooldown, got %d", expectedSuccessfulAttempts, attempts) } @@ -721,11 +726,9 @@ func TestNoGoroutineExplosionWithTimers(t *testing.T) { currentGoroutines := runtime.NumGoroutine() t.Logf("Goroutines after %d refresh operations: %d", numRefreshes, currentGoroutines) - // Check timer count - coordinator.cleanupTimerMu.Lock() - timerCount := len(coordinator.cleanupTimers) - coordinator.cleanupTimerMu.Unlock() - t.Logf("Active cleanup timers: %d", timerCount) + // (Coordinator no longer tracks pending timers; time.AfterFunc closures + // fire performCleanup directly. This test now only checks the goroutine + // budget, which was always the real invariant.) // With timer-based cleanup, goroutine increase should be minimal // Timers don't create goroutines - they use the runtime timer heap @@ -741,19 +744,9 @@ func TestNoGoroutineExplosionWithTimers(t *testing.T) { initialGoroutines, currentGoroutines, goroutineIncrease) } - // Wait for timers to fire and cleanup + // Wait for timers to fire and cleanup. time.Sleep(config.DeduplicationCleanupDelay + 50*time.Millisecond) - // Verify timers were cleaned up - coordinator.cleanupTimerMu.Lock() - remainingTimers := len(coordinator.cleanupTimers) - coordinator.cleanupTimerMu.Unlock() - - // Most timers should have fired and been removed - if remainingTimers > 10 { - t.Errorf("Too many cleanup timers remaining: %d", remainingTimers) - } - // Verify goroutines returned to near initial runtime.GC() time.Sleep(50 * time.Millisecond)