Move SDK processing to async queue.

This commit is contained in:
2025-12-16 00:29:45 +00:00
parent cd14ba8915
commit 085c2cb3d0
4 changed files with 79 additions and 51 deletions
+23 -8
View File
@@ -9,7 +9,6 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
json "github.com/goccy/go-json"
@@ -39,7 +38,8 @@ type Processor struct {
broadcastFunc BroadcastFunc
syncObservationFunc SyncObservationFunc
syncSummaryFunc SyncSummaryFunc
mu sync.Mutex
// Semaphore to limit concurrent Claude CLI calls (prevents API overload)
sem chan struct{}
}
// SetBroadcastFunc sets the broadcast callback for SSE events.
@@ -64,6 +64,10 @@ func (p *Processor) broadcast(event map[string]interface{}) {
}
}
// MaxConcurrentCLICalls is the maximum number of concurrent Claude CLI calls.
// This prevents overwhelming the API and manages resource usage.
const MaxConcurrentCLICalls = 4
// NewProcessor creates a new SDK processor.
func NewProcessor(observationStore *sqlite.ObservationStore, summaryStore *sqlite.SummaryStore) (*Processor, error) {
cfg := config.Get()
@@ -89,6 +93,7 @@ func NewProcessor(observationStore *sqlite.ObservationStore, summaryStore *sqlit
model: cfg.Model,
observationStore: observationStore,
summaryStore: summaryStore,
sem: make(chan struct{}, MaxConcurrentCLICalls),
}, nil
}
@@ -100,9 +105,6 @@ func (p *Processor) IsAvailable() bool {
// ProcessObservation processes a single tool observation and extracts insights.
func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, project string, toolName string, toolInput, toolResponse interface{}, promptNumber int, cwd string) error {
p.mu.Lock()
defer p.mu.Unlock()
// Skip certain tools that aren't worth processing
if shouldSkipTool(toolName) {
log.Info().Str("tool", toolName).Msg("Skipping tool (not interesting for memory)")
@@ -136,6 +138,14 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec
}
prompt := BuildObservationPrompt(exec)
// Acquire semaphore slot (limits concurrent CLI calls)
select {
case p.sem <- struct{}{}:
defer func() { <-p.sem }()
case <-ctx.Done():
return ctx.Err()
}
// Call Claude Code CLI
response, err := p.callClaudeCLI(ctx, prompt)
if err != nil {
@@ -226,9 +236,6 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec
// ProcessSummary processes a session summary request.
func (p *Processor) ProcessSummary(ctx context.Context, sessionDBID int64, sdkSessionID, project, userPrompt, lastUserMsg, lastAssistantMsg string) error {
p.mu.Lock()
defer p.mu.Unlock()
// Skip summary generation if there's no meaningful assistant response
// This prevents generic "initial session setup" summaries
if !hasMeaningfulContent(lastAssistantMsg) {
@@ -249,6 +256,14 @@ func (p *Processor) ProcessSummary(ctx context.Context, sessionDBID int64, sdkSe
}
prompt := BuildSummaryPrompt(req)
// Acquire semaphore slot (limits concurrent CLI calls)
select {
case p.sem <- struct{}{}:
defer func() { <-p.sem }()
case <-ctx.Done():
return ctx.Err()
}
// Call Claude Code CLI
response, err := p.callClaudeCLI(ctx, prompt)
if err != nil {