diff --git a/internal/worker/sdk/processor.go b/internal/worker/sdk/processor.go
index c23b23c..1415628 100644
--- a/internal/worker/sdk/processor.go
+++ b/internal/worker/sdk/processor.go
@@ -326,7 +326,16 @@ func (p *Processor) callClaudeCLI(ctx context.Context, prompt string) (string, e
// Use claude CLI with --print flag for non-interactive output
// and -p for prompt input
- cmd := exec.CommandContext(ctx, p.claudePath, "--print", "-p", fullPrompt) // #nosec G204 -- claudePath is from config, fullPrompt is internal
+ // Add --tools "" to disable tools (we only need text analysis)
+ // Add --strict-mcp-config to skip loading MCP servers
+ // Add --disable-slash-commands to skip command loading
+ // These flags significantly speed up processing by avoiding plugin/MCP initialization
+ cmd := exec.CommandContext(ctx, p.claudePath,
+ "--print",
+ "--tools", "",
+ "--strict-mcp-config",
+ "--disable-slash-commands",
+ "-p", fullPrompt) // #nosec G204 -- claudePath is from config, fullPrompt is internal
// Set model if specified (use haiku for cost efficiency)
if p.model != "" {
diff --git a/internal/worker/service.go b/internal/worker/service.go
index cb05974..c5b1ff4 100644
--- a/internal/worker/service.go
+++ b/internal/worker/service.go
@@ -258,9 +258,22 @@ func (s *Service) initializeAsync() {
})
}
- // Set callback for session deletion
+ // Set callbacks for session lifecycle events
+ sessionManager.SetOnSessionCreated(func(id int64) {
+ s.broadcastProcessingStatus()
+ s.sseBroadcaster.Broadcast(map[string]interface{}{
+ "type": "session",
+ "action": "created",
+ "id": id,
+ })
+ })
sessionManager.SetOnSessionDeleted(func(id int64) {
s.broadcastProcessingStatus()
+ s.sseBroadcaster.Broadcast(map[string]interface{}{
+ "type": "session",
+ "action": "deleted",
+ "id": id,
+ })
})
// Mark as ready
@@ -448,9 +461,22 @@ func (s *Service) reinitializeDatabase() {
})
}
- // Set callback for session deletion
+ // Set callbacks for session lifecycle events
+ sessionManager.SetOnSessionCreated(func(id int64) {
+ s.broadcastProcessingStatus()
+ s.sseBroadcaster.Broadcast(map[string]interface{}{
+ "type": "session",
+ "action": "created",
+ "id": id,
+ })
+ })
sessionManager.SetOnSessionDeleted(func(id int64) {
s.broadcastProcessingStatus()
+ s.sseBroadcaster.Broadcast(map[string]interface{}{
+ "type": "session",
+ "action": "deleted",
+ "id": id,
+ })
})
// Mark as ready again
@@ -719,6 +745,7 @@ func (s *Service) Start() error {
}
// processQueue processes the observation queue in the background.
+// Processes immediately when notified, or every QueueProcessInterval as fallback.
func (s *Service) processQueue() {
defer s.wg.Done()
@@ -729,7 +756,11 @@ func (s *Service) processQueue() {
select {
case <-s.ctx.Done():
return
+ case <-s.sessionManager.ProcessNotify:
+ // Immediate processing when observation is queued
+ s.processAllSessions()
case <-ticker.C:
+ // Fallback periodic processing
s.processAllSessions()
}
}
diff --git a/internal/worker/session/manager.go b/internal/worker/session/manager.go
index d050de4..c965fea 100644
--- a/internal/worker/session/manager.go
+++ b/internal/worker/session/manager.go
@@ -62,20 +62,88 @@ type ActiveSession struct {
generatorActive atomic.Bool
}
+// SessionTimeout is how long an inactive session can exist before cleanup.
+const SessionTimeout = 30 * time.Minute
+
+// CleanupInterval is how often to check for stale sessions.
+const CleanupInterval = 5 * time.Minute
+
// Manager manages active session lifecycles.
type Manager struct {
sessionStore *sqlite.SessionStore
sessions map[int64]*ActiveSession
mu sync.RWMutex
+ onCreated func(int64)
onDeleted func(int64)
+ ctx context.Context
+ cancel context.CancelFunc
+ // Global notification channel for immediate processing
+ ProcessNotify chan struct{}
}
// NewManager creates a new session manager.
func NewManager(sessionStore *sqlite.SessionStore) *Manager {
- return &Manager{
- sessionStore: sessionStore,
- sessions: make(map[int64]*ActiveSession),
+ ctx, cancel := context.WithCancel(context.Background())
+ m := &Manager{
+ sessionStore: sessionStore,
+ sessions: make(map[int64]*ActiveSession),
+ ctx: ctx,
+ cancel: cancel,
+ ProcessNotify: make(chan struct{}, 1),
}
+ // Start background cleanup goroutine
+ go m.cleanupLoop()
+ return m
+}
+
+// cleanupLoop periodically removes stale sessions.
+func (m *Manager) cleanupLoop() {
+ ticker := time.NewTicker(CleanupInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-m.ctx.Done():
+ return
+ case <-ticker.C:
+ m.cleanupStaleSessions()
+ }
+ }
+}
+
+// cleanupStaleSessions removes sessions that have been inactive too long.
+func (m *Manager) cleanupStaleSessions() {
+ m.mu.RLock()
+ var staleIDs []int64
+ now := time.Now()
+ for id, session := range m.sessions {
+ // Check if session has been inactive for too long
+ session.messageMu.Lock()
+ hasPending := len(session.pendingMessages) > 0
+ session.messageMu.Unlock()
+
+ // Don't delete sessions with pending messages or active processing
+ if hasPending || session.generatorActive.Load() {
+ continue
+ }
+
+ // Delete if session is older than timeout
+ if now.Sub(session.StartTime) > SessionTimeout {
+ staleIDs = append(staleIDs, id)
+ }
+ }
+ m.mu.RUnlock()
+
+ // Delete stale sessions
+ for _, id := range staleIDs {
+ log.Info().Int64("sessionId", id).Dur("age", SessionTimeout).Msg("Cleaning up stale session")
+ m.DeleteSession(id)
+ }
+}
+
+// SetOnSessionCreated sets a callback for when a session is created.
+func (m *Manager) SetOnSessionCreated(callback func(int64)) {
+ m.onCreated = callback
}
// SetOnSessionDeleted sets a callback for when a session is deleted.
@@ -86,7 +154,6 @@ func (m *Manager) SetOnSessionDeleted(callback func(int64)) {
// InitializeSession initializes a session, creating it if needed.
func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, userPrompt string, promptNumber int) (*ActiveSession, error) {
m.mu.Lock()
- defer m.mu.Unlock()
// Check if already active
if session, ok := m.sessions[sessionDBID]; ok {
@@ -95,10 +162,12 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user
session.UserPrompt = userPrompt
session.LastPromptNumber = promptNumber
}
+ m.mu.Unlock()
return session, nil
}
- // Fetch from database
+ // Fetch from database (unlock during DB call to avoid blocking)
+ m.mu.Unlock()
dbSession, err := m.sessionStore.GetSessionByID(ctx, sessionDBID)
if err != nil {
return nil, err
@@ -135,7 +204,17 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user
cancel: cancel,
}
+ // Re-acquire lock to add session
+ m.mu.Lock()
+ // Double-check another goroutine didn't create it
+ if existing, ok := m.sessions[sessionDBID]; ok {
+ m.mu.Unlock()
+ cancel() // Clean up unused context
+ return existing, nil
+ }
m.sessions[sessionDBID] = session
+ onCreated := m.onCreated
+ m.mu.Unlock()
log.Info().
Int64("sessionId", sessionDBID).
@@ -143,6 +222,11 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user
Str("claudeSessionId", session.ClaudeSessionID).
Msg("Session initialized")
+ // Notify callback (outside lock)
+ if onCreated != nil {
+ onCreated(sessionDBID)
+ }
+
return session, nil
}
@@ -170,12 +254,18 @@ func (m *Manager) QueueObservation(ctx context.Context, sessionDBID int64, data
queueDepth := len(session.pendingMessages)
session.messageMu.Unlock()
- // Non-blocking notification
+ // Non-blocking notification to session
select {
case session.notify <- struct{}{}:
default:
}
+ // Non-blocking notification to global processor
+ select {
+ case m.ProcessNotify <- struct{}{}:
+ default:
+ }
+
log.Info().
Int64("sessionId", sessionDBID).
Str("tool", data.ToolName).
@@ -212,12 +302,18 @@ func (m *Manager) QueueSummarize(ctx context.Context, sessionDBID int64, lastUse
queueDepth := len(session.pendingMessages)
session.messageMu.Unlock()
- // Non-blocking notification
+ // Non-blocking notification to session
select {
case session.notify <- struct{}{}:
default:
}
+ // Non-blocking notification to global processor
+ select {
+ case m.ProcessNotify <- struct{}{}:
+ default:
+ }
+
log.Info().
Int64("sessionId", sessionDBID).
Int("queueDepth", queueDepth).
@@ -255,6 +351,9 @@ func (m *Manager) DeleteSession(sessionDBID int64) {
// ShutdownAll shuts down all active sessions.
func (m *Manager) ShutdownAll(ctx context.Context) {
+ // Stop cleanup goroutine
+ m.cancel()
+
m.mu.Lock()
sessionIDs := make([]int64, 0, len(m.sessions))
for id := range m.sessions {
diff --git a/ui/package-lock.json b/ui/package-lock.json
index 892a505..aa80f31 100644
--- a/ui/package-lock.json
+++ b/ui/package-lock.json
@@ -1,12 +1,12 @@
{
"name": "claude-mnemonic-dashboard",
- "version": "v0.6.1-5-gcd14ba8-dirty",
+ "version": "v0.6.1-7-gf0c35ee-dirty",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "claude-mnemonic-dashboard",
- "version": "v0.6.1-5-gcd14ba8-dirty",
+ "version": "v0.6.1-7-gf0c35ee-dirty",
"dependencies": {
"vue": "^3.5.13"
},
diff --git a/ui/package.json b/ui/package.json
index dbeb571..e563a3c 100644
--- a/ui/package.json
+++ b/ui/package.json
@@ -1,6 +1,6 @@
{
"name": "claude-mnemonic-dashboard",
- "version": "v0.6.1-5-gcd14ba8-dirty",
+ "version": "v0.6.1-7-gf0c35ee-dirty",
"private": true,
"type": "module",
"scripts": {
diff --git a/ui/src/App.vue b/ui/src/App.vue
index 2b38384..5cfe4df 100644
--- a/ui/src/App.vue
+++ b/ui/src/App.vue
@@ -1,5 +1,4 @@
diff --git a/ui/src/composables/useSSE.ts b/ui/src/composables/useSSE.ts
index 035d8c1..97a8164 100644
--- a/ui/src/composables/useSSE.ts
+++ b/ui/src/composables/useSSE.ts
@@ -1,18 +1,22 @@
import { ref, onMounted, onUnmounted } from 'vue'
import type { SSEEvent } from '@/types'
-export function useSSE() {
- const isConnected = ref(false)
- const isProcessing = ref(false)
- const queueDepth = ref(0)
- const lastEvent = ref(null)
+// Singleton state - shared across all useSSE() calls
+const isConnected = ref(false)
+const isProcessing = ref(false)
+const queueDepth = ref(0)
+const lastEvent = ref(null)
- let eventSource: EventSource | null = null
- let reconnectTimeout: number | null = null
+let eventSource: EventSource | null = null
+let reconnectTimeout: number | null = null
+let connectionCount = 0
+
+export function useSSE() {
const connect = () => {
+ // Only create connection if not already connected
if (eventSource) {
- eventSource.close()
+ return
}
eventSource = new EventSource('/api/events')
@@ -25,6 +29,12 @@ export function useSSE() {
eventSource.onmessage = (event) => {
try {
const data: SSEEvent = JSON.parse(event.data)
+
+ // Debug: log all SSE events
+ if (data.type !== 'processing_status') {
+ console.log('[SSE] Event received:', data.type, data)
+ }
+
lastEvent.value = data
if (data.type === 'processing_status') {
@@ -82,18 +92,25 @@ export function useSSE() {
}
onMounted(() => {
- // Add listeners to close SSE on page refresh/navigation
- window.addEventListener('beforeunload', handleBeforeUnload)
- window.addEventListener('pagehide', handlePageHide)
- window.addEventListener('pageshow', handlePageShow)
- connect()
+ connectionCount++
+ if (connectionCount === 1) {
+ // First consumer - add listeners and connect
+ window.addEventListener('beforeunload', handleBeforeUnload)
+ window.addEventListener('pagehide', handlePageHide)
+ window.addEventListener('pageshow', handlePageShow)
+ connect()
+ }
})
onUnmounted(() => {
- window.removeEventListener('beforeunload', handleBeforeUnload)
- window.removeEventListener('pagehide', handlePageHide)
- window.removeEventListener('pageshow', handlePageShow)
- disconnect()
+ connectionCount--
+ if (connectionCount === 0) {
+ // Last consumer - remove listeners and disconnect
+ window.removeEventListener('beforeunload', handleBeforeUnload)
+ window.removeEventListener('pagehide', handlePageHide)
+ window.removeEventListener('pageshow', handlePageShow)
+ disconnect()
+ }
})
return {
diff --git a/ui/src/composables/useStats.ts b/ui/src/composables/useStats.ts
index 244806f..ed1f6c8 100644
--- a/ui/src/composables/useStats.ts
+++ b/ui/src/composables/useStats.ts
@@ -1,12 +1,16 @@
-import { ref, onMounted, onUnmounted } from 'vue'
+import { ref, onMounted, onUnmounted, watch } from 'vue'
import type { Stats } from '@/types'
import { fetchStats } from '@/utils/api'
+import { useSSE } from './useSSE'
export function useStats(pollInterval: number = 5000) {
const stats = ref(null)
const loading = ref(false)
const error = ref(null)
+ // SSE for real-time session updates
+ const { lastEvent } = useSSE()
+
let intervalId: number | null = null
const refresh = async () => {
@@ -35,6 +39,14 @@ export function useStats(pollInterval: number = 5000) {
}
}
+ // Watch for SSE session events and refresh immediately
+ watch(lastEvent, (event) => {
+ if (event && event.type === 'session') {
+ console.log('[Stats] SSE session event triggered refresh:', event.action)
+ refresh()
+ }
+ })
+
onMounted(() => {
refresh()
startPolling()
diff --git a/ui/src/composables/useTimeline.ts b/ui/src/composables/useTimeline.ts
index 6784c83..4f8466b 100644
--- a/ui/src/composables/useTimeline.ts
+++ b/ui/src/composables/useTimeline.ts
@@ -112,6 +112,7 @@ export function useTimeline() {
// Watch for SSE events and refresh
watch(lastEvent, (event) => {
if (event && (event.type === 'observation' || event.type === 'prompt' || event.type === 'summary')) {
+ console.log('[Timeline] SSE event triggered refresh:', event.type)
refresh()
}
})