Files
traefikoidc/singleton_resources_test.go
T
lukaszraczylo 5fcbd54955 Add sharded cache and prevention of CPU spikes / locks (#96)
* Add sharded cache and prevention of CPU spikes / locks

* Add dynamic client registration with oidc provider

* Fix race condition introduced during the sharded cache implementation.

* Add page for traefikoidc.
2025-11-30 01:41:12 +00:00

801 lines
20 KiB
Go

package traefikoidc
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
// TestSingletonResourceManager tests the singleton resource manager implementation
func TestSingletonResourceManager(t *testing.T) {
t.Run("SingletonInstance", func(t *testing.T) {
// Test that GetResourceManager returns the same instance
rm1 := GetResourceManager()
rm2 := GetResourceManager()
if rm1 != rm2 {
t.Error("GetResourceManager did not return singleton instance")
}
})
t.Run("ThreadSafeInitialization", func(t *testing.T) {
// Reset singleton for test
resetResourceManagerForTesting()
const numGoroutines = 100
instances := make([]*ResourceManager, numGoroutines)
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
instances[idx] = GetResourceManager()
}(i)
}
wg.Wait()
// Verify all instances are the same
first := instances[0]
for i := 1; i < numGoroutines; i++ {
if instances[i] != first {
t.Errorf("Instance %d differs from first instance", i)
}
}
})
t.Run("SharedHTTPClient", func(t *testing.T) {
rm := GetResourceManager()
client1 := rm.GetHTTPClient("test-client-1")
client2 := rm.GetHTTPClient("test-client-1")
if client1 != client2 {
t.Error("GetHTTPClient did not return same client for same key")
}
client3 := rm.GetHTTPClient("test-client-2")
if client1 == client3 {
t.Error("GetHTTPClient returned same client for different keys")
}
})
t.Run("SharedCache", func(t *testing.T) {
rm := GetResourceManager()
cache1 := rm.GetCache("test-cache-1")
cache2 := rm.GetCache("test-cache-1")
if cache1 != cache2 {
t.Error("GetCache did not return same cache for same key")
}
})
t.Run("SingletonTaskRegistry", func(t *testing.T) {
rm := GetResourceManager()
err := rm.RegisterBackgroundTask("test-task", 1*time.Second, func() {
// Test task
})
if err != nil {
t.Errorf("Failed to register task: %v", err)
}
// Try to register same task again - should return existing
err = rm.RegisterBackgroundTask("test-task", 1*time.Second, func() {
// Duplicate task
})
if err != nil {
t.Errorf("Failed to handle duplicate task registration: %v", err)
}
})
t.Run("ReferenceCountingCleanup", func(t *testing.T) {
rm := GetResourceManager()
// Add reference
rm.AddReference("test-instance-1")
// Get reference count
if rm.GetReferenceCount("test-instance-1") != 1 {
t.Error("Reference count should be 1")
}
// Add another reference
rm.AddReference("test-instance-1")
if rm.GetReferenceCount("test-instance-1") != 2 {
t.Error("Reference count should be 2")
}
// Remove reference
rm.RemoveReference("test-instance-1")
if rm.GetReferenceCount("test-instance-1") != 1 {
t.Error("Reference count should be 1 after removal")
}
// Remove last reference
rm.RemoveReference("test-instance-1")
if rm.GetReferenceCount("test-instance-1") != 0 {
t.Error("Reference count should be 0 after removing all references")
}
})
t.Run("GracefulShutdown", func(t *testing.T) {
rm := GetResourceManager()
// Register a task with atomic variable to avoid race condition
var taskExecuted int32
err := rm.RegisterBackgroundTask("shutdown-test-task", 100*time.Millisecond, func() {
atomic.StoreInt32(&taskExecuted, 1)
})
if err != nil {
t.Errorf("Failed to register task: %v", err)
}
// Start the task
rm.StartBackgroundTask("shutdown-test-task")
// Wait for task to execute at least once
time.Sleep(150 * time.Millisecond)
if atomic.LoadInt32(&taskExecuted) == 0 {
t.Error("Task was not executed")
}
// Shutdown
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = rm.Shutdown(ctx)
if err != nil {
t.Errorf("Shutdown failed: %v", err)
}
// Verify task is stopped
if rm.IsTaskRunning("shutdown-test-task") {
t.Error("Task should be stopped after shutdown")
}
})
}
// TestContextAwareGoroutineManagement tests context-aware goroutine management
func TestContextAwareGoroutineManagement(t *testing.T) {
t.Run("GoroutineCleanupOnContextCancel", func(t *testing.T) {
// Reset singletons to ensure clean state
resetResourceManagerForTesting()
ResetUniversalCacheManagerForTesting()
defer ResetUniversalCacheManagerForTesting()
initialGoroutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
// Create a TraefikOidc instance with context
config := &Config{
ProviderURL: "https://example.com",
ClientID: "test-client",
ClientSecret: "test-secret",
}
plugin, err := NewWithContext(ctx, config, nil, "test")
if err != nil {
t.Fatalf("Failed to create plugin: %v", err)
}
// Wait for goroutines to start
time.Sleep(100 * time.Millisecond)
midGoroutines := runtime.NumGoroutine()
if midGoroutines <= initialGoroutines {
t.Error("No goroutines were created")
}
// Cancel context
cancel()
// Close the plugin to trigger cleanup
plugin.Close()
// Wait for cleanup
time.Sleep(500 * time.Millisecond)
finalGoroutines := runtime.NumGoroutine()
// Allow for some singleton background goroutines (caches, pools, etc.)
// These are shared across all instances and persist for the test duration
tolerance := 10
if finalGoroutines > initialGoroutines+tolerance {
t.Errorf("Goroutine leak detected: initial=%d, final=%d", initialGoroutines, finalGoroutines)
}
})
t.Run("NoGoroutineLeakOnMultipleInstances", func(t *testing.T) {
// Reset singletons to ensure clean state
resetResourceManagerForTesting()
ResetUniversalCacheManagerForTesting()
defer ResetUniversalCacheManagerForTesting()
initialGoroutines := runtime.NumGoroutine()
configs := []Config{
{ProviderURL: "https://example1.com", ClientID: "client1", ClientSecret: "secret1"},
{ProviderURL: "https://example2.com", ClientID: "client2", ClientSecret: "secret2"},
{ProviderURL: "https://example3.com", ClientID: "client3", ClientSecret: "secret3"},
}
var plugins []*TraefikOidc
var cancels []context.CancelFunc
// Create multiple instances
for i, config := range configs {
ctx, cancel := context.WithCancel(context.Background())
cancels = append(cancels, cancel)
plugin, err := NewWithContext(ctx, &config, nil, fmt.Sprintf("test-%d", i))
if err != nil {
t.Fatalf("Failed to create plugin %d: %v", i, err)
}
plugins = append(plugins, plugin)
}
// Wait for all goroutines to start
time.Sleep(200 * time.Millisecond)
midGoroutines := runtime.NumGoroutine()
// Cancel all contexts
for _, cancel := range cancels {
cancel()
}
// Close all plugins
for _, plugin := range plugins {
plugin.Close()
}
// Wait for cleanup
time.Sleep(500 * time.Millisecond)
finalGoroutines := runtime.NumGoroutine()
// Check for leaks
tolerance := 5
if finalGoroutines > initialGoroutines+tolerance {
t.Errorf("Goroutine leak with multiple instances: initial=%d, mid=%d, final=%d",
initialGoroutines, midGoroutines, finalGoroutines)
}
})
t.Run("SingletonTasksAcrossInstances", func(t *testing.T) {
// Reset singletons to ensure clean state
ResetGlobalTaskRegistry() // Reset circuit breaker and task registry
resetResourceManagerForTesting()
ResetUniversalCacheManagerForTesting()
defer ResetUniversalCacheManagerForTesting()
rm := GetResourceManager()
// Register singleton cleanup task
var cleanupCount int32
err := rm.RegisterBackgroundTask("singleton-cleanup", 100*time.Millisecond, func() {
atomic.AddInt32(&cleanupCount, 1)
})
if err != nil {
t.Fatalf("Failed to register singleton task: %v", err)
}
// Start the task
rm.StartBackgroundTask("singleton-cleanup")
// Create multiple plugin instances
var plugins []*TraefikOidc
for i := 0; i < 3; i++ {
ctx := context.Background()
config := &Config{
ProviderURL: fmt.Sprintf("https://example%d.com", i),
ClientID: fmt.Sprintf("client%d", i),
ClientSecret: fmt.Sprintf("secret%d", i),
}
plugin, err := NewWithContext(ctx, config, nil, fmt.Sprintf("test-%d", i))
if err != nil {
t.Fatalf("Failed to create plugin %d: %v", i, err)
}
plugins = append(plugins, plugin)
}
// Wait for cleanup to run at least 2 times with adaptive timeout
// This handles race detector overhead which can slow goroutine scheduling significantly
// When running as part of full test suite, CPU contention is even higher, so use generous timeout
const minExpectedCount = 2
const maxExpectedCount = 5
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
var count int32
waitLoop:
for {
select {
case <-ticker.C:
count = atomic.LoadInt32(&cleanupCount)
if count >= minExpectedCount {
// Success: reached minimum threshold
break waitLoop
}
case <-timeout:
count = atomic.LoadInt32(&cleanupCount)
t.Errorf("Timeout waiting for cleanup count to reach %d, got %d (race detector may be slowing execution)", minExpectedCount, count)
break waitLoop
}
}
// Verify count is within expected range (should be singleton, not running excessively)
if count > maxExpectedCount {
t.Errorf("Cleanup count too high: %d (expected max %d for singleton)", count, maxExpectedCount)
}
// Cleanup
for _, plugin := range plugins {
plugin.Close()
}
rm.StopBackgroundTask("singleton-cleanup")
})
}
// TestResourcePooling tests resource pooling implementation
func TestResourcePooling(t *testing.T) {
t.Run("GoroutinePoolLimiting", func(t *testing.T) {
rm := GetResourceManager()
// Configure pool with max workers
pool := rm.GetGoroutinePool("test-pool", 5) // Max 5 workers
if pool == nil {
t.Fatal("Failed to get goroutine pool")
}
// Submit more tasks than pool size
var taskCount int32
var runningCount int32
maxRunning := int32(0)
for i := 0; i < 20; i++ {
err := pool.Submit(func() {
atomic.AddInt32(&taskCount, 1)
current := atomic.AddInt32(&runningCount, 1)
// Track max concurrent tasks
for {
oldMax := atomic.LoadInt32(&maxRunning)
if current <= oldMax || atomic.CompareAndSwapInt32(&maxRunning, oldMax, current) {
break
}
}
time.Sleep(50 * time.Millisecond)
atomic.AddInt32(&runningCount, -1)
})
if err != nil {
t.Errorf("Failed to submit task %d: %v", i, err)
}
}
// Wait for all tasks to complete
pool.Wait()
// Verify all tasks executed
if atomic.LoadInt32(&taskCount) != 20 {
t.Errorf("Expected 20 tasks to execute, got %d", taskCount)
}
// Verify concurrency was limited
if atomic.LoadInt32(&maxRunning) > 5 {
t.Errorf("Max concurrent tasks exceeded pool size: %d > 5", maxRunning)
}
})
t.Run("PoolShutdown", func(t *testing.T) {
rm := GetResourceManager()
pool := rm.GetGoroutinePool("shutdown-pool", 3)
// Submit tasks
var completed int32
for i := 0; i < 10; i++ {
pool.Submit(func() {
time.Sleep(10 * time.Millisecond)
atomic.AddInt32(&completed, 1)
})
}
// Shutdown pool
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err := pool.Shutdown(ctx)
if err != nil {
t.Errorf("Pool shutdown failed: %v", err)
}
// Try to submit after shutdown - should fail
err = pool.Submit(func() {
t.Error("Task should not execute after shutdown")
})
if err == nil {
t.Error("Expected error when submitting to shutdown pool")
}
})
t.Run("ResourceReuse", func(t *testing.T) {
rm := GetResourceManager()
// Get same pool multiple times
pool1 := rm.GetGoroutinePool("reuse-pool", 3)
pool2 := rm.GetGoroutinePool("reuse-pool", 3)
if pool1 != pool2 {
t.Error("Expected same pool instance for same key")
}
// Get HTTP client multiple times
client1 := rm.GetHTTPClient("reuse-client")
client2 := rm.GetHTTPClient("reuse-client")
if client1 != client2 {
t.Error("Expected same HTTP client instance for same key")
}
})
}
// TestBackwardCompatibility verifies the changes maintain backward compatibility
func TestBackwardCompatibility(t *testing.T) {
t.Run("LegacyNewFunction", func(t *testing.T) {
// Test that the original New function still works
config := &Config{
ProviderURL: "https://example.com",
ClientID: "test-client",
ClientSecret: "test-secret",
}
handler, err := New(context.Background(), nil, config, "test")
if err != nil {
t.Fatalf("Legacy New function failed: %v", err)
}
if handler == nil {
t.Fatal("Handler should not be nil")
}
// Cleanup - cast to TraefikOidc if needed
if plugin, ok := handler.(*TraefikOidc); ok {
plugin.Close()
}
})
t.Run("ExistingAPICompatibility", func(t *testing.T) {
config := &Config{
ProviderURL: "https://example.com",
ClientID: "test-client",
ClientSecret: "test-secret",
}
handler, _ := New(context.Background(), nil, config, "test")
// Test that the handler works
if handler == nil {
t.Error("Handler should not be nil")
}
// Cleanup - cast to TraefikOidc if needed
if plugin, ok := handler.(*TraefikOidc); ok {
plugin.Close()
}
})
}
// TestGoroutinePoolConditionVariable tests the condition variable-based Wait implementation
func TestGoroutinePoolConditionVariable(t *testing.T) {
t.Run("WaitDoesNotBusyPoll", func(t *testing.T) {
// This test verifies that Wait() uses condition variable instead of busy-polling
pool := NewGoroutinePool(2, nil)
defer pool.Shutdown(context.Background())
// Submit a slow task
var taskStarted, taskFinished int32
pool.Submit(func() {
atomic.StoreInt32(&taskStarted, 1)
time.Sleep(100 * time.Millisecond)
atomic.StoreInt32(&taskFinished, 1)
})
// Give task time to start
time.Sleep(10 * time.Millisecond)
// Measure CPU-time before Wait
startCPU := time.Now()
// Wait should block efficiently without consuming CPU
pool.Wait()
elapsed := time.Since(startCPU)
// Verify task completed
if atomic.LoadInt32(&taskFinished) != 1 {
t.Error("Task should have finished")
}
// Wait should have taken ~90ms (task was already running for ~10ms)
// If it was busy-polling, we would see much higher CPU usage
// This is a sanity check - the real proof is in profiling
if elapsed < 50*time.Millisecond {
t.Errorf("Wait returned too quickly: %v", elapsed)
}
})
t.Run("WaitReturnsImmediatelyWhenEmpty", func(t *testing.T) {
pool := NewGoroutinePool(2, nil)
defer pool.Shutdown(context.Background())
// Wait on empty pool should return immediately
start := time.Now()
pool.Wait()
elapsed := time.Since(start)
// Should return almost immediately
if elapsed > 10*time.Millisecond {
t.Errorf("Wait on empty pool took too long: %v", elapsed)
}
})
t.Run("ConcurrentSubmitAndWait", func(t *testing.T) {
pool := NewGoroutinePool(4, nil)
defer pool.Shutdown(context.Background())
var completed int32
const numTasks = 100
// Submit tasks concurrently
var wg sync.WaitGroup
for i := 0; i < numTasks; i++ {
wg.Add(1)
go func() {
defer wg.Done()
pool.Submit(func() {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&completed, 1)
})
}()
}
wg.Wait() // Wait for all submissions
// Wait for all tasks to complete
pool.Wait()
if atomic.LoadInt32(&completed) != numTasks {
t.Errorf("Expected %d tasks completed, got %d", numTasks, completed)
}
})
t.Run("WaitWithTimeoutSuccess", func(t *testing.T) {
pool := NewGoroutinePool(2, nil)
defer pool.Shutdown(context.Background())
pool.Submit(func() {
time.Sleep(50 * time.Millisecond)
})
// Should complete within timeout
success := pool.WaitWithTimeout(1 * time.Second)
if !success {
t.Error("WaitWithTimeout should have succeeded")
}
})
t.Run("WaitWithTimeoutExpired", func(t *testing.T) {
pool := NewGoroutinePool(1, nil)
defer pool.Shutdown(context.Background())
pool.Submit(func() {
time.Sleep(500 * time.Millisecond)
})
// Should timeout
success := pool.WaitWithTimeout(50 * time.Millisecond)
if success {
t.Error("WaitWithTimeout should have timed out")
}
// Wait for actual completion to avoid goroutine leak in test
pool.Wait()
})
t.Run("PendingTasksCounter", func(t *testing.T) {
// Use pool with larger buffer (maxWorkers=2, buffer=4)
pool := NewGoroutinePool(2, nil)
defer pool.Shutdown(context.Background())
// Initially no pending tasks
if pool.PendingTasks() != 0 {
t.Errorf("Expected 0 pending tasks, got %d", pool.PendingTasks())
}
// Block both workers with signals that tasks have started
blocker1 := make(chan struct{})
blocker2 := make(chan struct{})
started1 := make(chan struct{})
started2 := make(chan struct{})
pool.Submit(func() {
close(started1)
<-blocker1
})
pool.Submit(func() {
close(started2)
<-blocker2
})
// Wait for both blocking tasks to actually start
<-started1
<-started2
// Submit 2 more tasks that will queue up (buffer can hold 4)
for i := 0; i < 2; i++ {
pool.Submit(func() {
time.Sleep(1 * time.Millisecond)
})
}
// Should have pending tasks (2 running + 2 queued = 4)
pending := pool.PendingTasks()
if pending != 4 {
t.Errorf("Expected 4 pending tasks, got %d", pending)
}
// Release blockers
close(blocker1)
close(blocker2)
// Wait for completion
pool.Wait()
// Should have no pending tasks
if pool.PendingTasks() != 0 {
t.Errorf("Expected 0 pending tasks after Wait, got %d", pool.PendingTasks())
}
})
t.Run("MultipleWaiters", func(t *testing.T) {
pool := NewGoroutinePool(2, nil)
defer pool.Shutdown(context.Background())
// Submit a slow task
pool.Submit(func() {
time.Sleep(100 * time.Millisecond)
})
// Multiple goroutines waiting
var waiters sync.WaitGroup
var waitCount int32
for i := 0; i < 5; i++ {
waiters.Add(1)
go func() {
defer waiters.Done()
pool.Wait()
atomic.AddInt32(&waitCount, 1)
}()
}
// All waiters should complete
waiters.Wait()
if atomic.LoadInt32(&waitCount) != 5 {
t.Errorf("Expected all 5 waiters to complete, got %d", waitCount)
}
})
t.Run("SubmitFailureDoesNotIncrementPending", func(t *testing.T) {
pool := NewGoroutinePool(1, nil)
// Shutdown the pool
pool.Shutdown(context.Background())
// Submit should fail
err := pool.Submit(func() {})
if err == nil {
t.Error("Submit should fail on shutdown pool")
}
// Pending tasks should still be 0
if pool.PendingTasks() != 0 {
t.Errorf("Pending tasks should be 0 after failed submit, got %d", pool.PendingTasks())
}
})
t.Run("PanicRecoveryDecrementsPending", func(t *testing.T) {
pool := NewGoroutinePool(2, nil)
defer pool.Shutdown(context.Background())
// Submit a task that panics
pool.Submit(func() {
panic("test panic")
})
// Submit a normal task
var normalCompleted int32
pool.Submit(func() {
atomic.StoreInt32(&normalCompleted, 1)
})
// Wait should still work (panic is recovered)
pool.Wait()
// Normal task should have completed
if atomic.LoadInt32(&normalCompleted) != 1 {
t.Error("Normal task should have completed despite panic in other task")
}
// Pending should be 0
if pool.PendingTasks() != 0 {
t.Errorf("Pending tasks should be 0 after Wait, got %d", pool.PendingTasks())
}
})
}
// BenchmarkGoroutinePoolWait benchmarks the Wait implementation
func BenchmarkGoroutinePoolWait(b *testing.B) {
pool := NewGoroutinePool(4, nil)
defer pool.Shutdown(context.Background())
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Submit a quick task
pool.Submit(func() {})
pool.Wait()
}
}
// BenchmarkGoroutinePoolHighThroughput benchmarks high throughput scenario
func BenchmarkGoroutinePoolHighThroughput(b *testing.B) {
pool := NewGoroutinePool(8, nil)
defer pool.Shutdown(context.Background())
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 100; j++ {
pool.Submit(func() {
// Minimal work
_ = 1 + 1
})
}
pool.Wait()
}
}
// Helper function to reset singleton for testing
func resetResourceManagerForTesting() {
resourceManagerMutex.Lock()
defer resourceManagerMutex.Unlock()
if globalResourceManager != nil {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
globalResourceManager.Shutdown(ctx)
}
resourceManagerOnce = sync.Once{}
globalResourceManager = nil
}