Move SDK processing to async queue.

This commit is contained in:
2025-12-16 00:29:45 +00:00
parent 091af2d21b
commit 0af10165c6
4 changed files with 79 additions and 51 deletions
+23 -8
View File
@@ -9,7 +9,6 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
json "github.com/goccy/go-json"
@@ -39,7 +38,8 @@ type Processor struct {
broadcastFunc BroadcastFunc
syncObservationFunc SyncObservationFunc
syncSummaryFunc SyncSummaryFunc
mu sync.Mutex
// Semaphore to limit concurrent Claude CLI calls (prevents API overload)
sem chan struct{}
}
// SetBroadcastFunc sets the broadcast callback for SSE events.
@@ -64,6 +64,10 @@ func (p *Processor) broadcast(event map[string]interface{}) {
}
}
// MaxConcurrentCLICalls is the maximum number of concurrent Claude CLI calls.
// This prevents overwhelming the API and manages resource usage.
const MaxConcurrentCLICalls = 4
// NewProcessor creates a new SDK processor.
func NewProcessor(observationStore *sqlite.ObservationStore, summaryStore *sqlite.SummaryStore) (*Processor, error) {
cfg := config.Get()
@@ -89,6 +93,7 @@ func NewProcessor(observationStore *sqlite.ObservationStore, summaryStore *sqlit
model: cfg.Model,
observationStore: observationStore,
summaryStore: summaryStore,
sem: make(chan struct{}, MaxConcurrentCLICalls),
}, nil
}
@@ -100,9 +105,6 @@ func (p *Processor) IsAvailable() bool {
// ProcessObservation processes a single tool observation and extracts insights.
func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, project string, toolName string, toolInput, toolResponse interface{}, promptNumber int, cwd string) error {
p.mu.Lock()
defer p.mu.Unlock()
// Skip certain tools that aren't worth processing
if shouldSkipTool(toolName) {
log.Info().Str("tool", toolName).Msg("Skipping tool (not interesting for memory)")
@@ -136,6 +138,14 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec
}
prompt := BuildObservationPrompt(exec)
// Acquire semaphore slot (limits concurrent CLI calls)
select {
case p.sem <- struct{}{}:
defer func() { <-p.sem }()
case <-ctx.Done():
return ctx.Err()
}
// Call Claude Code CLI
response, err := p.callClaudeCLI(ctx, prompt)
if err != nil {
@@ -226,9 +236,6 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec
// ProcessSummary processes a session summary request.
func (p *Processor) ProcessSummary(ctx context.Context, sessionDBID int64, sdkSessionID, project, userPrompt, lastUserMsg, lastAssistantMsg string) error {
p.mu.Lock()
defer p.mu.Unlock()
// Skip summary generation if there's no meaningful assistant response
// This prevents generic "initial session setup" summaries
if !hasMeaningfulContent(lastAssistantMsg) {
@@ -249,6 +256,14 @@ func (p *Processor) ProcessSummary(ctx context.Context, sessionDBID int64, sdkSe
}
prompt := BuildSummaryPrompt(req)
// Acquire semaphore slot (limits concurrent CLI calls)
select {
case p.sem <- struct{}{}:
defer func() { <-p.sem }()
case <-ctx.Done():
return ctx.Err()
}
// Call Claude Code CLI
response, err := p.callClaudeCLI(ctx, prompt)
if err != nil {
+53 -40
View File
@@ -736,10 +736,14 @@ func (s *Service) processQueue() {
}
// processAllSessions processes pending messages for all active sessions.
// Messages are processed in parallel using goroutines, with concurrency
// limited by the processor's semaphore.
func (s *Service) processAllSessions() {
// Get all sessions with pending messages
sessions := s.sessionManager.GetAllSessions()
var wg sync.WaitGroup
for _, sess := range sessions {
// Get pending messages
messages := s.sessionManager.DrainMessages(sess.SessionDBID)
@@ -747,52 +751,61 @@ func (s *Service) processAllSessions() {
continue
}
// Process each message
// Process each message in a goroutine
for _, msg := range messages {
switch msg.Type {
case session.MessageTypeObservation:
if msg.Observation != nil {
err := s.processor.ProcessObservation(
s.ctx,
sess.SDKSessionID,
sess.Project,
msg.Observation.ToolName,
msg.Observation.ToolInput,
msg.Observation.ToolResponse,
msg.Observation.PromptNumber,
msg.Observation.CWD,
)
if err != nil {
log.Error().Err(err).
Str("tool", msg.Observation.ToolName).
Msg("Failed to process observation")
}
}
wg.Add(1)
go func(sess *session.ActiveSession, msg session.PendingMessage) {
defer wg.Done()
case session.MessageTypeSummarize:
if msg.Summarize != nil {
err := s.processor.ProcessSummary(
s.ctx,
sess.SessionDBID,
sess.SDKSessionID,
sess.Project,
sess.UserPrompt,
msg.Summarize.LastUserMessage,
msg.Summarize.LastAssistantMessage,
)
if err != nil {
log.Error().Err(err).
Int64("sessionId", sess.SessionDBID).
Msg("Failed to process summary")
switch msg.Type {
case session.MessageTypeObservation:
if msg.Observation != nil {
err := s.processor.ProcessObservation(
s.ctx,
sess.SDKSessionID,
sess.Project,
msg.Observation.ToolName,
msg.Observation.ToolInput,
msg.Observation.ToolResponse,
msg.Observation.PromptNumber,
msg.Observation.CWD,
)
if err != nil {
log.Error().Err(err).
Str("tool", msg.Observation.ToolName).
Msg("Failed to process observation")
}
}
case session.MessageTypeSummarize:
if msg.Summarize != nil {
err := s.processor.ProcessSummary(
s.ctx,
sess.SessionDBID,
sess.SDKSessionID,
sess.Project,
sess.UserPrompt,
msg.Summarize.LastUserMessage,
msg.Summarize.LastAssistantMessage,
)
if err != nil {
log.Error().Err(err).
Int64("sessionId", sess.SessionDBID).
Msg("Failed to process summary")
}
// Delete session after summary
s.sessionManager.DeleteSession(sess.SessionDBID)
}
// Delete session after summary
s.sessionManager.DeleteSession(sess.SessionDBID)
}
}
}(sess, msg)
}
s.broadcastProcessingStatus()
}
// Wait for all goroutines to complete
wg.Wait()
// Broadcast status after processing
s.broadcastProcessingStatus()
}
// Shutdown gracefully shuts down the service.
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "claude-mnemonic-dashboard",
"version": "v0.6.1-4-g0a0dfc1-dirty",
"version": "v0.6.1-5-gcd14ba8-dirty",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "claude-mnemonic-dashboard",
"version": "v0.6.1-4-g0a0dfc1-dirty",
"version": "v0.6.1-5-gcd14ba8-dirty",
"dependencies": {
"vue": "^3.5.13"
},
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mnemonic-dashboard",
"version": "v0.6.1-4-g0a0dfc1-dirty",
"version": "v0.6.1-5-gcd14ba8-dirty",
"private": true,
"type": "module",
"scripts": {