From 0af10165c679e4453bc4aacd142508dad3fe82e1 Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Tue, 16 Dec 2025 00:29:45 +0000 Subject: [PATCH] Move SDK processing to async queue. --- internal/worker/sdk/processor.go | 31 ++++++++--- internal/worker/service.go | 93 ++++++++++++++++++-------------- ui/package-lock.json | 4 +- ui/package.json | 2 +- 4 files changed, 79 insertions(+), 51 deletions(-) diff --git a/internal/worker/sdk/processor.go b/internal/worker/sdk/processor.go index adfcc7f..c23b23c 100644 --- a/internal/worker/sdk/processor.go +++ b/internal/worker/sdk/processor.go @@ -9,7 +9,6 @@ import ( "os/exec" "path/filepath" "strings" - "sync" "time" json "github.com/goccy/go-json" @@ -39,7 +38,8 @@ type Processor struct { broadcastFunc BroadcastFunc syncObservationFunc SyncObservationFunc syncSummaryFunc SyncSummaryFunc - mu sync.Mutex + // Semaphore to limit concurrent Claude CLI calls (prevents API overload) + sem chan struct{} } // SetBroadcastFunc sets the broadcast callback for SSE events. @@ -64,6 +64,10 @@ func (p *Processor) broadcast(event map[string]interface{}) { } } +// MaxConcurrentCLICalls is the maximum number of concurrent Claude CLI calls. +// This prevents overwhelming the API and manages resource usage. +const MaxConcurrentCLICalls = 4 + // NewProcessor creates a new SDK processor. func NewProcessor(observationStore *sqlite.ObservationStore, summaryStore *sqlite.SummaryStore) (*Processor, error) { cfg := config.Get() @@ -89,6 +93,7 @@ func NewProcessor(observationStore *sqlite.ObservationStore, summaryStore *sqlit model: cfg.Model, observationStore: observationStore, summaryStore: summaryStore, + sem: make(chan struct{}, MaxConcurrentCLICalls), }, nil } @@ -100,9 +105,6 @@ func (p *Processor) IsAvailable() bool { // ProcessObservation processes a single tool observation and extracts insights. func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, project string, toolName string, toolInput, toolResponse interface{}, promptNumber int, cwd string) error { - p.mu.Lock() - defer p.mu.Unlock() - // Skip certain tools that aren't worth processing if shouldSkipTool(toolName) { log.Info().Str("tool", toolName).Msg("Skipping tool (not interesting for memory)") @@ -136,6 +138,14 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec } prompt := BuildObservationPrompt(exec) + // Acquire semaphore slot (limits concurrent CLI calls) + select { + case p.sem <- struct{}{}: + defer func() { <-p.sem }() + case <-ctx.Done(): + return ctx.Err() + } + // Call Claude Code CLI response, err := p.callClaudeCLI(ctx, prompt) if err != nil { @@ -226,9 +236,6 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec // ProcessSummary processes a session summary request. func (p *Processor) ProcessSummary(ctx context.Context, sessionDBID int64, sdkSessionID, project, userPrompt, lastUserMsg, lastAssistantMsg string) error { - p.mu.Lock() - defer p.mu.Unlock() - // Skip summary generation if there's no meaningful assistant response // This prevents generic "initial session setup" summaries if !hasMeaningfulContent(lastAssistantMsg) { @@ -249,6 +256,14 @@ func (p *Processor) ProcessSummary(ctx context.Context, sessionDBID int64, sdkSe } prompt := BuildSummaryPrompt(req) + // Acquire semaphore slot (limits concurrent CLI calls) + select { + case p.sem <- struct{}{}: + defer func() { <-p.sem }() + case <-ctx.Done(): + return ctx.Err() + } + // Call Claude Code CLI response, err := p.callClaudeCLI(ctx, prompt) if err != nil { diff --git a/internal/worker/service.go b/internal/worker/service.go index b099c1b..cb05974 100644 --- a/internal/worker/service.go +++ b/internal/worker/service.go @@ -736,10 +736,14 @@ func (s *Service) processQueue() { } // processAllSessions processes pending messages for all active sessions. +// Messages are processed in parallel using goroutines, with concurrency +// limited by the processor's semaphore. func (s *Service) processAllSessions() { // Get all sessions with pending messages sessions := s.sessionManager.GetAllSessions() + var wg sync.WaitGroup + for _, sess := range sessions { // Get pending messages messages := s.sessionManager.DrainMessages(sess.SessionDBID) @@ -747,52 +751,61 @@ func (s *Service) processAllSessions() { continue } - // Process each message + // Process each message in a goroutine for _, msg := range messages { - switch msg.Type { - case session.MessageTypeObservation: - if msg.Observation != nil { - err := s.processor.ProcessObservation( - s.ctx, - sess.SDKSessionID, - sess.Project, - msg.Observation.ToolName, - msg.Observation.ToolInput, - msg.Observation.ToolResponse, - msg.Observation.PromptNumber, - msg.Observation.CWD, - ) - if err != nil { - log.Error().Err(err). - Str("tool", msg.Observation.ToolName). - Msg("Failed to process observation") - } - } + wg.Add(1) + go func(sess *session.ActiveSession, msg session.PendingMessage) { + defer wg.Done() - case session.MessageTypeSummarize: - if msg.Summarize != nil { - err := s.processor.ProcessSummary( - s.ctx, - sess.SessionDBID, - sess.SDKSessionID, - sess.Project, - sess.UserPrompt, - msg.Summarize.LastUserMessage, - msg.Summarize.LastAssistantMessage, - ) - if err != nil { - log.Error().Err(err). - Int64("sessionId", sess.SessionDBID). - Msg("Failed to process summary") + switch msg.Type { + case session.MessageTypeObservation: + if msg.Observation != nil { + err := s.processor.ProcessObservation( + s.ctx, + sess.SDKSessionID, + sess.Project, + msg.Observation.ToolName, + msg.Observation.ToolInput, + msg.Observation.ToolResponse, + msg.Observation.PromptNumber, + msg.Observation.CWD, + ) + if err != nil { + log.Error().Err(err). + Str("tool", msg.Observation.ToolName). + Msg("Failed to process observation") + } + } + + case session.MessageTypeSummarize: + if msg.Summarize != nil { + err := s.processor.ProcessSummary( + s.ctx, + sess.SessionDBID, + sess.SDKSessionID, + sess.Project, + sess.UserPrompt, + msg.Summarize.LastUserMessage, + msg.Summarize.LastAssistantMessage, + ) + if err != nil { + log.Error().Err(err). + Int64("sessionId", sess.SessionDBID). + Msg("Failed to process summary") + } + // Delete session after summary + s.sessionManager.DeleteSession(sess.SessionDBID) } - // Delete session after summary - s.sessionManager.DeleteSession(sess.SessionDBID) } - } + }(sess, msg) } - - s.broadcastProcessingStatus() } + + // Wait for all goroutines to complete + wg.Wait() + + // Broadcast status after processing + s.broadcastProcessingStatus() } // Shutdown gracefully shuts down the service. diff --git a/ui/package-lock.json b/ui/package-lock.json index ac97210..892a505 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-4-g0a0dfc1-dirty", + "version": "v0.6.1-5-gcd14ba8-dirty", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-4-g0a0dfc1-dirty", + "version": "v0.6.1-5-gcd14ba8-dirty", "dependencies": { "vue": "^3.5.13" }, diff --git a/ui/package.json b/ui/package.json index d72f755..dbeb571 100644 --- a/ui/package.json +++ b/ui/package.json @@ -1,6 +1,6 @@ { "name": "claude-mnemonic-dashboard", - "version": "v0.6.1-4-g0a0dfc1-dirty", + "version": "v0.6.1-5-gcd14ba8-dirty", "private": true, "type": "module", "scripts": {