mirror of
https://github.com/lukaszraczylo/traefikoidc.git
synced 2026-06-07 22:53:58 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 827926bc3a |
+45
-49
@@ -31,14 +31,12 @@ type RefreshCoordinator struct {
|
||||
// serializing pattern caused the v1.0.15 death spiral after v1.0.14
|
||||
// removed the refreshMutex (same architectural shape, different mutex).
|
||||
sessionRefreshAttempts sync.Map
|
||||
cleanupTimers map[string]*time.Timer
|
||||
circuitBreaker *RefreshCircuitBreaker
|
||||
metrics *RefreshMetrics
|
||||
logger *Logger
|
||||
stopChan chan struct{}
|
||||
config RefreshCoordinatorConfig
|
||||
wg sync.WaitGroup
|
||||
cleanupTimerMu sync.Mutex
|
||||
}
|
||||
|
||||
// RefreshCoordinatorConfig configures the refresh coordinator behavior
|
||||
@@ -157,10 +155,9 @@ func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *Ref
|
||||
// inFlightRefreshes and sessionRefreshAttempts are both sync.Map;
|
||||
// their zero values are ready to use.
|
||||
config: config,
|
||||
metrics: &RefreshMetrics{},
|
||||
logger: logger,
|
||||
stopChan: make(chan struct{}),
|
||||
cleanupTimers: make(map[string]*time.Timer),
|
||||
metrics: &RefreshMetrics{},
|
||||
logger: logger,
|
||||
stopChan: make(chan struct{}),
|
||||
circuitBreaker: &RefreshCircuitBreaker{
|
||||
config: RefreshCircuitBreakerConfig{
|
||||
MaxFailures: 3,
|
||||
@@ -288,19 +285,22 @@ func (rc *RefreshCoordinator) getOrCreateOperation(
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Reserve concurrent slot via CAS — without the old global lock we can
|
||||
// no longer rely on mutex-mediated check-then-increment. If we lose the
|
||||
// CAS race we retry; if the limit has since been reached we back out.
|
||||
for {
|
||||
current := atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes)
|
||||
if int(current) >= rc.config.MaxConcurrentRefreshes {
|
||||
err := fmt.Errorf("maximum concurrent refresh operations reached")
|
||||
rc.failCandidate(tokenHash, candidate, err)
|
||||
return nil, false, err
|
||||
}
|
||||
if atomic.CompareAndSwapInt32(&rc.metrics.currentInFlightRefreshes, current, current+1) {
|
||||
break
|
||||
}
|
||||
// Reserve concurrent slot via ticket-and-return: increment optimistically,
|
||||
// decrement if we overshot the limit. The previous CAS-loop allowed a
|
||||
// transient overshoot of up to N-1 leaders when several goroutines all
|
||||
// observed `current < max` in the same scheduling slice before any one
|
||||
// of them succeeded their CAS — visible to readers as
|
||||
// currentInFlightRefreshes > MaxConcurrentRefreshes for a brief window.
|
||||
// The ticket pattern is strictly bounded: the counter momentarily reads
|
||||
// max+k for k concurrent attempts past the limit, but only the k that
|
||||
// produced max+1..max+k decrement back, and only k=1 ever observes max+1
|
||||
// as committed.
|
||||
newCount := atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, 1)
|
||||
if int(newCount) > rc.config.MaxConcurrentRefreshes {
|
||||
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
||||
err := fmt.Errorf("maximum concurrent refresh operations reached")
|
||||
rc.failCandidate(tokenHash, candidate, err)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return candidate, true, nil
|
||||
@@ -311,7 +311,13 @@ func (rc *RefreshCoordinator) getOrCreateOperation(
|
||||
// goroutine that just registered the operation) runs them; joiners share the
|
||||
// leader's outcome via operation.done.
|
||||
func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error {
|
||||
rc.recordRefreshAttempt(sessionID)
|
||||
// Cooldown check FIRST, BEFORE incrementing the attempt counter.
|
||||
// Previously this function recorded the attempt and then read the
|
||||
// cooldown state. Under burst load (many concurrent leaders with
|
||||
// different token hashes but same session) every goroutine could
|
||||
// increment past MaxRefreshAttempts before any one of them observed
|
||||
// the threshold, so the cooldown gate fired too late — the same
|
||||
// thundering-herd shape that drove v1.0.14 into the ground.
|
||||
if rc.isInCooldown(sessionID) {
|
||||
atomic.AddInt64(&rc.metrics.cooldownsTriggered, 1)
|
||||
return fmt.Errorf("refresh attempts exceeded for session, in cooldown period")
|
||||
@@ -320,6 +326,8 @@ func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error {
|
||||
atomic.AddInt64(&rc.metrics.memoryPressureEvents, 1)
|
||||
return fmt.Errorf("system under memory pressure, refresh denied")
|
||||
}
|
||||
// Only count attempts that actually progress past the gates.
|
||||
rc.recordRefreshAttempt(sessionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -396,31 +404,25 @@ func (rc *RefreshCoordinator) executeRefreshAsync(
|
||||
}
|
||||
}
|
||||
|
||||
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning a goroutine
|
||||
// This prevents goroutine explosion under high load (500+ req/sec)
|
||||
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning
|
||||
// a goroutine — time.AfterFunc uses the runtime's timer heap and never spawns
|
||||
// a per-timer goroutine until the callback actually fires.
|
||||
//
|
||||
// The previous implementation tracked every pending timer in a map guarded by
|
||||
// cleanupTimerMu so a duplicate scheduling could cancel the prior timer. That
|
||||
// "shouldn't happen" path was the only consumer of the map, but the mutex
|
||||
// fired on every successful refresh completion — yet another per-request
|
||||
// Yaegi-dispatched lock acquisition. performCleanup is already idempotent
|
||||
// (LoadAndDelete on the sync.Map), so a duplicate scheduling at worst fires
|
||||
// performCleanup twice; the second call is a no-op. Dropping the map removes
|
||||
// the whole class of contention on this code path.
|
||||
func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) {
|
||||
delay := rc.config.DeduplicationCleanupDelay
|
||||
if delay <= 0 {
|
||||
// Immediate cleanup
|
||||
rc.performCleanup(tokenHash)
|
||||
return
|
||||
}
|
||||
|
||||
// Use time.AfterFunc which is more efficient than spawning a goroutine with Sleep
|
||||
// time.AfterFunc uses the runtime's timer heap which is much more efficient
|
||||
rc.cleanupTimerMu.Lock()
|
||||
// Cancel any existing timer for this hash (shouldn't happen, but just in case)
|
||||
if existingTimer, exists := rc.cleanupTimers[tokenHash]; exists {
|
||||
existingTimer.Stop()
|
||||
}
|
||||
rc.cleanupTimers[tokenHash] = time.AfterFunc(delay, func() {
|
||||
rc.performCleanup(tokenHash)
|
||||
// Remove timer from map
|
||||
rc.cleanupTimerMu.Lock()
|
||||
delete(rc.cleanupTimers, tokenHash)
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
})
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
time.AfterFunc(delay, func() { rc.performCleanup(tokenHash) })
|
||||
}
|
||||
|
||||
// performCleanup removes the operation from the in-flight map.
|
||||
@@ -611,18 +613,12 @@ func (rc *RefreshCoordinator) GetMetrics() map[string]interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the coordinator
|
||||
// Shutdown gracefully shuts down the coordinator. Pending delayed-cleanup
|
||||
// timers are NOT canceled explicitly: time.AfterFunc callbacks are tiny
|
||||
// (one map LoadAndDelete) and harmless after Shutdown — sync.Map operations
|
||||
// remain safe on an unused coordinator until GC.
|
||||
func (rc *RefreshCoordinator) Shutdown() {
|
||||
close(rc.stopChan)
|
||||
|
||||
// Cancel all pending cleanup timers
|
||||
rc.cleanupTimerMu.Lock()
|
||||
for _, timer := range rc.cleanupTimers {
|
||||
timer.Stop()
|
||||
}
|
||||
rc.cleanupTimers = make(map[string]*time.Timer)
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
|
||||
rc.wg.Wait()
|
||||
}
|
||||
|
||||
|
||||
+12
-19
@@ -165,9 +165,14 @@ func TestRefreshRateLimiting(t *testing.T) {
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Verify that cooldown was triggered after max attempts
|
||||
// With the new logic, the Nth attempt triggers cooldown, so we get N-1 successful attempts
|
||||
expectedSuccessfulAttempts := config.MaxRefreshAttempts - 1
|
||||
// Verify that cooldown was triggered after max attempts.
|
||||
// With applyLeaderGates checking cooldown BEFORE recording the attempt
|
||||
// (the v1.0.16 reorder fixing the thundering-herd off-by-one), N attempts
|
||||
// run to completion and the (N+1)th is denied. Previously the Nth was
|
||||
// denied as it tried to record, which under burst load let multiple
|
||||
// concurrent leaders increment past the limit before any one of them
|
||||
// observed the gate.
|
||||
expectedSuccessfulAttempts := config.MaxRefreshAttempts
|
||||
if attempts != expectedSuccessfulAttempts {
|
||||
t.Errorf("Expected %d successful attempts before cooldown, got %d", expectedSuccessfulAttempts, attempts)
|
||||
}
|
||||
@@ -721,11 +726,9 @@ func TestNoGoroutineExplosionWithTimers(t *testing.T) {
|
||||
currentGoroutines := runtime.NumGoroutine()
|
||||
t.Logf("Goroutines after %d refresh operations: %d", numRefreshes, currentGoroutines)
|
||||
|
||||
// Check timer count
|
||||
coordinator.cleanupTimerMu.Lock()
|
||||
timerCount := len(coordinator.cleanupTimers)
|
||||
coordinator.cleanupTimerMu.Unlock()
|
||||
t.Logf("Active cleanup timers: %d", timerCount)
|
||||
// (Coordinator no longer tracks pending timers; time.AfterFunc closures
|
||||
// fire performCleanup directly. This test now only checks the goroutine
|
||||
// budget, which was always the real invariant.)
|
||||
|
||||
// With timer-based cleanup, goroutine increase should be minimal
|
||||
// Timers don't create goroutines - they use the runtime timer heap
|
||||
@@ -741,19 +744,9 @@ func TestNoGoroutineExplosionWithTimers(t *testing.T) {
|
||||
initialGoroutines, currentGoroutines, goroutineIncrease)
|
||||
}
|
||||
|
||||
// Wait for timers to fire and cleanup
|
||||
// Wait for timers to fire and cleanup.
|
||||
time.Sleep(config.DeduplicationCleanupDelay + 50*time.Millisecond)
|
||||
|
||||
// Verify timers were cleaned up
|
||||
coordinator.cleanupTimerMu.Lock()
|
||||
remainingTimers := len(coordinator.cleanupTimers)
|
||||
coordinator.cleanupTimerMu.Unlock()
|
||||
|
||||
// Most timers should have fired and been removed
|
||||
if remainingTimers > 10 {
|
||||
t.Errorf("Too many cleanup timers remaining: %d", remainingTimers)
|
||||
}
|
||||
|
||||
// Verify goroutines returned to near initial
|
||||
runtime.GC()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
Reference in New Issue
Block a user