mirror of
https://github.com/lukaszraczylo/traefikoidc.git
synced 2026-06-05 22:44:17 +00:00
Add sharded cache and prevention of CPU spikes / locks (#96)
* Add sharded cache and prevention of CPU spikes / locks * Add dynamic client registration with oidc provider * Fix race condition introduced during the sharded cache implementation. * Add page for traefikoidc.
This commit is contained in:
+90
-10
@@ -40,6 +40,13 @@ type RefreshCoordinator struct {
|
||||
// Cleanup goroutine control
|
||||
stopChan chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
// delayedCleanupQueue stores items to be cleaned up after delay
|
||||
// Uses a timer-based approach instead of spawning goroutines per cleanup
|
||||
delayedCleanupQueue chan delayedCleanupItem
|
||||
// cleanupTimerPool reuses timers to avoid goroutine-per-cleanup
|
||||
cleanupTimerMu sync.Mutex
|
||||
cleanupTimers map[string]*time.Timer
|
||||
}
|
||||
|
||||
// RefreshCoordinatorConfig configures the refresh coordinator behavior
|
||||
@@ -131,6 +138,12 @@ type RefreshMetrics struct {
|
||||
currentInFlightRefreshes int32
|
||||
}
|
||||
|
||||
// delayedCleanupItem represents an item scheduled for delayed cleanup
|
||||
type delayedCleanupItem struct {
|
||||
tokenHash string
|
||||
cleanupAt time.Time
|
||||
}
|
||||
|
||||
// RefreshCircuitBreaker implements a circuit breaker specifically for refresh operations
|
||||
type RefreshCircuitBreaker struct {
|
||||
state int32 // 0=closed, 1=open, 2=half-open
|
||||
@@ -161,6 +174,8 @@ func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *Ref
|
||||
metrics: &RefreshMetrics{},
|
||||
logger: logger,
|
||||
stopChan: make(chan struct{}),
|
||||
delayedCleanupQueue: make(chan delayedCleanupItem, 1000), // Buffered channel for cleanup items
|
||||
cleanupTimers: make(map[string]*time.Timer),
|
||||
circuitBreaker: &RefreshCircuitBreaker{
|
||||
config: RefreshCircuitBreakerConfig{
|
||||
MaxFailures: 3,
|
||||
@@ -174,6 +189,10 @@ func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *Ref
|
||||
rc.wg.Add(1)
|
||||
go rc.cleanupRoutine()
|
||||
|
||||
// Start delayed cleanup processor (single goroutine processes all cleanup timers)
|
||||
rc.wg.Add(1)
|
||||
go rc.processDelayedCleanups()
|
||||
|
||||
return rc
|
||||
}
|
||||
|
||||
@@ -313,16 +332,9 @@ func (rc *RefreshCoordinator) executeRefreshAsync(
|
||||
// Signal completion to all waiters
|
||||
close(operation.done)
|
||||
|
||||
// Clean up operation after a configurable delay to allow waiters to read result
|
||||
go func() {
|
||||
if rc.config.DeduplicationCleanupDelay > 0 {
|
||||
time.Sleep(rc.config.DeduplicationCleanupDelay)
|
||||
}
|
||||
rc.refreshMutex.Lock()
|
||||
delete(rc.inFlightRefreshes, tokenHash)
|
||||
rc.refreshMutex.Unlock()
|
||||
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
||||
}()
|
||||
// Schedule delayed cleanup using timer instead of spawning a goroutine
|
||||
// This prevents goroutine explosion under high load
|
||||
rc.scheduleDelayedCleanup(tokenHash)
|
||||
}()
|
||||
|
||||
// Create timeout context
|
||||
@@ -369,6 +381,65 @@ func (rc *RefreshCoordinator) executeRefreshAsync(
|
||||
}
|
||||
}
|
||||
|
||||
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning a goroutine
|
||||
// This prevents goroutine explosion under high load (500+ req/sec)
|
||||
func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) {
|
||||
delay := rc.config.DeduplicationCleanupDelay
|
||||
if delay <= 0 {
|
||||
// Immediate cleanup
|
||||
rc.performCleanup(tokenHash)
|
||||
return
|
||||
}
|
||||
|
||||
// Use time.AfterFunc which is more efficient than spawning a goroutine with Sleep
|
||||
// time.AfterFunc uses the runtime's timer heap which is much more efficient
|
||||
rc.cleanupTimerMu.Lock()
|
||||
// Cancel any existing timer for this hash (shouldn't happen, but just in case)
|
||||
if existingTimer, exists := rc.cleanupTimers[tokenHash]; exists {
|
||||
existingTimer.Stop()
|
||||
}
|
||||
rc.cleanupTimers[tokenHash] = time.AfterFunc(delay, func() {
|
||||
rc.performCleanup(tokenHash)
|
||||
// Remove timer from map
|
||||
rc.cleanupTimerMu.Lock()
|
||||
delete(rc.cleanupTimers, tokenHash)
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
})
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
}
|
||||
|
||||
// performCleanup removes the operation from the in-flight map
|
||||
func (rc *RefreshCoordinator) performCleanup(tokenHash string) {
|
||||
rc.refreshMutex.Lock()
|
||||
delete(rc.inFlightRefreshes, tokenHash)
|
||||
rc.refreshMutex.Unlock()
|
||||
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
||||
}
|
||||
|
||||
// processDelayedCleanups processes delayed cleanup requests from the queue
|
||||
// This is a single goroutine that handles all delayed cleanups
|
||||
func (rc *RefreshCoordinator) processDelayedCleanups() {
|
||||
defer rc.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case item := <-rc.delayedCleanupQueue:
|
||||
// Wait until cleanup time
|
||||
waitDuration := time.Until(item.cleanupAt)
|
||||
if waitDuration > 0 {
|
||||
select {
|
||||
case <-time.After(waitDuration):
|
||||
case <-rc.stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
rc.performCleanup(item.tokenHash)
|
||||
case <-rc.stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isInCooldown checks if a session is in cooldown after recording an attempt
|
||||
func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
|
||||
rc.attemptsMutex.Lock()
|
||||
@@ -516,6 +587,15 @@ func (rc *RefreshCoordinator) GetMetrics() map[string]interface{} {
|
||||
// Shutdown gracefully shuts down the coordinator
|
||||
func (rc *RefreshCoordinator) Shutdown() {
|
||||
close(rc.stopChan)
|
||||
|
||||
// Cancel all pending cleanup timers
|
||||
rc.cleanupTimerMu.Lock()
|
||||
for _, timer := range rc.cleanupTimers {
|
||||
timer.Stop()
|
||||
}
|
||||
rc.cleanupTimers = make(map[string]*time.Timer)
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
|
||||
rc.wg.Wait()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user