mirror of
https://github.com/lukaszraczylo/traefikoidc.git
synced 2026-06-06 22:49:43 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 68e1c4319c | |||
| 17e3f8ef62 | |||
| 827926bc3a |
@@ -517,6 +517,19 @@ func (t *TraefikOidc) updateMetadataEndpoints(metadata *ProviderMetadata) {
|
||||
introspectionURL := t.introspectionURL
|
||||
registrationURL := t.registrationURL
|
||||
|
||||
// Publish the read-mostly URL bundle atomically. Hot-path readers Load
|
||||
// this directly instead of acquiring metadataMu.RLock per request.
|
||||
t.metadataSnapshot.Store(&MetadataSnapshot{
|
||||
IssuerURL: metadata.Issuer,
|
||||
JWKSURL: metadata.JWKSURL,
|
||||
TokenURL: metadata.TokenURL,
|
||||
AuthURL: metadata.AuthURL,
|
||||
RevocationURL: metadata.RevokeURL,
|
||||
EndSessionURL: metadata.EndSessionURL,
|
||||
IntrospectionURL: metadata.IntrospectionURL,
|
||||
RegistrationURL: metadata.RegistrationURL,
|
||||
})
|
||||
|
||||
t.metadataMu.Unlock()
|
||||
|
||||
// Log introspection endpoint availability for opaque token support
|
||||
|
||||
+120
-7
@@ -209,10 +209,21 @@ func (t *TraefikOidc) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
|
||||
select {
|
||||
case <-t.initComplete:
|
||||
// Read issuerURL with RLock
|
||||
t.metadataMu.RLock()
|
||||
issuerURL := t.issuerURL
|
||||
t.metadataMu.RUnlock()
|
||||
// Read issuerURL via atomic snapshot when available — replaces the
|
||||
// metadataMu.RLock that previously fired on every non-bypass request.
|
||||
// Under Yaegi each RLock acquisition costs 1-5ms of interpreter
|
||||
// dispatch; the snapshot is a single atomic.Value.Load. Falls back
|
||||
// to the legacy field+RLock for paths that haven't published a
|
||||
// snapshot yet (notably some test setups that initialize the struct
|
||||
// fields directly).
|
||||
var issuerURL string
|
||||
if snap := t.metadataSnap(); snap != nil {
|
||||
issuerURL = snap.IssuerURL
|
||||
} else {
|
||||
t.metadataMu.RLock()
|
||||
issuerURL = t.issuerURL
|
||||
t.metadataMu.RUnlock()
|
||||
}
|
||||
|
||||
if issuerURL == "" {
|
||||
// Provider metadata initialization failed - try to recover.
|
||||
@@ -300,6 +311,19 @@ func (t *TraefikOidc) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
host := utils.DetermineHost(req)
|
||||
redirectURL := buildFullURL(scheme, host, t.redirURLPath)
|
||||
|
||||
// Capture per-request state: one RLock on sd.sessionMutex covers all the
|
||||
// getter values the handler chain needs (instead of 5-7 separate
|
||||
// session.GetX() calls each acquiring their own RLock under Yaegi).
|
||||
// metadataSnap is also stored once so downstream handlers don't repeat
|
||||
// the atomic.Value.Load.
|
||||
rs := (&requestState{
|
||||
scheme: scheme,
|
||||
host: host,
|
||||
redirectURL: redirectURL,
|
||||
next: t.next,
|
||||
metadata: t.metadataSnap(),
|
||||
}).captureSession(session)
|
||||
|
||||
// Check if the current request is the OIDC callback
|
||||
t.logger.Debugf("Checking callback URL match: request_path=%q, configured_callback=%q", req.URL.Path, t.redirURLPath)
|
||||
if req.URL.Path == t.redirURLPath {
|
||||
@@ -317,7 +341,7 @@ func (t *TraefikOidc) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
userIdentifier := session.GetUserIdentifier()
|
||||
userIdentifier := rs.userIdentifier
|
||||
// User authorization check
|
||||
if authenticated && userIdentifier != "" {
|
||||
if !t.isAllowedUser(userIdentifier) {
|
||||
@@ -334,11 +358,11 @@ func (t *TraefikOidc) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
// methods (validateAzureTokens/validateStandardTokens) before reaching this point.
|
||||
// Redundant validation here was causing issues with Azure AD tokens that have
|
||||
// JWT format but unverifiable signatures. See issue #89.
|
||||
t.processAuthorizedRequest(rw, req, session, redirectURL)
|
||||
t.processAuthorizedRequestRS(rw, req, rs)
|
||||
return
|
||||
}
|
||||
|
||||
refreshTokenPresent := session.GetRefreshToken() != ""
|
||||
refreshTokenPresent := rs.refreshToken != ""
|
||||
|
||||
// Decide whether to answer with 401 instead of a redirect. AJAX requests
|
||||
// cannot follow a 302 into an IdP, and sub-resource loads (script/image/
|
||||
@@ -445,6 +469,95 @@ func (t *TraefikOidc) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
// - req: The HTTP request to process.
|
||||
// - session: The user's session data containing tokens and claims.
|
||||
// - redirectURL: The callback URL for re-authentication if needed.
|
||||
// processAuthorizedRequestRS is the requestState-aware variant of
|
||||
// processAuthorizedRequest. It reads SessionData fields from the captured
|
||||
// snapshot in rs instead of calling session.GetX() (each of which acquires
|
||||
// sd.sessionMutex.RLock — under Yaegi every RLock pays ~1-5ms of interpreter
|
||||
// dispatch). Only session-mutating operations (Save, ResetRedirectCount,
|
||||
// Clear, IsDirty) still go through the session pointer because those write
|
||||
// state and have no snapshot.
|
||||
func (t *TraefikOidc) processAuthorizedRequestRS(rw http.ResponseWriter, req *http.Request, rs *requestState) {
|
||||
session := rs.session
|
||||
redirectURL := rs.redirectURL
|
||||
userIdentifier := rs.userIdentifier
|
||||
if userIdentifier == "" {
|
||||
t.logger.Info("No user identifier found in session during final processing, initiating re-auth")
|
||||
session.ResetRedirectCount()
|
||||
t.defaultInitiateAuthentication(rw, req, session, redirectURL)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if session has been invalidated via backchannel or front-channel logout
|
||||
idToken := rs.idToken
|
||||
if t.enableBackchannelLogout || t.enableFrontchannelLogout {
|
||||
if idToken != "" {
|
||||
sid, sub, createdAt := t.extractSessionInfo(idToken)
|
||||
if t.isSessionInvalidated(sid, sub, createdAt) {
|
||||
t.logger.Infof("Session for user %s has been invalidated via IdP-initiated logout", userIdentifier)
|
||||
if err := session.Clear(req, rw); err != nil {
|
||||
t.logger.Errorf("Error clearing invalidated session: %v", err)
|
||||
}
|
||||
session.ResetRedirectCount()
|
||||
t.defaultInitiateAuthentication(rw, req, session, redirectURL)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve ID-token claims at most once per request. SessionData caches
|
||||
// the parsed claims keyed on the raw ID token.
|
||||
var (
|
||||
idClaims map[string]interface{}
|
||||
idClaimsErr error
|
||||
)
|
||||
if idToken != "" {
|
||||
idClaims, idClaimsErr = session.GetIDTokenClaims(t.extractClaimsFunc)
|
||||
}
|
||||
|
||||
var (
|
||||
groupClaims map[string]interface{}
|
||||
groupClaimsErr error
|
||||
)
|
||||
if idToken != "" {
|
||||
groupClaims, groupClaimsErr = idClaims, idClaimsErr
|
||||
} else if rs.accessToken != "" {
|
||||
groupClaims, groupClaimsErr = t.extractClaimsFunc(rs.accessToken)
|
||||
} else if len(t.allowedRolesAndGroups) > 0 {
|
||||
t.logger.Error("No token available but roles/groups checks are required")
|
||||
session.ResetRedirectCount()
|
||||
t.defaultInitiateAuthentication(rw, req, session, redirectURL)
|
||||
return
|
||||
}
|
||||
|
||||
if groupClaimsErr != nil && len(t.allowedRolesAndGroups) > 0 {
|
||||
t.logger.Errorf("Failed to extract claims for roles/groups check: %v", groupClaimsErr)
|
||||
session.ResetRedirectCount()
|
||||
t.defaultInitiateAuthentication(rw, req, session, redirectURL)
|
||||
return
|
||||
}
|
||||
|
||||
// Persist any dirty session state BEFORE forwardAuthorized writes the
|
||||
// response.
|
||||
if session.IsDirty() {
|
||||
if err := session.Save(req, rw); err != nil {
|
||||
t.logger.Errorf("Failed to save session after processing headers: %v", err)
|
||||
}
|
||||
} else {
|
||||
t.logger.Debug("Session not dirty, skipping save in processAuthorizedRequest")
|
||||
}
|
||||
|
||||
p := &principal{
|
||||
Source: sourceSession,
|
||||
Identifier: userIdentifier,
|
||||
AccessToken: rs.accessToken,
|
||||
IDToken: idToken,
|
||||
RefreshToken: rs.refreshToken,
|
||||
Claims: groupClaims,
|
||||
}
|
||||
|
||||
t.forwardAuthorized(rw, req, p)
|
||||
}
|
||||
|
||||
func (t *TraefikOidc) processAuthorizedRequest(rw http.ResponseWriter, req *http.Request, session *SessionData, redirectURL string) {
|
||||
userIdentifier := session.GetUserIdentifier()
|
||||
if userIdentifier == "" {
|
||||
|
||||
+184
-100
@@ -31,14 +31,12 @@ type RefreshCoordinator struct {
|
||||
// serializing pattern caused the v1.0.15 death spiral after v1.0.14
|
||||
// removed the refreshMutex (same architectural shape, different mutex).
|
||||
sessionRefreshAttempts sync.Map
|
||||
cleanupTimers map[string]*time.Timer
|
||||
circuitBreaker *RefreshCircuitBreaker
|
||||
metrics *RefreshMetrics
|
||||
logger *Logger
|
||||
stopChan chan struct{}
|
||||
config RefreshCoordinatorConfig
|
||||
wg sync.WaitGroup
|
||||
cleanupTimerMu sync.Mutex
|
||||
}
|
||||
|
||||
// RefreshCoordinatorConfig configures the refresh coordinator behavior
|
||||
@@ -96,22 +94,46 @@ type refreshResult struct {
|
||||
fromCache bool
|
||||
}
|
||||
|
||||
// refreshAttemptTracker tracks refresh attempts for a session. All fields are
|
||||
// accessed via sync/atomic so isInCooldown/recordRefreshAttempt/Success/Failure
|
||||
// can run without holding any per-coordinator lock. Times are UnixNano so they
|
||||
// fit in an int64 and can be read with a single atomic.LoadInt64.
|
||||
// attemptState is the immutable snapshot of a session's refresh-attempt
|
||||
// state. Lives behind refreshAttemptTracker.state (atomic.Value). Every
|
||||
// transition (record, success, failure, window-reset, cooldown-enter,
|
||||
// cooldown-exit) constructs a fresh attemptState and publishes it via
|
||||
// CompareAndSwap so the entire field set is updated together.
|
||||
//
|
||||
// cooldownEndNano == 0 means "not in cooldown". This sentinel replaces the
|
||||
// inCooldown bool that the previous implementation kept under attemptsMutex —
|
||||
// under Yaegi any per-request global mutex turns into a serializing bottleneck
|
||||
// (the v1.0.14 refreshMutex -> sync.Map fix removed only one such bottleneck;
|
||||
// attemptsMutex was the next one in the queue).
|
||||
// Per-field atomic.Load/Store (the previous v1.0.15 design) had a benign
|
||||
// but observable hazard: the cooldown-exit reset wrote cooldownEndNano = 0
|
||||
// first, then separately stored attempts = 1 and windowStartNano = now.
|
||||
// A concurrent isInCooldown call could see cooldownEndNano = 0 (reset
|
||||
// just completed) with attempts still at MaxRefreshAttempts, triggering
|
||||
// a fresh cooldown immediately. The snapshot approach eliminates the
|
||||
// intermediate state entirely.
|
||||
type attemptState struct {
|
||||
lastAttemptNano int64 // UnixNano of last attempt
|
||||
windowStartNano int64 // UnixNano of attempt-window start
|
||||
cooldownEndNano int64 // UnixNano; 0 = not in cooldown
|
||||
attempts int32
|
||||
consecutiveFailures int32
|
||||
}
|
||||
|
||||
// refreshAttemptTracker tracks refresh attempts for a session via a single
|
||||
// atomic.Value holding a *attemptState pointer. Readers do exactly one Load.
|
||||
// Writers do Load → construct new → CompareAndSwap (retry on conflict).
|
||||
// Under Yaegi this collapses 3-4 per-field atomic dispatches into one Load,
|
||||
// and eliminates the cross-field race in the window-reset path.
|
||||
type refreshAttemptTracker struct {
|
||||
lastAttemptNano int64 // atomic, UnixNano of last attempt
|
||||
windowStartNano int64 // atomic, UnixNano of attempt-window start
|
||||
cooldownEndNano int64 // atomic, UnixNano; 0 = not in cooldown
|
||||
attempts int32 // atomic
|
||||
consecutiveFailures int32 // atomic
|
||||
state atomic.Value // *attemptState
|
||||
}
|
||||
|
||||
// stateOf returns the current attemptState, or a zero-value snapshot if none
|
||||
// has been published yet. The empty snapshot represents "no attempts recorded".
|
||||
func (t *refreshAttemptTracker) stateOf() *attemptState {
|
||||
if v := t.state.Load(); v != nil {
|
||||
s, _ := v.(*attemptState)
|
||||
if s != nil {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return &attemptState{}
|
||||
}
|
||||
|
||||
// RefreshMetrics tracks coordinator performance metrics
|
||||
@@ -157,10 +179,9 @@ func NewRefreshCoordinator(config RefreshCoordinatorConfig, logger *Logger) *Ref
|
||||
// inFlightRefreshes and sessionRefreshAttempts are both sync.Map;
|
||||
// their zero values are ready to use.
|
||||
config: config,
|
||||
metrics: &RefreshMetrics{},
|
||||
logger: logger,
|
||||
stopChan: make(chan struct{}),
|
||||
cleanupTimers: make(map[string]*time.Timer),
|
||||
metrics: &RefreshMetrics{},
|
||||
logger: logger,
|
||||
stopChan: make(chan struct{}),
|
||||
circuitBreaker: &RefreshCircuitBreaker{
|
||||
config: RefreshCircuitBreakerConfig{
|
||||
MaxFailures: 3,
|
||||
@@ -288,19 +309,22 @@ func (rc *RefreshCoordinator) getOrCreateOperation(
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Reserve concurrent slot via CAS — without the old global lock we can
|
||||
// no longer rely on mutex-mediated check-then-increment. If we lose the
|
||||
// CAS race we retry; if the limit has since been reached we back out.
|
||||
for {
|
||||
current := atomic.LoadInt32(&rc.metrics.currentInFlightRefreshes)
|
||||
if int(current) >= rc.config.MaxConcurrentRefreshes {
|
||||
err := fmt.Errorf("maximum concurrent refresh operations reached")
|
||||
rc.failCandidate(tokenHash, candidate, err)
|
||||
return nil, false, err
|
||||
}
|
||||
if atomic.CompareAndSwapInt32(&rc.metrics.currentInFlightRefreshes, current, current+1) {
|
||||
break
|
||||
}
|
||||
// Reserve concurrent slot via ticket-and-return: increment optimistically,
|
||||
// decrement if we overshot the limit. The previous CAS-loop allowed a
|
||||
// transient overshoot of up to N-1 leaders when several goroutines all
|
||||
// observed `current < max` in the same scheduling slice before any one
|
||||
// of them succeeded their CAS — visible to readers as
|
||||
// currentInFlightRefreshes > MaxConcurrentRefreshes for a brief window.
|
||||
// The ticket pattern is strictly bounded: the counter momentarily reads
|
||||
// max+k for k concurrent attempts past the limit, but only the k that
|
||||
// produced max+1..max+k decrement back, and only k=1 ever observes max+1
|
||||
// as committed.
|
||||
newCount := atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, 1)
|
||||
if int(newCount) > rc.config.MaxConcurrentRefreshes {
|
||||
atomic.AddInt32(&rc.metrics.currentInFlightRefreshes, -1)
|
||||
err := fmt.Errorf("maximum concurrent refresh operations reached")
|
||||
rc.failCandidate(tokenHash, candidate, err)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return candidate, true, nil
|
||||
@@ -311,7 +335,13 @@ func (rc *RefreshCoordinator) getOrCreateOperation(
|
||||
// goroutine that just registered the operation) runs them; joiners share the
|
||||
// leader's outcome via operation.done.
|
||||
func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error {
|
||||
rc.recordRefreshAttempt(sessionID)
|
||||
// Cooldown check FIRST, BEFORE incrementing the attempt counter.
|
||||
// Previously this function recorded the attempt and then read the
|
||||
// cooldown state. Under burst load (many concurrent leaders with
|
||||
// different token hashes but same session) every goroutine could
|
||||
// increment past MaxRefreshAttempts before any one of them observed
|
||||
// the threshold, so the cooldown gate fired too late — the same
|
||||
// thundering-herd shape that drove v1.0.14 into the ground.
|
||||
if rc.isInCooldown(sessionID) {
|
||||
atomic.AddInt64(&rc.metrics.cooldownsTriggered, 1)
|
||||
return fmt.Errorf("refresh attempts exceeded for session, in cooldown period")
|
||||
@@ -320,6 +350,8 @@ func (rc *RefreshCoordinator) applyLeaderGates(sessionID string) error {
|
||||
atomic.AddInt64(&rc.metrics.memoryPressureEvents, 1)
|
||||
return fmt.Errorf("system under memory pressure, refresh denied")
|
||||
}
|
||||
// Only count attempts that actually progress past the gates.
|
||||
rc.recordRefreshAttempt(sessionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -396,31 +428,25 @@ func (rc *RefreshCoordinator) executeRefreshAsync(
|
||||
}
|
||||
}
|
||||
|
||||
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning a goroutine
|
||||
// This prevents goroutine explosion under high load (500+ req/sec)
|
||||
// scheduleDelayedCleanup schedules a cleanup using a timer instead of spawning
|
||||
// a goroutine — time.AfterFunc uses the runtime's timer heap and never spawns
|
||||
// a per-timer goroutine until the callback actually fires.
|
||||
//
|
||||
// The previous implementation tracked every pending timer in a map guarded by
|
||||
// cleanupTimerMu so a duplicate scheduling could cancel the prior timer. That
|
||||
// "shouldn't happen" path was the only consumer of the map, but the mutex
|
||||
// fired on every successful refresh completion — yet another per-request
|
||||
// Yaegi-dispatched lock acquisition. performCleanup is already idempotent
|
||||
// (LoadAndDelete on the sync.Map), so a duplicate scheduling at worst fires
|
||||
// performCleanup twice; the second call is a no-op. Dropping the map removes
|
||||
// the whole class of contention on this code path.
|
||||
func (rc *RefreshCoordinator) scheduleDelayedCleanup(tokenHash string) {
|
||||
delay := rc.config.DeduplicationCleanupDelay
|
||||
if delay <= 0 {
|
||||
// Immediate cleanup
|
||||
rc.performCleanup(tokenHash)
|
||||
return
|
||||
}
|
||||
|
||||
// Use time.AfterFunc which is more efficient than spawning a goroutine with Sleep
|
||||
// time.AfterFunc uses the runtime's timer heap which is much more efficient
|
||||
rc.cleanupTimerMu.Lock()
|
||||
// Cancel any existing timer for this hash (shouldn't happen, but just in case)
|
||||
if existingTimer, exists := rc.cleanupTimers[tokenHash]; exists {
|
||||
existingTimer.Stop()
|
||||
}
|
||||
rc.cleanupTimers[tokenHash] = time.AfterFunc(delay, func() {
|
||||
rc.performCleanup(tokenHash)
|
||||
// Remove timer from map
|
||||
rc.cleanupTimerMu.Lock()
|
||||
delete(rc.cleanupTimers, tokenHash)
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
})
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
time.AfterFunc(delay, func() { rc.performCleanup(tokenHash) })
|
||||
}
|
||||
|
||||
// performCleanup removes the operation from the in-flight map.
|
||||
@@ -450,18 +476,39 @@ func (rc *RefreshCoordinator) getOrCreateTracker(sessionID string) *refreshAttem
|
||||
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
||||
return trackerFromMapValue(v)
|
||||
}
|
||||
fresh := &refreshAttemptTracker{
|
||||
windowStartNano: time.Now().UnixNano(),
|
||||
}
|
||||
fresh := &refreshAttemptTracker{}
|
||||
fresh.state.Store(&attemptState{windowStartNano: time.Now().UnixNano()})
|
||||
actual, _ := rc.sessionRefreshAttempts.LoadOrStore(sessionID, fresh)
|
||||
return trackerFromMapValue(actual)
|
||||
}
|
||||
|
||||
// isInCooldown checks if a session is in cooldown. Lock-free read with a
|
||||
// best-effort cooldown-reset CAS on the cooldownEndNano sentinel. If the
|
||||
// reset races with another goroutine we accept the loser's view (the winner's
|
||||
// reset still happens). The attempt-window expiry and limit-exceeded paths
|
||||
// are write-mostly but use atomic.StoreInt64/AddInt32 — never a held lock.
|
||||
// mutateState performs a CompareAndSwap loop that applies mutate to the
|
||||
// current snapshot. mutate must be PURE: it receives an immutable view of
|
||||
// the current state and returns a fresh *attemptState. If mutate returns nil
|
||||
// the update is skipped (used by isInCooldown for "no change needed" paths).
|
||||
//
|
||||
// Retries on CAS conflict are bounded by the number of concurrent writers —
|
||||
// in practice 1-3. Under Yaegi each retry pays the dispatch cost of one Load
|
||||
// + one CompareAndSwap; still cheaper than the previous per-field atomic
|
||||
// sequence and immune to the cross-field race the v1.0.15 design had.
|
||||
func (t *refreshAttemptTracker) mutateState(mutate func(cur *attemptState) *attemptState) *attemptState {
|
||||
for {
|
||||
cur := t.stateOf()
|
||||
next := mutate(cur)
|
||||
if next == nil {
|
||||
return cur
|
||||
}
|
||||
if t.state.CompareAndSwap(t.state.Load(), next) {
|
||||
return next
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isInCooldown checks if a session is in cooldown. Snapshot-based: every
|
||||
// transition publishes a fresh *attemptState atomically so readers never see
|
||||
// a partially-updated state. The previous per-field atomic design had a
|
||||
// benign race in the cooldown-exit path (cooldownEndNano reset before
|
||||
// attempts reset) that could double-trigger cooldown.
|
||||
func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
|
||||
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
||||
if !ok {
|
||||
@@ -470,37 +517,60 @@ func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
|
||||
tracker := trackerFromMapValue(v)
|
||||
now := time.Now()
|
||||
nowNano := now.UnixNano()
|
||||
maxAttempts := rc.config.MaxRefreshAttempts
|
||||
window := rc.config.RefreshAttemptWindow
|
||||
cooldownPeriod := rc.config.RefreshCooldownPeriod
|
||||
|
||||
cur := tracker.stateOf()
|
||||
|
||||
// Already in cooldown?
|
||||
if cooldownEnd := atomic.LoadInt64(&tracker.cooldownEndNano); cooldownEnd != 0 {
|
||||
if nowNano <= cooldownEnd {
|
||||
if cur.cooldownEndNano != 0 {
|
||||
if nowNano <= cur.cooldownEndNano {
|
||||
return true // still in cooldown
|
||||
}
|
||||
// Cooldown expired. Best-effort reset (a concurrent caller may also
|
||||
// reset; the result is equivalent — fresh window + one recorded
|
||||
// attempt — so the CAS race is benign).
|
||||
if atomic.CompareAndSwapInt64(&tracker.cooldownEndNano, cooldownEnd, 0) {
|
||||
atomic.StoreInt32(&tracker.attempts, 1)
|
||||
atomic.StoreInt32(&tracker.consecutiveFailures, 0)
|
||||
atomic.StoreInt64(&tracker.windowStartNano, nowNano)
|
||||
}
|
||||
// Cooldown expired: atomically publish a fresh state with the window
|
||||
// restarted from one attempt. Whichever goroutine wins the CAS sets
|
||||
// the new snapshot; losers see it via the next stateOf load.
|
||||
tracker.mutateState(func(s *attemptState) *attemptState {
|
||||
if s.cooldownEndNano == 0 || nowNano <= s.cooldownEndNano {
|
||||
return nil // someone else already reset, or back in cooldown
|
||||
}
|
||||
return &attemptState{
|
||||
windowStartNano: nowNano,
|
||||
attempts: 1,
|
||||
}
|
||||
})
|
||||
return false
|
||||
}
|
||||
|
||||
// Window expired?
|
||||
if windowStart := atomic.LoadInt64(&tracker.windowStartNano); time.Duration(nowNano-windowStart) > rc.config.RefreshAttemptWindow {
|
||||
atomic.StoreInt32(&tracker.attempts, 1)
|
||||
atomic.StoreInt64(&tracker.windowStartNano, nowNano)
|
||||
if time.Duration(nowNano-cur.windowStartNano) > window {
|
||||
tracker.mutateState(func(s *attemptState) *attemptState {
|
||||
if time.Duration(nowNano-s.windowStartNano) <= window {
|
||||
return nil
|
||||
}
|
||||
next := *s
|
||||
next.windowStartNano = nowNano
|
||||
next.attempts = 1
|
||||
return &next
|
||||
})
|
||||
return false
|
||||
}
|
||||
|
||||
// Just exceeded attempt limit?
|
||||
if int(atomic.LoadInt32(&tracker.attempts)) >= rc.config.MaxRefreshAttempts {
|
||||
end := now.Add(rc.config.RefreshCooldownPeriod).UnixNano()
|
||||
// Only one CAS winner publishes the cooldown end + logs.
|
||||
if atomic.CompareAndSwapInt64(&tracker.cooldownEndNano, 0, end) {
|
||||
if int(cur.attempts) >= maxAttempts {
|
||||
end := now.Add(cooldownPeriod).UnixNano()
|
||||
published := tracker.mutateState(func(s *attemptState) *attemptState {
|
||||
if s.cooldownEndNano != 0 {
|
||||
return nil
|
||||
}
|
||||
next := *s
|
||||
next.cooldownEndNano = end
|
||||
return &next
|
||||
})
|
||||
if published.cooldownEndNano == end {
|
||||
rc.logger.Infof("Session %s entering refresh cooldown after %d attempts",
|
||||
sessionID, atomic.LoadInt32(&tracker.attempts))
|
||||
sessionID, published.attempts)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -508,26 +578,46 @@ func (rc *RefreshCoordinator) isInCooldown(sessionID string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// recordRefreshAttempt records a refresh attempt for rate limiting. Lock-free:
|
||||
// LoadOrStore for the tracker, atomic counters/timestamps for fields.
|
||||
// recordRefreshAttempt records a refresh attempt for rate limiting. Lock-free
|
||||
// snapshot mutation; attempts and lastAttemptNano are advanced atomically.
|
||||
func (rc *RefreshCoordinator) recordRefreshAttempt(sessionID string) {
|
||||
tracker := rc.getOrCreateTracker(sessionID)
|
||||
atomic.AddInt32(&tracker.attempts, 1)
|
||||
atomic.StoreInt64(&tracker.lastAttemptNano, time.Now().UnixNano())
|
||||
nowNano := time.Now().UnixNano()
|
||||
tracker.mutateState(func(s *attemptState) *attemptState {
|
||||
next := *s
|
||||
next.attempts++
|
||||
next.lastAttemptNano = nowNano
|
||||
return &next
|
||||
})
|
||||
}
|
||||
|
||||
// recordRefreshSuccess records a successful refresh. Lock-free.
|
||||
// recordRefreshSuccess records a successful refresh: zero consecutiveFailures.
|
||||
func (rc *RefreshCoordinator) recordRefreshSuccess(sessionID string) {
|
||||
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
||||
atomic.StoreInt32(&trackerFromMapValue(v).consecutiveFailures, 0)
|
||||
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
|
||||
if s.consecutiveFailures == 0 {
|
||||
return nil
|
||||
}
|
||||
next := *s
|
||||
next.consecutiveFailures = 0
|
||||
return &next
|
||||
})
|
||||
}
|
||||
|
||||
// recordRefreshFailure records a failed refresh. Lock-free.
|
||||
// recordRefreshFailure records a failed refresh: increments consecutiveFailures.
|
||||
func (rc *RefreshCoordinator) recordRefreshFailure(sessionID string) {
|
||||
if v, ok := rc.sessionRefreshAttempts.Load(sessionID); ok {
|
||||
atomic.AddInt32(&trackerFromMapValue(v).consecutiveFailures, 1)
|
||||
v, ok := rc.sessionRefreshAttempts.Load(sessionID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
trackerFromMapValue(v).mutateState(func(s *attemptState) *attemptState {
|
||||
next := *s
|
||||
next.consecutiveFailures++
|
||||
return &next
|
||||
})
|
||||
}
|
||||
|
||||
// hashRefreshToken creates a hash of the refresh token for deduplication
|
||||
@@ -587,7 +677,7 @@ func (rc *RefreshCoordinator) cleanupStaleEntries() {
|
||||
if tracker == nil {
|
||||
return true
|
||||
}
|
||||
if atomic.LoadInt64(&tracker.lastAttemptNano) < cutoff {
|
||||
if tracker.stateOf().lastAttemptNano < cutoff {
|
||||
// Compare-and-delete to avoid evicting a tracker that was just
|
||||
// re-used by a concurrent caller. We compare by pointer identity.
|
||||
rc.sessionRefreshAttempts.CompareAndDelete(key, value)
|
||||
@@ -611,18 +701,12 @@ func (rc *RefreshCoordinator) GetMetrics() map[string]interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the coordinator
|
||||
// Shutdown gracefully shuts down the coordinator. Pending delayed-cleanup
|
||||
// timers are NOT canceled explicitly: time.AfterFunc callbacks are tiny
|
||||
// (one map LoadAndDelete) and harmless after Shutdown — sync.Map operations
|
||||
// remain safe on an unused coordinator until GC.
|
||||
func (rc *RefreshCoordinator) Shutdown() {
|
||||
close(rc.stopChan)
|
||||
|
||||
// Cancel all pending cleanup timers
|
||||
rc.cleanupTimerMu.Lock()
|
||||
for _, timer := range rc.cleanupTimers {
|
||||
timer.Stop()
|
||||
}
|
||||
rc.cleanupTimers = make(map[string]*time.Timer)
|
||||
rc.cleanupTimerMu.Unlock()
|
||||
|
||||
rc.wg.Wait()
|
||||
}
|
||||
|
||||
|
||||
+12
-19
@@ -165,9 +165,14 @@ func TestRefreshRateLimiting(t *testing.T) {
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Verify that cooldown was triggered after max attempts
|
||||
// With the new logic, the Nth attempt triggers cooldown, so we get N-1 successful attempts
|
||||
expectedSuccessfulAttempts := config.MaxRefreshAttempts - 1
|
||||
// Verify that cooldown was triggered after max attempts.
|
||||
// With applyLeaderGates checking cooldown BEFORE recording the attempt
|
||||
// (the v1.0.16 reorder fixing the thundering-herd off-by-one), N attempts
|
||||
// run to completion and the (N+1)th is denied. Previously the Nth was
|
||||
// denied as it tried to record, which under burst load let multiple
|
||||
// concurrent leaders increment past the limit before any one of them
|
||||
// observed the gate.
|
||||
expectedSuccessfulAttempts := config.MaxRefreshAttempts
|
||||
if attempts != expectedSuccessfulAttempts {
|
||||
t.Errorf("Expected %d successful attempts before cooldown, got %d", expectedSuccessfulAttempts, attempts)
|
||||
}
|
||||
@@ -721,11 +726,9 @@ func TestNoGoroutineExplosionWithTimers(t *testing.T) {
|
||||
currentGoroutines := runtime.NumGoroutine()
|
||||
t.Logf("Goroutines after %d refresh operations: %d", numRefreshes, currentGoroutines)
|
||||
|
||||
// Check timer count
|
||||
coordinator.cleanupTimerMu.Lock()
|
||||
timerCount := len(coordinator.cleanupTimers)
|
||||
coordinator.cleanupTimerMu.Unlock()
|
||||
t.Logf("Active cleanup timers: %d", timerCount)
|
||||
// (Coordinator no longer tracks pending timers; time.AfterFunc closures
|
||||
// fire performCleanup directly. This test now only checks the goroutine
|
||||
// budget, which was always the real invariant.)
|
||||
|
||||
// With timer-based cleanup, goroutine increase should be minimal
|
||||
// Timers don't create goroutines - they use the runtime timer heap
|
||||
@@ -741,19 +744,9 @@ func TestNoGoroutineExplosionWithTimers(t *testing.T) {
|
||||
initialGoroutines, currentGoroutines, goroutineIncrease)
|
||||
}
|
||||
|
||||
// Wait for timers to fire and cleanup
|
||||
// Wait for timers to fire and cleanup.
|
||||
time.Sleep(config.DeduplicationCleanupDelay + 50*time.Millisecond)
|
||||
|
||||
// Verify timers were cleaned up
|
||||
coordinator.cleanupTimerMu.Lock()
|
||||
remainingTimers := len(coordinator.cleanupTimers)
|
||||
coordinator.cleanupTimerMu.Unlock()
|
||||
|
||||
// Most timers should have fired and been removed
|
||||
if remainingTimers > 10 {
|
||||
t.Errorf("Too many cleanup timers remaining: %d", remainingTimers)
|
||||
}
|
||||
|
||||
// Verify goroutines returned to near initial
|
||||
runtime.GC()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
// Package traefikoidc provides OIDC authentication middleware for Traefik.
|
||||
// requestState bundles read-mostly fields for a single ServeHTTP call.
|
||||
package traefikoidc
|
||||
|
||||
import "net/http"
|
||||
|
||||
// requestState is a per-request context object allocated at the top of
|
||||
// ServeHTTP and threaded through to downstream handlers. It caches values
|
||||
// that would otherwise require a Yaegi-dispatched lock acquisition each time
|
||||
// they're read:
|
||||
//
|
||||
// - The metadata snapshot (atomic.Value.Load once, not per-handler).
|
||||
// - SessionData getter results (one RLock on sd.sessionMutex covers all
|
||||
// fields, instead of 5-7 separate RLock/RUnlock pairs scattered through
|
||||
// the handler chain).
|
||||
//
|
||||
// The struct is alloc'd at request entry, populated under at most one RLock
|
||||
// of sd.sessionMutex, and discarded at request exit. It is NOT shared across
|
||||
// requests and never written from another goroutine, so no synchronization
|
||||
// on its fields is required.
|
||||
//
|
||||
// Cross-request global caches (tokenCache, JWKCache, sessionEntries,
|
||||
// sessionInvalidationCache) remain — they're orthogonal. requestState's job
|
||||
// is to eliminate redundant per-handler reads of values that don't change
|
||||
// within a single request.
|
||||
type requestState struct {
|
||||
// Globals snapshotted once.
|
||||
metadata *MetadataSnapshot
|
||||
|
||||
// SessionData fields snapshotted under one RLock. The pointer to the
|
||||
// SessionData is retained so handlers that genuinely need to mutate
|
||||
// (Save, Clear, etc.) still have access.
|
||||
session *SessionData
|
||||
|
||||
authenticated bool
|
||||
accessToken string
|
||||
idToken string
|
||||
refreshToken string
|
||||
userIdentifier string
|
||||
createdAtUnixSec int64
|
||||
|
||||
// Output: scheme/host/redirect path determined at top of ServeHTTP.
|
||||
scheme string
|
||||
host string
|
||||
redirectURL string
|
||||
|
||||
// Carry the next handler so forwardAuthorized doesn't need to close over t.
|
||||
next http.Handler
|
||||
}
|
||||
|
||||
// captureSession populates requestState's SessionData-derived fields under a
|
||||
// single RLock of sd.sessionMutex. Returns the populated rs for chaining.
|
||||
//
|
||||
// Replaces a sequence of SessionData.GetX() calls each of which acquires
|
||||
// sd.sessionMutex.RLock(). Under Yaegi each RLock costs ~1-5ms of
|
||||
// interpreter dispatch; batching saves the rest.
|
||||
func (rs *requestState) captureSession(sd *SessionData) *requestState {
|
||||
if sd == nil {
|
||||
return rs
|
||||
}
|
||||
rs.session = sd
|
||||
sd.sessionMutex.RLock()
|
||||
rs.authenticated = sd.getAuthenticatedUnsafe()
|
||||
rs.accessToken = sd.getAccessTokenUnsafe()
|
||||
rs.idToken = sd.getIDTokenUnsafe()
|
||||
rs.refreshToken = sd.getRefreshTokenUnsafe()
|
||||
rs.userIdentifier = sd.getUserIdentifierUnsafe()
|
||||
rs.createdAtUnixSec = sd.getCreatedAtUnsafe()
|
||||
sd.sessionMutex.RUnlock()
|
||||
return rs
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
@@ -64,7 +65,33 @@ type ProviderMetadata struct {
|
||||
// It integrates with various OIDC providers, manages sessions, caches tokens, and handles
|
||||
// the complete authentication flow. It's designed to work seamlessly with Traefik's
|
||||
// plugin system and provides flexible configuration options.
|
||||
// MetadataSnapshot is an immutable bundle of provider-metadata URLs that the
|
||||
// plugin needs on the hot request path. Published atomically via
|
||||
// TraefikOidc.metadataSnapshot; readers do exactly one atomic.Value.Load to
|
||||
// access all fields. Replaces 3 per-request metadataMu.RLock acquisitions
|
||||
// in middleware.ServeHTTP + token_manager paths, each of which paid
|
||||
// 1-5ms of Yaegi-dispatch overhead.
|
||||
//
|
||||
// The fields are a strict subset of the metadataMu-guarded TraefikOidc
|
||||
// fields; the legacy fields are still written under metadataMu for
|
||||
// less-frequent code paths that have not been migrated.
|
||||
type MetadataSnapshot struct {
|
||||
IssuerURL string
|
||||
JWKSURL string
|
||||
TokenURL string
|
||||
AuthURL string
|
||||
RevocationURL string
|
||||
EndSessionURL string
|
||||
IntrospectionURL string
|
||||
RegistrationURL string
|
||||
}
|
||||
|
||||
type TraefikOidc struct {
|
||||
// metadataSnapshot atomically publishes the read-mostly URL bundle.
|
||||
// Hot-path readers (middleware.ServeHTTP, token verification) load it
|
||||
// directly; less-frequent paths still acquire metadataMu.RLock and
|
||||
// read the individual fields below.
|
||||
metadataSnapshot atomic.Value
|
||||
// lastMetadataRetryNano is the UnixNano timestamp of the last metadata
|
||||
// recovery attempt. Stored atomically so the hot ServeHTTP path can
|
||||
// throttle retries without acquiring metadataRetryMutex on every request.
|
||||
|
||||
@@ -14,6 +14,19 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// metadataSnap returns the most recently published *MetadataSnapshot, or nil
|
||||
// if metadata has not yet been resolved. Single atomic.Value.Load — the hot
|
||||
// ServeHTTP path uses this instead of acquiring metadataMu.RLock, which under
|
||||
// Yaegi pays 1-5ms of interpreter-dispatch overhead per acquisition.
|
||||
func (t *TraefikOidc) metadataSnap() *MetadataSnapshot {
|
||||
v := t.metadataSnapshot.Load()
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
s, _ := v.(*MetadataSnapshot)
|
||||
return s
|
||||
}
|
||||
|
||||
// safeLogDebug provides nil-safe logging for debug messages
|
||||
func (t *TraefikOidc) safeLogDebug(msg string) {
|
||||
if t.logger != nil {
|
||||
|
||||
Reference in New Issue
Block a user