improvements mid apr 2025 (#27)

* General improvements and bug fixes.

* Improve tests coverage.

* fixup! Improve tests coverage.

* Update README.md with latest changes.

* Fix the uint32

* Resolve issue with race condition for logging.

* fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* Fix the test of the rate limiter

* Add default ratelimit.json file

* Update dependencies.

* Significant refactor.

* fixup! Significant refactor.

* fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Merge remote-tracking branch 'origin/main' into improvements-mid-apr-2025

* Enhance admin dashboard with real-time WebSocket streaming and charts

Dashboard improvements:
- Added Chart.js for time-series visualization
- Created real-time graphs for RPS and cache hit rate
- Added new statistics displays:
  * System uptime with human-readable format
  * Total requests with success/failure breakdown
  * Current and average RPS
  * Success rate with progress bars
  * Cache hit rate and memory usage with visual indicators
  * Detailed cache statistics
- Implemented WebSocket streaming endpoint (/admin/ws/stats)
- Real-time updates every 2 seconds via WebSocket
- Automatic fallback to polling if WebSocket unavailable
- Connection status indicator
- Progress bars for success rate, cache hit rate, and memory usage

Backend enhancements:
- New WebSocket handler for streaming all statistics
- gatherAllStats() method to collect comprehensive metrics
- Streams data every 2 seconds to connected clients
- Automatic reconnection handling
- Maintains up to 60 data points per chart

The dashboard now provides comprehensive real-time monitoring with:
- Live metrics streaming
- Historical trend visualization
- Responsive design with visual indicators
- Graceful degradation to polling mode

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix WebSocket message handling for real-time stats streaming

Issues fixed:
- Removed blocking default case that prevented ticker from firing
- Separated read and write operations into proper goroutines
- Added proper ping/pong handlers with read deadlines
- Implemented done channel for clean disconnection signaling
- Send initial stats immediately on connection

The WebSocket now properly:
- Streams stats every 2 seconds via ticker
- Handles client disconnections gracefully
- Maintains connection with ping/pong
- Detects connection drops via read goroutine
- Non-blocking message handling

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add Redis-based cluster mode for distributed metrics aggregation

When using Redis for caching, proxies now automatically form a cluster
and aggregate metrics across all instances for unified monitoring.

Features:
- Metrics Aggregator: Publishes instance metrics to Redis every 5s
- Cluster Mode API: /admin/api/cluster/stats and /admin/api/cluster/instances
- Dashboard Cluster View: Toggle between single instance and cluster view
- Auto-discovery: Detects cluster mode automatically via Redis
- Instance Management: Each instance gets unique ID (hostname + UUID)
- Graceful Cleanup: Removes metrics from Redis on shutdown
- TTL-based expiration: Stale instances auto-expire after 30s

Cluster metrics include:
- Aggregated requests (total, succeeded, failed, success rate)
- Combined RPS across all instances
- Total cache hits/misses with cluster-wide hit rate
- Per-instance health status and uptime
- Active connections and WebSocket stats
- Request coalescing backend savings

Dashboard improvements:
- Cluster Status section showing total/healthy instances
- Instance Details section with per-node metrics
- Cluster View toggle in header
- Automatic detection of cluster availability
- Individual instance cards with health indicators
- "Current" badge for the instance you're viewing

Architecture:
- Uses Redis SET to track active instances
- Each instance publishes to redis key: graphql-proxy:metrics:instances:{id}
- 30s TTL ensures stale instances are removed
- Aggregator started automatically when Redis cache enabled
- Registered with shutdown manager for graceful cleanup

Environment: Automatically enabled when ENABLE_REDIS_CACHE=true

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster instance details using correct JSON field names

JavaScript was using Go struct field names (PascalCase) instead of
JSON field names (snake_case), causing all instance metrics to show
as 0 or undefined.

Fixed references:
- instance.InstanceID → instance.instance_id
- instance.Hostname → instance.hostname
- instance.UptimeSeconds → instance.uptime_seconds
- instance.Stats → instance.stats
- instance.Health → instance.health

Also added fallback to check both instance.cache_summary and
stats.cache_summary for better compatibility.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Enhance Cluster View toggle visibility and styling

Improvements:
- Replaced basic checkbox with custom toggle switch
- Added prominent card-style container with backdrop blur
- Positioned toggle in header next to dashboard title
- Toggle switch with smooth animation (slides left/right)
- Green color when enabled (#10b981)
- Hover effects with slight lift
- Better typography with font weights and spacing
- Info text positioned below toggle label
- Disabled state with reduced opacity
- Responsive layout with flexbox

The toggle is now much more visible and professional-looking,
making it clear when cluster mode is available and active.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add comprehensive debugging for cluster mode metrics issues

Backend improvements:
- Fixed metrics structure: ensure Stats always has correct inner structure
- Added defensive nil checks for instance.Stats in aggregation
- Added debug logging in publishMetrics to verify data being sent
- Added warning logging in aggregateStats when data is missing
- Log actual keys present in Stats when 'requests' is missing
- Initialize empty maps instead of leaving fields nil

Frontend improvements:
- Added console.log statements to trace cluster data flow
- Log cluster data structure on receive
- Log stats keys and structure
- Log instances array and count
- Warn when expected data is missing
- Added fallback values (|| 0) for display fields

This will help diagnose why cluster view shows zeros by logging:
1. What data is being published to Redis
2. What data is being retrieved from Redis
3. What structure the data has at each step
4. What keys are present vs expected

Check browser console and server logs to see where the data flow breaks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add cluster debug endpoint for troubleshooting

New endpoint: GET /admin/api/cluster/debug

Returns comprehensive debug information:
- Whether metrics aggregator is initialized
- Redis cache enabled status
- Current instance ID
- Cluster mode status
- Total/healthy instance counts
- Sample instance structure with keys
- Sample requests data structure
- Error messages if any

This helps diagnose cluster mode issues by showing:
1. If Redis is actually enabled
2. If aggregator is initialized
3. What data structure is being stored
4. What keys are present in Stats
5. Sample of actual data being aggregated

Visit http://localhost:8181/admin/api/cluster/debug to see
what's happening under the hood.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster mode initialization and improve Redis error visibility

Critical fixes:
1. Move metrics aggregator initialization BEFORE cache initialization
   - Runs independently even if CacheEnable=false
   - Only requires CacheRedisEnable=true
   - This was causing aggregator to not initialize when cache was disabled

2. Promote Redis errors from Warning to Error level
   - Changed "Failed to publish" from Warning to Error with  CRITICAL prefix
   - Added detailed error context (instance_id, keys, error message)
   - Added success logging with ✓ confirmation
   - Log command count and data size on success

3. Enhanced startup logging
   - Log "Initializing metrics aggregator" with Redis URL/DB
   - Log "✓ Successfully initialized" with instance ID
   - Log "FAILED to initialize" as ERROR (was Warning)

Why this matters:
- If Redis cache is disabled but Redis is available, cluster mode should still work
- Previous code only initialized aggregator inside cache initialization block
- Redis publish errors were being silently logged as warnings
- No visibility into whether metrics were actually being stored

After this fix:
- Cluster mode works even with ENABLE_GLOBAL_CACHE=false + ENABLE_REDIS_CACHE=true
- Redis errors are immediately visible in logs
- Clear success/failure indicators
- Better troubleshooting information

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Add force-publish endpoint for cluster mode debugging

New endpoint: POST /admin/api/cluster/force-publish

Forces an immediate metrics publish to Redis and reports results.
This helps diagnose why instances aren't appearing:

Response includes:
- success: true/false
- publish_done: confirmation publish was attempted
- instances_found: count after publish
- error: if retrieval failed
- check_logs: reminder to look for log messages

Use this to test:
curl -X POST http://localhost:8181/admin/api/cluster/force-publish

Then check server logs for:
✓ Successfully published metrics to Redis
OR
 CRITICAL: Failed to publish metrics to Redis

This bypasses the 5-second timer and publishes immediately,
making it easier to test without waiting.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster metrics aggregation and dashboard display issues

- Fix metric names: use correct Prometheus metric names (requests_succesful, requests_failed, requests_skipped)
- Add automatic stale instance cleanup (>1 minute inactive)
- Implement 10-second moving average smoothing for RPS, success rate, and cache hit rate
- Add trend indicators (↑ ↗ → ↘ ↓) to show metric direction
- Add compact number formatting (1.2M, 3.4K) with full-value tooltips
- Add retry budget aggregation (allowed/denied retries, denial rate)
- Add circuit breaker aggregation (state counts, per-instance breakdown)
- Add coalescing stats aggregation (backend savings percentage)
- Fix memory display to show "N/A" for Redis cache (memory tracking not available)
- Fix JavaScript error: change hitRate to smoothedHitRate in chart update call
- Change Redis operations to use context.Background() instead of parent context
- Fix staticcheck warning: omit nil check for map len()

This resolves cluster view showing zeros and prevents metrics from disappearing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix cluster instance count not updating in upper right corner

The cluster-info element (showing instance count next to "Cluster View" toggle)
was only updated during initial page load. Now it updates in real-time whenever
cluster stats are received via WebSocket.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Clean up verbose debug logging

Removed debug console.log statements from dashboard JavaScript:
- WebSocket connection/disconnection logs
- Cluster mode availability checks
- Cluster stats update debug logs

Removed verbose Info logs from Go code that ran frequently:
- "Publishing metrics to Redis" (every 5s)
- "Metrics gathered successfully" (every 5s)
- "Successfully published metrics to Redis" (every 5s)
- "Aggregating stats from instances" (frequently)
- "Successfully aggregated cluster metrics" (frequently)
- "Aggregation complete" (frequently)

Kept important logs:
- Error and warning logs
- Initialization and shutdown logs
- Conditional logs (stale instance cleanup, failures)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-01 00:25:32 +01:00
committed by GitHub
parent 0fc776228f
commit e3e9f7d181
5 changed files with 2219 additions and 49 deletions
+895 -34
View File
@@ -4,6 +4,7 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>GraphQL Proxy Admin Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
<style>
* {
margin: 0;
@@ -211,28 +212,318 @@
.loading {
animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite;
}
.chart-container {
position: relative;
height: 300px;
margin-top: 20px;
}
.progress-bar {
width: 100%;
height: 8px;
background: #f0f0f0;
border-radius: 4px;
overflow: hidden;
margin-top: 8px;
}
.progress-fill {
height: 100%;
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
transition: width 0.3s ease;
}
.ws-status {
display: inline-block;
padding: 4px 8px;
border-radius: 4px;
font-size: 0.75em;
font-weight: 600;
margin-left: 10px;
}
.ws-connected {
background: #d1fae5;
color: #065f46;
}
.ws-disconnected {
background: #fee2e2;
color: #991b1b;
}
.cluster-toggle-container {
background: rgba(255, 255, 255, 0.15);
padding: 12px 20px;
border-radius: 8px;
display: inline-flex;
align-items: center;
gap: 12px;
backdrop-filter: blur(10px);
border: 1px solid rgba(255, 255, 255, 0.2);
cursor: pointer;
transition: all 0.3s ease;
}
.cluster-toggle-container:hover {
background: rgba(255, 255, 255, 0.25);
transform: translateY(-1px);
}
.toggle-switch {
position: relative;
width: 48px;
height: 24px;
background: rgba(255, 255, 255, 0.3);
border-radius: 12px;
transition: background 0.3s ease;
cursor: pointer;
}
.toggle-switch::after {
content: '';
position: absolute;
top: 2px;
left: 2px;
width: 20px;
height: 20px;
background: white;
border-radius: 50%;
transition: transform 0.3s ease;
box-shadow: 0 2px 4px rgba(0,0,0,0.2);
}
#cluster-mode-toggle {
display: none;
}
#cluster-mode-toggle:checked + .cluster-toggle-container .toggle-switch {
background: #10b981;
}
#cluster-mode-toggle:checked + .cluster-toggle-container .toggle-switch::after {
transform: translateX(24px);
}
#cluster-mode-toggle:disabled + .cluster-toggle-container {
opacity: 0.5;
cursor: not-allowed;
}
#cluster-mode-toggle:disabled + .cluster-toggle-container:hover {
background: rgba(255, 255, 255, 0.15);
transform: none;
}
.cluster-toggle-label {
font-size: 0.95em;
font-weight: 600;
color: white;
letter-spacing: 0.3px;
user-select: none;
}
.cluster-toggle-info {
font-size: 0.8em;
opacity: 0.9;
color: white;
font-weight: 400;
}
.instance-card {
background: white;
border-radius: 12px;
padding: 20px;
margin-bottom: 15px;
box-shadow: 0 2px 8px rgba(0,0,0,0.08);
}
.instance-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 15px;
padding-bottom: 15px;
border-bottom: 1px solid #f0f0f0;
}
.instance-title {
font-size: 1.1em;
font-weight: 600;
color: #333;
}
.instance-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
}
.instance-metric {
display: flex;
flex-direction: column;
}
.instance-metric-label {
font-size: 0.8em;
color: #666;
margin-bottom: 4px;
}
.instance-metric-value {
font-size: 1.1em;
font-weight: 600;
color: #333;
}
</style>
</head>
<body>
<header>
<div class="container">
<h1>GraphQL Proxy Admin Dashboard</h1>
<div class="subtitle">Real-time monitoring and management</div>
<div style="display: flex; justify-content: space-between; align-items: center; flex-wrap: wrap; gap: 15px;">
<div>
<h1>GraphQL Proxy Admin Dashboard</h1>
<div class="subtitle">
Real-time monitoring and management
<span class="ws-status ws-disconnected" id="ws-status">Connecting...</span>
</div>
</div>
<div>
<input type="checkbox" id="cluster-mode-toggle">
<label for="cluster-mode-toggle" class="cluster-toggle-container">
<div class="toggle-switch"></div>
<div>
<div class="cluster-toggle-label">Cluster View</div>
<div class="cluster-toggle-info" id="cluster-info">Checking availability...</div>
</div>
</label>
</div>
</div>
</div>
</header>
<div class="container">
<!-- Cluster Status (shown when cluster mode detected) -->
<div id="cluster-status-section" style="display: none;">
<h2 class="section-title">Cluster Status</h2>
<div class="stats-grid">
<div class="card">
<div class="card-title">Total Instances</div>
<div class="card-value" id="cluster-total-instances">--</div>
<div class="card-label">Proxy nodes</div>
</div>
<div class="card">
<div class="card-title">Healthy Instances</div>
<div class="card-value" id="cluster-healthy-instances">--</div>
<div class="card-label">Active nodes</div>
</div>
</div>
</div>
<!-- System Overview -->
<h2 class="section-title">
<span id="overview-title">System Overview</span>
</h2>
<div class="stats-grid">
<div class="card">
<div class="card-title">Uptime</div>
<div class="card-value" id="uptime">--</div>
<div class="card-label" id="uptime-seconds">-- seconds</div>
</div>
<div class="card">
<div class="card-title">Total Requests</div>
<div class="card-value" id="total-requests">--</div>
<div class="card-label">
<span style="color: #10b981;"><span id="succeeded-requests">--</span></span>
<span style="color: #ef4444;"><span id="failed-requests">--</span></span>
</div>
</div>
<div class="card">
<div class="card-title">Current RPS</div>
<div class="card-value" id="current-rps">--</div>
<div class="card-label">Avg: <span id="avg-rps">--</span> req/s</div>
</div>
<div class="card">
<div class="card-title">Success Rate</div>
<div class="card-value" id="success-rate">--%</div>
<div class="progress-bar">
<div class="progress-fill" id="success-progress" style="width: 0%"></div>
</div>
</div>
</div>
<!-- Cache Statistics -->
<h2 class="section-title">Cache Performance</h2>
<div class="stats-grid">
<div class="card">
<div class="card-title">Cache Hit Rate</div>
<div class="card-value" id="cache-hit-rate">--%</div>
<div class="progress-bar">
<div class="progress-fill" id="cache-hit-progress" style="width: 0%"></div>
</div>
</div>
<div class="card">
<div class="card-title">Cache Hits / Misses</div>
<div class="card-value" id="cache-hits">--</div>
<div class="card-label">
Hits: <span id="cache-hits-detail">--</span> |
Misses: <span id="cache-misses">--</span>
</div>
</div>
<div class="card">
<div class="card-title">Cached Queries</div>
<div class="card-value" id="cached-queries">--</div>
<div class="card-label">Total entries</div>
</div>
<div class="card">
<div class="card-title">Memory Usage</div>
<div class="card-value" id="cache-memory">-- MB</div>
<div class="card-label" id="cache-memory-pct">--%</div>
<div class="progress-bar">
<div class="progress-fill" id="memory-progress" style="width: 0%"></div>
</div>
</div>
</div>
<!-- Real-time Charts -->
<h2 class="section-title">Real-time Metrics</h2>
<div class="stats-grid">
<div class="card">
<div class="card-title">Requests Per Second</div>
<div class="chart-container">
<canvas id="rps-chart"></canvas>
</div>
</div>
<div class="card">
<div class="card-title">Cache Hit Rate Over Time</div>
<div class="chart-container">
<canvas id="cache-chart"></canvas>
</div>
</div>
</div>
<!-- Health Status -->
<h2 class="section-title">Health Status</h2>
<div class="card" id="health-card">
<div class="card-title">System Health</div>
<div>
<span class="status-indicator status-unknown loading" id="health-indicator"></span>
<span id="health-status">Loading...</span>
<div class="metric-row">
<span class="metric-label">Backend Status</span>
<span>
<span class="status-indicator status-unknown loading" id="health-indicator"></span>
<span id="health-status">Loading...</span>
</span>
</div>
</div>
<!-- Key Metrics -->
<h2 class="section-title">Key Metrics</h2>
<h2 class="section-title">Advanced Features</h2>
<div class="stats-grid">
<div class="card">
<div class="card-title">Request Coalescing</div>
@@ -334,44 +625,558 @@
</div>
</div>
<div class="refresh-info">
Dashboard refreshes every 5 seconds
<!-- Instance Details (shown in cluster mode) -->
<div id="instance-details-section" style="display: none;">
<h2 class="section-title">Instance Details</h2>
<div id="instance-list"></div>
</div>
<div class="refresh-info" id="refresh-info">
<span id="connection-mode">Connecting to real-time updates...</span>
</div>
</div>
<script>
// Fetch and update dashboard data
// Chart instances
let rpsChart, cacheChart;
let rpsData = { labels: [], data: [] };
let cacheData = { labels: [], data: [] };
const MAX_DATA_POINTS = 60; // Keep last 60 data points
// WebSocket connection
let ws = null;
let wsReconnectInterval = null;
let useWebSocket = true;
// Cluster mode state
let clusterModeEnabled = false;
let clusterModeAvailable = false;
// Smoothing buffers for metrics (10-second moving average at 2s intervals = 5 data points)
const smoothingWindow = 5;
const rpsBuffer = [];
const successRateBuffer = [];
const cacheHitRateBuffer = [];
// Initialize charts
function initCharts() {
const rpsCtx = document.getElementById('rps-chart').getContext('2d');
rpsChart = new Chart(rpsCtx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: 'Requests/sec',
data: [],
borderColor: '#667eea',
backgroundColor: 'rgba(102, 126, 234, 0.1)',
tension: 0.4,
fill: true
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: { display: false }
},
scales: {
y: {
beginAtZero: true,
ticks: { precision: 0 }
},
x: {
display: false
}
}
}
});
const cacheCtx = document.getElementById('cache-chart').getContext('2d');
cacheChart = new Chart(cacheCtx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: 'Hit Rate %',
data: [],
borderColor: '#10b981',
backgroundColor: 'rgba(16, 185, 129, 0.1)',
tension: 0.4,
fill: true
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: { display: false }
},
scales: {
y: {
beginAtZero: true,
max: 100,
ticks: {
callback: function(value) {
return value + '%';
}
}
},
x: {
display: false
}
}
}
});
}
// Update chart data
function updateChart(chart, dataStore, value) {
const timestamp = new Date().toLocaleTimeString();
dataStore.labels.push(timestamp);
dataStore.data.push(value);
// Keep only last MAX_DATA_POINTS
if (dataStore.labels.length > MAX_DATA_POINTS) {
dataStore.labels.shift();
dataStore.data.shift();
}
chart.data.labels = dataStore.labels;
chart.data.datasets[0].data = dataStore.data;
chart.update('none'); // Update without animation for smoother real-time updates
}
// Connect to WebSocket
function connectWebSocket() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/admin/ws/stats`;
try {
ws = new WebSocket(wsUrl);
ws.onopen = () => {
updateWSStatus(true);
if (wsReconnectInterval) {
clearInterval(wsReconnectInterval);
wsReconnectInterval = null;
}
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
updateAllStats(data);
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
updateWSStatus(false);
// Try to reconnect after 5 seconds
if (!wsReconnectInterval) {
wsReconnectInterval = setInterval(() => {
connectWebSocket();
}, 5000);
}
};
} catch (error) {
console.error('Failed to create WebSocket:', error);
updateWSStatus(false);
// Fall back to polling if WebSocket fails
useWebSocket = false;
startPolling();
}
}
// Update WebSocket status indicator
function updateWSStatus(connected) {
const statusEl = document.getElementById('ws-status');
const infoEl = document.getElementById('connection-mode');
if (connected) {
statusEl.className = 'ws-status ws-connected';
statusEl.textContent = 'Live';
infoEl.textContent = 'Real-time updates via WebSocket';
} else {
statusEl.className = 'ws-status ws-disconnected';
statusEl.textContent = 'Reconnecting...';
infoEl.textContent = 'Attempting to reconnect...';
}
}
// Fallback: Fetch and update dashboard data via polling
async function updateDashboard() {
try {
// Update health
const health = await fetch('/admin/api/health').then(r => r.json());
updateHealth(health);
if (clusterModeEnabled) {
// Fetch cluster stats
const [clusterStats, instances] = await Promise.all([
fetch('/admin/api/cluster/stats').then(r => r.json()),
fetch('/admin/api/cluster/instances').then(r => r.json())
]);
// Update circuit breaker
const cb = await fetch('/admin/api/circuit-breaker').then(r => r.json());
updateCircuitBreaker(cb);
if (clusterStats.cluster_mode) {
updateClusterStats(clusterStats, instances);
}
} else {
// Fetch all stats for single instance
const [stats, health, cb, cache, coalescing, retryBudget, wsStats, connections] = await Promise.all([
fetch('/admin/api/stats').then(r => r.json()),
fetch('/admin/api/health').then(r => r.json()),
fetch('/admin/api/circuit-breaker').then(r => r.json()),
fetch('/admin/api/cache').then(r => r.json()),
fetch('/admin/api/coalescing').then(r => r.json()),
fetch('/admin/api/retry-budget').then(r => r.json()),
fetch('/admin/api/websocket').then(r => r.json()),
fetch('/admin/api/connections').then(r => r.json())
]);
// Update coalescing
const coalescing = await fetch('/admin/api/coalescing').then(r => r.json());
updateCoalescing(coalescing);
const allData = {
stats,
health,
circuit_breaker: cb,
cache,
coalescing,
retry_budget: retryBudget,
websocket: wsStats,
connections
};
// Update retry budget
const retryBudget = await fetch('/admin/api/retry-budget').then(r => r.json());
updateRetryBudget(retryBudget);
// Update WebSocket
const ws = await fetch('/admin/api/websocket').then(r => r.json());
updateWebSocket(ws);
// Update connections
const connections = await fetch('/admin/api/connections').then(r => r.json());
updateConnections(connections);
updateAllStats(allData);
}
} catch (error) {
console.error('Failed to update dashboard:', error);
}
}
// Check if cluster mode is available
async function checkClusterMode() {
try {
const response = await fetch('/admin/api/cluster/stats');
const data = await response.json();
if (response.ok && data.cluster_mode) {
clusterModeAvailable = true;
document.getElementById('cluster-info').textContent =
`(${data.total_instances} instance${data.total_instances !== 1 ? 's' : ''} available)`;
return true;
}
} catch (error) {
// Cluster mode not available
}
clusterModeAvailable = false;
document.getElementById('cluster-info').textContent = '(not available)';
document.getElementById('cluster-mode-toggle').disabled = true;
return false;
}
// Update cluster statistics
function updateClusterStats(clusterData, instancesData) {
// Show cluster sections
document.getElementById('cluster-status-section').style.display = 'block';
document.getElementById('instance-details-section').style.display = 'block';
document.getElementById('overview-title').textContent = 'Cluster Overview';
// Update cluster status
document.getElementById('cluster-total-instances').textContent = clusterData.total_instances || 0;
document.getElementById('cluster-healthy-instances').textContent = clusterData.healthy_instances || 0;
// Update cluster info in toggle label
const totalInstances = clusterData.total_instances || 0;
document.getElementById('cluster-info').textContent =
`(${totalInstances} instance${totalInstances !== 1 ? 's' : ''} available)`;
// Update combined stats
if (clusterData.stats) {
updateStats({ ...clusterData.stats, cluster_mode: true });
// Update memory usage from cluster stats
if (clusterData.stats.memory) {
updateCacheStats({
memory_usage_mb: clusterData.stats.memory.total_usage_mb || 0
});
}
}
// Update instance list
if (instancesData && instancesData.instances) {
updateInstanceList(instancesData.instances, instancesData.current_instance);
}
}
// Update individual instance list
function updateInstanceList(instances, currentInstanceID) {
const container = document.getElementById('instance-list');
container.innerHTML = '';
instances.forEach(instance => {
// Use JSON field names (snake_case), not Go struct names (PascalCase)
const isCurrent = instance.instance_id === currentInstanceID;
const isHealthy = instance.health && instance.health.status === 'healthy';
const card = document.createElement('div');
card.className = 'instance-card';
const header = document.createElement('div');
header.className = 'instance-header';
const title = document.createElement('div');
title.className = 'instance-title';
title.innerHTML = `
<span class="status-indicator ${isHealthy ? 'status-healthy' : 'status-unhealthy'}"></span>
${instance.hostname || 'unknown'}
${isCurrent ? '<span class="badge badge-info" style="margin-left: 8px;">Current</span>' : ''}
`;
const uptime = document.createElement('span');
uptime.style.fontSize = '0.85em';
uptime.style.color = '#666';
uptime.textContent = `Uptime: ${formatUptime(instance.uptime_seconds || 0)}`;
header.appendChild(title);
header.appendChild(uptime);
const grid = document.createElement('div');
grid.className = 'instance-grid';
// Extract stats - use JSON field names
const stats = instance.stats || {};
const requests = stats.requests || {};
const cache = instance.cache_summary || stats.cache_summary || {};
const failedCount = requests.failed || 0;
const totalCount = requests.total || 0;
const failureInfo = failedCount > 0 ? ` (${failedCount} failed)` : '';
const metrics = [
{
label: 'Total Requests',
value: formatNumber(totalCount),
title: totalCount.toLocaleString() + ' total requests' + failureInfo
},
{
label: 'Success Rate',
value: (requests.success_rate_pct || 0).toFixed(1) + '%',
title: null
},
{
label: 'Current RPS',
value: (requests.current_requests_per_second || 0).toFixed(1),
title: null
},
{
label: 'Cache Hit Rate',
value: (cache.hit_rate_pct || 0).toFixed(1) + '%',
title: null
}
];
metrics.forEach(metric => {
const metricDiv = document.createElement('div');
metricDiv.className = 'instance-metric';
if (metric.title) {
metricDiv.title = metric.title;
}
metricDiv.innerHTML = `
<div class="instance-metric-label">${metric.label}</div>
<div class="instance-metric-value">${metric.value}</div>
`;
grid.appendChild(metricDiv);
});
card.appendChild(header);
card.appendChild(grid);
container.appendChild(card);
});
}
// Format uptime in human readable format
function formatUptime(seconds) {
const days = Math.floor(seconds / 86400);
const hours = Math.floor((seconds % 86400) / 3600);
const minutes = Math.floor((seconds % 3600) / 60);
if (days > 0) return `${days}d ${hours}h`;
if (hours > 0) return `${hours}h ${minutes}m`;
return `${minutes}m`;
}
// Format large numbers compactly (1.2M, 3.4K, etc)
function formatNumber(num) {
if (num === undefined || num === null) return '0';
const absNum = Math.abs(num);
if (absNum >= 1000000000) {
return (num / 1000000000).toFixed(1) + 'B';
}
if (absNum >= 1000000) {
return (num / 1000000).toFixed(1) + 'M';
}
if (absNum >= 1000) {
return (num / 1000).toFixed(1) + 'K';
}
return num.toLocaleString();
}
// Smooth metric values using moving average
function smoothMetric(buffer, newValue) {
buffer.push(newValue);
if (buffer.length > smoothingWindow) {
buffer.shift(); // Remove oldest value
}
// Calculate average
const sum = buffer.reduce((a, b) => a + b, 0);
return sum / buffer.length;
}
// Get trend indicator (↑ ↗ → ↘ ↓)
function getTrendIndicator(buffer) {
if (buffer.length < 2) return '→';
const recent = buffer.slice(-3); // Last 3 values
const avg = recent.reduce((a, b) => a + b, 0) / recent.length;
const diff = recent[recent.length - 1] - recent[0];
const percentChange = Math.abs(diff / (avg || 1)) * 100;
if (percentChange < 5) return '→'; // Stable
if (diff > 0) {
return percentChange > 15 ? '↑' : '↗'; // Strong/moderate increase
} else {
return percentChange > 15 ? '↓' : '↘'; // Strong/moderate decrease
}
}
// Update all statistics
function updateAllStats(data) {
if (data.stats) updateStats(data.stats);
if (data.health) updateHealth(data.health);
if (data.circuit_breaker) updateCircuitBreaker(data.circuit_breaker);
if (data.cache) updateCacheStats(data.cache);
if (data.coalescing) updateCoalescing(data.coalescing);
if (data.retry_budget) updateRetryBudget(data.retry_budget);
if (data.websocket) updateWebSocket(data.websocket);
if (data.connections) updateConnections(data.connections);
}
// Update main stats
function updateStats(data) {
// Uptime
document.getElementById('uptime').textContent = data.uptime_human || '--';
document.getElementById('uptime-seconds').textContent =
(data.uptime_seconds || 0).toFixed(0) + ' seconds';
if (data.requests) {
const req = data.requests;
// Total requests with compact formatting
document.getElementById('total-requests').textContent = formatNumber(req.total || 0);
document.getElementById('total-requests').title = (req.total || 0).toLocaleString() + ' total requests';
document.getElementById('succeeded-requests').textContent = formatNumber(req.succeeded || 0);
document.getElementById('succeeded-requests').title = (req.succeeded || 0).toLocaleString() + ' succeeded';
document.getElementById('failed-requests').textContent = formatNumber(req.failed || 0);
document.getElementById('failed-requests').title = (req.failed || 0).toLocaleString() + ' failed';
// Show failure details if there are failures
if (req.failed > 0) {
const failureRate = (req.failed / (req.total || 1) * 100).toFixed(2);
document.getElementById('failed-requests').title += ` (${failureRate}% failure rate)`;
}
// Success rate with smoothing
const rawSuccessRate = req.success_rate_pct || 0;
const smoothedSuccessRate = smoothMetric(successRateBuffer, rawSuccessRate);
const successTrend = getTrendIndicator(successRateBuffer);
document.getElementById('success-rate').textContent =
smoothedSuccessRate.toFixed(1) + '% ' + successTrend;
document.getElementById('success-rate').title =
`10s avg: ${smoothedSuccessRate.toFixed(2)}% | Current: ${rawSuccessRate.toFixed(2)}%`;
document.getElementById('success-progress').style.width =
smoothedSuccessRate + '%';
// RPS with smoothing
const rawRPS = req.current_requests_per_second || 0;
const smoothedRPS = smoothMetric(rpsBuffer, rawRPS);
const rpsTrend = getTrendIndicator(rpsBuffer);
document.getElementById('current-rps').textContent =
smoothedRPS.toFixed(1) + ' ' + rpsTrend;
document.getElementById('current-rps').title =
`10s avg: ${smoothedRPS.toFixed(2)} | Current: ${rawRPS.toFixed(2)}`;
document.getElementById('avg-rps').textContent =
(req.avg_requests_per_second || 0).toFixed(1);
// Update RPS chart with smoothed value
updateChart(rpsChart, rpsData, smoothedRPS);
}
// Cache summary with smoothing
if (data.cache_summary) {
const cache = data.cache_summary;
const rawHitRate = cache.hit_rate_pct || 0;
const smoothedHitRate = smoothMetric(cacheHitRateBuffer, rawHitRate);
const hitRateTrend = getTrendIndicator(cacheHitRateBuffer);
document.getElementById('cache-hit-rate').textContent =
smoothedHitRate.toFixed(1) + '% ' + hitRateTrend;
document.getElementById('cache-hit-rate').title =
`10s avg: ${smoothedHitRate.toFixed(2)}% | Current: ${rawHitRate.toFixed(2)}%`;
document.getElementById('cache-hit-progress').style.width =
smoothedHitRate + '%';
document.getElementById('cache-hits').textContent = formatNumber(cache.hits || 0);
document.getElementById('cache-hits').title = (cache.hits || 0).toLocaleString() + ' cache hits';
document.getElementById('cache-hits-detail').textContent = formatNumber(cache.hits || 0);
document.getElementById('cache-hits-detail').title = (cache.hits || 0).toLocaleString();
document.getElementById('cache-misses').textContent = formatNumber(cache.misses || 0);
document.getElementById('cache-misses').title = (cache.misses || 0).toLocaleString() + ' cache misses';
document.getElementById('cached-queries').textContent = formatNumber(cache.total_cached || 0);
document.getElementById('cached-queries').title = (cache.total_cached || 0).toLocaleString() + ' unique queries cached';
// Update cache hit rate chart with smoothed value
updateChart(cacheChart, cacheData, smoothedHitRate);
}
}
// Update cache detailed stats
function updateCacheStats(data) {
if (data.memory_usage_mb !== undefined) {
if (data.memory_usage_mb === -1) {
// Redis cache - memory tracking not available
document.getElementById('cache-memory').textContent = 'N/A';
document.getElementById('cache-memory').title = 'Memory tracking not available for Redis cache';
document.getElementById('cache-memory-pct').textContent = 'Redis cache';
document.getElementById('memory-progress').style.width = '0%';
} else {
document.getElementById('cache-memory').textContent =
data.memory_usage_mb.toFixed(2) + ' MB';
document.getElementById('cache-memory').title = 'In-memory cache usage';
if (data.memory_usage_pct !== undefined) {
const memPct = data.memory_usage_pct;
document.getElementById('cache-memory-pct').textContent =
memPct.toFixed(1) + '% used';
document.getElementById('memory-progress').style.width =
memPct + '%';
}
}
}
}
function updateHealth(data) {
const indicator = document.getElementById('health-indicator');
const status = document.getElementById('health-status');
@@ -465,11 +1270,67 @@
}
}
// Initial load
updateDashboard();
// Start polling (fallback when WebSocket is not available)
function startPolling() {
const statusEl = document.getElementById('ws-status');
const infoEl = document.getElementById('connection-mode');
// Refresh every 5 seconds
setInterval(updateDashboard, 5000);
statusEl.className = 'ws-status ws-disconnected';
statusEl.textContent = 'Polling';
infoEl.textContent = 'Updates every 5 seconds (WebSocket unavailable)';
// Initial load
updateDashboard();
// Refresh every 5 seconds
setInterval(updateDashboard, 5000);
}
// Initialize dashboard
async function initDashboard() {
// Initialize charts first
initCharts();
// Check if cluster mode is available
await checkClusterMode();
// Setup cluster mode toggle
const toggle = document.getElementById('cluster-mode-toggle');
toggle.addEventListener('change', (e) => {
clusterModeEnabled = e.target.checked && clusterModeAvailable;
// Toggle cluster sections visibility
if (clusterModeEnabled) {
document.getElementById('cluster-status-section').style.display = 'block';
document.getElementById('overview-title').textContent = 'Cluster Overview';
} else {
document.getElementById('cluster-status-section').style.display = 'none';
document.getElementById('instance-details-section').style.display = 'none';
document.getElementById('overview-title').textContent = 'System Overview';
}
// Refresh data
updateDashboard();
});
// Try WebSocket connection first
connectWebSocket();
// Set a timeout to fall back to polling if WebSocket doesn't connect
setTimeout(() => {
if (!ws || ws.readyState !== WebSocket.OPEN) {
useWebSocket = false;
startPolling();
}
}, 3000);
}
// Start when page loads
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', initDashboard);
} else {
initDashboard();
}
</script>
</body>
</html>
+476 -13
View File
@@ -2,10 +2,12 @@ package main
import (
"embed"
"encoding/json"
"fmt"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
@@ -41,6 +43,15 @@ func (ad *AdminDashboard) RegisterRoutes(app *fiber.App) {
app.Get("/admin/api/coalescing", ad.getCoalescingStats)
app.Get("/admin/api/websocket", ad.getWebSocketStats)
// WebSocket endpoint for streaming statistics
app.Get("/admin/ws/stats", websocket.New(ad.handleStatsWebSocket))
// Cluster mode endpoints (when using Redis)
app.Get("/admin/api/cluster/stats", ad.getClusterStats)
app.Get("/admin/api/cluster/instances", ad.getClusterInstances)
app.Get("/admin/api/cluster/debug", ad.getClusterDebug)
app.Post("/admin/api/cluster/force-publish", ad.forcePublish)
// Control endpoints
app.Post("/admin/api/cache/clear", ad.clearCache)
app.Post("/admin/api/retry-budget/reset", ad.resetRetryBudget)
@@ -78,9 +89,9 @@ func (ad *AdminDashboard) getStats(c *fiber.Ctx) error {
}
if cfg != nil && cfg.Monitoring != nil {
succeeded := getAdminMetricValue("graphql_proxy_succeeded_total")
failed := getAdminMetricValue("graphql_proxy_failed_total")
skipped := getAdminMetricValue("graphql_proxy_skipped_total")
succeeded := getAdminMetricValue("requests_succesful")
failed := getAdminMetricValue("requests_failed")
skipped := getAdminMetricValue("requests_skipped")
total := succeeded + failed + skipped
// Request statistics
@@ -241,18 +252,24 @@ func (ad *AdminDashboard) getCacheStats(c *fiber.Ctx) error {
}
stats["hit_rate_pct"] = hitRate
// Get memory usage
memoryUsage := libpack_cache.GetCacheMemoryUsage()
maxMemory := libpack_cache.GetCacheMaxMemorySize()
stats["memory_usage_bytes"] = memoryUsage
stats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024)
// Get memory usage only for in-memory cache
if cfg.Cache.CacheEnable && !cfg.Cache.CacheRedisEnable {
memoryUsage := libpack_cache.GetCacheMemoryUsage()
maxMemory := libpack_cache.GetCacheMaxMemorySize()
stats["memory_usage_bytes"] = memoryUsage
stats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024)
// Calculate memory usage percentage
memoryUsagePct := 0.0
if maxMemory > 0 {
memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100
// Calculate memory usage percentage
memoryUsagePct := 0.0
if maxMemory > 0 {
memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100
}
stats["memory_usage_pct"] = memoryUsagePct
} else {
// For Redis cache, memory tracking not available per instance
stats["memory_usage_mb"] = -1 // Sentinel value for "not applicable"
stats["memory_usage_pct"] = -1
}
stats["memory_usage_pct"] = memoryUsagePct
}
}
@@ -349,6 +366,161 @@ func (ad *AdminDashboard) resetCoalescing(c *fiber.Ctx) error {
})
}
// getClusterStats returns aggregated statistics from all proxy instances
func (ad *AdminDashboard) getClusterStats(c *fiber.Ctx) error {
aggregator := GetMetricsAggregator()
if aggregator == nil {
return c.Status(503).JSON(map[string]interface{}{
"error": "Cluster mode not available",
"message": "Redis-based metrics aggregation is not enabled",
"cluster_mode": false,
})
}
metrics, err := aggregator.GetAggregatedMetrics()
if err != nil {
if ad.logger != nil {
ad.logger.Error(&libpack_logger.LogMessage{
Message: "Failed to get aggregated metrics",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
return c.Status(500).JSON(map[string]interface{}{
"error": "Failed to retrieve cluster metrics",
"message": err.Error(),
})
}
// Format response similar to regular stats endpoint
response := map[string]interface{}{
"cluster_mode": true,
"total_instances": metrics.TotalInstances,
"healthy_instances": metrics.HealthyInstances,
"last_update": metrics.LastUpdate.Format(time.RFC3339),
"stats": metrics.CombinedStats,
}
return c.JSON(response)
}
// getClusterInstances returns detailed metrics for each proxy instance
func (ad *AdminDashboard) getClusterInstances(c *fiber.Ctx) error {
aggregator := GetMetricsAggregator()
if aggregator == nil {
return c.Status(503).JSON(map[string]interface{}{
"error": "Cluster mode not available",
"message": "Redis-based metrics aggregation is not enabled",
"cluster_mode": false,
})
}
metrics, err := aggregator.GetAggregatedMetrics()
if err != nil {
if ad.logger != nil {
ad.logger.Error(&libpack_logger.LogMessage{
Message: "Failed to get instance metrics",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
return c.Status(500).JSON(map[string]interface{}{
"error": "Failed to retrieve instance metrics",
"message": err.Error(),
})
}
return c.JSON(map[string]interface{}{
"cluster_mode": true,
"total_instances": metrics.TotalInstances,
"healthy_instances": metrics.HealthyInstances,
"current_instance": aggregator.GetInstanceID(),
"instances": metrics.Instances,
})
}
// getClusterDebug returns debug information about cluster mode
func (ad *AdminDashboard) getClusterDebug(c *fiber.Ctx) error {
aggregator := GetMetricsAggregator()
debug := map[string]interface{}{
"aggregator_initialized": aggregator != nil,
"redis_cache_enabled": false,
}
if cfg != nil {
debug["redis_cache_enabled"] = cfg.Cache.CacheRedisEnable
debug["cache_enabled"] = cfg.Cache.CacheEnable
}
if aggregator != nil {
debug["instance_id"] = aggregator.GetInstanceID()
debug["is_cluster_mode"] = aggregator.IsClusterMode()
// Try to get metrics
metrics, err := aggregator.GetAggregatedMetrics()
if err != nil {
debug["error"] = err.Error()
} else {
debug["total_instances"] = metrics.TotalInstances
debug["healthy_instances"] = metrics.HealthyInstances
// Show first instance structure as example
if len(metrics.Instances) > 0 {
first := metrics.Instances[0]
debug["sample_instance"] = map[string]interface{}{
"instance_id": first.InstanceID,
"hostname": first.Hostname,
"uptime_seconds": first.UptimeSeconds,
"stats_keys": getMapKeys(first.Stats),
"has_requests": first.Stats["requests"] != nil,
"has_cache": len(first.CacheSummary) > 0,
"health_status": first.Health["status"],
}
// Show requests structure if it exists
if requests, ok := first.Stats["requests"].(map[string]interface{}); ok {
debug["sample_requests"] = requests
}
}
}
}
return c.JSON(debug)
}
// Helper to get map keys
func getMapKeys(m map[string]interface{}) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}
// forcePublish forces an immediate metrics publish for testing
func (ad *AdminDashboard) forcePublish(c *fiber.Ctx) error {
aggregator := GetMetricsAggregator()
if aggregator == nil {
return c.Status(503).JSON(map[string]interface{}{
"error": "Aggregator not initialized",
"success": false,
})
}
// Trigger publish in goroutine to avoid blocking
go aggregator.publishMetrics()
return c.JSON(map[string]interface{}{
"success": true,
"triggered": true,
"message": "Publish triggered in background",
"next_steps": []string{
"Wait 2 seconds",
"Check GET /admin/api/cluster/debug",
"Check server logs for ✓ Successfully published or ❌ CRITICAL errors",
},
})
}
// Helper to get metric value for admin dashboard
func getAdminMetricValue(name string) int64 {
if cfg == nil || cfg.Monitoring == nil {
@@ -361,4 +533,295 @@ func getAdminMetricValue(name string) int64 {
return int64(counter.Get())
}
// handleStatsWebSocket handles WebSocket connections for streaming statistics
func (ad *AdminDashboard) handleStatsWebSocket(c *websocket.Conn) {
if ad.logger != nil {
ad.logger.Info(&libpack_logger.LogMessage{
Message: "WebSocket client connected to stats stream",
Pairs: map[string]interface{}{
"remote_addr": c.RemoteAddr().String(),
},
})
}
// Cleanup on disconnect
defer func() {
if ad.logger != nil {
ad.logger.Info(&libpack_logger.LogMessage{
Message: "WebSocket client disconnected from stats stream",
Pairs: map[string]interface{}{
"remote_addr": c.RemoteAddr().String(),
},
})
}
c.Close()
}()
// Set up ping/pong handlers
c.SetReadDeadline(time.Now().Add(60 * time.Second))
c.SetPongHandler(func(string) error {
c.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
// Channel to signal when to stop
done := make(chan struct{})
// Goroutine to handle incoming messages (for connection keep-alive)
go func() {
defer close(done)
for {
if _, _, err := c.ReadMessage(); err != nil {
// Connection closed or error
return
}
}
}()
// Stream statistics every 2 seconds
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
// Send initial stats immediately
if stats := ad.gatherAllStats(); stats != nil {
if data, err := json.Marshal(stats); err == nil {
c.WriteMessage(websocket.TextMessage, data)
}
}
// Stream loop
for {
select {
case <-ticker.C:
// Gather all stats
stats := ad.gatherAllStats()
// Marshal to JSON
data, err := json.Marshal(stats)
if err != nil {
if ad.logger != nil {
ad.logger.Error(&libpack_logger.LogMessage{
Message: "Failed to marshal stats for WebSocket",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
return
}
// Send to client
if err := c.WriteMessage(websocket.TextMessage, data); err != nil {
if ad.logger != nil {
ad.logger.Debug(&libpack_logger.LogMessage{
Message: "Failed to write to WebSocket (client likely disconnected)",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
return
}
case <-done:
// Client disconnected
return
}
}
}
// gatherAllStats collects all statistics into a single structure
func (ad *AdminDashboard) gatherAllStats() map[string]interface{} {
result := make(map[string]interface{})
// Main stats
uptimeSeconds := time.Since(startTime).Seconds()
stats := map[string]interface{}{
"timestamp": time.Now().Format(time.RFC3339),
"uptime_seconds": uptimeSeconds,
"uptime_human": formatDuration(time.Since(startTime)),
"version": "0.27.0",
}
if cfg != nil && cfg.Monitoring != nil {
succeeded := getAdminMetricValue("requests_succesful")
failed := getAdminMetricValue("requests_failed")
skipped := getAdminMetricValue("requests_skipped")
total := succeeded + failed + skipped
requestStats := map[string]interface{}{
"total": total,
"succeeded": succeeded,
"failed": failed,
"skipped": skipped,
}
if total > 0 {
requestStats["success_rate_pct"] = float64(succeeded) / float64(total) * 100
requestStats["failure_rate_pct"] = float64(failed) / float64(total) * 100
requestStats["skip_rate_pct"] = float64(skipped) / float64(total) * 100
} else {
requestStats["success_rate_pct"] = 0.0
requestStats["failure_rate_pct"] = 0.0
requestStats["skip_rate_pct"] = 0.0
}
if uptimeSeconds > 0 {
requestStats["avg_requests_per_second"] = float64(total) / uptimeSeconds
} else {
requestStats["avg_requests_per_second"] = 0.0
}
if rpsTracker := GetRPSTracker(); rpsTracker != nil {
requestStats["current_requests_per_second"] = rpsTracker.GetCurrentRPS()
} else {
requestStats["current_requests_per_second"] = 0.0
}
stats["requests"] = requestStats
// Cache summary
cacheStats := libpack_cache.GetCacheStats()
if cacheStats != nil {
totalCacheRequests := cacheStats.CacheHits + cacheStats.CacheMisses
hitRate := 0.0
if totalCacheRequests > 0 {
hitRate = float64(cacheStats.CacheHits) / float64(totalCacheRequests) * 100
}
stats["cache_summary"] = map[string]interface{}{
"hits": cacheStats.CacheHits,
"misses": cacheStats.CacheMisses,
"hit_rate_pct": hitRate,
"total_cached": cacheStats.CachedQueries,
}
}
}
result["stats"] = stats
// Health
healthMgr := GetBackendHealthManager()
health := map[string]interface{}{
"status": "unknown",
"backend": map[string]interface{}{
"healthy": false,
},
}
if healthMgr != nil {
isHealthy := healthMgr.IsHealthy()
health["backend"] = map[string]interface{}{
"healthy": isHealthy,
"consecutive_failures": healthMgr.GetConsecutiveFailures(),
"last_check": healthMgr.GetLastHealthCheck().Format(time.RFC3339),
}
if isHealthy {
health["status"] = "healthy"
} else {
health["status"] = "unhealthy"
}
}
result["health"] = health
// Circuit breaker
cbStatus := map[string]interface{}{
"enabled": false,
"state": "unknown",
}
if cfg != nil {
cbStatus["enabled"] = cfg.CircuitBreaker.Enable
if cb != nil {
cbMutex.RLock()
state := cb.State()
cbMutex.RUnlock()
cbStatus["state"] = state.String()
cbStatus["config"] = map[string]interface{}{
"max_failures": cfg.CircuitBreaker.MaxFailures,
"failure_ratio": cfg.CircuitBreaker.FailureRatio,
"timeout": cfg.CircuitBreaker.Timeout,
"max_requests_half_open": cfg.CircuitBreaker.MaxRequestsInHalfOpen,
"return_cached_on_open": cfg.CircuitBreaker.ReturnCachedOnOpen,
}
}
}
result["circuit_breaker"] = cbStatus
// Cache stats
cacheStats := map[string]interface{}{
"enabled": false,
}
if cfg != nil {
cacheStats["enabled"] = cfg.Cache.CacheEnable
cacheStats["redis_enabled"] = cfg.Cache.CacheRedisEnable
cacheStats["ttl_seconds"] = cfg.Cache.CacheTTL
cacheStats["max_memory_mb"] = cfg.Cache.CacheMaxMemorySize
cacheStats["max_entries"] = cfg.Cache.CacheMaxEntries
runtimeCacheStats := libpack_cache.GetCacheStats()
if runtimeCacheStats != nil {
cacheStats["cached_queries"] = runtimeCacheStats.CachedQueries
cacheStats["cache_hits"] = runtimeCacheStats.CacheHits
cacheStats["cache_misses"] = runtimeCacheStats.CacheMisses
totalRequests := runtimeCacheStats.CacheHits + runtimeCacheStats.CacheMisses
hitRate := 0.0
if totalRequests > 0 {
hitRate = float64(runtimeCacheStats.CacheHits) / float64(totalRequests) * 100
}
cacheStats["hit_rate_pct"] = hitRate
memoryUsage := libpack_cache.GetCacheMemoryUsage()
maxMemory := libpack_cache.GetCacheMaxMemorySize()
cacheStats["memory_usage_bytes"] = memoryUsage
cacheStats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024)
memoryUsagePct := 0.0
if maxMemory > 0 {
memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100
}
cacheStats["memory_usage_pct"] = memoryUsagePct
}
}
result["cache"] = cacheStats
// Connection stats
poolMgr := GetConnectionPoolManager()
connStats := map[string]interface{}{
"available": false,
}
if poolMgr != nil {
connStats = poolMgr.GetConnectionStats()
connStats["available"] = true
}
result["connections"] = connStats
// Retry budget
rb := GetRetryBudget()
if rb == nil {
result["retry_budget"] = map[string]interface{}{"enabled": false}
} else {
result["retry_budget"] = rb.GetStats()
}
// Coalescing
rc := GetRequestCoalescer()
if rc == nil {
result["coalescing"] = map[string]interface{}{"enabled": false}
} else {
result["coalescing"] = rc.GetStats()
}
// WebSocket
wsp := GetWebSocketProxy()
if wsp == nil {
result["websocket"] = map[string]interface{}{"enabled": false}
} else {
result["websocket"] = wsp.GetStats()
}
return result
}
var startTime = time.Now()
+2 -2
View File
@@ -425,13 +425,13 @@ func TestGetAdminMetricValue(t *testing.T) {
}
// Test with valid metric
value := getAdminMetricValue("graphql_proxy_succeeded_total")
value := getAdminMetricValue("requests_succesful")
assert.GreaterOrEqual(t, value, int64(0))
// Test with nil config
oldCfg := cfg
cfg = nil
value = getAdminMetricValue("graphql_proxy_succeeded_total")
value = getAdminMetricValue("requests_succesful")
assert.Equal(t, int64(0), value)
cfg = oldCfg
}
+41
View File
@@ -319,6 +319,39 @@ func parseConfig() {
}
}
// Initialize metrics aggregator FIRST if Redis is enabled (even if cache is disabled)
// This allows cluster mode monitoring even when cache is off
if cfg.Cache.CacheRedisEnable {
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "Initializing metrics aggregator for cluster mode",
Pairs: map[string]interface{}{
"redis_url": cfg.Cache.CacheRedisURL,
"redis_db": cfg.Cache.CacheRedisDB,
},
})
if err := InitializeMetricsAggregator(
cfg.Cache.CacheRedisURL,
cfg.Cache.CacheRedisPassword,
cfg.Cache.CacheRedisDB,
cfg.Logger,
); err != nil {
cfg.Logger.Error(&libpack_logging.LogMessage{
Message: "FAILED to initialize metrics aggregator - cluster mode will not work",
Pairs: map[string]interface{}{
"error": err.Error(),
},
})
} else {
cfg.Logger.Info(&libpack_logging.LogMessage{
Message: "✓ Metrics aggregator successfully initialized",
Pairs: map[string]interface{}{
"instance_id": GetMetricsAggregator().GetInstanceID(),
},
})
}
}
// Initialize cache if enabled
if cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable {
cacheConfig := &libpack_cache.CacheConfig{
@@ -487,6 +520,14 @@ func main() {
return nil
})
// Register metrics aggregator for cleanup
shutdownManager.RegisterComponent("metrics-aggregator", func(ctx context.Context) error {
if aggregator := GetMetricsAggregator(); aggregator != nil {
aggregator.Shutdown()
}
return nil
})
// Cache shutdown is handled internally by the cache implementation
// Start monitoring server
+805
View File
@@ -0,0 +1,805 @@
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/google/uuid"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
"github.com/redis/go-redis/v9"
)
// MetricsAggregator handles distributed metrics collection via Redis
type MetricsAggregator struct {
redisClient *redis.Client
logger *libpack_logger.Logger
instanceID string
publishKey string
ttl time.Duration
publishTimer *time.Ticker
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
}
// InstanceMetrics represents metrics for a single proxy instance
type InstanceMetrics struct {
InstanceID string `json:"instance_id"`
Hostname string `json:"hostname"`
LastUpdate time.Time `json:"last_update"`
UptimeSeconds float64 `json:"uptime_seconds"`
Stats map[string]interface{} `json:"stats"`
Cache map[string]interface{} `json:"cache,omitempty"` // Full cache details including memory
CacheSummary map[string]interface{} `json:"cache_summary,omitempty"` // Deprecated: kept for compatibility
Health map[string]interface{} `json:"health"`
CircuitBreaker map[string]interface{} `json:"circuit_breaker,omitempty"`
RetryBudget map[string]interface{} `json:"retry_budget,omitempty"`
Coalescing map[string]interface{} `json:"coalescing,omitempty"`
WebSocketStats map[string]interface{} `json:"websocket,omitempty"`
Connections map[string]interface{} `json:"connections,omitempty"`
}
// AggregatedMetrics represents combined metrics from all instances
type AggregatedMetrics struct {
TotalInstances int `json:"total_instances"`
HealthyInstances int `json:"healthy_instances"`
LastUpdate time.Time `json:"last_update"`
CombinedStats map[string]interface{} `json:"combined_stats"`
Instances []InstanceMetrics `json:"instances"`
PerInstanceStats map[string]InstanceMetrics `json:"per_instance_stats"`
}
var (
metricsAggregator *MetricsAggregator
aggregatorMutex sync.RWMutex
)
// InitializeMetricsAggregator creates and starts the metrics aggregator
func InitializeMetricsAggregator(redisURL, redisPassword string, redisDB int, logger *libpack_logger.Logger) error {
aggregatorMutex.Lock()
defer aggregatorMutex.Unlock()
if metricsAggregator != nil {
return nil // Already initialized
}
// Parse Redis URL
opt, err := redis.ParseURL(fmt.Sprintf("redis://%s/%d", redisURL, redisDB))
if err != nil {
return fmt.Errorf("failed to parse Redis URL: %w", err)
}
if redisPassword != "" {
opt.Password = redisPassword
}
// Create Redis client with connection timeouts
opt.DialTimeout = 2 * time.Second
opt.ReadTimeout = 2 * time.Second
opt.WriteTimeout = 2 * time.Second
opt.PoolTimeout = 3 * time.Second
opt.MaxRetries = 2
client := redis.NewClient(opt)
// Test connection with detailed error reporting
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
// Log detailed connection error
if logger != nil {
logger.Error(&libpack_logger.LogMessage{
Message: "❌ CRITICAL: Redis connection test FAILED during initialization",
Pairs: map[string]interface{}{
"error": err.Error(),
"redis_url": redisURL,
"redis_db": redisDB,
"has_password": redisPassword != "",
},
})
}
return fmt.Errorf("failed to connect to Redis: %w", err)
}
// Log successful connection
if logger != nil {
logger.Info(&libpack_logger.LogMessage{
Message: "✓ Redis connection test PASSED",
Pairs: map[string]interface{}{
"redis_url": redisURL,
"redis_db": redisDB,
},
})
}
// Generate unique instance ID (hostname + UUID for uniqueness)
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
instanceID := fmt.Sprintf("%s-%s", hostname, uuid.New().String()[:8])
ctx, cancel = context.WithCancel(context.Background())
aggregator := &MetricsAggregator{
redisClient: client,
logger: logger,
instanceID: instanceID,
publishKey: "graphql-proxy:metrics:instances",
ttl: 30 * time.Second, // Metrics expire after 30s if not updated
publishTimer: time.NewTicker(5 * time.Second),
ctx: ctx,
cancel: cancel,
}
metricsAggregator = aggregator
// Start publishing metrics
go aggregator.startPublishing()
if logger != nil {
logger.Info(&libpack_logger.LogMessage{
Message: "Metrics aggregator initialized",
Pairs: map[string]interface{}{
"instance_id": instanceID,
"redis_url": redisURL,
"publish_key": aggregator.publishKey,
},
})
}
return nil
}
// GetMetricsAggregator returns the singleton instance
func GetMetricsAggregator() *MetricsAggregator {
aggregatorMutex.RLock()
defer aggregatorMutex.RUnlock()
return metricsAggregator
}
// startPublishing periodically publishes metrics to Redis
func (ma *MetricsAggregator) startPublishing() {
defer ma.publishTimer.Stop()
// Publish immediately on start
ma.publishMetrics()
for {
select {
case <-ma.ctx.Done():
// Clean up our metrics on shutdown
ma.removeInstanceMetrics()
return
case <-ma.publishTimer.C:
ma.publishMetrics()
}
}
}
// publishMetrics collects current metrics and stores them in Redis
// Note: This is exported for testing/debugging via admin API
func (ma *MetricsAggregator) publishMetrics() {
// Defensive: check if aggregator is still valid
if ma == nil {
return
}
ma.mu.RLock()
defer ma.mu.RUnlock()
// Safety check: ensure global config is initialized
if cfg == nil {
if ma.logger != nil {
ma.logger.Warning(&libpack_logger.LogMessage{
Message: "Cannot publish metrics - global config not initialized yet",
Pairs: map[string]interface{}{
"instance_id": ma.instanceID,
},
})
}
return
}
// Gather all stats using the admin dashboard's method
dashboard := NewAdminDashboard(ma.logger)
allStats := dashboard.gatherAllStats()
if len(allStats) == 0 {
if ma.logger != nil {
ma.logger.Warning(&libpack_logger.LogMessage{
Message: "gatherAllStats returned empty/nil result",
Pairs: map[string]interface{}{
"instance_id": ma.instanceID,
},
})
}
return
}
// Create instance metrics
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
metrics := InstanceMetrics{
InstanceID: ma.instanceID,
Hostname: hostname,
LastUpdate: time.Now(),
UptimeSeconds: time.Since(startTime).Seconds(),
}
// Extract specific sections - CRITICAL: we must set the correct structure
// Stats should contain the inner stats object with requests, cache_summary, etc.
if stats, ok := allStats["stats"].(map[string]interface{}); ok {
metrics.Stats = stats
// Also extract cache summary separately for easier access (deprecated but kept for compatibility)
if cacheSummary, ok := stats["cache_summary"].(map[string]interface{}); ok {
metrics.CacheSummary = cacheSummary
}
} else {
// Fallback: if stats extraction fails, use empty map
if ma.logger != nil {
ma.logger.Error(&libpack_logger.LogMessage{
Message: "Failed to extract stats from allStats - using empty stats",
Pairs: map[string]interface{}{
"instance_id": ma.instanceID,
"allStats_keys": func() []string {
keys := make([]string, 0, len(allStats))
for k := range allStats {
keys = append(keys, k)
}
return keys
}(),
},
})
}
metrics.Stats = make(map[string]interface{})
}
// Extract full cache details (includes memory usage)
if cache, ok := allStats["cache"].(map[string]interface{}); ok {
metrics.Cache = cache
}
if health, ok := allStats["health"].(map[string]interface{}); ok {
metrics.Health = health
} else {
metrics.Health = make(map[string]interface{})
}
if cb, ok := allStats["circuit_breaker"].(map[string]interface{}); ok {
metrics.CircuitBreaker = cb
}
if rb, ok := allStats["retry_budget"].(map[string]interface{}); ok {
metrics.RetryBudget = rb
}
if coal, ok := allStats["coalescing"].(map[string]interface{}); ok {
metrics.Coalescing = coal
}
if ws, ok := allStats["websocket"].(map[string]interface{}); ok {
metrics.WebSocketStats = ws
}
if conn, ok := allStats["connections"].(map[string]interface{}); ok {
metrics.Connections = conn
}
// Marshal to JSON
data, err := json.Marshal(metrics)
if err != nil {
if ma.logger != nil {
ma.logger.Error(&libpack_logger.LogMessage{
Message: "Failed to marshal metrics for Redis",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
return
}
// Store in Redis hash with TTL
key := fmt.Sprintf("%s:%s", ma.publishKey, ma.instanceID)
// Create a fresh context with timeout to avoid inheriting cancelled parent context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
pipe := ma.redisClient.Pipeline()
pipe.Set(ctx, key, data, ma.ttl)
pipe.SAdd(ctx, ma.publishKey, ma.instanceID)
pipe.Expire(ctx, ma.publishKey, ma.ttl*2) // Keep set alive
_, err = pipe.Exec(ctx)
if err != nil {
if ma.logger != nil {
ma.logger.Error(&libpack_logger.LogMessage{
Message: "❌ CRITICAL: Failed to publish metrics to Redis - cluster mode will not work!",
Pairs: map[string]interface{}{
"error": err.Error(),
"instance_id": ma.instanceID,
"key": key,
"redis_key": ma.publishKey,
},
})
}
return
}
}
// removeInstanceMetrics cleans up metrics from Redis on shutdown
func (ma *MetricsAggregator) removeInstanceMetrics() {
// Create a fresh context with timeout for cleanup
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
key := fmt.Sprintf("%s:%s", ma.publishKey, ma.instanceID)
pipe := ma.redisClient.Pipeline()
pipe.Del(ctx, key)
pipe.SRem(ctx, ma.publishKey, ma.instanceID)
_, err := pipe.Exec(ctx)
if err != nil && ma.logger != nil {
ma.logger.Warning(&libpack_logger.LogMessage{
Message: "Failed to remove instance metrics from Redis during shutdown",
Pairs: map[string]interface{}{"instance_id": ma.instanceID, "error": err.Error()},
})
return
}
if ma.logger != nil {
ma.logger.Info(&libpack_logger.LogMessage{
Message: "Removed instance metrics from Redis",
Pairs: map[string]interface{}{"instance_id": ma.instanceID},
})
}
}
// GetAggregatedMetrics retrieves and aggregates metrics from all instances
func (ma *MetricsAggregator) GetAggregatedMetrics() (*AggregatedMetrics, error) {
// Create a fresh context with timeout to avoid inheriting cancelled parent context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Get all instance IDs
instanceIDs, err := ma.redisClient.SMembers(ctx, ma.publishKey).Result()
if err != nil {
return nil, fmt.Errorf("failed to get instance list: %w", err)
}
if len(instanceIDs) == 0 {
return &AggregatedMetrics{
TotalInstances: 0,
HealthyInstances: 0,
LastUpdate: time.Now(),
CombinedStats: make(map[string]interface{}),
Instances: []InstanceMetrics{},
PerInstanceStats: make(map[string]InstanceMetrics),
}, nil
}
// Fetch metrics for all instances
pipe := ma.redisClient.Pipeline()
cmds := make([]*redis.StringCmd, len(instanceIDs))
for i, instanceID := range instanceIDs {
key := fmt.Sprintf("%s:%s", ma.publishKey, instanceID)
cmds[i] = pipe.Get(ctx, key)
}
pipe.Exec(ctx)
// Parse metrics
instances := make([]InstanceMetrics, 0, len(instanceIDs))
perInstance := make(map[string]InstanceMetrics)
healthyCount := 0
staleCount := 0
errorCount := 0
for i, cmd := range cmds {
data, err := cmd.Result()
if err != nil {
errorCount++
// Clean up stale instance ID from the set
if err == redis.Nil {
staleCount++
// Remove stale instance from set in background
go func(instID string) {
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cleanupCancel()
ma.redisClient.SRem(cleanupCtx, ma.publishKey, instID)
}(instanceIDs[i])
}
continue
}
var metrics InstanceMetrics
if err := json.Unmarshal([]byte(data), &metrics); err != nil {
if ma.logger != nil {
ma.logger.Warning(&libpack_logger.LogMessage{
Message: "Failed to unmarshal instance metrics",
Pairs: map[string]interface{}{"error": err.Error()},
})
}
continue
}
// Check if instance is stale (not updated in 1 minute)
instanceAge := time.Since(metrics.LastUpdate)
if instanceAge > 1*time.Minute {
staleCount++
// Clean up stale instance from set in background
go func(instID string, age time.Duration) {
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cleanupCancel()
ma.redisClient.SRem(cleanupCtx, ma.publishKey, instID)
if ma.logger != nil {
ma.logger.Info(&libpack_logger.LogMessage{
Message: "Removed inactive instance",
Pairs: map[string]interface{}{
"instance_id": instID,
"inactive_seconds": age.Seconds(),
},
})
}
}(instanceIDs[i], instanceAge)
continue // Skip stale instances
}
instances = append(instances, metrics)
perInstance[metrics.InstanceID] = metrics
// Count healthy instances
if health, ok := metrics.Health["status"].(string); ok && health == "healthy" {
healthyCount++
}
}
// Log cleanup stats if we found stale instances
if ma.logger != nil && (staleCount > 0 || errorCount > 0) {
ma.logger.Info(&libpack_logger.LogMessage{
Message: "Cleaned up stale instance IDs from Redis",
Pairs: map[string]interface{}{
"total_in_set": len(instanceIDs),
"valid_instances": len(instances),
"stale_cleaned": staleCount,
"errors": errorCount,
},
})
}
// Aggregate statistics
aggregated := &AggregatedMetrics{
TotalInstances: len(instances),
HealthyInstances: healthyCount,
LastUpdate: time.Now(),
CombinedStats: ma.aggregateStats(instances),
Instances: instances,
PerInstanceStats: perInstance,
}
return aggregated, nil
}
// aggregateStats combines statistics from multiple instances
func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[string]interface{} {
if len(instances) == 0 {
if ma.logger != nil {
ma.logger.Warning(&libpack_logger.LogMessage{
Message: "No instances to aggregate",
})
}
return make(map[string]interface{})
}
// Initialize aggregated values
var (
totalRequests int64
totalSucceeded int64
totalFailed int64
totalSkipped int64
totalCacheHits int64
totalCacheMisses int64
totalCachedQueries int64
totalMemoryUsageMB float64
totalCurrentRPS float64
totalAvgRPS float64
totalActiveConnections int64
totalWSConnections int64
totalCoalescedRequests int64
totalPrimaryRequests int64
oldestUptime float64
// Retry budget stats
totalRetryAllowed int64
totalRetryDenied int64
totalRetryAttempts int64
retryBudgetEnabled = false
// Circuit breaker stats
cbOpenCount int
cbHalfOpenCount int
cbClosedCount int
circuitBreakerEnabled = false
)
for idx, instance := range instances {
// Track oldest uptime for cluster uptime
if oldestUptime == 0 || instance.UptimeSeconds < oldestUptime {
oldestUptime = instance.UptimeSeconds
}
// Aggregate request stats
if instance.Stats == nil {
if ma.logger != nil {
ma.logger.Warning(&libpack_logger.LogMessage{
Message: "Instance has nil Stats",
Pairs: map[string]interface{}{
"instance_id": instance.InstanceID,
"index": idx,
},
})
}
continue
}
if stats, ok := instance.Stats["requests"].(map[string]interface{}); ok {
if total, ok := stats["total"].(float64); ok {
totalRequests += int64(total)
}
if succeeded, ok := stats["succeeded"].(float64); ok {
totalSucceeded += int64(succeeded)
}
if failed, ok := stats["failed"].(float64); ok {
totalFailed += int64(failed)
}
if skipped, ok := stats["skipped"].(float64); ok {
totalSkipped += int64(skipped)
}
if currentRPS, ok := stats["current_requests_per_second"].(float64); ok {
totalCurrentRPS += currentRPS
}
if avgRPS, ok := stats["avg_requests_per_second"].(float64); ok {
totalAvgRPS += avgRPS
}
} else {
if ma.logger != nil {
// Log what keys are actually in Stats for debugging
keys := make([]string, 0, len(instance.Stats))
for k := range instance.Stats {
keys = append(keys, k)
}
ma.logger.Warning(&libpack_logger.LogMessage{
Message: "Instance Stats missing 'requests' key",
Pairs: map[string]interface{}{
"instance_id": instance.InstanceID,
"stats_keys": keys,
"index": idx,
},
})
}
}
// Aggregate cache stats from CacheSummary (backward compatibility)
if len(instance.CacheSummary) > 0 {
if hits, ok := instance.CacheSummary["hits"].(float64); ok {
totalCacheHits += int64(hits)
}
if misses, ok := instance.CacheSummary["misses"].(float64); ok {
totalCacheMisses += int64(misses)
}
if cached, ok := instance.CacheSummary["total_cached"].(float64); ok {
totalCachedQueries += int64(cached)
}
}
// Aggregate memory usage from full cache details
if len(instance.Cache) > 0 {
if memMB, ok := instance.Cache["memory_usage_mb"].(float64); ok {
totalMemoryUsageMB += memMB
}
}
// Aggregate connection stats
if len(instance.Connections) > 0 {
if active, ok := instance.Connections["active_connections"].(float64); ok {
totalActiveConnections += int64(active)
}
}
// Aggregate WebSocket connections
if len(instance.WebSocketStats) > 0 {
if active, ok := instance.WebSocketStats["active_connections"].(float64); ok {
totalWSConnections += int64(active)
}
}
// Aggregate coalescing stats
if len(instance.Coalescing) > 0 {
if coalesced, ok := instance.Coalescing["coalesced_requests"].(float64); ok {
totalCoalescedRequests += int64(coalesced)
}
if primary, ok := instance.Coalescing["primary_requests"].(float64); ok {
totalPrimaryRequests += int64(primary)
}
}
// Aggregate retry budget stats
if len(instance.RetryBudget) > 0 {
if enabled, ok := instance.RetryBudget["enabled"].(bool); ok && enabled {
retryBudgetEnabled = true
}
if allowed, ok := instance.RetryBudget["allowed_retries"].(float64); ok {
totalRetryAllowed += int64(allowed)
}
if denied, ok := instance.RetryBudget["denied_retries"].(float64); ok {
totalRetryDenied += int64(denied)
}
if attempts, ok := instance.RetryBudget["total_attempts"].(float64); ok {
totalRetryAttempts += int64(attempts)
}
}
// Aggregate circuit breaker stats
if len(instance.CircuitBreaker) > 0 {
if enabled, ok := instance.CircuitBreaker["enabled"].(bool); ok && enabled {
circuitBreakerEnabled = true
}
if state, ok := instance.CircuitBreaker["state"].(string); ok {
switch state {
case "open":
cbOpenCount++
case "half-open":
cbHalfOpenCount++
case "closed":
cbClosedCount++
}
}
}
}
// Calculate derived metrics
successRate := 0.0
if totalRequests > 0 {
successRate = float64(totalSucceeded) / float64(totalRequests) * 100
}
cacheHitRate := 0.0
totalCacheRequests := totalCacheHits + totalCacheMisses
if totalCacheRequests > 0 {
cacheHitRate = float64(totalCacheHits) / float64(totalCacheRequests) * 100
}
backendSavings := 0.0
totalCoalRequests := totalCoalescedRequests + totalPrimaryRequests
if totalCoalRequests > 0 {
backendSavings = float64(totalCoalescedRequests) / float64(totalCoalRequests) * 100
}
// Calculate retry budget denial rate
retryDenialRate := 0.0
if totalRetryAttempts > 0 {
retryDenialRate = float64(totalRetryDenied) / float64(totalRetryAttempts) * 100
}
// Determine overall circuit breaker state
cbState := "unknown"
if circuitBreakerEnabled {
if cbOpenCount > 0 {
cbState = "open" // If any instance is open, cluster is in degraded state
} else if cbHalfOpenCount > 0 {
cbState = "half-open"
} else if cbClosedCount > 0 {
cbState = "closed"
}
}
result := map[string]interface{}{
"cluster_mode": true,
"total_instances": len(instances),
"cluster_uptime": oldestUptime,
"requests": map[string]interface{}{
"total": totalRequests,
"succeeded": totalSucceeded,
"failed": totalFailed,
"skipped": totalSkipped,
"success_rate_pct": successRate,
"current_requests_per_second": totalCurrentRPS,
"avg_requests_per_second": totalAvgRPS,
},
"cache_summary": map[string]interface{}{
"hits": totalCacheHits,
"misses": totalCacheMisses,
"hit_rate_pct": cacheHitRate,
"total_cached": totalCachedQueries,
},
"memory": map[string]interface{}{
"total_usage_mb": totalMemoryUsageMB,
},
"connections": map[string]interface{}{
"total_active": totalActiveConnections,
},
"websocket": map[string]interface{}{
"total_connections": totalWSConnections,
},
"coalescing": map[string]interface{}{
"enabled": len(instances) > 0, // enabled if we have instances with data
"total_coalesced_requests": totalCoalescedRequests,
"total_primary_requests": totalPrimaryRequests,
"backend_savings_pct": backendSavings,
"coalescing_rate_pct": backendSavings,
},
"retry_budget": map[string]interface{}{
"enabled": retryBudgetEnabled,
"allowed_retries": totalRetryAllowed,
"denied_retries": totalRetryDenied,
"total_attempts": totalRetryAttempts,
"denial_rate_pct": retryDenialRate,
},
"circuit_breaker": map[string]interface{}{
"enabled": circuitBreakerEnabled,
"state": cbState,
"instances_open": cbOpenCount,
"instances_closed": cbClosedCount,
"instances_halfopen": cbHalfOpenCount,
},
}
return result
}
// Shutdown stops the metrics aggregator
func (ma *MetricsAggregator) Shutdown() {
ma.mu.Lock()
defer ma.mu.Unlock()
if ma.cancel != nil {
ma.cancel()
}
if ma.redisClient != nil {
ma.redisClient.Close()
}
if ma.logger != nil {
ma.logger.Info(&libpack_logger.LogMessage{
Message: "Metrics aggregator shut down",
})
}
}
// GetInstanceID returns the current instance ID
func (ma *MetricsAggregator) GetInstanceID() string {
return ma.instanceID
}
// IsClusterMode returns true if multiple instances are detected
func (ma *MetricsAggregator) IsClusterMode() bool {
// Create a fresh context with timeout to avoid inheriting cancelled parent context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
count, err := ma.redisClient.SCard(ctx, ma.publishKey).Result()
if err != nil {
return false
}
return count > 1
}
// GetInstanceHostname returns a human-readable instance identifier
func GetInstanceHostname() string {
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
// Remove domain suffix for cleaner display
if idx := strings.Index(hostname, "."); idx > 0 {
hostname = hostname[:idx]
}
return hostname
}