mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-24 04:31:09 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 63e2e46578 | |||
| e3e9f7d181 | |||
| 0fc776228f |
+895
-34
@@ -4,6 +4,7 @@
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>GraphQL Proxy Admin Dashboard</title>
|
||||
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
|
||||
<style>
|
||||
* {
|
||||
margin: 0;
|
||||
@@ -211,28 +212,318 @@
|
||||
.loading {
|
||||
animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite;
|
||||
}
|
||||
|
||||
.chart-container {
|
||||
position: relative;
|
||||
height: 300px;
|
||||
margin-top: 20px;
|
||||
}
|
||||
|
||||
.progress-bar {
|
||||
width: 100%;
|
||||
height: 8px;
|
||||
background: #f0f0f0;
|
||||
border-radius: 4px;
|
||||
overflow: hidden;
|
||||
margin-top: 8px;
|
||||
}
|
||||
|
||||
.progress-fill {
|
||||
height: 100%;
|
||||
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
|
||||
transition: width 0.3s ease;
|
||||
}
|
||||
|
||||
.ws-status {
|
||||
display: inline-block;
|
||||
padding: 4px 8px;
|
||||
border-radius: 4px;
|
||||
font-size: 0.75em;
|
||||
font-weight: 600;
|
||||
margin-left: 10px;
|
||||
}
|
||||
|
||||
.ws-connected {
|
||||
background: #d1fae5;
|
||||
color: #065f46;
|
||||
}
|
||||
|
||||
.ws-disconnected {
|
||||
background: #fee2e2;
|
||||
color: #991b1b;
|
||||
}
|
||||
|
||||
.cluster-toggle-container {
|
||||
background: rgba(255, 255, 255, 0.15);
|
||||
padding: 12px 20px;
|
||||
border-radius: 8px;
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 12px;
|
||||
backdrop-filter: blur(10px);
|
||||
border: 1px solid rgba(255, 255, 255, 0.2);
|
||||
cursor: pointer;
|
||||
transition: all 0.3s ease;
|
||||
}
|
||||
|
||||
.cluster-toggle-container:hover {
|
||||
background: rgba(255, 255, 255, 0.25);
|
||||
transform: translateY(-1px);
|
||||
}
|
||||
|
||||
.toggle-switch {
|
||||
position: relative;
|
||||
width: 48px;
|
||||
height: 24px;
|
||||
background: rgba(255, 255, 255, 0.3);
|
||||
border-radius: 12px;
|
||||
transition: background 0.3s ease;
|
||||
cursor: pointer;
|
||||
}
|
||||
|
||||
.toggle-switch::after {
|
||||
content: '';
|
||||
position: absolute;
|
||||
top: 2px;
|
||||
left: 2px;
|
||||
width: 20px;
|
||||
height: 20px;
|
||||
background: white;
|
||||
border-radius: 50%;
|
||||
transition: transform 0.3s ease;
|
||||
box-shadow: 0 2px 4px rgba(0,0,0,0.2);
|
||||
}
|
||||
|
||||
#cluster-mode-toggle {
|
||||
display: none;
|
||||
}
|
||||
|
||||
#cluster-mode-toggle:checked + .cluster-toggle-container .toggle-switch {
|
||||
background: #10b981;
|
||||
}
|
||||
|
||||
#cluster-mode-toggle:checked + .cluster-toggle-container .toggle-switch::after {
|
||||
transform: translateX(24px);
|
||||
}
|
||||
|
||||
#cluster-mode-toggle:disabled + .cluster-toggle-container {
|
||||
opacity: 0.5;
|
||||
cursor: not-allowed;
|
||||
}
|
||||
|
||||
#cluster-mode-toggle:disabled + .cluster-toggle-container:hover {
|
||||
background: rgba(255, 255, 255, 0.15);
|
||||
transform: none;
|
||||
}
|
||||
|
||||
.cluster-toggle-label {
|
||||
font-size: 0.95em;
|
||||
font-weight: 600;
|
||||
color: white;
|
||||
letter-spacing: 0.3px;
|
||||
user-select: none;
|
||||
}
|
||||
|
||||
.cluster-toggle-info {
|
||||
font-size: 0.8em;
|
||||
opacity: 0.9;
|
||||
color: white;
|
||||
font-weight: 400;
|
||||
}
|
||||
|
||||
.instance-card {
|
||||
background: white;
|
||||
border-radius: 12px;
|
||||
padding: 20px;
|
||||
margin-bottom: 15px;
|
||||
box-shadow: 0 2px 8px rgba(0,0,0,0.08);
|
||||
}
|
||||
|
||||
.instance-header {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
margin-bottom: 15px;
|
||||
padding-bottom: 15px;
|
||||
border-bottom: 1px solid #f0f0f0;
|
||||
}
|
||||
|
||||
.instance-title {
|
||||
font-size: 1.1em;
|
||||
font-weight: 600;
|
||||
color: #333;
|
||||
}
|
||||
|
||||
.instance-grid {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
|
||||
gap: 15px;
|
||||
}
|
||||
|
||||
.instance-metric {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
}
|
||||
|
||||
.instance-metric-label {
|
||||
font-size: 0.8em;
|
||||
color: #666;
|
||||
margin-bottom: 4px;
|
||||
}
|
||||
|
||||
.instance-metric-value {
|
||||
font-size: 1.1em;
|
||||
font-weight: 600;
|
||||
color: #333;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<header>
|
||||
<div class="container">
|
||||
<h1>GraphQL Proxy Admin Dashboard</h1>
|
||||
<div class="subtitle">Real-time monitoring and management</div>
|
||||
<div style="display: flex; justify-content: space-between; align-items: center; flex-wrap: wrap; gap: 15px;">
|
||||
<div>
|
||||
<h1>GraphQL Proxy Admin Dashboard</h1>
|
||||
<div class="subtitle">
|
||||
Real-time monitoring and management
|
||||
<span class="ws-status ws-disconnected" id="ws-status">Connecting...</span>
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<input type="checkbox" id="cluster-mode-toggle">
|
||||
<label for="cluster-mode-toggle" class="cluster-toggle-container">
|
||||
<div class="toggle-switch"></div>
|
||||
<div>
|
||||
<div class="cluster-toggle-label">Cluster View</div>
|
||||
<div class="cluster-toggle-info" id="cluster-info">Checking availability...</div>
|
||||
</div>
|
||||
</label>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</header>
|
||||
|
||||
<div class="container">
|
||||
<!-- Cluster Status (shown when cluster mode detected) -->
|
||||
<div id="cluster-status-section" style="display: none;">
|
||||
<h2 class="section-title">Cluster Status</h2>
|
||||
<div class="stats-grid">
|
||||
<div class="card">
|
||||
<div class="card-title">Total Instances</div>
|
||||
<div class="card-value" id="cluster-total-instances">--</div>
|
||||
<div class="card-label">Proxy nodes</div>
|
||||
</div>
|
||||
<div class="card">
|
||||
<div class="card-title">Healthy Instances</div>
|
||||
<div class="card-value" id="cluster-healthy-instances">--</div>
|
||||
<div class="card-label">Active nodes</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- System Overview -->
|
||||
<h2 class="section-title">
|
||||
<span id="overview-title">System Overview</span>
|
||||
</h2>
|
||||
<div class="stats-grid">
|
||||
<div class="card">
|
||||
<div class="card-title">Uptime</div>
|
||||
<div class="card-value" id="uptime">--</div>
|
||||
<div class="card-label" id="uptime-seconds">-- seconds</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<div class="card-title">Total Requests</div>
|
||||
<div class="card-value" id="total-requests">--</div>
|
||||
<div class="card-label">
|
||||
<span style="color: #10b981;">✓ <span id="succeeded-requests">--</span></span>
|
||||
<span style="color: #ef4444;">✗ <span id="failed-requests">--</span></span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<div class="card-title">Current RPS</div>
|
||||
<div class="card-value" id="current-rps">--</div>
|
||||
<div class="card-label">Avg: <span id="avg-rps">--</span> req/s</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<div class="card-title">Success Rate</div>
|
||||
<div class="card-value" id="success-rate">--%</div>
|
||||
<div class="progress-bar">
|
||||
<div class="progress-fill" id="success-progress" style="width: 0%"></div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Cache Statistics -->
|
||||
<h2 class="section-title">Cache Performance</h2>
|
||||
<div class="stats-grid">
|
||||
<div class="card">
|
||||
<div class="card-title">Cache Hit Rate</div>
|
||||
<div class="card-value" id="cache-hit-rate">--%</div>
|
||||
<div class="progress-bar">
|
||||
<div class="progress-fill" id="cache-hit-progress" style="width: 0%"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<div class="card-title">Cache Hits / Misses</div>
|
||||
<div class="card-value" id="cache-hits">--</div>
|
||||
<div class="card-label">
|
||||
Hits: <span id="cache-hits-detail">--</span> |
|
||||
Misses: <span id="cache-misses">--</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<div class="card-title">Cached Queries</div>
|
||||
<div class="card-value" id="cached-queries">--</div>
|
||||
<div class="card-label">Total entries</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<div class="card-title">Memory Usage</div>
|
||||
<div class="card-value" id="cache-memory">-- MB</div>
|
||||
<div class="card-label" id="cache-memory-pct">--%</div>
|
||||
<div class="progress-bar">
|
||||
<div class="progress-fill" id="memory-progress" style="width: 0%"></div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Real-time Charts -->
|
||||
<h2 class="section-title">Real-time Metrics</h2>
|
||||
<div class="stats-grid">
|
||||
<div class="card">
|
||||
<div class="card-title">Requests Per Second</div>
|
||||
<div class="chart-container">
|
||||
<canvas id="rps-chart"></canvas>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<div class="card-title">Cache Hit Rate Over Time</div>
|
||||
<div class="chart-container">
|
||||
<canvas id="cache-chart"></canvas>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Health Status -->
|
||||
<h2 class="section-title">Health Status</h2>
|
||||
<div class="card" id="health-card">
|
||||
<div class="card-title">System Health</div>
|
||||
<div>
|
||||
<span class="status-indicator status-unknown loading" id="health-indicator"></span>
|
||||
<span id="health-status">Loading...</span>
|
||||
<div class="metric-row">
|
||||
<span class="metric-label">Backend Status</span>
|
||||
<span>
|
||||
<span class="status-indicator status-unknown loading" id="health-indicator"></span>
|
||||
<span id="health-status">Loading...</span>
|
||||
</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Key Metrics -->
|
||||
<h2 class="section-title">Key Metrics</h2>
|
||||
<h2 class="section-title">Advanced Features</h2>
|
||||
<div class="stats-grid">
|
||||
<div class="card">
|
||||
<div class="card-title">Request Coalescing</div>
|
||||
@@ -334,44 +625,558 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="refresh-info">
|
||||
Dashboard refreshes every 5 seconds
|
||||
<!-- Instance Details (shown in cluster mode) -->
|
||||
<div id="instance-details-section" style="display: none;">
|
||||
<h2 class="section-title">Instance Details</h2>
|
||||
<div id="instance-list"></div>
|
||||
</div>
|
||||
|
||||
<div class="refresh-info" id="refresh-info">
|
||||
<span id="connection-mode">Connecting to real-time updates...</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script>
|
||||
// Fetch and update dashboard data
|
||||
// Chart instances
|
||||
let rpsChart, cacheChart;
|
||||
let rpsData = { labels: [], data: [] };
|
||||
let cacheData = { labels: [], data: [] };
|
||||
const MAX_DATA_POINTS = 60; // Keep last 60 data points
|
||||
|
||||
// WebSocket connection
|
||||
let ws = null;
|
||||
let wsReconnectInterval = null;
|
||||
let useWebSocket = true;
|
||||
|
||||
// Cluster mode state
|
||||
let clusterModeEnabled = false;
|
||||
let clusterModeAvailable = false;
|
||||
|
||||
// Smoothing buffers for metrics (10-second moving average at 2s intervals = 5 data points)
|
||||
const smoothingWindow = 5;
|
||||
const rpsBuffer = [];
|
||||
const successRateBuffer = [];
|
||||
const cacheHitRateBuffer = [];
|
||||
|
||||
// Initialize charts
|
||||
function initCharts() {
|
||||
const rpsCtx = document.getElementById('rps-chart').getContext('2d');
|
||||
rpsChart = new Chart(rpsCtx, {
|
||||
type: 'line',
|
||||
data: {
|
||||
labels: [],
|
||||
datasets: [{
|
||||
label: 'Requests/sec',
|
||||
data: [],
|
||||
borderColor: '#667eea',
|
||||
backgroundColor: 'rgba(102, 126, 234, 0.1)',
|
||||
tension: 0.4,
|
||||
fill: true
|
||||
}]
|
||||
},
|
||||
options: {
|
||||
responsive: true,
|
||||
maintainAspectRatio: false,
|
||||
plugins: {
|
||||
legend: { display: false }
|
||||
},
|
||||
scales: {
|
||||
y: {
|
||||
beginAtZero: true,
|
||||
ticks: { precision: 0 }
|
||||
},
|
||||
x: {
|
||||
display: false
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const cacheCtx = document.getElementById('cache-chart').getContext('2d');
|
||||
cacheChart = new Chart(cacheCtx, {
|
||||
type: 'line',
|
||||
data: {
|
||||
labels: [],
|
||||
datasets: [{
|
||||
label: 'Hit Rate %',
|
||||
data: [],
|
||||
borderColor: '#10b981',
|
||||
backgroundColor: 'rgba(16, 185, 129, 0.1)',
|
||||
tension: 0.4,
|
||||
fill: true
|
||||
}]
|
||||
},
|
||||
options: {
|
||||
responsive: true,
|
||||
maintainAspectRatio: false,
|
||||
plugins: {
|
||||
legend: { display: false }
|
||||
},
|
||||
scales: {
|
||||
y: {
|
||||
beginAtZero: true,
|
||||
max: 100,
|
||||
ticks: {
|
||||
callback: function(value) {
|
||||
return value + '%';
|
||||
}
|
||||
}
|
||||
},
|
||||
x: {
|
||||
display: false
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Update chart data
|
||||
function updateChart(chart, dataStore, value) {
|
||||
const timestamp = new Date().toLocaleTimeString();
|
||||
|
||||
dataStore.labels.push(timestamp);
|
||||
dataStore.data.push(value);
|
||||
|
||||
// Keep only last MAX_DATA_POINTS
|
||||
if (dataStore.labels.length > MAX_DATA_POINTS) {
|
||||
dataStore.labels.shift();
|
||||
dataStore.data.shift();
|
||||
}
|
||||
|
||||
chart.data.labels = dataStore.labels;
|
||||
chart.data.datasets[0].data = dataStore.data;
|
||||
chart.update('none'); // Update without animation for smoother real-time updates
|
||||
}
|
||||
|
||||
// Connect to WebSocket
|
||||
function connectWebSocket() {
|
||||
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
const wsUrl = `${protocol}//${window.location.host}/admin/ws/stats`;
|
||||
|
||||
try {
|
||||
ws = new WebSocket(wsUrl);
|
||||
|
||||
ws.onopen = () => {
|
||||
updateWSStatus(true);
|
||||
if (wsReconnectInterval) {
|
||||
clearInterval(wsReconnectInterval);
|
||||
wsReconnectInterval = null;
|
||||
}
|
||||
};
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const data = JSON.parse(event.data);
|
||||
updateAllStats(data);
|
||||
} catch (error) {
|
||||
console.error('Failed to parse WebSocket message:', error);
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = (error) => {
|
||||
console.error('WebSocket error:', error);
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
updateWSStatus(false);
|
||||
// Try to reconnect after 5 seconds
|
||||
if (!wsReconnectInterval) {
|
||||
wsReconnectInterval = setInterval(() => {
|
||||
connectWebSocket();
|
||||
}, 5000);
|
||||
}
|
||||
};
|
||||
} catch (error) {
|
||||
console.error('Failed to create WebSocket:', error);
|
||||
updateWSStatus(false);
|
||||
// Fall back to polling if WebSocket fails
|
||||
useWebSocket = false;
|
||||
startPolling();
|
||||
}
|
||||
}
|
||||
|
||||
// Update WebSocket status indicator
|
||||
function updateWSStatus(connected) {
|
||||
const statusEl = document.getElementById('ws-status');
|
||||
const infoEl = document.getElementById('connection-mode');
|
||||
|
||||
if (connected) {
|
||||
statusEl.className = 'ws-status ws-connected';
|
||||
statusEl.textContent = 'Live';
|
||||
infoEl.textContent = 'Real-time updates via WebSocket';
|
||||
} else {
|
||||
statusEl.className = 'ws-status ws-disconnected';
|
||||
statusEl.textContent = 'Reconnecting...';
|
||||
infoEl.textContent = 'Attempting to reconnect...';
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: Fetch and update dashboard data via polling
|
||||
async function updateDashboard() {
|
||||
try {
|
||||
// Update health
|
||||
const health = await fetch('/admin/api/health').then(r => r.json());
|
||||
updateHealth(health);
|
||||
if (clusterModeEnabled) {
|
||||
// Fetch cluster stats
|
||||
const [clusterStats, instances] = await Promise.all([
|
||||
fetch('/admin/api/cluster/stats').then(r => r.json()),
|
||||
fetch('/admin/api/cluster/instances').then(r => r.json())
|
||||
]);
|
||||
|
||||
// Update circuit breaker
|
||||
const cb = await fetch('/admin/api/circuit-breaker').then(r => r.json());
|
||||
updateCircuitBreaker(cb);
|
||||
if (clusterStats.cluster_mode) {
|
||||
updateClusterStats(clusterStats, instances);
|
||||
}
|
||||
} else {
|
||||
// Fetch all stats for single instance
|
||||
const [stats, health, cb, cache, coalescing, retryBudget, wsStats, connections] = await Promise.all([
|
||||
fetch('/admin/api/stats').then(r => r.json()),
|
||||
fetch('/admin/api/health').then(r => r.json()),
|
||||
fetch('/admin/api/circuit-breaker').then(r => r.json()),
|
||||
fetch('/admin/api/cache').then(r => r.json()),
|
||||
fetch('/admin/api/coalescing').then(r => r.json()),
|
||||
fetch('/admin/api/retry-budget').then(r => r.json()),
|
||||
fetch('/admin/api/websocket').then(r => r.json()),
|
||||
fetch('/admin/api/connections').then(r => r.json())
|
||||
]);
|
||||
|
||||
// Update coalescing
|
||||
const coalescing = await fetch('/admin/api/coalescing').then(r => r.json());
|
||||
updateCoalescing(coalescing);
|
||||
const allData = {
|
||||
stats,
|
||||
health,
|
||||
circuit_breaker: cb,
|
||||
cache,
|
||||
coalescing,
|
||||
retry_budget: retryBudget,
|
||||
websocket: wsStats,
|
||||
connections
|
||||
};
|
||||
|
||||
// Update retry budget
|
||||
const retryBudget = await fetch('/admin/api/retry-budget').then(r => r.json());
|
||||
updateRetryBudget(retryBudget);
|
||||
|
||||
// Update WebSocket
|
||||
const ws = await fetch('/admin/api/websocket').then(r => r.json());
|
||||
updateWebSocket(ws);
|
||||
|
||||
// Update connections
|
||||
const connections = await fetch('/admin/api/connections').then(r => r.json());
|
||||
updateConnections(connections);
|
||||
updateAllStats(allData);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('Failed to update dashboard:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if cluster mode is available
|
||||
async function checkClusterMode() {
|
||||
try {
|
||||
const response = await fetch('/admin/api/cluster/stats');
|
||||
const data = await response.json();
|
||||
|
||||
if (response.ok && data.cluster_mode) {
|
||||
clusterModeAvailable = true;
|
||||
document.getElementById('cluster-info').textContent =
|
||||
`(${data.total_instances} instance${data.total_instances !== 1 ? 's' : ''} available)`;
|
||||
return true;
|
||||
}
|
||||
} catch (error) {
|
||||
// Cluster mode not available
|
||||
}
|
||||
|
||||
clusterModeAvailable = false;
|
||||
document.getElementById('cluster-info').textContent = '(not available)';
|
||||
document.getElementById('cluster-mode-toggle').disabled = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Update cluster statistics
|
||||
function updateClusterStats(clusterData, instancesData) {
|
||||
// Show cluster sections
|
||||
document.getElementById('cluster-status-section').style.display = 'block';
|
||||
document.getElementById('instance-details-section').style.display = 'block';
|
||||
document.getElementById('overview-title').textContent = 'Cluster Overview';
|
||||
|
||||
// Update cluster status
|
||||
document.getElementById('cluster-total-instances').textContent = clusterData.total_instances || 0;
|
||||
document.getElementById('cluster-healthy-instances').textContent = clusterData.healthy_instances || 0;
|
||||
|
||||
// Update cluster info in toggle label
|
||||
const totalInstances = clusterData.total_instances || 0;
|
||||
document.getElementById('cluster-info').textContent =
|
||||
`(${totalInstances} instance${totalInstances !== 1 ? 's' : ''} available)`;
|
||||
|
||||
// Update combined stats
|
||||
if (clusterData.stats) {
|
||||
updateStats({ ...clusterData.stats, cluster_mode: true });
|
||||
|
||||
// Update memory usage from cluster stats
|
||||
if (clusterData.stats.memory) {
|
||||
updateCacheStats({
|
||||
memory_usage_mb: clusterData.stats.memory.total_usage_mb || 0
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Update instance list
|
||||
if (instancesData && instancesData.instances) {
|
||||
updateInstanceList(instancesData.instances, instancesData.current_instance);
|
||||
}
|
||||
}
|
||||
|
||||
// Update individual instance list
|
||||
function updateInstanceList(instances, currentInstanceID) {
|
||||
const container = document.getElementById('instance-list');
|
||||
container.innerHTML = '';
|
||||
|
||||
instances.forEach(instance => {
|
||||
// Use JSON field names (snake_case), not Go struct names (PascalCase)
|
||||
const isCurrent = instance.instance_id === currentInstanceID;
|
||||
const isHealthy = instance.health && instance.health.status === 'healthy';
|
||||
|
||||
const card = document.createElement('div');
|
||||
card.className = 'instance-card';
|
||||
|
||||
const header = document.createElement('div');
|
||||
header.className = 'instance-header';
|
||||
|
||||
const title = document.createElement('div');
|
||||
title.className = 'instance-title';
|
||||
title.innerHTML = `
|
||||
<span class="status-indicator ${isHealthy ? 'status-healthy' : 'status-unhealthy'}"></span>
|
||||
${instance.hostname || 'unknown'}
|
||||
${isCurrent ? '<span class="badge badge-info" style="margin-left: 8px;">Current</span>' : ''}
|
||||
`;
|
||||
|
||||
const uptime = document.createElement('span');
|
||||
uptime.style.fontSize = '0.85em';
|
||||
uptime.style.color = '#666';
|
||||
uptime.textContent = `Uptime: ${formatUptime(instance.uptime_seconds || 0)}`;
|
||||
|
||||
header.appendChild(title);
|
||||
header.appendChild(uptime);
|
||||
|
||||
const grid = document.createElement('div');
|
||||
grid.className = 'instance-grid';
|
||||
|
||||
// Extract stats - use JSON field names
|
||||
const stats = instance.stats || {};
|
||||
const requests = stats.requests || {};
|
||||
const cache = instance.cache_summary || stats.cache_summary || {};
|
||||
|
||||
const failedCount = requests.failed || 0;
|
||||
const totalCount = requests.total || 0;
|
||||
const failureInfo = failedCount > 0 ? ` (${failedCount} failed)` : '';
|
||||
|
||||
const metrics = [
|
||||
{
|
||||
label: 'Total Requests',
|
||||
value: formatNumber(totalCount),
|
||||
title: totalCount.toLocaleString() + ' total requests' + failureInfo
|
||||
},
|
||||
{
|
||||
label: 'Success Rate',
|
||||
value: (requests.success_rate_pct || 0).toFixed(1) + '%',
|
||||
title: null
|
||||
},
|
||||
{
|
||||
label: 'Current RPS',
|
||||
value: (requests.current_requests_per_second || 0).toFixed(1),
|
||||
title: null
|
||||
},
|
||||
{
|
||||
label: 'Cache Hit Rate',
|
||||
value: (cache.hit_rate_pct || 0).toFixed(1) + '%',
|
||||
title: null
|
||||
}
|
||||
];
|
||||
|
||||
metrics.forEach(metric => {
|
||||
const metricDiv = document.createElement('div');
|
||||
metricDiv.className = 'instance-metric';
|
||||
if (metric.title) {
|
||||
metricDiv.title = metric.title;
|
||||
}
|
||||
metricDiv.innerHTML = `
|
||||
<div class="instance-metric-label">${metric.label}</div>
|
||||
<div class="instance-metric-value">${metric.value}</div>
|
||||
`;
|
||||
grid.appendChild(metricDiv);
|
||||
});
|
||||
|
||||
card.appendChild(header);
|
||||
card.appendChild(grid);
|
||||
container.appendChild(card);
|
||||
});
|
||||
}
|
||||
|
||||
// Format uptime in human readable format
|
||||
function formatUptime(seconds) {
|
||||
const days = Math.floor(seconds / 86400);
|
||||
const hours = Math.floor((seconds % 86400) / 3600);
|
||||
const minutes = Math.floor((seconds % 3600) / 60);
|
||||
|
||||
if (days > 0) return `${days}d ${hours}h`;
|
||||
if (hours > 0) return `${hours}h ${minutes}m`;
|
||||
return `${minutes}m`;
|
||||
}
|
||||
|
||||
// Format large numbers compactly (1.2M, 3.4K, etc)
|
||||
function formatNumber(num) {
|
||||
if (num === undefined || num === null) return '0';
|
||||
|
||||
const absNum = Math.abs(num);
|
||||
if (absNum >= 1000000000) {
|
||||
return (num / 1000000000).toFixed(1) + 'B';
|
||||
}
|
||||
if (absNum >= 1000000) {
|
||||
return (num / 1000000).toFixed(1) + 'M';
|
||||
}
|
||||
if (absNum >= 1000) {
|
||||
return (num / 1000).toFixed(1) + 'K';
|
||||
}
|
||||
return num.toLocaleString();
|
||||
}
|
||||
|
||||
// Smooth metric values using moving average
|
||||
function smoothMetric(buffer, newValue) {
|
||||
buffer.push(newValue);
|
||||
if (buffer.length > smoothingWindow) {
|
||||
buffer.shift(); // Remove oldest value
|
||||
}
|
||||
// Calculate average
|
||||
const sum = buffer.reduce((a, b) => a + b, 0);
|
||||
return sum / buffer.length;
|
||||
}
|
||||
|
||||
// Get trend indicator (↑ ↗ → ↘ ↓)
|
||||
function getTrendIndicator(buffer) {
|
||||
if (buffer.length < 2) return '→';
|
||||
const recent = buffer.slice(-3); // Last 3 values
|
||||
const avg = recent.reduce((a, b) => a + b, 0) / recent.length;
|
||||
const diff = recent[recent.length - 1] - recent[0];
|
||||
const percentChange = Math.abs(diff / (avg || 1)) * 100;
|
||||
|
||||
if (percentChange < 5) return '→'; // Stable
|
||||
if (diff > 0) {
|
||||
return percentChange > 15 ? '↑' : '↗'; // Strong/moderate increase
|
||||
} else {
|
||||
return percentChange > 15 ? '↓' : '↘'; // Strong/moderate decrease
|
||||
}
|
||||
}
|
||||
|
||||
// Update all statistics
|
||||
function updateAllStats(data) {
|
||||
if (data.stats) updateStats(data.stats);
|
||||
if (data.health) updateHealth(data.health);
|
||||
if (data.circuit_breaker) updateCircuitBreaker(data.circuit_breaker);
|
||||
if (data.cache) updateCacheStats(data.cache);
|
||||
if (data.coalescing) updateCoalescing(data.coalescing);
|
||||
if (data.retry_budget) updateRetryBudget(data.retry_budget);
|
||||
if (data.websocket) updateWebSocket(data.websocket);
|
||||
if (data.connections) updateConnections(data.connections);
|
||||
}
|
||||
|
||||
// Update main stats
|
||||
function updateStats(data) {
|
||||
// Uptime
|
||||
document.getElementById('uptime').textContent = data.uptime_human || '--';
|
||||
document.getElementById('uptime-seconds').textContent =
|
||||
(data.uptime_seconds || 0).toFixed(0) + ' seconds';
|
||||
|
||||
if (data.requests) {
|
||||
const req = data.requests;
|
||||
|
||||
// Total requests with compact formatting
|
||||
document.getElementById('total-requests').textContent = formatNumber(req.total || 0);
|
||||
document.getElementById('total-requests').title = (req.total || 0).toLocaleString() + ' total requests';
|
||||
|
||||
document.getElementById('succeeded-requests').textContent = formatNumber(req.succeeded || 0);
|
||||
document.getElementById('succeeded-requests').title = (req.succeeded || 0).toLocaleString() + ' succeeded';
|
||||
|
||||
document.getElementById('failed-requests').textContent = formatNumber(req.failed || 0);
|
||||
document.getElementById('failed-requests').title = (req.failed || 0).toLocaleString() + ' failed';
|
||||
|
||||
// Show failure details if there are failures
|
||||
if (req.failed > 0) {
|
||||
const failureRate = (req.failed / (req.total || 1) * 100).toFixed(2);
|
||||
document.getElementById('failed-requests').title += ` (${failureRate}% failure rate)`;
|
||||
}
|
||||
|
||||
// Success rate with smoothing
|
||||
const rawSuccessRate = req.success_rate_pct || 0;
|
||||
const smoothedSuccessRate = smoothMetric(successRateBuffer, rawSuccessRate);
|
||||
const successTrend = getTrendIndicator(successRateBuffer);
|
||||
|
||||
document.getElementById('success-rate').textContent =
|
||||
smoothedSuccessRate.toFixed(1) + '% ' + successTrend;
|
||||
document.getElementById('success-rate').title =
|
||||
`10s avg: ${smoothedSuccessRate.toFixed(2)}% | Current: ${rawSuccessRate.toFixed(2)}%`;
|
||||
document.getElementById('success-progress').style.width =
|
||||
smoothedSuccessRate + '%';
|
||||
|
||||
// RPS with smoothing
|
||||
const rawRPS = req.current_requests_per_second || 0;
|
||||
const smoothedRPS = smoothMetric(rpsBuffer, rawRPS);
|
||||
const rpsTrend = getTrendIndicator(rpsBuffer);
|
||||
|
||||
document.getElementById('current-rps').textContent =
|
||||
smoothedRPS.toFixed(1) + ' ' + rpsTrend;
|
||||
document.getElementById('current-rps').title =
|
||||
`10s avg: ${smoothedRPS.toFixed(2)} | Current: ${rawRPS.toFixed(2)}`;
|
||||
|
||||
document.getElementById('avg-rps').textContent =
|
||||
(req.avg_requests_per_second || 0).toFixed(1);
|
||||
|
||||
// Update RPS chart with smoothed value
|
||||
updateChart(rpsChart, rpsData, smoothedRPS);
|
||||
}
|
||||
|
||||
// Cache summary with smoothing
|
||||
if (data.cache_summary) {
|
||||
const cache = data.cache_summary;
|
||||
|
||||
const rawHitRate = cache.hit_rate_pct || 0;
|
||||
const smoothedHitRate = smoothMetric(cacheHitRateBuffer, rawHitRate);
|
||||
const hitRateTrend = getTrendIndicator(cacheHitRateBuffer);
|
||||
|
||||
document.getElementById('cache-hit-rate').textContent =
|
||||
smoothedHitRate.toFixed(1) + '% ' + hitRateTrend;
|
||||
document.getElementById('cache-hit-rate').title =
|
||||
`10s avg: ${smoothedHitRate.toFixed(2)}% | Current: ${rawHitRate.toFixed(2)}%`;
|
||||
document.getElementById('cache-hit-progress').style.width =
|
||||
smoothedHitRate + '%';
|
||||
|
||||
document.getElementById('cache-hits').textContent = formatNumber(cache.hits || 0);
|
||||
document.getElementById('cache-hits').title = (cache.hits || 0).toLocaleString() + ' cache hits';
|
||||
|
||||
document.getElementById('cache-hits-detail').textContent = formatNumber(cache.hits || 0);
|
||||
document.getElementById('cache-hits-detail').title = (cache.hits || 0).toLocaleString();
|
||||
|
||||
document.getElementById('cache-misses').textContent = formatNumber(cache.misses || 0);
|
||||
document.getElementById('cache-misses').title = (cache.misses || 0).toLocaleString() + ' cache misses';
|
||||
|
||||
document.getElementById('cached-queries').textContent = formatNumber(cache.total_cached || 0);
|
||||
document.getElementById('cached-queries').title = (cache.total_cached || 0).toLocaleString() + ' unique queries cached';
|
||||
|
||||
// Update cache hit rate chart with smoothed value
|
||||
updateChart(cacheChart, cacheData, smoothedHitRate);
|
||||
}
|
||||
}
|
||||
|
||||
// Update cache detailed stats
|
||||
function updateCacheStats(data) {
|
||||
if (data.memory_usage_mb !== undefined) {
|
||||
if (data.memory_usage_mb === -1) {
|
||||
// Redis cache - memory tracking not available
|
||||
document.getElementById('cache-memory').textContent = 'N/A';
|
||||
document.getElementById('cache-memory').title = 'Memory tracking not available for Redis cache';
|
||||
document.getElementById('cache-memory-pct').textContent = 'Redis cache';
|
||||
document.getElementById('memory-progress').style.width = '0%';
|
||||
} else {
|
||||
document.getElementById('cache-memory').textContent =
|
||||
data.memory_usage_mb.toFixed(2) + ' MB';
|
||||
document.getElementById('cache-memory').title = 'In-memory cache usage';
|
||||
|
||||
if (data.memory_usage_pct !== undefined) {
|
||||
const memPct = data.memory_usage_pct;
|
||||
document.getElementById('cache-memory-pct').textContent =
|
||||
memPct.toFixed(1) + '% used';
|
||||
document.getElementById('memory-progress').style.width =
|
||||
memPct + '%';
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function updateHealth(data) {
|
||||
const indicator = document.getElementById('health-indicator');
|
||||
const status = document.getElementById('health-status');
|
||||
@@ -465,11 +1270,67 @@
|
||||
}
|
||||
}
|
||||
|
||||
// Initial load
|
||||
updateDashboard();
|
||||
// Start polling (fallback when WebSocket is not available)
|
||||
function startPolling() {
|
||||
const statusEl = document.getElementById('ws-status');
|
||||
const infoEl = document.getElementById('connection-mode');
|
||||
|
||||
// Refresh every 5 seconds
|
||||
setInterval(updateDashboard, 5000);
|
||||
statusEl.className = 'ws-status ws-disconnected';
|
||||
statusEl.textContent = 'Polling';
|
||||
infoEl.textContent = 'Updates every 5 seconds (WebSocket unavailable)';
|
||||
|
||||
// Initial load
|
||||
updateDashboard();
|
||||
|
||||
// Refresh every 5 seconds
|
||||
setInterval(updateDashboard, 5000);
|
||||
}
|
||||
|
||||
// Initialize dashboard
|
||||
async function initDashboard() {
|
||||
// Initialize charts first
|
||||
initCharts();
|
||||
|
||||
// Check if cluster mode is available
|
||||
await checkClusterMode();
|
||||
|
||||
// Setup cluster mode toggle
|
||||
const toggle = document.getElementById('cluster-mode-toggle');
|
||||
toggle.addEventListener('change', (e) => {
|
||||
clusterModeEnabled = e.target.checked && clusterModeAvailable;
|
||||
|
||||
// Toggle cluster sections visibility
|
||||
if (clusterModeEnabled) {
|
||||
document.getElementById('cluster-status-section').style.display = 'block';
|
||||
document.getElementById('overview-title').textContent = 'Cluster Overview';
|
||||
} else {
|
||||
document.getElementById('cluster-status-section').style.display = 'none';
|
||||
document.getElementById('instance-details-section').style.display = 'none';
|
||||
document.getElementById('overview-title').textContent = 'System Overview';
|
||||
}
|
||||
|
||||
// Refresh data
|
||||
updateDashboard();
|
||||
});
|
||||
|
||||
// Try WebSocket connection first
|
||||
connectWebSocket();
|
||||
|
||||
// Set a timeout to fall back to polling if WebSocket doesn't connect
|
||||
setTimeout(() => {
|
||||
if (!ws || ws.readyState !== WebSocket.OPEN) {
|
||||
useWebSocket = false;
|
||||
startPolling();
|
||||
}
|
||||
}, 3000);
|
||||
}
|
||||
|
||||
// Start when page loads
|
||||
if (document.readyState === 'loading') {
|
||||
document.addEventListener('DOMContentLoaded', initDashboard);
|
||||
} else {
|
||||
initDashboard();
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
+570
-7
@@ -2,9 +2,13 @@ package main
|
||||
|
||||
import (
|
||||
"embed"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/websocket/v2"
|
||||
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
|
||||
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
||||
)
|
||||
|
||||
@@ -39,6 +43,15 @@ func (ad *AdminDashboard) RegisterRoutes(app *fiber.App) {
|
||||
app.Get("/admin/api/coalescing", ad.getCoalescingStats)
|
||||
app.Get("/admin/api/websocket", ad.getWebSocketStats)
|
||||
|
||||
// WebSocket endpoint for streaming statistics
|
||||
app.Get("/admin/ws/stats", websocket.New(ad.handleStatsWebSocket))
|
||||
|
||||
// Cluster mode endpoints (when using Redis)
|
||||
app.Get("/admin/api/cluster/stats", ad.getClusterStats)
|
||||
app.Get("/admin/api/cluster/instances", ad.getClusterInstances)
|
||||
app.Get("/admin/api/cluster/debug", ad.getClusterDebug)
|
||||
app.Post("/admin/api/cluster/force-publish", ad.forcePublish)
|
||||
|
||||
// Control endpoints
|
||||
app.Post("/admin/api/cache/clear", ad.clearCache)
|
||||
app.Post("/admin/api/retry-budget/reset", ad.resetRetryBudget)
|
||||
@@ -67,23 +80,92 @@ func (ad *AdminDashboard) serveDashboard(c *fiber.Ctx) error {
|
||||
|
||||
// getStats returns overall proxy statistics
|
||||
func (ad *AdminDashboard) getStats(c *fiber.Ctx) error {
|
||||
uptimeSeconds := time.Since(startTime).Seconds()
|
||||
stats := map[string]interface{}{
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
"uptime": time.Since(startTime).Seconds(),
|
||||
"version": "0.27.0", // TODO: Get from build info
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
"uptime_seconds": uptimeSeconds,
|
||||
"uptime_human": formatDuration(time.Since(startTime)),
|
||||
"version": "0.27.0", // TODO: Get from build info
|
||||
}
|
||||
|
||||
if cfg != nil && cfg.Monitoring != nil {
|
||||
stats["metrics"] = map[string]interface{}{
|
||||
"succeeded": getAdminMetricValue("graphql_proxy_succeeded_total"),
|
||||
"failed": getAdminMetricValue("graphql_proxy_failed_total"),
|
||||
"skipped": getAdminMetricValue("graphql_proxy_skipped_total"),
|
||||
succeeded := getAdminMetricValue("requests_succesful")
|
||||
failed := getAdminMetricValue("requests_failed")
|
||||
skipped := getAdminMetricValue("requests_skipped")
|
||||
total := succeeded + failed + skipped
|
||||
|
||||
// Request statistics
|
||||
requestStats := map[string]interface{}{
|
||||
"total": total,
|
||||
"succeeded": succeeded,
|
||||
"failed": failed,
|
||||
"skipped": skipped,
|
||||
}
|
||||
|
||||
// Calculate rates and percentages
|
||||
if total > 0 {
|
||||
requestStats["success_rate_pct"] = float64(succeeded) / float64(total) * 100
|
||||
requestStats["failure_rate_pct"] = float64(failed) / float64(total) * 100
|
||||
requestStats["skip_rate_pct"] = float64(skipped) / float64(total) * 100
|
||||
} else {
|
||||
requestStats["success_rate_pct"] = 0.0
|
||||
requestStats["failure_rate_pct"] = 0.0
|
||||
requestStats["skip_rate_pct"] = 0.0
|
||||
}
|
||||
|
||||
// Calculate average requests per second (lifetime)
|
||||
if uptimeSeconds > 0 {
|
||||
requestStats["avg_requests_per_second"] = float64(total) / uptimeSeconds
|
||||
} else {
|
||||
requestStats["avg_requests_per_second"] = 0.0
|
||||
}
|
||||
|
||||
// Get current requests per second (last 1 second)
|
||||
if rpsTracker := GetRPSTracker(); rpsTracker != nil {
|
||||
requestStats["current_requests_per_second"] = rpsTracker.GetCurrentRPS()
|
||||
} else {
|
||||
requestStats["current_requests_per_second"] = 0.0
|
||||
}
|
||||
|
||||
stats["requests"] = requestStats
|
||||
|
||||
// Get cache statistics summary
|
||||
cacheStats := libpack_cache.GetCacheStats()
|
||||
if cacheStats != nil {
|
||||
totalCacheRequests := cacheStats.CacheHits + cacheStats.CacheMisses
|
||||
hitRate := 0.0
|
||||
if totalCacheRequests > 0 {
|
||||
hitRate = float64(cacheStats.CacheHits) / float64(totalCacheRequests) * 100
|
||||
}
|
||||
stats["cache_summary"] = map[string]interface{}{
|
||||
"hits": cacheStats.CacheHits,
|
||||
"misses": cacheStats.CacheMisses,
|
||||
"hit_rate_pct": hitRate,
|
||||
"total_cached": cacheStats.CachedQueries,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return c.JSON(stats)
|
||||
}
|
||||
|
||||
// formatDuration formats a duration into human-readable format
|
||||
func formatDuration(d time.Duration) string {
|
||||
days := int(d.Hours() / 24)
|
||||
hours := int(d.Hours()) % 24
|
||||
minutes := int(d.Minutes()) % 60
|
||||
seconds := int(d.Seconds()) % 60
|
||||
|
||||
if days > 0 {
|
||||
return fmt.Sprintf("%dd %dh %dm %ds", days, hours, minutes, seconds)
|
||||
} else if hours > 0 {
|
||||
return fmt.Sprintf("%dh %dm %ds", hours, minutes, seconds)
|
||||
} else if minutes > 0 {
|
||||
return fmt.Sprintf("%dm %ds", minutes, seconds)
|
||||
}
|
||||
return fmt.Sprintf("%ds", seconds)
|
||||
}
|
||||
|
||||
// getHealth returns health status
|
||||
func (ad *AdminDashboard) getHealth(c *fiber.Ctx) error {
|
||||
healthMgr := GetBackendHealthManager()
|
||||
@@ -154,6 +236,41 @@ func (ad *AdminDashboard) getCacheStats(c *fiber.Ctx) error {
|
||||
stats["ttl_seconds"] = cfg.Cache.CacheTTL
|
||||
stats["max_memory_mb"] = cfg.Cache.CacheMaxMemorySize
|
||||
stats["max_entries"] = cfg.Cache.CacheMaxEntries
|
||||
|
||||
// Get runtime cache statistics
|
||||
cacheStats := libpack_cache.GetCacheStats()
|
||||
if cacheStats != nil {
|
||||
stats["cached_queries"] = cacheStats.CachedQueries
|
||||
stats["cache_hits"] = cacheStats.CacheHits
|
||||
stats["cache_misses"] = cacheStats.CacheMisses
|
||||
|
||||
// Calculate hit rate
|
||||
totalRequests := cacheStats.CacheHits + cacheStats.CacheMisses
|
||||
hitRate := 0.0
|
||||
if totalRequests > 0 {
|
||||
hitRate = float64(cacheStats.CacheHits) / float64(totalRequests) * 100
|
||||
}
|
||||
stats["hit_rate_pct"] = hitRate
|
||||
|
||||
// Get memory usage only for in-memory cache
|
||||
if cfg.Cache.CacheEnable && !cfg.Cache.CacheRedisEnable {
|
||||
memoryUsage := libpack_cache.GetCacheMemoryUsage()
|
||||
maxMemory := libpack_cache.GetCacheMaxMemorySize()
|
||||
stats["memory_usage_bytes"] = memoryUsage
|
||||
stats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024)
|
||||
|
||||
// Calculate memory usage percentage
|
||||
memoryUsagePct := 0.0
|
||||
if maxMemory > 0 {
|
||||
memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100
|
||||
}
|
||||
stats["memory_usage_pct"] = memoryUsagePct
|
||||
} else {
|
||||
// For Redis cache, memory tracking not available per instance
|
||||
stats["memory_usage_mb"] = -1 // Sentinel value for "not applicable"
|
||||
stats["memory_usage_pct"] = -1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return c.JSON(stats)
|
||||
@@ -249,6 +366,161 @@ func (ad *AdminDashboard) resetCoalescing(c *fiber.Ctx) error {
|
||||
})
|
||||
}
|
||||
|
||||
// getClusterStats returns aggregated statistics from all proxy instances
|
||||
func (ad *AdminDashboard) getClusterStats(c *fiber.Ctx) error {
|
||||
aggregator := GetMetricsAggregator()
|
||||
if aggregator == nil {
|
||||
return c.Status(503).JSON(map[string]interface{}{
|
||||
"error": "Cluster mode not available",
|
||||
"message": "Redis-based metrics aggregation is not enabled",
|
||||
"cluster_mode": false,
|
||||
})
|
||||
}
|
||||
|
||||
metrics, err := aggregator.GetAggregatedMetrics()
|
||||
if err != nil {
|
||||
if ad.logger != nil {
|
||||
ad.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to get aggregated metrics",
|
||||
Pairs: map[string]interface{}{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
return c.Status(500).JSON(map[string]interface{}{
|
||||
"error": "Failed to retrieve cluster metrics",
|
||||
"message": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
// Format response similar to regular stats endpoint
|
||||
response := map[string]interface{}{
|
||||
"cluster_mode": true,
|
||||
"total_instances": metrics.TotalInstances,
|
||||
"healthy_instances": metrics.HealthyInstances,
|
||||
"last_update": metrics.LastUpdate.Format(time.RFC3339),
|
||||
"stats": metrics.CombinedStats,
|
||||
}
|
||||
|
||||
return c.JSON(response)
|
||||
}
|
||||
|
||||
// getClusterInstances returns detailed metrics for each proxy instance
|
||||
func (ad *AdminDashboard) getClusterInstances(c *fiber.Ctx) error {
|
||||
aggregator := GetMetricsAggregator()
|
||||
if aggregator == nil {
|
||||
return c.Status(503).JSON(map[string]interface{}{
|
||||
"error": "Cluster mode not available",
|
||||
"message": "Redis-based metrics aggregation is not enabled",
|
||||
"cluster_mode": false,
|
||||
})
|
||||
}
|
||||
|
||||
metrics, err := aggregator.GetAggregatedMetrics()
|
||||
if err != nil {
|
||||
if ad.logger != nil {
|
||||
ad.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to get instance metrics",
|
||||
Pairs: map[string]interface{}{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
return c.Status(500).JSON(map[string]interface{}{
|
||||
"error": "Failed to retrieve instance metrics",
|
||||
"message": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
return c.JSON(map[string]interface{}{
|
||||
"cluster_mode": true,
|
||||
"total_instances": metrics.TotalInstances,
|
||||
"healthy_instances": metrics.HealthyInstances,
|
||||
"current_instance": aggregator.GetInstanceID(),
|
||||
"instances": metrics.Instances,
|
||||
})
|
||||
}
|
||||
|
||||
// getClusterDebug returns debug information about cluster mode
|
||||
func (ad *AdminDashboard) getClusterDebug(c *fiber.Ctx) error {
|
||||
aggregator := GetMetricsAggregator()
|
||||
|
||||
debug := map[string]interface{}{
|
||||
"aggregator_initialized": aggregator != nil,
|
||||
"redis_cache_enabled": false,
|
||||
}
|
||||
|
||||
if cfg != nil {
|
||||
debug["redis_cache_enabled"] = cfg.Cache.CacheRedisEnable
|
||||
debug["cache_enabled"] = cfg.Cache.CacheEnable
|
||||
}
|
||||
|
||||
if aggregator != nil {
|
||||
debug["instance_id"] = aggregator.GetInstanceID()
|
||||
debug["is_cluster_mode"] = aggregator.IsClusterMode()
|
||||
|
||||
// Try to get metrics
|
||||
metrics, err := aggregator.GetAggregatedMetrics()
|
||||
if err != nil {
|
||||
debug["error"] = err.Error()
|
||||
} else {
|
||||
debug["total_instances"] = metrics.TotalInstances
|
||||
debug["healthy_instances"] = metrics.HealthyInstances
|
||||
|
||||
// Show first instance structure as example
|
||||
if len(metrics.Instances) > 0 {
|
||||
first := metrics.Instances[0]
|
||||
debug["sample_instance"] = map[string]interface{}{
|
||||
"instance_id": first.InstanceID,
|
||||
"hostname": first.Hostname,
|
||||
"uptime_seconds": first.UptimeSeconds,
|
||||
"stats_keys": getMapKeys(first.Stats),
|
||||
"has_requests": first.Stats["requests"] != nil,
|
||||
"has_cache": len(first.CacheSummary) > 0,
|
||||
"health_status": first.Health["status"],
|
||||
}
|
||||
|
||||
// Show requests structure if it exists
|
||||
if requests, ok := first.Stats["requests"].(map[string]interface{}); ok {
|
||||
debug["sample_requests"] = requests
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return c.JSON(debug)
|
||||
}
|
||||
|
||||
// Helper to get map keys
|
||||
func getMapKeys(m map[string]interface{}) []string {
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// forcePublish forces an immediate metrics publish for testing
|
||||
func (ad *AdminDashboard) forcePublish(c *fiber.Ctx) error {
|
||||
aggregator := GetMetricsAggregator()
|
||||
if aggregator == nil {
|
||||
return c.Status(503).JSON(map[string]interface{}{
|
||||
"error": "Aggregator not initialized",
|
||||
"success": false,
|
||||
})
|
||||
}
|
||||
|
||||
// Trigger publish in goroutine to avoid blocking
|
||||
go aggregator.publishMetrics()
|
||||
|
||||
return c.JSON(map[string]interface{}{
|
||||
"success": true,
|
||||
"triggered": true,
|
||||
"message": "Publish triggered in background",
|
||||
"next_steps": []string{
|
||||
"Wait 2 seconds",
|
||||
"Check GET /admin/api/cluster/debug",
|
||||
"Check server logs for ✓ Successfully published or ❌ CRITICAL errors",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Helper to get metric value for admin dashboard
|
||||
func getAdminMetricValue(name string) int64 {
|
||||
if cfg == nil || cfg.Monitoring == nil {
|
||||
@@ -261,4 +533,295 @@ func getAdminMetricValue(name string) int64 {
|
||||
return int64(counter.Get())
|
||||
}
|
||||
|
||||
// handleStatsWebSocket handles WebSocket connections for streaming statistics
|
||||
func (ad *AdminDashboard) handleStatsWebSocket(c *websocket.Conn) {
|
||||
if ad.logger != nil {
|
||||
ad.logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "WebSocket client connected to stats stream",
|
||||
Pairs: map[string]interface{}{
|
||||
"remote_addr": c.RemoteAddr().String(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Cleanup on disconnect
|
||||
defer func() {
|
||||
if ad.logger != nil {
|
||||
ad.logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "WebSocket client disconnected from stats stream",
|
||||
Pairs: map[string]interface{}{
|
||||
"remote_addr": c.RemoteAddr().String(),
|
||||
},
|
||||
})
|
||||
}
|
||||
c.Close()
|
||||
}()
|
||||
|
||||
// Set up ping/pong handlers
|
||||
c.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.SetPongHandler(func(string) error {
|
||||
c.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
|
||||
// Channel to signal when to stop
|
||||
done := make(chan struct{})
|
||||
|
||||
// Goroutine to handle incoming messages (for connection keep-alive)
|
||||
go func() {
|
||||
defer close(done)
|
||||
for {
|
||||
if _, _, err := c.ReadMessage(); err != nil {
|
||||
// Connection closed or error
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Stream statistics every 2 seconds
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Send initial stats immediately
|
||||
if stats := ad.gatherAllStats(); stats != nil {
|
||||
if data, err := json.Marshal(stats); err == nil {
|
||||
c.WriteMessage(websocket.TextMessage, data)
|
||||
}
|
||||
}
|
||||
|
||||
// Stream loop
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Gather all stats
|
||||
stats := ad.gatherAllStats()
|
||||
|
||||
// Marshal to JSON
|
||||
data, err := json.Marshal(stats)
|
||||
if err != nil {
|
||||
if ad.logger != nil {
|
||||
ad.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to marshal stats for WebSocket",
|
||||
Pairs: map[string]interface{}{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Send to client
|
||||
if err := c.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
if ad.logger != nil {
|
||||
ad.logger.Debug(&libpack_logger.LogMessage{
|
||||
Message: "Failed to write to WebSocket (client likely disconnected)",
|
||||
Pairs: map[string]interface{}{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
case <-done:
|
||||
// Client disconnected
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// gatherAllStats collects all statistics into a single structure
|
||||
func (ad *AdminDashboard) gatherAllStats() map[string]interface{} {
|
||||
result := make(map[string]interface{})
|
||||
|
||||
// Main stats
|
||||
uptimeSeconds := time.Since(startTime).Seconds()
|
||||
stats := map[string]interface{}{
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
"uptime_seconds": uptimeSeconds,
|
||||
"uptime_human": formatDuration(time.Since(startTime)),
|
||||
"version": "0.27.0",
|
||||
}
|
||||
|
||||
if cfg != nil && cfg.Monitoring != nil {
|
||||
succeeded := getAdminMetricValue("requests_succesful")
|
||||
failed := getAdminMetricValue("requests_failed")
|
||||
skipped := getAdminMetricValue("requests_skipped")
|
||||
total := succeeded + failed + skipped
|
||||
|
||||
requestStats := map[string]interface{}{
|
||||
"total": total,
|
||||
"succeeded": succeeded,
|
||||
"failed": failed,
|
||||
"skipped": skipped,
|
||||
}
|
||||
|
||||
if total > 0 {
|
||||
requestStats["success_rate_pct"] = float64(succeeded) / float64(total) * 100
|
||||
requestStats["failure_rate_pct"] = float64(failed) / float64(total) * 100
|
||||
requestStats["skip_rate_pct"] = float64(skipped) / float64(total) * 100
|
||||
} else {
|
||||
requestStats["success_rate_pct"] = 0.0
|
||||
requestStats["failure_rate_pct"] = 0.0
|
||||
requestStats["skip_rate_pct"] = 0.0
|
||||
}
|
||||
|
||||
if uptimeSeconds > 0 {
|
||||
requestStats["avg_requests_per_second"] = float64(total) / uptimeSeconds
|
||||
} else {
|
||||
requestStats["avg_requests_per_second"] = 0.0
|
||||
}
|
||||
|
||||
if rpsTracker := GetRPSTracker(); rpsTracker != nil {
|
||||
requestStats["current_requests_per_second"] = rpsTracker.GetCurrentRPS()
|
||||
} else {
|
||||
requestStats["current_requests_per_second"] = 0.0
|
||||
}
|
||||
|
||||
stats["requests"] = requestStats
|
||||
|
||||
// Cache summary
|
||||
cacheStats := libpack_cache.GetCacheStats()
|
||||
if cacheStats != nil {
|
||||
totalCacheRequests := cacheStats.CacheHits + cacheStats.CacheMisses
|
||||
hitRate := 0.0
|
||||
if totalCacheRequests > 0 {
|
||||
hitRate = float64(cacheStats.CacheHits) / float64(totalCacheRequests) * 100
|
||||
}
|
||||
stats["cache_summary"] = map[string]interface{}{
|
||||
"hits": cacheStats.CacheHits,
|
||||
"misses": cacheStats.CacheMisses,
|
||||
"hit_rate_pct": hitRate,
|
||||
"total_cached": cacheStats.CachedQueries,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result["stats"] = stats
|
||||
|
||||
// Health
|
||||
healthMgr := GetBackendHealthManager()
|
||||
health := map[string]interface{}{
|
||||
"status": "unknown",
|
||||
"backend": map[string]interface{}{
|
||||
"healthy": false,
|
||||
},
|
||||
}
|
||||
|
||||
if healthMgr != nil {
|
||||
isHealthy := healthMgr.IsHealthy()
|
||||
health["backend"] = map[string]interface{}{
|
||||
"healthy": isHealthy,
|
||||
"consecutive_failures": healthMgr.GetConsecutiveFailures(),
|
||||
"last_check": healthMgr.GetLastHealthCheck().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if isHealthy {
|
||||
health["status"] = "healthy"
|
||||
} else {
|
||||
health["status"] = "unhealthy"
|
||||
}
|
||||
}
|
||||
result["health"] = health
|
||||
|
||||
// Circuit breaker
|
||||
cbStatus := map[string]interface{}{
|
||||
"enabled": false,
|
||||
"state": "unknown",
|
||||
}
|
||||
|
||||
if cfg != nil {
|
||||
cbStatus["enabled"] = cfg.CircuitBreaker.Enable
|
||||
|
||||
if cb != nil {
|
||||
cbMutex.RLock()
|
||||
state := cb.State()
|
||||
cbMutex.RUnlock()
|
||||
|
||||
cbStatus["state"] = state.String()
|
||||
cbStatus["config"] = map[string]interface{}{
|
||||
"max_failures": cfg.CircuitBreaker.MaxFailures,
|
||||
"failure_ratio": cfg.CircuitBreaker.FailureRatio,
|
||||
"timeout": cfg.CircuitBreaker.Timeout,
|
||||
"max_requests_half_open": cfg.CircuitBreaker.MaxRequestsInHalfOpen,
|
||||
"return_cached_on_open": cfg.CircuitBreaker.ReturnCachedOnOpen,
|
||||
}
|
||||
}
|
||||
}
|
||||
result["circuit_breaker"] = cbStatus
|
||||
|
||||
// Cache stats
|
||||
cacheStats := map[string]interface{}{
|
||||
"enabled": false,
|
||||
}
|
||||
|
||||
if cfg != nil {
|
||||
cacheStats["enabled"] = cfg.Cache.CacheEnable
|
||||
cacheStats["redis_enabled"] = cfg.Cache.CacheRedisEnable
|
||||
cacheStats["ttl_seconds"] = cfg.Cache.CacheTTL
|
||||
cacheStats["max_memory_mb"] = cfg.Cache.CacheMaxMemorySize
|
||||
cacheStats["max_entries"] = cfg.Cache.CacheMaxEntries
|
||||
|
||||
runtimeCacheStats := libpack_cache.GetCacheStats()
|
||||
if runtimeCacheStats != nil {
|
||||
cacheStats["cached_queries"] = runtimeCacheStats.CachedQueries
|
||||
cacheStats["cache_hits"] = runtimeCacheStats.CacheHits
|
||||
cacheStats["cache_misses"] = runtimeCacheStats.CacheMisses
|
||||
|
||||
totalRequests := runtimeCacheStats.CacheHits + runtimeCacheStats.CacheMisses
|
||||
hitRate := 0.0
|
||||
if totalRequests > 0 {
|
||||
hitRate = float64(runtimeCacheStats.CacheHits) / float64(totalRequests) * 100
|
||||
}
|
||||
cacheStats["hit_rate_pct"] = hitRate
|
||||
|
||||
memoryUsage := libpack_cache.GetCacheMemoryUsage()
|
||||
maxMemory := libpack_cache.GetCacheMaxMemorySize()
|
||||
cacheStats["memory_usage_bytes"] = memoryUsage
|
||||
cacheStats["memory_usage_mb"] = float64(memoryUsage) / (1024 * 1024)
|
||||
|
||||
memoryUsagePct := 0.0
|
||||
if maxMemory > 0 {
|
||||
memoryUsagePct = float64(memoryUsage) / float64(maxMemory) * 100
|
||||
}
|
||||
cacheStats["memory_usage_pct"] = memoryUsagePct
|
||||
}
|
||||
}
|
||||
result["cache"] = cacheStats
|
||||
|
||||
// Connection stats
|
||||
poolMgr := GetConnectionPoolManager()
|
||||
connStats := map[string]interface{}{
|
||||
"available": false,
|
||||
}
|
||||
|
||||
if poolMgr != nil {
|
||||
connStats = poolMgr.GetConnectionStats()
|
||||
connStats["available"] = true
|
||||
}
|
||||
result["connections"] = connStats
|
||||
|
||||
// Retry budget
|
||||
rb := GetRetryBudget()
|
||||
if rb == nil {
|
||||
result["retry_budget"] = map[string]interface{}{"enabled": false}
|
||||
} else {
|
||||
result["retry_budget"] = rb.GetStats()
|
||||
}
|
||||
|
||||
// Coalescing
|
||||
rc := GetRequestCoalescer()
|
||||
if rc == nil {
|
||||
result["coalescing"] = map[string]interface{}{"enabled": false}
|
||||
} else {
|
||||
result["coalescing"] = rc.GetStats()
|
||||
}
|
||||
|
||||
// WebSocket
|
||||
wsp := GetWebSocketProxy()
|
||||
if wsp == nil {
|
||||
result["websocket"] = map[string]interface{}{"enabled": false}
|
||||
} else {
|
||||
result["websocket"] = wsp.GetStats()
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
var startTime = time.Now()
|
||||
|
||||
+14
-3
@@ -110,8 +110,19 @@ func TestAdminDashboard_GetStats(t *testing.T) {
|
||||
|
||||
// Verify stats structure
|
||||
assert.NotEmpty(t, stats["timestamp"])
|
||||
assert.NotNil(t, stats["uptime"])
|
||||
assert.NotNil(t, stats["uptime_seconds"])
|
||||
assert.NotNil(t, stats["uptime_human"])
|
||||
assert.NotEmpty(t, stats["version"])
|
||||
assert.NotNil(t, stats["requests"])
|
||||
|
||||
// Verify request stats structure
|
||||
requests := stats["requests"].(map[string]interface{})
|
||||
assert.NotNil(t, requests["total"])
|
||||
assert.NotNil(t, requests["succeeded"])
|
||||
assert.NotNil(t, requests["failed"])
|
||||
assert.NotNil(t, requests["success_rate_pct"])
|
||||
assert.NotNil(t, requests["avg_requests_per_second"])
|
||||
assert.NotNil(t, requests["current_requests_per_second"])
|
||||
}
|
||||
|
||||
func TestAdminDashboard_GetHealth(t *testing.T) {
|
||||
@@ -414,13 +425,13 @@ func TestGetAdminMetricValue(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test with valid metric
|
||||
value := getAdminMetricValue("graphql_proxy_succeeded_total")
|
||||
value := getAdminMetricValue("requests_succesful")
|
||||
assert.GreaterOrEqual(t, value, int64(0))
|
||||
|
||||
// Test with nil config
|
||||
oldCfg := cfg
|
||||
cfg = nil
|
||||
value = getAdminMetricValue("graphql_proxy_succeeded_total")
|
||||
value = getAdminMetricValue("requests_succesful")
|
||||
assert.Equal(t, int64(0), value)
|
||||
cfg = oldCfg
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ go 1.24.0
|
||||
toolchain go1.24.6
|
||||
|
||||
require (
|
||||
github.com/VictoriaMetrics/metrics v1.40.1
|
||||
github.com/VictoriaMetrics/metrics v1.40.2
|
||||
github.com/alicebob/miniredis/v2 v2.33.0
|
||||
github.com/avast/retry-go/v4 v4.6.1
|
||||
github.com/goccy/go-json v0.10.5
|
||||
@@ -36,9 +36,10 @@ require (
|
||||
github.com/andybalholm/brotli v1.2.0 // indirect
|
||||
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/clipperhouse/uax29/v2 v2.2.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/fasthttp/websocket v1.5.3 // indirect
|
||||
github.com/fasthttp/websocket v1.5.12 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
|
||||
@@ -48,10 +49,9 @@ require (
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.16 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.19 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
|
||||
github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fastrand v1.1.0 // indirect
|
||||
github.com/valyala/histogram v1.2.0 // indirect
|
||||
@@ -66,8 +66,8 @@ require (
|
||||
golang.org/x/sys v0.36.0 // indirect
|
||||
golang.org/x/term v0.35.0 // indirect
|
||||
golang.org/x/text v0.29.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 // indirect
|
||||
google.golang.org/protobuf v1.36.9 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
github.com/VictoriaMetrics/metrics v1.40.1 h1:FrF5uJRpIVj9fayWcn8xgiI+FYsKGMslzPuOXjdeyR4=
|
||||
github.com/VictoriaMetrics/metrics v1.40.1/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA=
|
||||
github.com/VictoriaMetrics/metrics v1.40.2 h1:OVSjKcQEx6JAwGeu8/KQm9Su5qJ72TMEW4xYn5vw3Ac=
|
||||
github.com/VictoriaMetrics/metrics v1.40.2/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA=
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
|
||||
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
|
||||
@@ -16,13 +16,15 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x
|
||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/clipperhouse/uax29/v2 v2.2.0 h1:ChwIKnQN3kcZteTXMgb1wztSgaU+ZemkgWdohwgs8tY=
|
||||
github.com/clipperhouse/uax29/v2 v2.2.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek=
|
||||
github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs=
|
||||
github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE=
|
||||
github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
@@ -76,19 +78,16 @@ github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHP
|
||||
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
|
||||
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
||||
github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byFGLdw=
|
||||
github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
|
||||
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
|
||||
github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 h1:McifyVxygw1d67y6vxUqls2D46J8W9nrki9c8c0eVvE=
|
||||
github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761/go.mod h1:Vi9gvHvTw4yCUHIznFl5TPULS7aXwgaTByGeBY75Wko=
|
||||
github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ=
|
||||
github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
@@ -143,10 +142,10 @@ golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
|
||||
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
|
||||
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 h1:d8Nakh1G+ur7+P3GcMjpRDEkoLUcLW2iU92XVqR+XMQ=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090/go.mod h1:U8EXRNSd8sUYyDfs/It7KVWodQr+Hf9xtxyxWudSwEw=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 h1:/OQuEa4YWtDt7uQWHd3q3sUMb+QOLQUg1xa8CEsRv5w=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 h1:8XJ4pajGwOlasW+L13MnEGA8W4115jJySQtVfS2/IBU=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4/go.mod h1:NnuHhy+bxcg30o7FnVAZbXsPHUDQ9qKWAQKCD7VxFtk=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 h1:i8QOKZfYg6AbGVZzUAY3LrNWCKF8O6zFisU9Wl9RER4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ=
|
||||
google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI=
|
||||
google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
|
||||
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
|
||||
|
||||
@@ -319,6 +319,39 @@ func parseConfig() {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize metrics aggregator FIRST if Redis is enabled (even if cache is disabled)
|
||||
// This allows cluster mode monitoring even when cache is off
|
||||
if cfg.Cache.CacheRedisEnable {
|
||||
cfg.Logger.Info(&libpack_logging.LogMessage{
|
||||
Message: "Initializing metrics aggregator for cluster mode",
|
||||
Pairs: map[string]interface{}{
|
||||
"redis_url": cfg.Cache.CacheRedisURL,
|
||||
"redis_db": cfg.Cache.CacheRedisDB,
|
||||
},
|
||||
})
|
||||
|
||||
if err := InitializeMetricsAggregator(
|
||||
cfg.Cache.CacheRedisURL,
|
||||
cfg.Cache.CacheRedisPassword,
|
||||
cfg.Cache.CacheRedisDB,
|
||||
cfg.Logger,
|
||||
); err != nil {
|
||||
cfg.Logger.Error(&libpack_logging.LogMessage{
|
||||
Message: "FAILED to initialize metrics aggregator - cluster mode will not work",
|
||||
Pairs: map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
},
|
||||
})
|
||||
} else {
|
||||
cfg.Logger.Info(&libpack_logging.LogMessage{
|
||||
Message: "✓ Metrics aggregator successfully initialized",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": GetMetricsAggregator().GetInstanceID(),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize cache if enabled
|
||||
if cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable {
|
||||
cacheConfig := &libpack_cache.CacheConfig{
|
||||
@@ -387,6 +420,12 @@ func parseConfig() {
|
||||
healthMgr.StartHealthChecking()
|
||||
}
|
||||
|
||||
// Initialize RPS tracker for real-time requests per second monitoring
|
||||
InitializeRPSTracker()
|
||||
cfg.Logger.Info(&libpack_logging.LogMessage{
|
||||
Message: "RPS tracker initialized",
|
||||
})
|
||||
|
||||
// Load rate limit configuration with improved error handling
|
||||
if err := loadRatelimitConfig(); err != nil {
|
||||
// Log the error with clear guidance
|
||||
@@ -481,6 +520,14 @@ func main() {
|
||||
return nil
|
||||
})
|
||||
|
||||
// Register metrics aggregator for cleanup
|
||||
shutdownManager.RegisterComponent("metrics-aggregator", func(ctx context.Context) error {
|
||||
if aggregator := GetMetricsAggregator(); aggregator != nil {
|
||||
aggregator.Shutdown()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Cache shutdown is handled internally by the cache implementation
|
||||
|
||||
// Start monitoring server
|
||||
|
||||
@@ -0,0 +1,805 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// MetricsAggregator handles distributed metrics collection via Redis
|
||||
type MetricsAggregator struct {
|
||||
redisClient *redis.Client
|
||||
logger *libpack_logger.Logger
|
||||
instanceID string
|
||||
publishKey string
|
||||
ttl time.Duration
|
||||
publishTimer *time.Ticker
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// InstanceMetrics represents metrics for a single proxy instance
|
||||
type InstanceMetrics struct {
|
||||
InstanceID string `json:"instance_id"`
|
||||
Hostname string `json:"hostname"`
|
||||
LastUpdate time.Time `json:"last_update"`
|
||||
UptimeSeconds float64 `json:"uptime_seconds"`
|
||||
Stats map[string]interface{} `json:"stats"`
|
||||
Cache map[string]interface{} `json:"cache,omitempty"` // Full cache details including memory
|
||||
CacheSummary map[string]interface{} `json:"cache_summary,omitempty"` // Deprecated: kept for compatibility
|
||||
Health map[string]interface{} `json:"health"`
|
||||
CircuitBreaker map[string]interface{} `json:"circuit_breaker,omitempty"`
|
||||
RetryBudget map[string]interface{} `json:"retry_budget,omitempty"`
|
||||
Coalescing map[string]interface{} `json:"coalescing,omitempty"`
|
||||
WebSocketStats map[string]interface{} `json:"websocket,omitempty"`
|
||||
Connections map[string]interface{} `json:"connections,omitempty"`
|
||||
}
|
||||
|
||||
// AggregatedMetrics represents combined metrics from all instances
|
||||
type AggregatedMetrics struct {
|
||||
TotalInstances int `json:"total_instances"`
|
||||
HealthyInstances int `json:"healthy_instances"`
|
||||
LastUpdate time.Time `json:"last_update"`
|
||||
CombinedStats map[string]interface{} `json:"combined_stats"`
|
||||
Instances []InstanceMetrics `json:"instances"`
|
||||
PerInstanceStats map[string]InstanceMetrics `json:"per_instance_stats"`
|
||||
}
|
||||
|
||||
var (
|
||||
metricsAggregator *MetricsAggregator
|
||||
aggregatorMutex sync.RWMutex
|
||||
)
|
||||
|
||||
// InitializeMetricsAggregator creates and starts the metrics aggregator
|
||||
func InitializeMetricsAggregator(redisURL, redisPassword string, redisDB int, logger *libpack_logger.Logger) error {
|
||||
aggregatorMutex.Lock()
|
||||
defer aggregatorMutex.Unlock()
|
||||
|
||||
if metricsAggregator != nil {
|
||||
return nil // Already initialized
|
||||
}
|
||||
|
||||
// Parse Redis URL
|
||||
opt, err := redis.ParseURL(fmt.Sprintf("redis://%s/%d", redisURL, redisDB))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse Redis URL: %w", err)
|
||||
}
|
||||
|
||||
if redisPassword != "" {
|
||||
opt.Password = redisPassword
|
||||
}
|
||||
|
||||
// Create Redis client with connection timeouts
|
||||
opt.DialTimeout = 2 * time.Second
|
||||
opt.ReadTimeout = 2 * time.Second
|
||||
opt.WriteTimeout = 2 * time.Second
|
||||
opt.PoolTimeout = 3 * time.Second
|
||||
opt.MaxRetries = 2
|
||||
|
||||
client := redis.NewClient(opt)
|
||||
|
||||
// Test connection with detailed error reporting
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := client.Ping(ctx).Err(); err != nil {
|
||||
// Log detailed connection error
|
||||
if logger != nil {
|
||||
logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "❌ CRITICAL: Redis connection test FAILED during initialization",
|
||||
Pairs: map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
"redis_url": redisURL,
|
||||
"redis_db": redisDB,
|
||||
"has_password": redisPassword != "",
|
||||
},
|
||||
})
|
||||
}
|
||||
return fmt.Errorf("failed to connect to Redis: %w", err)
|
||||
}
|
||||
|
||||
// Log successful connection
|
||||
if logger != nil {
|
||||
logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "✓ Redis connection test PASSED",
|
||||
Pairs: map[string]interface{}{
|
||||
"redis_url": redisURL,
|
||||
"redis_db": redisDB,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Generate unique instance ID (hostname + UUID for uniqueness)
|
||||
hostname, _ := os.Hostname()
|
||||
if hostname == "" {
|
||||
hostname = "unknown"
|
||||
}
|
||||
instanceID := fmt.Sprintf("%s-%s", hostname, uuid.New().String()[:8])
|
||||
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
|
||||
aggregator := &MetricsAggregator{
|
||||
redisClient: client,
|
||||
logger: logger,
|
||||
instanceID: instanceID,
|
||||
publishKey: "graphql-proxy:metrics:instances",
|
||||
ttl: 30 * time.Second, // Metrics expire after 30s if not updated
|
||||
publishTimer: time.NewTicker(5 * time.Second),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
metricsAggregator = aggregator
|
||||
|
||||
// Start publishing metrics
|
||||
go aggregator.startPublishing()
|
||||
|
||||
if logger != nil {
|
||||
logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "Metrics aggregator initialized",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": instanceID,
|
||||
"redis_url": redisURL,
|
||||
"publish_key": aggregator.publishKey,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMetricsAggregator returns the singleton instance
|
||||
func GetMetricsAggregator() *MetricsAggregator {
|
||||
aggregatorMutex.RLock()
|
||||
defer aggregatorMutex.RUnlock()
|
||||
return metricsAggregator
|
||||
}
|
||||
|
||||
// startPublishing periodically publishes metrics to Redis
|
||||
func (ma *MetricsAggregator) startPublishing() {
|
||||
defer ma.publishTimer.Stop()
|
||||
|
||||
// Publish immediately on start
|
||||
ma.publishMetrics()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ma.ctx.Done():
|
||||
// Clean up our metrics on shutdown
|
||||
ma.removeInstanceMetrics()
|
||||
return
|
||||
case <-ma.publishTimer.C:
|
||||
ma.publishMetrics()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// publishMetrics collects current metrics and stores them in Redis
|
||||
// Note: This is exported for testing/debugging via admin API
|
||||
func (ma *MetricsAggregator) publishMetrics() {
|
||||
// Defensive: check if aggregator is still valid
|
||||
if ma == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ma.mu.RLock()
|
||||
defer ma.mu.RUnlock()
|
||||
|
||||
// Safety check: ensure global config is initialized
|
||||
if cfg == nil {
|
||||
if ma.logger != nil {
|
||||
ma.logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "Cannot publish metrics - global config not initialized yet",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": ma.instanceID,
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Gather all stats using the admin dashboard's method
|
||||
dashboard := NewAdminDashboard(ma.logger)
|
||||
allStats := dashboard.gatherAllStats()
|
||||
|
||||
if len(allStats) == 0 {
|
||||
if ma.logger != nil {
|
||||
ma.logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "gatherAllStats returned empty/nil result",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": ma.instanceID,
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Create instance metrics
|
||||
hostname, _ := os.Hostname()
|
||||
if hostname == "" {
|
||||
hostname = "unknown"
|
||||
}
|
||||
|
||||
metrics := InstanceMetrics{
|
||||
InstanceID: ma.instanceID,
|
||||
Hostname: hostname,
|
||||
LastUpdate: time.Now(),
|
||||
UptimeSeconds: time.Since(startTime).Seconds(),
|
||||
}
|
||||
|
||||
// Extract specific sections - CRITICAL: we must set the correct structure
|
||||
// Stats should contain the inner stats object with requests, cache_summary, etc.
|
||||
if stats, ok := allStats["stats"].(map[string]interface{}); ok {
|
||||
metrics.Stats = stats
|
||||
|
||||
// Also extract cache summary separately for easier access (deprecated but kept for compatibility)
|
||||
if cacheSummary, ok := stats["cache_summary"].(map[string]interface{}); ok {
|
||||
metrics.CacheSummary = cacheSummary
|
||||
}
|
||||
|
||||
} else {
|
||||
// Fallback: if stats extraction fails, use empty map
|
||||
if ma.logger != nil {
|
||||
ma.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to extract stats from allStats - using empty stats",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": ma.instanceID,
|
||||
"allStats_keys": func() []string {
|
||||
keys := make([]string, 0, len(allStats))
|
||||
for k := range allStats {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}(),
|
||||
},
|
||||
})
|
||||
}
|
||||
metrics.Stats = make(map[string]interface{})
|
||||
}
|
||||
|
||||
// Extract full cache details (includes memory usage)
|
||||
if cache, ok := allStats["cache"].(map[string]interface{}); ok {
|
||||
metrics.Cache = cache
|
||||
}
|
||||
|
||||
if health, ok := allStats["health"].(map[string]interface{}); ok {
|
||||
metrics.Health = health
|
||||
} else {
|
||||
metrics.Health = make(map[string]interface{})
|
||||
}
|
||||
if cb, ok := allStats["circuit_breaker"].(map[string]interface{}); ok {
|
||||
metrics.CircuitBreaker = cb
|
||||
}
|
||||
if rb, ok := allStats["retry_budget"].(map[string]interface{}); ok {
|
||||
metrics.RetryBudget = rb
|
||||
}
|
||||
if coal, ok := allStats["coalescing"].(map[string]interface{}); ok {
|
||||
metrics.Coalescing = coal
|
||||
}
|
||||
if ws, ok := allStats["websocket"].(map[string]interface{}); ok {
|
||||
metrics.WebSocketStats = ws
|
||||
}
|
||||
if conn, ok := allStats["connections"].(map[string]interface{}); ok {
|
||||
metrics.Connections = conn
|
||||
}
|
||||
|
||||
// Marshal to JSON
|
||||
data, err := json.Marshal(metrics)
|
||||
if err != nil {
|
||||
if ma.logger != nil {
|
||||
ma.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to marshal metrics for Redis",
|
||||
Pairs: map[string]interface{}{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Store in Redis hash with TTL
|
||||
key := fmt.Sprintf("%s:%s", ma.publishKey, ma.instanceID)
|
||||
|
||||
// Create a fresh context with timeout to avoid inheriting cancelled parent context
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
pipe := ma.redisClient.Pipeline()
|
||||
pipe.Set(ctx, key, data, ma.ttl)
|
||||
pipe.SAdd(ctx, ma.publishKey, ma.instanceID)
|
||||
pipe.Expire(ctx, ma.publishKey, ma.ttl*2) // Keep set alive
|
||||
|
||||
_, err = pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
if ma.logger != nil {
|
||||
ma.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "❌ CRITICAL: Failed to publish metrics to Redis - cluster mode will not work!",
|
||||
Pairs: map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
"instance_id": ma.instanceID,
|
||||
"key": key,
|
||||
"redis_key": ma.publishKey,
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// removeInstanceMetrics cleans up metrics from Redis on shutdown
|
||||
func (ma *MetricsAggregator) removeInstanceMetrics() {
|
||||
// Create a fresh context with timeout for cleanup
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
key := fmt.Sprintf("%s:%s", ma.publishKey, ma.instanceID)
|
||||
pipe := ma.redisClient.Pipeline()
|
||||
pipe.Del(ctx, key)
|
||||
pipe.SRem(ctx, ma.publishKey, ma.instanceID)
|
||||
_, err := pipe.Exec(ctx)
|
||||
|
||||
if err != nil && ma.logger != nil {
|
||||
ma.logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "Failed to remove instance metrics from Redis during shutdown",
|
||||
Pairs: map[string]interface{}{"instance_id": ma.instanceID, "error": err.Error()},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if ma.logger != nil {
|
||||
ma.logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "Removed instance metrics from Redis",
|
||||
Pairs: map[string]interface{}{"instance_id": ma.instanceID},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// GetAggregatedMetrics retrieves and aggregates metrics from all instances
|
||||
func (ma *MetricsAggregator) GetAggregatedMetrics() (*AggregatedMetrics, error) {
|
||||
// Create a fresh context with timeout to avoid inheriting cancelled parent context
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Get all instance IDs
|
||||
instanceIDs, err := ma.redisClient.SMembers(ctx, ma.publishKey).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get instance list: %w", err)
|
||||
}
|
||||
|
||||
if len(instanceIDs) == 0 {
|
||||
return &AggregatedMetrics{
|
||||
TotalInstances: 0,
|
||||
HealthyInstances: 0,
|
||||
LastUpdate: time.Now(),
|
||||
CombinedStats: make(map[string]interface{}),
|
||||
Instances: []InstanceMetrics{},
|
||||
PerInstanceStats: make(map[string]InstanceMetrics),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Fetch metrics for all instances
|
||||
pipe := ma.redisClient.Pipeline()
|
||||
cmds := make([]*redis.StringCmd, len(instanceIDs))
|
||||
for i, instanceID := range instanceIDs {
|
||||
key := fmt.Sprintf("%s:%s", ma.publishKey, instanceID)
|
||||
cmds[i] = pipe.Get(ctx, key)
|
||||
}
|
||||
pipe.Exec(ctx)
|
||||
|
||||
// Parse metrics
|
||||
instances := make([]InstanceMetrics, 0, len(instanceIDs))
|
||||
perInstance := make(map[string]InstanceMetrics)
|
||||
healthyCount := 0
|
||||
staleCount := 0
|
||||
errorCount := 0
|
||||
|
||||
for i, cmd := range cmds {
|
||||
data, err := cmd.Result()
|
||||
if err != nil {
|
||||
errorCount++
|
||||
// Clean up stale instance ID from the set
|
||||
if err == redis.Nil {
|
||||
staleCount++
|
||||
// Remove stale instance from set in background
|
||||
go func(instID string) {
|
||||
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cleanupCancel()
|
||||
ma.redisClient.SRem(cleanupCtx, ma.publishKey, instID)
|
||||
}(instanceIDs[i])
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var metrics InstanceMetrics
|
||||
if err := json.Unmarshal([]byte(data), &metrics); err != nil {
|
||||
if ma.logger != nil {
|
||||
ma.logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "Failed to unmarshal instance metrics",
|
||||
Pairs: map[string]interface{}{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if instance is stale (not updated in 1 minute)
|
||||
instanceAge := time.Since(metrics.LastUpdate)
|
||||
if instanceAge > 1*time.Minute {
|
||||
staleCount++
|
||||
// Clean up stale instance from set in background
|
||||
go func(instID string, age time.Duration) {
|
||||
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cleanupCancel()
|
||||
ma.redisClient.SRem(cleanupCtx, ma.publishKey, instID)
|
||||
if ma.logger != nil {
|
||||
ma.logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "Removed inactive instance",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": instID,
|
||||
"inactive_seconds": age.Seconds(),
|
||||
},
|
||||
})
|
||||
}
|
||||
}(instanceIDs[i], instanceAge)
|
||||
continue // Skip stale instances
|
||||
}
|
||||
|
||||
instances = append(instances, metrics)
|
||||
perInstance[metrics.InstanceID] = metrics
|
||||
|
||||
// Count healthy instances
|
||||
if health, ok := metrics.Health["status"].(string); ok && health == "healthy" {
|
||||
healthyCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Log cleanup stats if we found stale instances
|
||||
if ma.logger != nil && (staleCount > 0 || errorCount > 0) {
|
||||
ma.logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "Cleaned up stale instance IDs from Redis",
|
||||
Pairs: map[string]interface{}{
|
||||
"total_in_set": len(instanceIDs),
|
||||
"valid_instances": len(instances),
|
||||
"stale_cleaned": staleCount,
|
||||
"errors": errorCount,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Aggregate statistics
|
||||
aggregated := &AggregatedMetrics{
|
||||
TotalInstances: len(instances),
|
||||
HealthyInstances: healthyCount,
|
||||
LastUpdate: time.Now(),
|
||||
CombinedStats: ma.aggregateStats(instances),
|
||||
Instances: instances,
|
||||
PerInstanceStats: perInstance,
|
||||
}
|
||||
|
||||
return aggregated, nil
|
||||
}
|
||||
|
||||
// aggregateStats combines statistics from multiple instances
|
||||
func (ma *MetricsAggregator) aggregateStats(instances []InstanceMetrics) map[string]interface{} {
|
||||
if len(instances) == 0 {
|
||||
if ma.logger != nil {
|
||||
ma.logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "No instances to aggregate",
|
||||
})
|
||||
}
|
||||
return make(map[string]interface{})
|
||||
}
|
||||
|
||||
// Initialize aggregated values
|
||||
var (
|
||||
totalRequests int64
|
||||
totalSucceeded int64
|
||||
totalFailed int64
|
||||
totalSkipped int64
|
||||
totalCacheHits int64
|
||||
totalCacheMisses int64
|
||||
totalCachedQueries int64
|
||||
totalMemoryUsageMB float64
|
||||
totalCurrentRPS float64
|
||||
totalAvgRPS float64
|
||||
totalActiveConnections int64
|
||||
totalWSConnections int64
|
||||
totalCoalescedRequests int64
|
||||
totalPrimaryRequests int64
|
||||
oldestUptime float64
|
||||
|
||||
// Retry budget stats
|
||||
totalRetryAllowed int64
|
||||
totalRetryDenied int64
|
||||
totalRetryAttempts int64
|
||||
retryBudgetEnabled = false
|
||||
|
||||
// Circuit breaker stats
|
||||
cbOpenCount int
|
||||
cbHalfOpenCount int
|
||||
cbClosedCount int
|
||||
circuitBreakerEnabled = false
|
||||
)
|
||||
|
||||
for idx, instance := range instances {
|
||||
// Track oldest uptime for cluster uptime
|
||||
if oldestUptime == 0 || instance.UptimeSeconds < oldestUptime {
|
||||
oldestUptime = instance.UptimeSeconds
|
||||
}
|
||||
|
||||
// Aggregate request stats
|
||||
if instance.Stats == nil {
|
||||
if ma.logger != nil {
|
||||
ma.logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "Instance has nil Stats",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": instance.InstanceID,
|
||||
"index": idx,
|
||||
},
|
||||
})
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if stats, ok := instance.Stats["requests"].(map[string]interface{}); ok {
|
||||
if total, ok := stats["total"].(float64); ok {
|
||||
totalRequests += int64(total)
|
||||
}
|
||||
if succeeded, ok := stats["succeeded"].(float64); ok {
|
||||
totalSucceeded += int64(succeeded)
|
||||
}
|
||||
if failed, ok := stats["failed"].(float64); ok {
|
||||
totalFailed += int64(failed)
|
||||
}
|
||||
if skipped, ok := stats["skipped"].(float64); ok {
|
||||
totalSkipped += int64(skipped)
|
||||
}
|
||||
if currentRPS, ok := stats["current_requests_per_second"].(float64); ok {
|
||||
totalCurrentRPS += currentRPS
|
||||
}
|
||||
if avgRPS, ok := stats["avg_requests_per_second"].(float64); ok {
|
||||
totalAvgRPS += avgRPS
|
||||
}
|
||||
} else {
|
||||
if ma.logger != nil {
|
||||
// Log what keys are actually in Stats for debugging
|
||||
keys := make([]string, 0, len(instance.Stats))
|
||||
for k := range instance.Stats {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
ma.logger.Warning(&libpack_logger.LogMessage{
|
||||
Message: "Instance Stats missing 'requests' key",
|
||||
Pairs: map[string]interface{}{
|
||||
"instance_id": instance.InstanceID,
|
||||
"stats_keys": keys,
|
||||
"index": idx,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate cache stats from CacheSummary (backward compatibility)
|
||||
if len(instance.CacheSummary) > 0 {
|
||||
if hits, ok := instance.CacheSummary["hits"].(float64); ok {
|
||||
totalCacheHits += int64(hits)
|
||||
}
|
||||
if misses, ok := instance.CacheSummary["misses"].(float64); ok {
|
||||
totalCacheMisses += int64(misses)
|
||||
}
|
||||
if cached, ok := instance.CacheSummary["total_cached"].(float64); ok {
|
||||
totalCachedQueries += int64(cached)
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate memory usage from full cache details
|
||||
if len(instance.Cache) > 0 {
|
||||
if memMB, ok := instance.Cache["memory_usage_mb"].(float64); ok {
|
||||
totalMemoryUsageMB += memMB
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate connection stats
|
||||
if len(instance.Connections) > 0 {
|
||||
if active, ok := instance.Connections["active_connections"].(float64); ok {
|
||||
totalActiveConnections += int64(active)
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate WebSocket connections
|
||||
if len(instance.WebSocketStats) > 0 {
|
||||
if active, ok := instance.WebSocketStats["active_connections"].(float64); ok {
|
||||
totalWSConnections += int64(active)
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate coalescing stats
|
||||
if len(instance.Coalescing) > 0 {
|
||||
if coalesced, ok := instance.Coalescing["coalesced_requests"].(float64); ok {
|
||||
totalCoalescedRequests += int64(coalesced)
|
||||
}
|
||||
if primary, ok := instance.Coalescing["primary_requests"].(float64); ok {
|
||||
totalPrimaryRequests += int64(primary)
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate retry budget stats
|
||||
if len(instance.RetryBudget) > 0 {
|
||||
if enabled, ok := instance.RetryBudget["enabled"].(bool); ok && enabled {
|
||||
retryBudgetEnabled = true
|
||||
}
|
||||
if allowed, ok := instance.RetryBudget["allowed_retries"].(float64); ok {
|
||||
totalRetryAllowed += int64(allowed)
|
||||
}
|
||||
if denied, ok := instance.RetryBudget["denied_retries"].(float64); ok {
|
||||
totalRetryDenied += int64(denied)
|
||||
}
|
||||
if attempts, ok := instance.RetryBudget["total_attempts"].(float64); ok {
|
||||
totalRetryAttempts += int64(attempts)
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate circuit breaker stats
|
||||
if len(instance.CircuitBreaker) > 0 {
|
||||
if enabled, ok := instance.CircuitBreaker["enabled"].(bool); ok && enabled {
|
||||
circuitBreakerEnabled = true
|
||||
}
|
||||
if state, ok := instance.CircuitBreaker["state"].(string); ok {
|
||||
switch state {
|
||||
case "open":
|
||||
cbOpenCount++
|
||||
case "half-open":
|
||||
cbHalfOpenCount++
|
||||
case "closed":
|
||||
cbClosedCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate derived metrics
|
||||
successRate := 0.0
|
||||
if totalRequests > 0 {
|
||||
successRate = float64(totalSucceeded) / float64(totalRequests) * 100
|
||||
}
|
||||
|
||||
cacheHitRate := 0.0
|
||||
totalCacheRequests := totalCacheHits + totalCacheMisses
|
||||
if totalCacheRequests > 0 {
|
||||
cacheHitRate = float64(totalCacheHits) / float64(totalCacheRequests) * 100
|
||||
}
|
||||
|
||||
backendSavings := 0.0
|
||||
totalCoalRequests := totalCoalescedRequests + totalPrimaryRequests
|
||||
if totalCoalRequests > 0 {
|
||||
backendSavings = float64(totalCoalescedRequests) / float64(totalCoalRequests) * 100
|
||||
}
|
||||
|
||||
// Calculate retry budget denial rate
|
||||
retryDenialRate := 0.0
|
||||
if totalRetryAttempts > 0 {
|
||||
retryDenialRate = float64(totalRetryDenied) / float64(totalRetryAttempts) * 100
|
||||
}
|
||||
|
||||
// Determine overall circuit breaker state
|
||||
cbState := "unknown"
|
||||
if circuitBreakerEnabled {
|
||||
if cbOpenCount > 0 {
|
||||
cbState = "open" // If any instance is open, cluster is in degraded state
|
||||
} else if cbHalfOpenCount > 0 {
|
||||
cbState = "half-open"
|
||||
} else if cbClosedCount > 0 {
|
||||
cbState = "closed"
|
||||
}
|
||||
}
|
||||
|
||||
result := map[string]interface{}{
|
||||
"cluster_mode": true,
|
||||
"total_instances": len(instances),
|
||||
"cluster_uptime": oldestUptime,
|
||||
"requests": map[string]interface{}{
|
||||
"total": totalRequests,
|
||||
"succeeded": totalSucceeded,
|
||||
"failed": totalFailed,
|
||||
"skipped": totalSkipped,
|
||||
"success_rate_pct": successRate,
|
||||
"current_requests_per_second": totalCurrentRPS,
|
||||
"avg_requests_per_second": totalAvgRPS,
|
||||
},
|
||||
"cache_summary": map[string]interface{}{
|
||||
"hits": totalCacheHits,
|
||||
"misses": totalCacheMisses,
|
||||
"hit_rate_pct": cacheHitRate,
|
||||
"total_cached": totalCachedQueries,
|
||||
},
|
||||
"memory": map[string]interface{}{
|
||||
"total_usage_mb": totalMemoryUsageMB,
|
||||
},
|
||||
"connections": map[string]interface{}{
|
||||
"total_active": totalActiveConnections,
|
||||
},
|
||||
"websocket": map[string]interface{}{
|
||||
"total_connections": totalWSConnections,
|
||||
},
|
||||
"coalescing": map[string]interface{}{
|
||||
"enabled": len(instances) > 0, // enabled if we have instances with data
|
||||
"total_coalesced_requests": totalCoalescedRequests,
|
||||
"total_primary_requests": totalPrimaryRequests,
|
||||
"backend_savings_pct": backendSavings,
|
||||
"coalescing_rate_pct": backendSavings,
|
||||
},
|
||||
"retry_budget": map[string]interface{}{
|
||||
"enabled": retryBudgetEnabled,
|
||||
"allowed_retries": totalRetryAllowed,
|
||||
"denied_retries": totalRetryDenied,
|
||||
"total_attempts": totalRetryAttempts,
|
||||
"denial_rate_pct": retryDenialRate,
|
||||
},
|
||||
"circuit_breaker": map[string]interface{}{
|
||||
"enabled": circuitBreakerEnabled,
|
||||
"state": cbState,
|
||||
"instances_open": cbOpenCount,
|
||||
"instances_closed": cbClosedCount,
|
||||
"instances_halfopen": cbHalfOpenCount,
|
||||
},
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// Shutdown stops the metrics aggregator
|
||||
func (ma *MetricsAggregator) Shutdown() {
|
||||
ma.mu.Lock()
|
||||
defer ma.mu.Unlock()
|
||||
|
||||
if ma.cancel != nil {
|
||||
ma.cancel()
|
||||
}
|
||||
|
||||
if ma.redisClient != nil {
|
||||
ma.redisClient.Close()
|
||||
}
|
||||
|
||||
if ma.logger != nil {
|
||||
ma.logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "Metrics aggregator shut down",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// GetInstanceID returns the current instance ID
|
||||
func (ma *MetricsAggregator) GetInstanceID() string {
|
||||
return ma.instanceID
|
||||
}
|
||||
|
||||
// IsClusterMode returns true if multiple instances are detected
|
||||
func (ma *MetricsAggregator) IsClusterMode() bool {
|
||||
// Create a fresh context with timeout to avoid inheriting cancelled parent context
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
count, err := ma.redisClient.SCard(ctx, ma.publishKey).Result()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return count > 1
|
||||
}
|
||||
|
||||
// GetInstanceHostname returns a human-readable instance identifier
|
||||
func GetInstanceHostname() string {
|
||||
hostname, _ := os.Hostname()
|
||||
if hostname == "" {
|
||||
hostname = "unknown"
|
||||
}
|
||||
// Remove domain suffix for cleaner display
|
||||
if idx := strings.Index(hostname, "."); idx > 0 {
|
||||
hostname = hostname[:idx]
|
||||
}
|
||||
return hostname
|
||||
}
|
||||
@@ -236,6 +236,11 @@ func createFasthttpClient(clientConfig *config) *fasthttp.Client {
|
||||
|
||||
// proxyTheRequest handles the request proxying logic.
|
||||
func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
|
||||
// Record request for RPS tracking
|
||||
if rpsTracker := GetRPSTracker(); rpsTracker != nil {
|
||||
rpsTracker.RecordRequest()
|
||||
}
|
||||
|
||||
// Setup tracing if enabled
|
||||
var span trace.Span
|
||||
var ctx context.Context
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RPSTracker tracks requests per second using periodic sampling
|
||||
type RPSTracker struct {
|
||||
lastCount atomic.Int64
|
||||
lastSampleTime atomic.Int64 // Unix nano
|
||||
currentRPS uint64 // stored as uint64, accessed with atomic operations
|
||||
mu sync.RWMutex // for currentRPS updates
|
||||
}
|
||||
|
||||
// NewRPSTracker creates a new RPS tracker
|
||||
func NewRPSTracker() *RPSTracker {
|
||||
tracker := &RPSTracker{}
|
||||
tracker.lastSampleTime.Store(time.Now().UnixNano())
|
||||
go tracker.updateLoop()
|
||||
return tracker
|
||||
}
|
||||
|
||||
// RecordRequest increments the request counter
|
||||
func (r *RPSTracker) RecordRequest() {
|
||||
// Just increment the counter, sampling happens in background
|
||||
r.lastCount.Add(1)
|
||||
}
|
||||
|
||||
// updateLoop periodically calculates current RPS
|
||||
func (r *RPSTracker) updateLoop() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
r.sample()
|
||||
}
|
||||
}
|
||||
|
||||
// sample calculates RPS since last sample
|
||||
func (r *RPSTracker) sample() {
|
||||
now := time.Now()
|
||||
nowNano := now.UnixNano()
|
||||
|
||||
currentCount := r.lastCount.Load()
|
||||
lastSampleNano := r.lastSampleTime.Load()
|
||||
|
||||
if lastSampleNano == 0 {
|
||||
r.lastSampleTime.Store(nowNano)
|
||||
return
|
||||
}
|
||||
|
||||
elapsed := float64(nowNano-lastSampleNano) / float64(time.Second)
|
||||
if elapsed > 0 {
|
||||
rps := float64(currentCount) / elapsed
|
||||
// Store RPS as centirps for precision (multiply by 100)
|
||||
r.mu.Lock()
|
||||
atomic.StoreUint64(&r.currentRPS, uint64(rps*100))
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Reset for next sample
|
||||
r.lastCount.Store(0)
|
||||
r.lastSampleTime.Store(nowNano)
|
||||
}
|
||||
|
||||
// GetCurrentRPS returns the current requests per second
|
||||
func (r *RPSTracker) GetCurrentRPS() float64 {
|
||||
r.mu.RLock()
|
||||
centirps := atomic.LoadUint64(&r.currentRPS)
|
||||
r.mu.RUnlock()
|
||||
return float64(centirps) / 100.0
|
||||
}
|
||||
|
||||
var globalRPSTracker *RPSTracker
|
||||
|
||||
// InitializeRPSTracker initializes the global RPS tracker
|
||||
func InitializeRPSTracker() *RPSTracker {
|
||||
if globalRPSTracker == nil {
|
||||
globalRPSTracker = NewRPSTracker()
|
||||
}
|
||||
return globalRPSTracker
|
||||
}
|
||||
|
||||
// GetRPSTracker returns the global RPS tracker
|
||||
func GetRPSTracker() *RPSTracker {
|
||||
return globalRPSTracker
|
||||
}
|
||||
Reference in New Issue
Block a user