diff --git a/internal/worker/sdk/processor.go b/internal/worker/sdk/processor.go index 1415628..4316e20 100644 --- a/internal/worker/sdk/processor.go +++ b/internal/worker/sdk/processor.go @@ -111,12 +111,18 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec return nil } - log.Info().Str("tool", toolName).Msg("Processing tool execution with Claude CLI") - - // Convert tool data to strings + // Convert tool data to strings for pre-filtering inputStr := toJSONString(toolInput) outputStr := toJSONString(toolResponse) + // Pre-filter trivial operations without calling Haiku + if shouldSkipTrivialOperation(toolName, inputStr, outputStr) { + log.Debug().Str("tool", toolName).Msg("Skipping trivial operation (pre-filter)") + return nil + } + + log.Info().Str("tool", toolName).Msg("Processing tool execution with Claude CLI") + // Check if we already have observations for this file (skip if covered) if filePath := extractFilePath(toolName, inputStr); filePath != "" { exists, err := p.observationStore.ExistsSimilarObservation(ctx, project, []string{filePath}, nil) @@ -372,19 +378,103 @@ func (p *Processor) callClaudeCLI(ctx context.Context, prompt string) (string, e // shouldSkipTool returns true for tools that aren't worth processing. func shouldSkipTool(toolName string) bool { - // Only skip truly uninteresting tools + // Skip tools that rarely produce meaningful observations skipTools := map[string]bool{ - "TodoWrite": true, // Skip TodoWrite - internal tracking - "Task": true, // Skip Task - sub-agent spawning - "TaskOutput": true, // Skip TaskOutput - sub-agent results - "Glob": true, // Skip Glob - just file listing + // Internal tracking tools + "TodoWrite": true, + "Task": true, + "TaskOutput": true, + + // File discovery tools (just listings, no insights) + "Glob": true, + "ListDir": true, + "LS": true, + "KillShell": true, + + // Question/interaction tools (no code insights) + "AskUserQuestion": true, + + // Plan mode tools (planning, not execution) + "EnterPlanMode": true, + "ExitPlanMode": true, + + // Skill/command execution (meta-operations) + "Skill": true, + "SlashCommand": true, } skip, found := skipTools[toolName] if found { return skip } - return false // Process all other tools + return false // Process remaining tools: Read, Edit, Write, Grep, Bash, WebFetch, WebSearch, NotebookEdit +} + +// shouldSkipTrivialOperation performs local pre-filtering to skip trivial operations +// without making a Haiku API call. Returns true if the operation is too trivial to process. +func shouldSkipTrivialOperation(toolName, inputStr, outputStr string) bool { + // Skip if output is too small to be meaningful (less than 100 chars) + if len(outputStr) < 100 { + return true + } + + // Skip if output indicates an error or empty result + lowerOutput := strings.ToLower(outputStr) + trivialOutputs := []string{ + "no matches found", + "file not found", + "directory not found", + "permission denied", + "command not found", + "no such file", + "is a directory", + "[]", // Empty array result + "{}", // Empty object result + } + for _, trivial := range trivialOutputs { + if strings.Contains(lowerOutput, trivial) || outputStr == trivial { + return true + } + } + + // Tool-specific pre-filtering + switch toolName { + case "Read": + // Skip reading config files that rarely contain project-specific insights + boringFiles := []string{ + "package-lock.json", "yarn.lock", "pnpm-lock.yaml", + "go.sum", "Cargo.lock", "Gemfile.lock", "poetry.lock", + ".gitignore", ".dockerignore", ".eslintignore", + "tsconfig.json", "jsconfig.json", "vite.config", + "tailwind.config", "postcss.config", + } + for _, boring := range boringFiles { + if strings.Contains(inputStr, boring) { + return true + } + } + + case "Grep": + // Skip grep results with too many matches (likely generic search) + if strings.Count(outputStr, "\n") > 50 { + return true + } + + case "Bash": + // Skip simple status commands + boringCommands := []string{ + "git status", "git diff", "git log", "git branch", + "ls ", "pwd", "echo ", "cat ", "which ", "type ", + "npm list", "npm outdated", "npm audit", + } + for _, boring := range boringCommands { + if strings.Contains(strings.ToLower(inputStr), boring) { + return true + } + } + } + + return false } // extractFilePath extracts the file path from tool input for deduplication. diff --git a/ui/package-lock.json b/ui/package-lock.json index aa80f31..c8d6b51 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-7-gf0c35ee-dirty", + "version": "v0.6.1-8-ge52f328-dirty", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-7-gf0c35ee-dirty", + "version": "v0.6.1-8-ge52f328-dirty", "dependencies": { "vue": "^3.5.13" }, diff --git a/ui/package.json b/ui/package.json index e563a3c..72bc609 100644 --- a/ui/package.json +++ b/ui/package.json @@ -1,6 +1,6 @@ { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-7-gf0c35ee-dirty", + "version": "v0.6.1-8-ge52f328-dirty", "private": true, "type": "module", "scripts": { diff --git a/ui/src/composables/useSSE.ts b/ui/src/composables/useSSE.ts index 97a8164..5d73b6d 100644 --- a/ui/src/composables/useSSE.ts +++ b/ui/src/composables/useSSE.ts @@ -10,6 +10,19 @@ const lastEvent = ref(null) let eventSource: EventSource | null = null let reconnectTimeout: number | null = null let connectionCount = 0 +let reconnectAttempt = 0 + +// Exponential backoff configuration +const MIN_BACKOFF = 1000 // 1 second +const MAX_BACKOFF = 30000 // 30 seconds +const BACKOFF_MULTIPLIER = 2 +const JITTER_FACTOR = 0.2 // 20% jitter + +function getBackoffDelay(): number { + const baseDelay = Math.min(MIN_BACKOFF * Math.pow(BACKOFF_MULTIPLIER, reconnectAttempt), MAX_BACKOFF) + const jitter = baseDelay * JITTER_FACTOR * Math.random() + return Math.floor(baseDelay + jitter) +} export function useSSE() { @@ -23,6 +36,7 @@ export function useSSE() { eventSource.onopen = () => { isConnected.value = true + reconnectAttempt = 0 // Reset backoff on successful connection console.log('[SSE] Connected') } @@ -51,11 +65,14 @@ export function useSSE() { eventSource?.close() eventSource = null - // Reconnect after 5 seconds + // Exponential backoff with jitter + const delay = getBackoffDelay() + reconnectAttempt++ + console.log(`[SSE] Reconnecting in ${Math.round(delay/1000)}s (attempt ${reconnectAttempt})`) + reconnectTimeout = window.setTimeout(() => { - console.log('[SSE] Reconnecting...') connect() - }, 5000) + }, delay) } } diff --git a/ui/src/composables/useStats.ts b/ui/src/composables/useStats.ts index ed1f6c8..6554116 100644 --- a/ui/src/composables/useStats.ts +++ b/ui/src/composables/useStats.ts @@ -3,15 +3,18 @@ import type { Stats } from '@/types' import { fetchStats } from '@/utils/api' import { useSSE } from './useSSE' -export function useStats(pollInterval: number = 5000) { +// Fallback poll interval when SSE is disconnected +const FALLBACK_POLL_INTERVAL = 10000 // 10 seconds + +export function useStats() { const stats = ref(null) const loading = ref(false) const error = ref(null) // SSE for real-time session updates - const { lastEvent } = useSSE() + const { lastEvent, isConnected } = useSSE() - let intervalId: number | null = null + let fallbackIntervalId: number | null = null const refresh = async () => { loading.value = true @@ -27,33 +30,50 @@ export function useStats(pollInterval: number = 5000) { } } - const startPolling = () => { - if (intervalId) return - intervalId = window.setInterval(refresh, pollInterval) + const startFallbackPolling = () => { + if (fallbackIntervalId) return + console.log('[Stats] SSE disconnected, starting fallback polling') + fallbackIntervalId = window.setInterval(refresh, FALLBACK_POLL_INTERVAL) } - const stopPolling = () => { - if (intervalId) { - clearInterval(intervalId) - intervalId = null + const stopFallbackPolling = () => { + if (fallbackIntervalId) { + console.log('[Stats] SSE connected, stopping fallback polling') + clearInterval(fallbackIntervalId) + fallbackIntervalId = null } } - // Watch for SSE session events and refresh immediately + // Watch for SSE events that affect stats watch(lastEvent, (event) => { - if (event && event.type === 'session') { - console.log('[Stats] SSE session event triggered refresh:', event.action) + if (event && (event.type === 'session' || event.type === 'processing_status')) { + if (event.type === 'session') { + console.log('[Stats] SSE session event triggered refresh:', event.action) + } refresh() } }) + // Switch between SSE-driven and fallback polling based on connection status + watch(isConnected, (connected) => { + if (connected) { + stopFallbackPolling() + refresh() // Refresh immediately on reconnect + } else { + startFallbackPolling() + } + }) + onMounted(() => { refresh() - startPolling() + // Start fallback polling only if SSE is not connected + if (!isConnected.value) { + startFallbackPolling() + } }) onUnmounted(() => { - stopPolling() + stopFallbackPolling() }) return { diff --git a/ui/src/composables/useTimeline.ts b/ui/src/composables/useTimeline.ts index 4f8466b..56a1af3 100644 --- a/ui/src/composables/useTimeline.ts +++ b/ui/src/composables/useTimeline.ts @@ -1,8 +1,21 @@ -import { ref, computed, onMounted, watch } from 'vue' +import { ref, computed, onMounted, onUnmounted, watch } from 'vue' import type { FeedItem, FilterType, ObservationType, ConceptType } from '@/types' import { fetchObservations, fetchPrompts, fetchSummaries, combineTimeline } from '@/utils/api' import { useSSE } from './useSSE' +// Debounce utility +function debounce void>(fn: T, ms: number): T & { cancel: () => void } { + let timeoutId: number | null = null + const debounced = ((...args: unknown[]) => { + if (timeoutId) clearTimeout(timeoutId) + timeoutId = window.setTimeout(() => fn(...args), ms) + }) as T & { cancel: () => void } + debounced.cancel = () => { + if (timeoutId) clearTimeout(timeoutId) + } + return debounced +} + export function useTimeline() { const observations = ref([]) const prompts = ref([]) @@ -18,6 +31,9 @@ export function useTimeline() { const currentTypeFilter = ref(null) const currentConceptFilter = ref(null) + // Request cancellation + let abortController: AbortController | null = null + // SSE for real-time updates const { lastEvent } = useSSE() @@ -60,6 +76,13 @@ export function useTimeline() { }) const refresh = async () => { + // Cancel any in-flight request + if (abortController) { + abortController.abort() + } + abortController = new AbortController() + const signal = abortController.signal + loading.value = true error.value = null @@ -71,9 +94,9 @@ export function useTimeline() { const limit = project ? 100 : 50 const [obs, prm, sum] = await Promise.all([ - fetchObservations(limit, project), - fetchPrompts(limit, project), - fetchSummaries(limit, project) + fetchObservations(limit, project, signal), + fetchPrompts(limit, project, signal), + fetchSummaries(limit, project, signal) ]) // Combine into timeline @@ -84,6 +107,10 @@ export function useTimeline() { prompts.value = allItems.value.filter(i => i.itemType === 'prompt') summaries.value = allItems.value.filter(i => i.itemType === 'summary') } catch (err) { + // Ignore aborted requests + if (err instanceof Error && err.name === 'AbortError') { + return + } error.value = err instanceof Error ? err.message : 'Failed to fetch timeline' console.error('[Timeline] Error:', err) } finally { @@ -91,6 +118,12 @@ export function useTimeline() { } } + // Debounced refresh for SSE events (300ms delay) + const debouncedRefresh = debounce(() => { + console.log('[Timeline] Debounced refresh triggered') + refresh() + }, 300) + const setFilter = (filter: FilterType) => { currentFilter.value = filter } @@ -109,11 +142,11 @@ export function useTimeline() { currentConceptFilter.value = concept } - // Watch for SSE events and refresh + // Watch for SSE events and debounced refresh watch(lastEvent, (event) => { if (event && (event.type === 'observation' || event.type === 'prompt' || event.type === 'summary')) { - console.log('[Timeline] SSE event triggered refresh:', event.type) - refresh() + console.log('[Timeline] SSE event queued refresh:', event.type) + debouncedRefresh() } }) @@ -121,6 +154,14 @@ export function useTimeline() { refresh() }) + onUnmounted(() => { + // Cancel pending requests and debounced calls + debouncedRefresh.cancel() + if (abortController) { + abortController.abort() + } + }) + return { allItems, filteredItems, diff --git a/ui/src/utils/api.ts b/ui/src/utils/api.ts index 0f6199c..58a022e 100644 --- a/ui/src/utils/api.ts +++ b/ui/src/utils/api.ts @@ -1,39 +1,118 @@ import type { Observation, UserPrompt, SessionSummary, Stats, FeedItem, ObservationFeedItem, PromptFeedItem, SummaryFeedItem } from '@/types' const API_BASE = '/api' +const DEFAULT_TIMEOUT = 10000 // 10 seconds +const MAX_RETRIES = 3 +const RETRY_DELAY = 1000 // 1 second base delay -async function fetchJson(url: string): Promise { - const response = await fetch(url) - if (!response.ok) { - throw new Error(`HTTP ${response.status}: ${response.statusText}`) +interface FetchOptions { + timeout?: number + signal?: AbortSignal + retries?: number +} + +async function fetchJson(url: string, options: FetchOptions = {}): Promise { + const { timeout = DEFAULT_TIMEOUT, signal } = options + + // Create timeout abort controller + const timeoutController = new AbortController() + const timeoutId = setTimeout(() => timeoutController.abort(), timeout) + + // Combine signals if both provided + const combinedSignal = signal + ? combineAbortSignals(signal, timeoutController.signal) + : timeoutController.signal + + try { + const response = await fetch(url, { signal: combinedSignal }) + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`) + } + return response.json() + } catch (err) { + // Re-throw abort errors (user cancellation) + if (err instanceof Error && err.name === 'AbortError') { + // Check if it was a timeout vs user abort + if (signal?.aborted) { + throw err // User cancelled + } + throw new Error('Request timed out') + } + throw err + } finally { + clearTimeout(timeoutId) } - return response.json() } -export async function fetchObservations(limit: number = 100, project?: string): Promise { - const params = new URLSearchParams({ limit: String(limit) }) - if (project) params.append('project', project) - return fetchJson(`${API_BASE}/observations?${params}`) +// Helper to combine multiple abort signals +function combineAbortSignals(...signals: AbortSignal[]): AbortSignal { + const controller = new AbortController() + for (const signal of signals) { + if (signal.aborted) { + controller.abort() + break + } + signal.addEventListener('abort', () => controller.abort(), { once: true }) + } + return controller.signal } -export async function fetchPrompts(limit: number = 100, project?: string): Promise { - const params = new URLSearchParams({ limit: String(limit) }) - if (project) params.append('project', project) - return fetchJson(`${API_BASE}/prompts?${params}`) +// Fetch with retry logic +async function fetchWithRetry(url: string, options: FetchOptions = {}): Promise { + const { retries = MAX_RETRIES, ...fetchOptions } = options + let lastError: Error | null = null + + for (let attempt = 0; attempt < retries; attempt++) { + try { + return await fetchJson(url, fetchOptions) + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)) + + // Don't retry on user abort + if (lastError.name === 'AbortError') { + throw lastError + } + + // Don't retry on client errors (4xx) + if (lastError.message.includes('HTTP 4')) { + throw lastError + } + + // Wait before retry (exponential backoff) + if (attempt < retries - 1) { + const delay = Math.min(RETRY_DELAY * Math.pow(2, attempt), 5000) + await new Promise(r => setTimeout(r, delay)) + } + } + } + + throw lastError! } -export async function fetchSummaries(limit: number = 50, project?: string): Promise { +export async function fetchObservations(limit: number = 100, project?: string, signal?: AbortSignal): Promise { const params = new URLSearchParams({ limit: String(limit) }) if (project) params.append('project', project) - return fetchJson(`${API_BASE}/summaries?${params}`) + return fetchWithRetry(`${API_BASE}/observations?${params}`, { signal }) +} + +export async function fetchPrompts(limit: number = 100, project?: string, signal?: AbortSignal): Promise { + const params = new URLSearchParams({ limit: String(limit) }) + if (project) params.append('project', project) + return fetchWithRetry(`${API_BASE}/prompts?${params}`, { signal }) +} + +export async function fetchSummaries(limit: number = 50, project?: string, signal?: AbortSignal): Promise { + const params = new URLSearchParams({ limit: String(limit) }) + if (project) params.append('project', project) + return fetchWithRetry(`${API_BASE}/summaries?${params}`, { signal }) } export async function fetchStats(): Promise { - return fetchJson(`${API_BASE}/stats`) + return fetchWithRetry(`${API_BASE}/stats`) } export async function fetchProjects(): Promise { - return fetchJson(`${API_BASE}/projects`) + return fetchWithRetry(`${API_BASE}/projects`) } /**