fix: snapshot patterns for refresh-tracker and metadata URLs

Two related lock-free snapshot refactors addressing the remaining
post-v1.0.16 code-review findings.

1. refreshAttemptTracker: per-field atomic.Load/Store -> atomic.Value
   snapshot of *attemptState (refresh_coordinator.go).

   Previously each tracker held five independently-atomic fields. The
   cooldown-exit reset wrote cooldownEndNano = 0 first, then separately
   stored attempts = 1 and windowStartNano = now. A concurrent
   isInCooldown call could observe cooldownEndNano = 0 (reset just
   completed) with attempts still at MaxRefreshAttempts, immediately
   triggering a fresh cooldown — a benign double-trigger race that
   nonetheless meant the state machine had observable intermediate
   states.

   New design: state is a *attemptState (immutable) published via
   atomic.Value. All transitions (record/success/failure/window-reset/
   cooldown-enter/cooldown-exit) go through mutateState, which runs a
   CAS loop: load current snapshot -> construct fresh snapshot ->
   CompareAndSwap. Either the entire new state publishes or none of
   it does — no intermediate visibility, no cross-field race.

   Under Yaegi this collapses 3-5 per-field atomic dispatches into one
   atomic.Value.Load on the read path. Write paths pay an extra
   allocation for the new snapshot but avoid the cross-field hazard.

2. MetadataSnapshot: hot-path readers use atomic.Value instead of
   metadataMu.RLock (middleware.go, types.go, main.go, utilities.go).

   middleware.ServeHTTP previously took metadataMu.RLock on every
   non-bypass request to read the single field issuerURL. Under Yaegi
   each RLock acquisition costs 1-5ms of interpreter dispatch.
   updateMetadataEndpoints now also publishes an immutable
   *MetadataSnapshot via atomic.Value; the hot-path reader loads it
   in one op via t.metadataSnap(). Falls back to the legacy
   metadataMu.RLock pattern when the snapshot is unpublished (some
   test setups initialize the struct fields directly without going
   through updateMetadataEndpoints).

   Less-frequent callers (helpers, logout, token_introspection) still
   take metadataMu.RLock and are unchanged. The snapshot strictly
   subsets the metadataMu-protected fields, so those readers see
   identical data.

Note on atomic.Pointer[T]: this would have been the cleaner type but
yaegi v0.16.1's stdlib (used by traefik:v3.7.1) exposes only the
legacy unsafe.Pointer-based atomic primitives — no generic Pointer[T].
atomic.Value provides the same semantics via interface{} + type assert.

All tests pass with -race; golangci-lint clean.
This commit is contained in:
2026-05-23 11:31:51 +01:00
parent 827926bc3a
commit 17e3f8ef62
5 changed files with 207 additions and 55 deletions
+13
View File
@@ -517,6 +517,19 @@ func (t *TraefikOidc) updateMetadataEndpoints(metadata *ProviderMetadata) {
introspectionURL := t.introspectionURL
registrationURL := t.registrationURL
// Publish the read-mostly URL bundle atomically. Hot-path readers Load
// this directly instead of acquiring metadataMu.RLock per request.
t.metadataSnapshot.Store(&MetadataSnapshot{
IssuerURL: metadata.Issuer,
JWKSURL: metadata.JWKSURL,
TokenURL: metadata.TokenURL,
AuthURL: metadata.AuthURL,
RevocationURL: metadata.RevokeURL,
EndSessionURL: metadata.EndSessionURL,
IntrospectionURL: metadata.IntrospectionURL,
RegistrationURL: metadata.RegistrationURL,
})
t.metadataMu.Unlock()
// Log introspection endpoint availability for opaque token support
+15 -4
View File
@@ -209,10 +209,21 @@ func (t *TraefikOidc) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
select {
case <-t.initComplete:
// Read issuerURL with RLock
t.metadataMu.RLock()
issuerURL := t.issuerURL
t.metadataMu.RUnlock()
// Read issuerURL via atomic snapshot when available — replaces the
// metadataMu.RLock that previously fired on every non-bypass request.
// Under Yaegi each RLock acquisition costs 1-5ms of interpreter
// dispatch; the snapshot is a single atomic.Value.Load. Falls back
// to the legacy field+RLock for paths that haven't published a
// snapshot yet (notably some test setups that initialize the struct
// fields directly).
var issuerURL string
if snap := t.metadataSnap(); snap != nil {
issuerURL = snap.IssuerURL
} else {
t.metadataMu.RLock()
issuerURL = t.issuerURL
t.metadataMu.RUnlock()
}
if issuerURL == "" {
// Provider metadata initialization failed - try to recover.
+139 -51
View File
@@ -94,22 +94,46 @@ type refreshResult struct {
fromCache bool
}
// refreshAttemptTracker tracks refresh attempts for a session. All fields are
// accessed via sync/atomic so isInCooldown/recordRefreshAttempt/Success/Failure
// can run without holding any per-coordinator lock. Times are UnixNano so they
// fit in an int64 and can be read with a single atomic.LoadInt64.
// attemptState is the immutable snapshot of a session's refresh-attempt
// state. Lives behind refreshAttemptTracker.state (atomic.Value). Every
// transition (record, success, failure, window-reset, cooldown-enter,
// cooldown-exit) constructs a fresh attemptState and publishes it via
// CompareAndSwap so the entire field set is updated together.
//
// cooldownEndNano == 0 means "not in cooldown". This sentinel replaces the
// inCooldown bool that the previous implementation kept under attemptsMutex —
// under Yaegi any per-request global mutex turns into a serializing bottleneck
// (the v1.0.14 refreshMutex -> sync.Map fix removed only one such bottleneck;
// attemptsMutex was the next one in the queue).
// Per-field atomic.Load/Store (the previous v1.0.15 design) had a benign
// but observable hazard: the cooldown-exit reset wrote cooldownEndNano = 0
// first, then separately stored attempts = 1 and windowStartNano = now.
// A concurrent isInCooldown call could see cooldownEndNano = 0 (reset
// just completed) with attempts still at MaxRefreshAttempts, triggering
// a fresh cooldown immediately. The snapshot approach eliminates the
// intermediate state entirely.
type attemptState struct {
lastAttemptNano int64 // UnixNano of last attempt
windowStartNano int64 // UnixNano of attempt-window start
cooldownEndNano int64 // UnixNano; 0 = not in cooldown
attempts int32
consecutiveFailures int32
}
// refreshAttemptTracker tracks refresh attempts for a session via a single
// atomic.Value holding a *attemptState pointer. Readers do exactly one Load.
// Writers do Load → construct new → CompareAndSwap (retry on conflict).
// Under Yaegi this collapses 3-4 per-field atomic dispatches into one Load,
// and eliminates the cross-field race in the window-reset path.
type refreshAttemptTracker struct {
lastAttemptNano int64 // atomic, UnixNano of last attempt
windowStartNano int64 // atomic, UnixNano of attempt-window start
cooldownEndNano int64 // atomic, UnixNano; 0 = not in cooldown
attempts int32 // atomic
consecutiveFailures int32 // atomic
state atomic.Value // *attemptState
}
// stateOf returns the current attemptState, or a zero-value snapshot if none
// has been published yet. The empty snapshot represents "no attempts recorded".
func (t *refreshAttemptTracker) stateOf() *attemptState {
if v := t.state.Load(); v != nil {
s, _ := v.(*attemptState)
if s != nil {
return s
}
}
return &attemptState{}
}
// RefreshMetrics tracks coordinator performance metrics
@@ -452,18 +476,39 @@ func (rc *RefreshCoordinator) getOrCreateTracker(sessionID string) *refreshAttem
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
return trackerFromMapValue(v)
}
fresh := &refreshAttemptTracker{
windowStartNano: time.Now().UnixNano(),
}
fresh := &refreshAttemptTracker{}
fresh.state.Store(&attemptState{windowStartNano: time.Now().UnixNano()})
actual, _ := rc.sessionRefreshAttempts.LoadOrStore(sessionID, fresh)
return trackerFromMapValue(actual)
}
// isInCooldown checks if a session is in cooldown. Lock-free read with a
// best-effort cooldown-reset CAS on the cooldownEndNano sentinel. If the
// reset races with another goroutine we accept the loser's view (the winner's
// reset still happens). The attempt-window expiry and limit-exceeded paths
// are write-mostly but use atomic.StoreInt64/AddInt32 — never a held lock.
// mutateState performs a CompareAndSwap loop that applies mutate to the
// current snapshot. mutate must be PURE: it receives an immutable view of
// the current state and returns a fresh *attemptState. If mutate returns nil
// the update is skipped (used by isInCooldown for "no change needed" paths).
//
// Retries on CAS conflict are bounded by the number of concurrent writers —
// in practice 1-3. Under Yaegi each retry pays the dispatch cost of one Load
// + one CompareAndSwap; still cheaper than the previous per-field atomic
// sequence and immune to the cross-field race the v1.0.15 design had.
func (t *refreshAttemptTracker) mutateState(mutate func(cur *attemptState) *attemptState) *attemptState {
for {
cur := t.stateOf()
next := mutate(cur)
if next == nil {
return cur
}
if t.state.CompareAndSwap(t.state.Load(), next) {
return next
}
}
}
// isInCooldown checks if a session is in cooldown. Snapshot-based: every
// transition publishes a fresh *attemptState atomically so readers never see
// a partially-updated state. The previous per-field atomic design had a
// benign race in the cooldown-exit path (cooldownEndNano reset before
// attempts reset) that could double-trigger cooldown.
func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
if !ok {
@@ -472,37 +517,60 @@ func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
tracker := trackerFromMapValue(v)
now := time.Now()
nowNano := now.UnixNano()
maxAttempts := rc.config.MaxRefreshAttempts
window := rc.config.RefreshAttemptWindow
cooldownPeriod := rc.config.RefreshCooldownPeriod
cur := tracker.stateOf()
// Already in cooldown?
if cooldownEnd := atomic.LoadInt64(&tracker.cooldownEndNano); cooldownEnd != 0 {
if nowNano <= cooldownEnd {
if cur.cooldownEndNano != 0 {
if nowNano <= cur.cooldownEndNano {
return true // still in cooldown
}
// Cooldown expired. Best-effort reset (a concurrent caller may also
// reset; the result is equivalent — fresh window + one recorded
// attempt — so the CAS race is benign).
if atomic.CompareAndSwapInt64(&tracker.cooldownEndNano, cooldownEnd, 0) {
atomic.StoreInt32(&tracker.attempts, 1)
atomic.StoreInt32(&tracker.consecutiveFailures, 0)
atomic.StoreInt64(&tracker.windowStartNano, nowNano)
}
// Cooldown expired: atomically publish a fresh state with the window
// restarted from one attempt. Whichever goroutine wins the CAS sets
// the new snapshot; losers see it via the next stateOf load.
tracker.mutateState(func(s *attemptState) *attemptState {
if s.cooldownEndNano == 0 || nowNano <= s.cooldownEndNano {
return nil // someone else already reset, or back in cooldown
}
return &attemptState{
windowStartNano: nowNano,
attempts: 1,
}
})
return false
}
// Window expired?
if windowStart := atomic.LoadInt64(&tracker.windowStartNano); time.Duration(nowNano-windowStart) > rc.config.RefreshAttemptWindow {
atomic.StoreInt32(&tracker.attempts, 1)
atomic.StoreInt64(&tracker.windowStartNano, nowNano)
if time.Duration(nowNano-cur.windowStartNano) > window {
tracker.mutateState(func(s *attemptState) *attemptState {
if time.Duration(nowNano-s.windowStartNano) <= window {
return nil
}
next := *s
next.windowStartNano = nowNano
next.attempts = 1
return &next
})
return false
}
// Just exceeded attempt limit?
if int(atomic.LoadInt32(&tracker.attempts)) >= rc.config.MaxRefreshAttempts {
end := now.Add(rc.config.RefreshCooldownPeriod).UnixNano()
// Only one CAS winner publishes the cooldown end + logs.
if atomic.CompareAndSwapInt64(&tracker.cooldownEndNano, 0, end) {
if int(cur.attempts) >= maxAttempts {
end := now.Add(cooldownPeriod).UnixNano()
published := tracker.mutateState(func(s *attemptState) *attemptState {
if s.cooldownEndNano != 0 {
return nil
}
next := *s
next.cooldownEndNano = end
return &next
})
if published.cooldownEndNano == end {
rc.logger.Infof("Session %s entering refresh cooldown after %d attempts",
sessionID, atomic.LoadInt32(&tracker.attempts))
sessionID, published.attempts)
}
return true
}
@@ -510,26 +578,46 @@ func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
return false
}
// recordRefreshAttempt records a refresh attempt for rate limiting. Lock-free:
// LoadOrStore for the tracker, atomic counters/timestamps for fields.
// recordRefreshAttempt records a refresh attempt for rate limiting. Lock-free
// snapshot mutation; attempts and lastAttemptNano are advanced atomically.
func (rc *RefreshCoordinator) recordRefreshAttempt(sessionID string) {
tracker := rc.getOrCreateTracker(sessionID)
atomic.AddInt32(&tracker.attempts, 1)
atomic.StoreInt64(&tracker.lastAttemptNano, time.Now().UnixNano())
nowNano := time.Now().UnixNano()
tracker.mutateState(func(s *attemptState) *attemptState {
next := *s
next.attempts++
next.lastAttemptNano = nowNano
return &next
})
}
// recordRefreshSuccess records a successful refresh. Lock-free.
// recordRefreshSuccess records a successful refresh: zero consecutiveFailures.
func (rc *RefreshCoordinator) recordRefreshSuccess(sessionID string) {
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
atomic.StoreInt32(&trackerFromMapValue(v).consecutiveFailures, 0)
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
if !ok {
return
}
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
if s.consecutiveFailures == 0 {
return nil
}
next := *s
next.consecutiveFailures = 0
return &next
})
}
// recordRefreshFailure records a failed refresh. Lock-free.
// recordRefreshFailure records a failed refresh: increments consecutiveFailures.
func (rc *RefreshCoordinator) recordRefreshFailure(sessionID string) {
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
atomic.AddInt32(&trackerFromMapValue(v).consecutiveFailures, 1)
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
if !ok {
return
}
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
next := *s
next.consecutiveFailures++
return &next
})
}
// hashRefreshToken creates a hash of the refresh token for deduplication
@@ -589,7 +677,7 @@ func (rc *RefreshCoordinator) cleanupStaleEntries() {
if tracker == nil {
return true
}
if atomic.LoadInt64(&tracker.lastAttemptNano) < cutoff {
if tracker.stateOf().lastAttemptNano < cutoff {
// Compare-and-delete to avoid evicting a tracker that was just
// re-used by a concurrent caller. We compare by pointer identity.
rc.sessionRefreshAttempts.CompareAndDelete(key, value)
+27
View File
@@ -5,6 +5,7 @@ import (
"context"
"net/http"
"sync"
"sync/atomic"
"text/template"
"time"
@@ -64,7 +65,33 @@ type ProviderMetadata struct {
// It integrates with various OIDC providers, manages sessions, caches tokens, and handles
// the complete authentication flow. It's designed to work seamlessly with Traefik's
// plugin system and provides flexible configuration options.
// MetadataSnapshot is an immutable bundle of provider-metadata URLs that the
// plugin needs on the hot request path. Published atomically via
// TraefikOidc.metadataSnapshot; readers do exactly one atomic.Value.Load to
// access all fields. Replaces 3 per-request metadataMu.RLock acquisitions
// in middleware.ServeHTTP + token_manager paths, each of which paid
// 1-5ms of Yaegi-dispatch overhead.
//
// The fields are a strict subset of the metadataMu-guarded TraefikOidc
// fields; the legacy fields are still written under metadataMu for
// less-frequent code paths that have not been migrated.
type MetadataSnapshot struct {
IssuerURL string
JWKSURL string
TokenURL string
AuthURL string
RevocationURL string
EndSessionURL string
IntrospectionURL string
RegistrationURL string
}
type TraefikOidc struct {
// metadataSnapshot atomically publishes the read-mostly URL bundle.
// Hot-path readers (middleware.ServeHTTP, token verification) load it
// directly; less-frequent paths still acquire metadataMu.RLock and
// read the individual fields below.
metadataSnapshot atomic.Value
// lastMetadataRetryNano is the UnixNano timestamp of the last metadata
// recovery attempt. Stored atomically so the hot ServeHTTP path can
// throttle retries without acquiring metadataRetryMutex on every request.
+13
View File
@@ -14,6 +14,19 @@ import (
"time"
)
// metadataSnap returns the most recently published *MetadataSnapshot, or nil
// if metadata has not yet been resolved. Single atomic.Value.Load — the hot
// ServeHTTP path uses this instead of acquiring metadataMu.RLock, which under
// Yaegi pays 1-5ms of interpreter-dispatch overhead per acquisition.
func (t *TraefikOidc) metadataSnap() *MetadataSnapshot {
v := t.metadataSnapshot.Load()
if v == nil {
return nil
}
s, _ := v.(*MetadataSnapshot)
return s
}
// safeLogDebug provides nil-safe logging for debug messages
func (t *TraefikOidc) safeLogDebug(msg string) {
if t.logger != nil {