Dashboard and sdk processor improvements.

This commit is contained in:
2025-12-16 01:00:23 +00:00
parent cded6bb532
commit c40fa7317b
7 changed files with 301 additions and 54 deletions
+20 -3
View File
@@ -10,6 +10,19 @@ const lastEvent = ref<SSEEvent | null>(null)
let eventSource: EventSource | null = null
let reconnectTimeout: number | null = null
let connectionCount = 0
let reconnectAttempt = 0
// Exponential backoff configuration
const MIN_BACKOFF = 1000 // 1 second
const MAX_BACKOFF = 30000 // 30 seconds
const BACKOFF_MULTIPLIER = 2
const JITTER_FACTOR = 0.2 // 20% jitter
function getBackoffDelay(): number {
const baseDelay = Math.min(MIN_BACKOFF * Math.pow(BACKOFF_MULTIPLIER, reconnectAttempt), MAX_BACKOFF)
const jitter = baseDelay * JITTER_FACTOR * Math.random()
return Math.floor(baseDelay + jitter)
}
export function useSSE() {
@@ -23,6 +36,7 @@ export function useSSE() {
eventSource.onopen = () => {
isConnected.value = true
reconnectAttempt = 0 // Reset backoff on successful connection
console.log('[SSE] Connected')
}
@@ -51,11 +65,14 @@ export function useSSE() {
eventSource?.close()
eventSource = null
// Reconnect after 5 seconds
// Exponential backoff with jitter
const delay = getBackoffDelay()
reconnectAttempt++
console.log(`[SSE] Reconnecting in ${Math.round(delay/1000)}s (attempt ${reconnectAttempt})`)
reconnectTimeout = window.setTimeout(() => {
console.log('[SSE] Reconnecting...')
connect()
}, 5000)
}, delay)
}
}
+35 -15
View File
@@ -3,15 +3,18 @@ import type { Stats } from '@/types'
import { fetchStats } from '@/utils/api'
import { useSSE } from './useSSE'
export function useStats(pollInterval: number = 5000) {
// Fallback poll interval when SSE is disconnected
const FALLBACK_POLL_INTERVAL = 10000 // 10 seconds
export function useStats() {
const stats = ref<Stats | null>(null)
const loading = ref(false)
const error = ref<string | null>(null)
// SSE for real-time session updates
const { lastEvent } = useSSE()
const { lastEvent, isConnected } = useSSE()
let intervalId: number | null = null
let fallbackIntervalId: number | null = null
const refresh = async () => {
loading.value = true
@@ -27,33 +30,50 @@ export function useStats(pollInterval: number = 5000) {
}
}
const startPolling = () => {
if (intervalId) return
intervalId = window.setInterval(refresh, pollInterval)
const startFallbackPolling = () => {
if (fallbackIntervalId) return
console.log('[Stats] SSE disconnected, starting fallback polling')
fallbackIntervalId = window.setInterval(refresh, FALLBACK_POLL_INTERVAL)
}
const stopPolling = () => {
if (intervalId) {
clearInterval(intervalId)
intervalId = null
const stopFallbackPolling = () => {
if (fallbackIntervalId) {
console.log('[Stats] SSE connected, stopping fallback polling')
clearInterval(fallbackIntervalId)
fallbackIntervalId = null
}
}
// Watch for SSE session events and refresh immediately
// Watch for SSE events that affect stats
watch(lastEvent, (event) => {
if (event && event.type === 'session') {
console.log('[Stats] SSE session event triggered refresh:', event.action)
if (event && (event.type === 'session' || event.type === 'processing_status')) {
if (event.type === 'session') {
console.log('[Stats] SSE session event triggered refresh:', event.action)
}
refresh()
}
})
// Switch between SSE-driven and fallback polling based on connection status
watch(isConnected, (connected) => {
if (connected) {
stopFallbackPolling()
refresh() // Refresh immediately on reconnect
} else {
startFallbackPolling()
}
})
onMounted(() => {
refresh()
startPolling()
// Start fallback polling only if SSE is not connected
if (!isConnected.value) {
startFallbackPolling()
}
})
onUnmounted(() => {
stopPolling()
stopFallbackPolling()
})
return {
+48 -7
View File
@@ -1,8 +1,21 @@
import { ref, computed, onMounted, watch } from 'vue'
import { ref, computed, onMounted, onUnmounted, watch } from 'vue'
import type { FeedItem, FilterType, ObservationType, ConceptType } from '@/types'
import { fetchObservations, fetchPrompts, fetchSummaries, combineTimeline } from '@/utils/api'
import { useSSE } from './useSSE'
// Debounce utility
function debounce<T extends (...args: unknown[]) => void>(fn: T, ms: number): T & { cancel: () => void } {
let timeoutId: number | null = null
const debounced = ((...args: unknown[]) => {
if (timeoutId) clearTimeout(timeoutId)
timeoutId = window.setTimeout(() => fn(...args), ms)
}) as T & { cancel: () => void }
debounced.cancel = () => {
if (timeoutId) clearTimeout(timeoutId)
}
return debounced
}
export function useTimeline() {
const observations = ref<FeedItem[]>([])
const prompts = ref<FeedItem[]>([])
@@ -18,6 +31,9 @@ export function useTimeline() {
const currentTypeFilter = ref<ObservationType | null>(null)
const currentConceptFilter = ref<ConceptType | null>(null)
// Request cancellation
let abortController: AbortController | null = null
// SSE for real-time updates
const { lastEvent } = useSSE()
@@ -60,6 +76,13 @@ export function useTimeline() {
})
const refresh = async () => {
// Cancel any in-flight request
if (abortController) {
abortController.abort()
}
abortController = new AbortController()
const signal = abortController.signal
loading.value = true
error.value = null
@@ -71,9 +94,9 @@ export function useTimeline() {
const limit = project ? 100 : 50
const [obs, prm, sum] = await Promise.all([
fetchObservations(limit, project),
fetchPrompts(limit, project),
fetchSummaries(limit, project)
fetchObservations(limit, project, signal),
fetchPrompts(limit, project, signal),
fetchSummaries(limit, project, signal)
])
// Combine into timeline
@@ -84,6 +107,10 @@ export function useTimeline() {
prompts.value = allItems.value.filter(i => i.itemType === 'prompt')
summaries.value = allItems.value.filter(i => i.itemType === 'summary')
} catch (err) {
// Ignore aborted requests
if (err instanceof Error && err.name === 'AbortError') {
return
}
error.value = err instanceof Error ? err.message : 'Failed to fetch timeline'
console.error('[Timeline] Error:', err)
} finally {
@@ -91,6 +118,12 @@ export function useTimeline() {
}
}
// Debounced refresh for SSE events (300ms delay)
const debouncedRefresh = debounce(() => {
console.log('[Timeline] Debounced refresh triggered')
refresh()
}, 300)
const setFilter = (filter: FilterType) => {
currentFilter.value = filter
}
@@ -109,11 +142,11 @@ export function useTimeline() {
currentConceptFilter.value = concept
}
// Watch for SSE events and refresh
// Watch for SSE events and debounced refresh
watch(lastEvent, (event) => {
if (event && (event.type === 'observation' || event.type === 'prompt' || event.type === 'summary')) {
console.log('[Timeline] SSE event triggered refresh:', event.type)
refresh()
console.log('[Timeline] SSE event queued refresh:', event.type)
debouncedRefresh()
}
})
@@ -121,6 +154,14 @@ export function useTimeline() {
refresh()
})
onUnmounted(() => {
// Cancel pending requests and debounced calls
debouncedRefresh.cancel()
if (abortController) {
abortController.abort()
}
})
return {
allItems,
filteredItems,