From cded6bb5323a934d13253c5369d1d6253825fec9 Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Tue, 16 Dec 2025 00:49:35 +0000 Subject: [PATCH] Improvements to the queue processing. --- internal/worker/sdk/processor.go | 11 ++- internal/worker/service.go | 35 ++++++++- internal/worker/session/manager.go | 113 +++++++++++++++++++++++++++-- ui/package-lock.json | 4 +- ui/package.json | 2 +- ui/src/App.vue | 10 +-- ui/src/composables/useSSE.ts | 51 ++++++++----- ui/src/composables/useStats.ts | 14 +++- ui/src/composables/useTimeline.ts | 1 + 9 files changed, 202 insertions(+), 39 deletions(-) diff --git a/internal/worker/sdk/processor.go b/internal/worker/sdk/processor.go index c23b23c..1415628 100644 --- a/internal/worker/sdk/processor.go +++ b/internal/worker/sdk/processor.go @@ -326,7 +326,16 @@ func (p *Processor) callClaudeCLI(ctx context.Context, prompt string) (string, e // Use claude CLI with --print flag for non-interactive output // and -p for prompt input - cmd := exec.CommandContext(ctx, p.claudePath, "--print", "-p", fullPrompt) // #nosec G204 -- claudePath is from config, fullPrompt is internal + // Add --tools "" to disable tools (we only need text analysis) + // Add --strict-mcp-config to skip loading MCP servers + // Add --disable-slash-commands to skip command loading + // These flags significantly speed up processing by avoiding plugin/MCP initialization + cmd := exec.CommandContext(ctx, p.claudePath, + "--print", + "--tools", "", + "--strict-mcp-config", + "--disable-slash-commands", + "-p", fullPrompt) // #nosec G204 -- claudePath is from config, fullPrompt is internal // Set model if specified (use haiku for cost efficiency) if p.model != "" { diff --git a/internal/worker/service.go b/internal/worker/service.go index cb05974..c5b1ff4 100644 --- a/internal/worker/service.go +++ b/internal/worker/service.go @@ -258,9 +258,22 @@ func (s *Service) initializeAsync() { }) } - // Set callback for session deletion + // Set callbacks for session lifecycle events + sessionManager.SetOnSessionCreated(func(id int64) { + s.broadcastProcessingStatus() + s.sseBroadcaster.Broadcast(map[string]interface{}{ + "type": "session", + "action": "created", + "id": id, + }) + }) sessionManager.SetOnSessionDeleted(func(id int64) { s.broadcastProcessingStatus() + s.sseBroadcaster.Broadcast(map[string]interface{}{ + "type": "session", + "action": "deleted", + "id": id, + }) }) // Mark as ready @@ -448,9 +461,22 @@ func (s *Service) reinitializeDatabase() { }) } - // Set callback for session deletion + // Set callbacks for session lifecycle events + sessionManager.SetOnSessionCreated(func(id int64) { + s.broadcastProcessingStatus() + s.sseBroadcaster.Broadcast(map[string]interface{}{ + "type": "session", + "action": "created", + "id": id, + }) + }) sessionManager.SetOnSessionDeleted(func(id int64) { s.broadcastProcessingStatus() + s.sseBroadcaster.Broadcast(map[string]interface{}{ + "type": "session", + "action": "deleted", + "id": id, + }) }) // Mark as ready again @@ -719,6 +745,7 @@ func (s *Service) Start() error { } // processQueue processes the observation queue in the background. +// Processes immediately when notified, or every QueueProcessInterval as fallback. func (s *Service) processQueue() { defer s.wg.Done() @@ -729,7 +756,11 @@ func (s *Service) processQueue() { select { case <-s.ctx.Done(): return + case <-s.sessionManager.ProcessNotify: + // Immediate processing when observation is queued + s.processAllSessions() case <-ticker.C: + // Fallback periodic processing s.processAllSessions() } } diff --git a/internal/worker/session/manager.go b/internal/worker/session/manager.go index d050de4..c965fea 100644 --- a/internal/worker/session/manager.go +++ b/internal/worker/session/manager.go @@ -62,20 +62,88 @@ type ActiveSession struct { generatorActive atomic.Bool } +// SessionTimeout is how long an inactive session can exist before cleanup. +const SessionTimeout = 30 * time.Minute + +// CleanupInterval is how often to check for stale sessions. +const CleanupInterval = 5 * time.Minute + // Manager manages active session lifecycles. type Manager struct { sessionStore *sqlite.SessionStore sessions map[int64]*ActiveSession mu sync.RWMutex + onCreated func(int64) onDeleted func(int64) + ctx context.Context + cancel context.CancelFunc + // Global notification channel for immediate processing + ProcessNotify chan struct{} } // NewManager creates a new session manager. func NewManager(sessionStore *sqlite.SessionStore) *Manager { - return &Manager{ - sessionStore: sessionStore, - sessions: make(map[int64]*ActiveSession), + ctx, cancel := context.WithCancel(context.Background()) + m := &Manager{ + sessionStore: sessionStore, + sessions: make(map[int64]*ActiveSession), + ctx: ctx, + cancel: cancel, + ProcessNotify: make(chan struct{}, 1), } + // Start background cleanup goroutine + go m.cleanupLoop() + return m +} + +// cleanupLoop periodically removes stale sessions. +func (m *Manager) cleanupLoop() { + ticker := time.NewTicker(CleanupInterval) + defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.cleanupStaleSessions() + } + } +} + +// cleanupStaleSessions removes sessions that have been inactive too long. +func (m *Manager) cleanupStaleSessions() { + m.mu.RLock() + var staleIDs []int64 + now := time.Now() + for id, session := range m.sessions { + // Check if session has been inactive for too long + session.messageMu.Lock() + hasPending := len(session.pendingMessages) > 0 + session.messageMu.Unlock() + + // Don't delete sessions with pending messages or active processing + if hasPending || session.generatorActive.Load() { + continue + } + + // Delete if session is older than timeout + if now.Sub(session.StartTime) > SessionTimeout { + staleIDs = append(staleIDs, id) + } + } + m.mu.RUnlock() + + // Delete stale sessions + for _, id := range staleIDs { + log.Info().Int64("sessionId", id).Dur("age", SessionTimeout).Msg("Cleaning up stale session") + m.DeleteSession(id) + } +} + +// SetOnSessionCreated sets a callback for when a session is created. +func (m *Manager) SetOnSessionCreated(callback func(int64)) { + m.onCreated = callback } // SetOnSessionDeleted sets a callback for when a session is deleted. @@ -86,7 +154,6 @@ func (m *Manager) SetOnSessionDeleted(callback func(int64)) { // InitializeSession initializes a session, creating it if needed. func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, userPrompt string, promptNumber int) (*ActiveSession, error) { m.mu.Lock() - defer m.mu.Unlock() // Check if already active if session, ok := m.sessions[sessionDBID]; ok { @@ -95,10 +162,12 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user session.UserPrompt = userPrompt session.LastPromptNumber = promptNumber } + m.mu.Unlock() return session, nil } - // Fetch from database + // Fetch from database (unlock during DB call to avoid blocking) + m.mu.Unlock() dbSession, err := m.sessionStore.GetSessionByID(ctx, sessionDBID) if err != nil { return nil, err @@ -135,7 +204,17 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user cancel: cancel, } + // Re-acquire lock to add session + m.mu.Lock() + // Double-check another goroutine didn't create it + if existing, ok := m.sessions[sessionDBID]; ok { + m.mu.Unlock() + cancel() // Clean up unused context + return existing, nil + } m.sessions[sessionDBID] = session + onCreated := m.onCreated + m.mu.Unlock() log.Info(). Int64("sessionId", sessionDBID). @@ -143,6 +222,11 @@ func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, user Str("claudeSessionId", session.ClaudeSessionID). Msg("Session initialized") + // Notify callback (outside lock) + if onCreated != nil { + onCreated(sessionDBID) + } + return session, nil } @@ -170,12 +254,18 @@ func (m *Manager) QueueObservation(ctx context.Context, sessionDBID int64, data queueDepth := len(session.pendingMessages) session.messageMu.Unlock() - // Non-blocking notification + // Non-blocking notification to session select { case session.notify <- struct{}{}: default: } + // Non-blocking notification to global processor + select { + case m.ProcessNotify <- struct{}{}: + default: + } + log.Info(). Int64("sessionId", sessionDBID). Str("tool", data.ToolName). @@ -212,12 +302,18 @@ func (m *Manager) QueueSummarize(ctx context.Context, sessionDBID int64, lastUse queueDepth := len(session.pendingMessages) session.messageMu.Unlock() - // Non-blocking notification + // Non-blocking notification to session select { case session.notify <- struct{}{}: default: } + // Non-blocking notification to global processor + select { + case m.ProcessNotify <- struct{}{}: + default: + } + log.Info(). Int64("sessionId", sessionDBID). Int("queueDepth", queueDepth). @@ -255,6 +351,9 @@ func (m *Manager) DeleteSession(sessionDBID int64) { // ShutdownAll shuts down all active sessions. func (m *Manager) ShutdownAll(ctx context.Context) { + // Stop cleanup goroutine + m.cancel() + m.mu.Lock() sessionIDs := make([]int64, 0, len(m.sessions)) for id := range m.sessions { diff --git a/ui/package-lock.json b/ui/package-lock.json index 892a505..aa80f31 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-5-gcd14ba8-dirty", + "version": "v0.6.1-7-gf0c35ee-dirty", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-5-gcd14ba8-dirty", + "version": "v0.6.1-7-gf0c35ee-dirty", "dependencies": { "vue": "^3.5.13" }, diff --git a/ui/package.json b/ui/package.json index dbeb571..e563a3c 100644 --- a/ui/package.json +++ b/ui/package.json @@ -1,6 +1,6 @@ { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-5-gcd14ba8-dirty", + "version": "v0.6.1-7-gf0c35ee-dirty", "private": true, "type": "module", "scripts": { diff --git a/ui/src/App.vue b/ui/src/App.vue index 2b38384..5cfe4df 100644 --- a/ui/src/App.vue +++ b/ui/src/App.vue @@ -1,5 +1,4 @@