Improvements to the queue processing.

This commit is contained in:
2025-12-16 00:49:35 +00:00
parent 4dc0a81582
commit cded6bb532
9 changed files with 202 additions and 39 deletions
+10 -1
View File
@@ -326,7 +326,16 @@ func (p *Processor) callClaudeCLI(ctx context.Context, prompt string) (string, e
// Use claude CLI with --print flag for non-interactive output
// and -p for prompt input
cmd := exec.CommandContext(ctx, p.claudePath, "--print", "-p", fullPrompt) // #nosec G204 -- claudePath is from config, fullPrompt is internal
// Add --tools "" to disable tools (we only need text analysis)
// Add --strict-mcp-config to skip loading MCP servers
// Add --disable-slash-commands to skip command loading
// These flags significantly speed up processing by avoiding plugin/MCP initialization
cmd := exec.CommandContext(ctx, p.claudePath,
"--print",
"--tools", "",
"--strict-mcp-config",
"--disable-slash-commands",
"-p", fullPrompt) // #nosec G204 -- claudePath is from config, fullPrompt is internal
// Set model if specified (use haiku for cost efficiency)
if p.model != "" {
+33 -2
View File
@@ -258,9 +258,22 @@ func (s *Service) initializeAsync() {
})
}
// Set callback for session deletion
// Set callbacks for session lifecycle events
sessionManager.SetOnSessionCreated(func(id int64) {
s.broadcastProcessingStatus()
s.sseBroadcaster.Broadcast(map[string]interface{}{
"type": "session",
"action": "created",
"id": id,
})
})
sessionManager.SetOnSessionDeleted(func(id int64) {
s.broadcastProcessingStatus()
s.sseBroadcaster.Broadcast(map[string]interface{}{
"type": "session",
"action": "deleted",
"id": id,
})
})
// Mark as ready
@@ -448,9 +461,22 @@ func (s *Service) reinitializeDatabase() {
})
}
// Set callback for session deletion
// Set callbacks for session lifecycle events
sessionManager.SetOnSessionCreated(func(id int64) {
s.broadcastProcessingStatus()
s.sseBroadcaster.Broadcast(map[string]interface{}{
"type": "session",
"action": "created",
"id": id,
})
})
sessionManager.SetOnSessionDeleted(func(id int64) {
s.broadcastProcessingStatus()
s.sseBroadcaster.Broadcast(map[string]interface{}{
"type": "session",
"action": "deleted",
"id": id,
})
})
// Mark as ready again
@@ -719,6 +745,7 @@ func (s *Service) Start() error {
}
// processQueue processes the observation queue in the background.
// Processes immediately when notified, or every QueueProcessInterval as fallback.
func (s *Service) processQueue() {
defer s.wg.Done()
@@ -729,7 +756,11 @@ func (s *Service) processQueue() {
select {
case <-s.ctx.Done():
return
case <-s.sessionManager.ProcessNotify:
// Immediate processing when observation is queued
s.processAllSessions()
case <-ticker.C:
// Fallback periodic processing
s.processAllSessions()
}
}
+106 -7
View File
@@ -62,20 +62,88 @@ type ActiveSession struct {
generatorActive atomic.Bool
}
// SessionTimeout is how long an inactive session can exist before cleanup.
const SessionTimeout = 30 * time.Minute
// CleanupInterval is how often to check for stale sessions.
const CleanupInterval = 5 * time.Minute
// Manager manages active session lifecycles.
type Manager struct {
sessionStore *sqlite.SessionStore
sessions map[int64]*ActiveSession
mu sync.RWMutex
onCreated func(int64)
onDeleted func(int64)
ctx context.Context
cancel context.CancelFunc
// Global notification channel for immediate processing
ProcessNotify chan struct{}
}
// NewManager creates a new session manager.
func NewManager(sessionStore *sqlite.SessionStore) *Manager {
return &Manager{
sessionStore: sessionStore,
sessions: make(map[int64]*ActiveSession),
ctx, cancel := context.WithCancel(context.Background())
m := &Manager{
sessionStore: sessionStore,
sessions: make(map[int64]*ActiveSession),
ctx: ctx,
cancel: cancel,
ProcessNotify: make(chan struct{}, 1),
}
// Start background cleanup goroutine
go m.cleanupLoop()
return m
}
// cleanupLoop periodically removes stale sessions.
func (m *Manager) cleanupLoop() {
ticker := time.NewTicker(CleanupInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.cleanupStaleSessions()
}
}
}
// cleanupStaleSessions removes sessions that have been inactive too long.
func (m *Manager) cleanupStaleSessions() {
m.mu.RLock()
var staleIDs []int64
now := time.Now()
for id, session := range m.sessions {
// Check if session has been inactive for too long
session.messageMu.Lock()
hasPending := len(session.pendingMessages) > 0
session.messageMu.Unlock()
// Don't delete sessions with pending messages or active processing
if hasPending || session.generatorActive.Load() {
continue
}
// Delete if session is older than timeout
if now.Sub(session.StartTime) > SessionTimeout {
staleIDs = append(staleIDs, id)
}
}
m.mu.RUnlock()
// Delete stale sessions
for _, id := range staleIDs {
log.Info().Int64("sessionId", id).Dur("age", SessionTimeout).Msg("Cleaning up stale session")
m.DeleteSession(id)
}
}
// SetOnSessionCreated sets a callback for when a session is created.
func (m *Manager) SetOnSessionCreated(callback func(int64)) {
m.onCreated = callback
}
// SetOnSessionDeleted sets a callback for when a session is deleted.
@@ -86,7 +154,6 @@ func (m *Manager) SetOnSessionDeleted(callback func(int64)) {
// InitializeSession initializes a session, creating it if needed.
func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, userPrompt string, promptNumber int) (*ActiveSession, error) {
m.mu.Lock()
defer m.mu.Unlock()
// Check if already active
if session, ok := m.sessions[sessionDBID]; ok {
@@ -95,10 +162,12 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user
session.UserPrompt = userPrompt
session.LastPromptNumber = promptNumber
}
m.mu.Unlock()
return session, nil
}
// Fetch from database
// Fetch from database (unlock during DB call to avoid blocking)
m.mu.Unlock()
dbSession, err := m.sessionStore.GetSessionByID(ctx, sessionDBID)
if err != nil {
return nil, err
@@ -135,7 +204,17 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user
cancel: cancel,
}
// Re-acquire lock to add session
m.mu.Lock()
// Double-check another goroutine didn't create it
if existing, ok := m.sessions[sessionDBID]; ok {
m.mu.Unlock()
cancel() // Clean up unused context
return existing, nil
}
m.sessions[sessionDBID] = session
onCreated := m.onCreated
m.mu.Unlock()
log.Info().
Int64("sessionId", sessionDBID).
@@ -143,6 +222,11 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user
Str("claudeSessionId", session.ClaudeSessionID).
Msg("Session initialized")
// Notify callback (outside lock)
if onCreated != nil {
onCreated(sessionDBID)
}
return session, nil
}
@@ -170,12 +254,18 @@ func (m *Manager) QueueObservation(ctx context.Context, sessionDBID int64, data
queueDepth := len(session.pendingMessages)
session.messageMu.Unlock()
// Non-blocking notification
// Non-blocking notification to session
select {
case session.notify <- struct{}{}:
default:
}
// Non-blocking notification to global processor
select {
case m.ProcessNotify <- struct{}{}:
default:
}
log.Info().
Int64("sessionId", sessionDBID).
Str("tool", data.ToolName).
@@ -212,12 +302,18 @@ func (m *Manager) QueueSummarize(ctx context.Context, sessionDBID int64, lastUse
queueDepth := len(session.pendingMessages)
session.messageMu.Unlock()
// Non-blocking notification
// Non-blocking notification to session
select {
case session.notify <- struct{}{}:
default:
}
// Non-blocking notification to global processor
select {
case m.ProcessNotify <- struct{}{}:
default:
}
log.Info().
Int64("sessionId", sessionDBID).
Int("queueDepth", queueDepth).
@@ -255,6 +351,9 @@ func (m *Manager) DeleteSession(sessionDBID int64) {
// ShutdownAll shuts down all active sessions.
func (m *Manager) ShutdownAll(ctx context.Context) {
// Stop cleanup goroutine
m.cancel()
m.mu.Lock()
sessionIDs := make([]int64, 0, len(m.sessions))
for id := range m.sessions {
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "claude-mnemonic-dashboard",
"version": "v0.6.1-5-gcd14ba8-dirty",
"version": "v0.6.1-7-gf0c35ee-dirty",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "claude-mnemonic-dashboard",
"version": "v0.6.1-5-gcd14ba8-dirty",
"version": "v0.6.1-7-gf0c35ee-dirty",
"dependencies": {
"vue": "^3.5.13"
},
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mnemonic-dashboard",
"version": "v0.6.1-5-gcd14ba8-dirty",
"version": "v0.6.1-7-gf0c35ee-dirty",
"private": true,
"type": "module",
"scripts": {
+2 -8
View File
@@ -1,5 +1,4 @@
<script setup lang="ts">
import { watch } from 'vue'
import { useSSE, useStats, useTimeline, useUpdate, useHealth } from '@/composables'
import Header from '@/components/Header.vue'
import StatsCards from '@/components/StatsCards.vue'
@@ -8,7 +7,7 @@ import Timeline from '@/components/Timeline.vue'
import Sidebar from '@/components/Sidebar.vue'
// Composables
const { isConnected, isProcessing, queueDepth, lastEvent } = useSSE()
const { isConnected, isProcessing, queueDepth } = useSSE()
const { stats } = useStats()
const { updateInfo, updateStatus, isUpdating, applyUpdate } = useUpdate()
const { health } = useHealth()
@@ -29,12 +28,7 @@ const {
setConceptFilter
} = useTimeline()
// Refresh timeline when new events arrive
watch(lastEvent, (event) => {
if (event && (event.type === 'observation' || event.type === 'prompt')) {
refresh()
}
})
// Note: Timeline refresh is handled by useTimeline's SSE watcher
</script>
<template>
+34 -17
View File
@@ -1,18 +1,22 @@
import { ref, onMounted, onUnmounted } from 'vue'
import type { SSEEvent } from '@/types'
export function useSSE() {
const isConnected = ref(false)
const isProcessing = ref(false)
const queueDepth = ref(0)
const lastEvent = ref<SSEEvent | null>(null)
// Singleton state - shared across all useSSE() calls
const isConnected = ref(false)
const isProcessing = ref(false)
const queueDepth = ref(0)
const lastEvent = ref<SSEEvent | null>(null)
let eventSource: EventSource | null = null
let reconnectTimeout: number | null = null
let eventSource: EventSource | null = null
let reconnectTimeout: number | null = null
let connectionCount = 0
export function useSSE() {
const connect = () => {
// Only create connection if not already connected
if (eventSource) {
eventSource.close()
return
}
eventSource = new EventSource('/api/events')
@@ -25,6 +29,12 @@ export function useSSE() {
eventSource.onmessage = (event) => {
try {
const data: SSEEvent = JSON.parse(event.data)
// Debug: log all SSE events
if (data.type !== 'processing_status') {
console.log('[SSE] Event received:', data.type, data)
}
lastEvent.value = data
if (data.type === 'processing_status') {
@@ -82,18 +92,25 @@ export function useSSE() {
}
onMounted(() => {
// Add listeners to close SSE on page refresh/navigation
window.addEventListener('beforeunload', handleBeforeUnload)
window.addEventListener('pagehide', handlePageHide)
window.addEventListener('pageshow', handlePageShow)
connect()
connectionCount++
if (connectionCount === 1) {
// First consumer - add listeners and connect
window.addEventListener('beforeunload', handleBeforeUnload)
window.addEventListener('pagehide', handlePageHide)
window.addEventListener('pageshow', handlePageShow)
connect()
}
})
onUnmounted(() => {
window.removeEventListener('beforeunload', handleBeforeUnload)
window.removeEventListener('pagehide', handlePageHide)
window.removeEventListener('pageshow', handlePageShow)
disconnect()
connectionCount--
if (connectionCount === 0) {
// Last consumer - remove listeners and disconnect
window.removeEventListener('beforeunload', handleBeforeUnload)
window.removeEventListener('pagehide', handlePageHide)
window.removeEventListener('pageshow', handlePageShow)
disconnect()
}
})
return {
+13 -1
View File
@@ -1,12 +1,16 @@
import { ref, onMounted, onUnmounted } from 'vue'
import { ref, onMounted, onUnmounted, watch } from 'vue'
import type { Stats } from '@/types'
import { fetchStats } from '@/utils/api'
import { useSSE } from './useSSE'
export function useStats(pollInterval: number = 5000) {
const stats = ref<Stats | null>(null)
const loading = ref(false)
const error = ref<string | null>(null)
// SSE for real-time session updates
const { lastEvent } = useSSE()
let intervalId: number | null = null
const refresh = async () => {
@@ -35,6 +39,14 @@ export function useStats(pollInterval: number = 5000) {
}
}
// Watch for SSE session events and refresh immediately
watch(lastEvent, (event) => {
if (event && event.type === 'session') {
console.log('[Stats] SSE session event triggered refresh:', event.action)
refresh()
}
})
onMounted(() => {
refresh()
startPolling()
+1
View File
@@ -112,6 +112,7 @@ export function useTimeline() {
// Watch for SSE events and refresh
watch(lastEvent, (event) => {
if (event && (event.type === 'observation' || event.type === 'prompt' || event.type === 'summary')) {
console.log('[Timeline] SSE event triggered refresh:', event.type)
refresh()
}
})