Files
lukaszraczylo a81482d06a fix: address 15 additional hang vectors found during deep audit (#45)
MCP server (5 fixes):
- Move semaphore acquisition inside goroutine so main loop stays
  responsive when all slots are taken
- Add 10s write timeout to sendResponse to prevent pipe deadlock
  when Claude Code pauses reading stdout
- Send fallback JSON-RPC error when json.Marshal fails instead of
  silently swallowing the error and leaving caller waiting forever
- Silence unknown notification methods (req.ID == nil) instead of
  sending unsolicited error responses that may desync the host
- Return MCP isError content for tool failures instead of top-level
  JSON-RPC error, matching the MCP specification

Vector/embedding (3 fixes):
- Move EmbedBatchWithContext call before writeMu.Lock in AddDocuments
  so ONNX inference runs outside the write lock
- Replace singleflight.Do with DoChan + ctx select in both
  getOrComputeEmbedding and UnifiedSearch so callers can bail out
  independently when their context expires
- Add activeQueries atomic counter; skip cache warming when user
  queries are in-flight; reduce warming timeout from 5s to 2s

Hooks (4 fixes):
- Cap EnsureWorkerRunning to 15s hard deadline with context; reduce
  StartupTimeout from 30s to 10s; reduce port-in-use retries
- Fix nil dereference panic in user-prompt hook when initResult is
  nil (non-JSON worker response); use comma-ok assertions
- Use package-level hookClient/healthClient with DisableKeepAlives
  to prevent FD leaks in short-lived hook processes
- Set SysProcAttr{Setpgid: true} to detach worker from hook process
  group, preventing kill-cascade from Claude Code

Worker/DB (3 fixes):
- Replace os.Exit(0) in MCP config watcher with context cancellation
  for clean protocol shutdown
- Add 60s context.WithTimeout around ProcessObservation calls in
  processAllSessions to prevent hung CLI subprocesses from blocking
  the queue processor forever
- Set explicit PRAGMA wal_autocheckpoint=1000 and add PASSIVE WAL
  checkpoint to Optimize() to prevent checkpoint stalls

Adds 20+ regression tests across all fix areas.
2026-05-26 14:29:34 +01:00

986 lines
29 KiB
Go

// Package search provides unified search capabilities for claude-mnemonic.
package search
import (
"context"
"hash/fnv"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/lukaszraczylo/claude-mnemonic/internal/db/gorm"
"github.com/lukaszraczylo/claude-mnemonic/internal/vector/sqlitevec"
"github.com/lukaszraczylo/claude-mnemonic/pkg/models"
"github.com/rs/zerolog/log"
"golang.org/x/sync/singleflight"
)
// multiSpaceRegex matches multiple consecutive whitespace characters.
// Pre-compiled for performance in normalizeQuery.
var multiSpaceRegex = regexp.MustCompile(`\s+`)
// Search configuration constants.
const (
// Cache configuration
defaultCacheTTL = 30 * time.Second // Short TTL for freshness
defaultCacheMaxSize = 200 // Max cached results
cacheEvictionPercent = 10 // Evict 10% when cache is full
cacheEvictionThreshold = 80 // Start eviction scan at 80% capacity
// Latency tracking
latencyHistogramCap = 1000 // Max latency samples for histogram
slowQueryThresholdNs = 100 * 1e6 // 100ms threshold for slow query logging
// Query frequency tracking
maxFrequencyEntries = 1000 // Max queries to track for warming
frequencyEvictionBatch = 100 // Remove 10% when frequency map is full
staleQueryThreshold = 24 * time.Hour // Remove queries older than 24 hours
recentQueryWindow = time.Hour // Only consider queries from last hour for warming
// Cache warming configuration
cacheWarmingInitDelay = 30 * time.Second // Delay before starting warming
cacheWarmingInterval = 20 * time.Second // Run warming cycle every 20 seconds
frequencyCleanupInterval = 5 * time.Minute // Cleanup stale entries every 5 minutes
cacheCleanupInterval = time.Minute // Cleanup expired cache every minute
warmingQueryTimeout = 2 * time.Second // Timeout for warming queries (kept short to minimize mutex hold)
warmingBatchSize = 5 // Warm top 5 queries per cycle
minRecencyFactor = 0.1 // Minimum recency factor for scoring
// Default query limits
defaultQueryLimit = 20
maxQueryLimit = 100
defaultOrderBy = "date_desc"
// Truncation lengths
queryLogTruncateLen = 50 // Truncate query in logs
titleTruncateLen = 100 // Truncate titles in results
warmingLogTruncateLen = 30 // Truncate query in warming logs
)
// SearchMetrics tracks search performance statistics.
type SearchMetrics struct {
latencyHistogram []int64
TotalSearches int64
VectorSearches int64
FilterSearches int64
TotalLatencyNs int64
VectorLatencyNs int64
FilterLatencyNs int64
CacheHits int64
CoalescedRequests int64
SearchErrors int64
histogramMu sync.Mutex
}
// GetStats returns the current search statistics.
func (m *SearchMetrics) GetStats() map[string]any {
totalSearches := atomic.LoadInt64(&m.TotalSearches)
totalLatency := atomic.LoadInt64(&m.TotalLatencyNs)
vectorSearches := atomic.LoadInt64(&m.VectorSearches)
vectorLatency := atomic.LoadInt64(&m.VectorLatencyNs)
filterSearches := atomic.LoadInt64(&m.FilterSearches)
filterLatency := atomic.LoadInt64(&m.FilterLatencyNs)
avgLatencyMs := float64(0)
if totalSearches > 0 {
avgLatencyMs = float64(totalLatency) / float64(totalSearches) / 1e6
}
avgVectorLatencyMs := float64(0)
if vectorSearches > 0 {
avgVectorLatencyMs = float64(vectorLatency) / float64(vectorSearches) / 1e6
}
avgFilterLatencyMs := float64(0)
if filterSearches > 0 {
avgFilterLatencyMs = float64(filterLatency) / float64(filterSearches) / 1e6
}
return map[string]any{
"total_searches": totalSearches,
"vector_searches": vectorSearches,
"filter_searches": filterSearches,
"cache_hits": atomic.LoadInt64(&m.CacheHits),
"coalesced_requests": atomic.LoadInt64(&m.CoalescedRequests),
"search_errors": atomic.LoadInt64(&m.SearchErrors),
"avg_latency_ms": avgLatencyMs,
"avg_vector_latency_ms": avgVectorLatencyMs,
"avg_filter_latency_ms": avgFilterLatencyMs,
}
}
// Manager provides unified search across SQLite and sqlite-vec.
type Manager struct {
ctx context.Context
searchGroup singleflight.Group
cancel context.CancelFunc
vectorClient *sqlitevec.Client
metrics *SearchMetrics
promptStore *gorm.PromptStore
observationStore *gorm.ObservationStore
summaryStore *gorm.SummaryStore
resultCache map[string]*cachedResult
queryFrequency map[string]*queryFrequencyInfo
cacheTTL time.Duration
cacheMaxSize int
activeQueries atomic.Int32 // tracks in-flight user queries to yield embedding mutex
resultCacheMu sync.RWMutex
queryFrequencyMu sync.RWMutex
}
// queryFrequencyInfo tracks how often a query is used.
type queryFrequencyInfo struct {
lastUsed time.Time
lastCached time.Time
params SearchParams
count int64
}
// cachedResult stores a cached search result with expiry.
type cachedResult struct {
result *UnifiedSearchResult
expiresAt time.Time
}
// NewManager creates a new search manager.
func NewManager(
observationStore *gorm.ObservationStore,
summaryStore *gorm.SummaryStore,
promptStore *gorm.PromptStore,
vectorClient *sqlitevec.Client,
) *Manager {
ctx, cancel := context.WithCancel(context.Background())
m := &Manager{
observationStore: observationStore,
summaryStore: summaryStore,
promptStore: promptStore,
vectorClient: vectorClient,
metrics: &SearchMetrics{latencyHistogram: make([]int64, 0, latencyHistogramCap)},
ctx: ctx,
cancel: cancel,
resultCache: make(map[string]*cachedResult),
cacheTTL: defaultCacheTTL,
cacheMaxSize: defaultCacheMaxSize,
queryFrequency: make(map[string]*queryFrequencyInfo),
}
// Start cache cleanup goroutine
go m.cleanupCacheLoop()
// Start cache warming goroutine
go m.cacheWarmingLoop()
return m
}
// Close stops background goroutines and cleans up resources.
func (m *Manager) Close() {
if m.cancel != nil {
m.cancel()
}
}
// cleanupCacheLoop periodically removes expired cache entries.
func (m *Manager) cleanupCacheLoop() {
ticker := time.NewTicker(cacheCleanupInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.cleanupExpiredCache()
}
}
}
// cleanupExpiredCache removes expired entries from the cache.
func (m *Manager) cleanupExpiredCache() {
m.resultCacheMu.Lock()
defer m.resultCacheMu.Unlock()
now := time.Now()
for key, cached := range m.resultCache {
if now.After(cached.expiresAt) {
delete(m.resultCache, key)
}
}
}
// cacheWarmingLoop periodically warms the cache for frequently used queries.
func (m *Manager) cacheWarmingLoop() {
// Wait a bit before starting to allow system to stabilize
select {
case <-m.ctx.Done():
return
case <-time.After(cacheWarmingInitDelay):
}
warmingTicker := time.NewTicker(cacheWarmingInterval)
cleanupTicker := time.NewTicker(frequencyCleanupInterval)
defer warmingTicker.Stop()
defer cleanupTicker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-warmingTicker.C:
m.warmFrequentQueries()
case <-cleanupTicker.C:
m.cleanupStaleFrequencyEntries()
}
}
}
// cleanupStaleFrequencyEntries removes query frequency entries older than staleQueryThreshold.
// This prevents memory bloat from queries that haven't been used in a long time.
func (m *Manager) cleanupStaleFrequencyEntries() {
m.queryFrequencyMu.Lock()
now := time.Now()
var keysToDelete []string
for k, v := range m.queryFrequency {
if now.Sub(v.lastUsed) > staleQueryThreshold {
keysToDelete = append(keysToDelete, k)
}
}
for _, k := range keysToDelete {
delete(m.queryFrequency, k)
}
m.queryFrequencyMu.Unlock()
if len(keysToDelete) > 0 {
log.Debug().Int("removed", len(keysToDelete)).Msg("Cleaned up stale query frequency entries")
}
}
// warmFrequentQueries pre-executes frequently used queries to warm the cache.
// Skips warming entirely when user queries are in-flight to avoid competing
// for the embedding model mutex on throttled hardware.
func (m *Manager) warmFrequentQueries() {
if m.activeQueries.Load() > 0 {
log.Debug().Msg("Skipping cache warming: user queries in flight")
return
}
m.queryFrequencyMu.RLock()
// Find top N most frequent queries that aren't recently cached
type queryScore struct {
info *queryFrequencyInfo
key string
score float64
}
candidates := make([]queryScore, 0, len(m.queryFrequency))
now := time.Now()
for key, info := range m.queryFrequency {
// Only consider queries used recently
if now.Sub(info.lastUsed) > recentQueryWindow {
continue
}
// Only warm if not recently cached (cache about to expire or already expired)
timeSinceLastCache := now.Sub(info.lastCached)
if timeSinceLastCache < m.cacheTTL/2 {
continue
}
// Score: frequency * recency factor
recencyFactor := 1.0 - (now.Sub(info.lastUsed).Seconds() / recentQueryWindow.Seconds())
if recencyFactor < minRecencyFactor {
recencyFactor = minRecencyFactor
}
score := float64(info.count) * recencyFactor
candidates = append(candidates, queryScore{key: key, info: info, score: score})
}
m.queryFrequencyMu.RUnlock()
// Sort by score descending using O(n log n) algorithm
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].score > candidates[j].score
})
// Warm top queries
warmCount := min(warmingBatchSize, len(candidates))
for i := range warmCount {
candidate := candidates[i]
ctx, cancel := context.WithTimeout(context.Background(), warmingQueryTimeout)
result, err := m.executeSearch(ctx, candidate.info.params)
cancel()
if err == nil && result != nil {
cacheKey := m.getCacheKey(candidate.info.params)
m.putInCache(cacheKey, result)
// Update last cached time
m.queryFrequencyMu.Lock()
if info, ok := m.queryFrequency[candidate.key]; ok {
info.lastCached = time.Now()
}
m.queryFrequencyMu.Unlock()
log.Debug().
Str("query", truncate(candidate.info.params.Query, warmingLogTruncateLen)).
Float64("score", candidate.score).
Msg("Cache warmed for frequent query")
}
}
}
// trackQueryFrequency records query usage for cache warming decisions.
func (m *Manager) trackQueryFrequency(params SearchParams) {
key := m.getCacheKey(params)
m.queryFrequencyMu.Lock()
if info, ok := m.queryFrequency[key]; ok {
info.count++
info.lastUsed = time.Now()
m.queryFrequencyMu.Unlock()
return // Fast path: no eviction needed
}
m.queryFrequency[key] = &queryFrequencyInfo{
params: params,
count: 1,
lastUsed: time.Now(),
}
// Limit frequency map size to prevent memory bloat
mapLen := len(m.queryFrequency)
if mapLen <= maxFrequencyEntries {
m.queryFrequencyMu.Unlock()
return // Fast path: no eviction needed
}
// Collect keys and times for eviction (still under lock, but fast)
type entry struct {
lastUsed time.Time
key string
}
entries := make([]entry, 0, mapLen)
for k, v := range m.queryFrequency {
entries = append(entries, entry{key: k, lastUsed: v.lastUsed})
}
m.queryFrequencyMu.Unlock()
// Sort outside lock to reduce contention (O(n log n))
sort.Slice(entries, func(i, j int) bool {
return entries[i].lastUsed.Before(entries[j].lastUsed)
})
// Collect keys to delete
deleteCount := min(frequencyEvictionBatch, len(entries))
keysToDelete := make([]string, deleteCount)
for i := range deleteCount {
keysToDelete[i] = entries[i].key
}
// Re-acquire lock only for deletion (brief critical section)
m.queryFrequencyMu.Lock()
for _, k := range keysToDelete {
delete(m.queryFrequency, k)
}
m.queryFrequencyMu.Unlock()
}
// RecentQuery represents a recently executed search query.
type RecentQuery struct {
LastUsed time.Time `json:"last_used"`
Query string `json:"query"`
Project string `json:"project,omitempty"`
Type string `json:"type,omitempty"`
Count int64 `json:"count"`
}
// GetRecentQueries returns the most recent search queries, sorted by last used time.
func (m *Manager) GetRecentQueries(limit int) []RecentQuery {
if limit <= 0 {
limit = defaultQueryLimit
}
if limit > maxQueryLimit {
limit = maxQueryLimit
}
m.queryFrequencyMu.RLock()
defer m.queryFrequencyMu.RUnlock()
// Collect all queries
queries := make([]RecentQuery, 0, len(m.queryFrequency))
for _, info := range m.queryFrequency {
queries = append(queries, RecentQuery{
Query: info.params.Query,
Project: info.params.Project,
Type: info.params.Type,
Count: info.count,
LastUsed: info.lastUsed,
})
}
// Sort by last used (most recent first)
sort.Slice(queries, func(i, j int) bool {
return queries[i].LastUsed.After(queries[j].LastUsed)
})
// Limit results
if len(queries) > limit {
queries = queries[:limit]
}
return queries
}
// GetFrequentQueries returns the most frequently used search queries.
func (m *Manager) GetFrequentQueries(limit int) []RecentQuery {
if limit <= 0 {
limit = defaultQueryLimit
}
if limit > maxQueryLimit {
limit = maxQueryLimit
}
m.queryFrequencyMu.RLock()
defer m.queryFrequencyMu.RUnlock()
// Only include queries used recently
now := time.Now()
queries := make([]RecentQuery, 0, len(m.queryFrequency))
for _, info := range m.queryFrequency {
if now.Sub(info.lastUsed) > recentQueryWindow {
continue
}
queries = append(queries, RecentQuery{
Query: info.params.Query,
Project: info.params.Project,
Type: info.params.Type,
Count: info.count,
LastUsed: info.lastUsed,
})
}
// Sort by count (highest first)
sort.Slice(queries, func(i, j int) bool {
return queries[i].Count > queries[j].Count
})
// Limit results
if len(queries) > limit {
queries = queries[:limit]
}
return queries
}
// normalizeQuery normalizes a search query for consistent cache keys.
// Converts to lowercase, trims whitespace, and collapses multiple spaces.
// Uses pre-compiled regex for O(n) performance instead of O(n*m) loop.
func normalizeQuery(query string) string {
// Lowercase for case-insensitive matching
query = strings.ToLower(query)
// Collapse multiple whitespace into single space using pre-compiled regex
query = multiSpaceRegex.ReplaceAllString(query, " ")
// Trim leading/trailing whitespace (after collapsing)
return strings.TrimSpace(query)
}
// getCacheKey generates a cache key from search params.
// Uses direct string concatenation instead of JSON marshal for better performance.
// Queries are normalized for consistent cache hits across whitespace variations.
func (m *Manager) getCacheKey(params SearchParams) string {
// Normalize query for consistent cache keys
normalizedQuery := normalizeQuery(params.Query)
// Hash directly without intermediate string allocation.
// FNV-64a is fast and collision-safe for cache keys.
h := fnv.New64a()
// Write each field directly to the hasher with separators
h.Write([]byte(normalizedQuery))
h.Write([]byte{'|'})
h.Write([]byte(params.Type))
h.Write([]byte{'|'})
h.Write([]byte(params.Project))
h.Write([]byte{'|'})
h.Write([]byte(params.ObsType))
h.Write([]byte{'|'})
h.Write([]byte(params.Concepts))
h.Write([]byte{'|'})
h.Write([]byte(params.Files))
h.Write([]byte{'|'})
h.Write([]byte(strconv.FormatInt(params.DateStart, 10)))
h.Write([]byte{'|'})
h.Write([]byte(strconv.FormatInt(params.DateEnd, 10)))
h.Write([]byte{'|'})
h.Write([]byte(params.OrderBy))
h.Write([]byte{'|'})
h.Write([]byte(strconv.Itoa(params.Limit)))
h.Write([]byte{'|'})
h.Write([]byte(strconv.Itoa(params.Offset)))
h.Write([]byte{'|'})
h.Write([]byte(params.Format))
h.Write([]byte{'|'})
h.Write([]byte(params.Scope))
h.Write([]byte{'|'})
if params.IncludeGlobal {
h.Write([]byte{'1'})
} else {
h.Write([]byte{'0'})
}
h.Write([]byte{'|'})
if params.ExcludeSuperseded {
h.Write([]byte{'1'})
} else {
h.Write([]byte{'0'})
}
return strconv.FormatUint(h.Sum64(), 36) // Base36 for compact representation
}
// getFromCache retrieves a result from cache if valid.
func (m *Manager) getFromCache(key string) (*UnifiedSearchResult, bool) {
m.resultCacheMu.RLock()
defer m.resultCacheMu.RUnlock()
if cached, ok := m.resultCache[key]; ok {
if time.Now().Before(cached.expiresAt) {
atomic.AddInt64(&m.metrics.CacheHits, 1)
return cached.result, true
}
}
return nil, false
}
// putInCache stores a result in the cache.
// Optimized to skip expensive scans when cache is below capacity threshold.
func (m *Manager) putInCache(key string, result *UnifiedSearchResult) {
m.resultCacheMu.Lock()
defer m.resultCacheMu.Unlock()
now := time.Now()
cacheLen := len(m.resultCache)
// Only scan for expired entries when at threshold+ capacity (amortized cleanup)
evictionThreshold := (m.cacheMaxSize * cacheEvictionThreshold) / 100
if cacheLen >= evictionThreshold {
// Evict expired entries
for k, v := range m.resultCache {
if now.After(v.expiresAt) {
delete(m.resultCache, k)
}
}
cacheLen = len(m.resultCache) // Update after eviction
}
// If still at capacity after removing expired, use simple FIFO-style eviction
// Go map iteration order is random, which provides good cache behavior
if cacheLen >= m.cacheMaxSize {
// Evict percentage using random-order iteration (O(n) single pass)
evictCount := max(m.cacheMaxSize*cacheEvictionPercent/100, 1)
evicted := 0
for k := range m.resultCache {
delete(m.resultCache, k)
evicted++
if evicted >= evictCount {
break
}
}
}
m.resultCache[key] = &cachedResult{
result: result,
expiresAt: now.Add(m.cacheTTL),
}
}
// Metrics returns the search metrics for monitoring.
func (m *Manager) Metrics() *SearchMetrics {
return m.metrics
}
// CacheStats returns current cache statistics.
func (m *Manager) CacheStats() map[string]any {
m.resultCacheMu.RLock()
cacheSize := len(m.resultCache)
m.resultCacheMu.RUnlock()
return map[string]any{
"size": cacheSize,
"max_size": m.cacheMaxSize,
"ttl_sec": m.cacheTTL.Seconds(),
}
}
// ClearCache clears the result cache. Useful for testing or after data changes.
func (m *Manager) ClearCache() {
m.resultCacheMu.Lock()
m.resultCache = make(map[string]*cachedResult)
m.resultCacheMu.Unlock()
}
// SearchParams contains parameters for unified search.
type SearchParams struct {
Format string
Type string
Project string
ObsType string
Concepts string
Files string
Query string
Scope string
OrderBy string
DateStart int64
Offset int
Limit int
DateEnd int64
IncludeGlobal bool
ExcludeSuperseded bool
}
// SearchResult represents a unified search result.
// Field order optimized for memory alignment (fieldalignment).
type SearchResult struct {
Metadata map[string]any `json:"metadata,omitempty"`
Type string `json:"type"`
Title string `json:"title,omitempty"`
Content string `json:"content,omitempty"`
Project string `json:"project"`
Scope string `json:"scope,omitempty"`
ID int64 `json:"id"`
CreatedAt int64 `json:"created_at_epoch"`
Score float64 `json:"score,omitempty"`
}
// UnifiedSearchResult contains the combined search results.
type UnifiedSearchResult struct {
Query string `json:"query,omitempty"`
Results []SearchResult `json:"results"`
TotalCount int `json:"total_count"`
}
// UnifiedSearch performs a unified search across all document types.
// Uses caching and request coalescing for optimal performance.
func (m *Manager) UnifiedSearch(ctx context.Context, params SearchParams) (*UnifiedSearchResult, error) {
start := time.Now()
defer func() {
latency := time.Since(start).Nanoseconds()
atomic.AddInt64(&m.metrics.TotalSearches, 1)
atomic.AddInt64(&m.metrics.TotalLatencyNs, latency)
// Sample latency for histogram (reservoir sampling)
m.metrics.histogramMu.Lock()
if len(m.metrics.latencyHistogram) < latencyHistogramCap {
m.metrics.latencyHistogram = append(m.metrics.latencyHistogram, latency)
}
m.metrics.histogramMu.Unlock()
// Log slow queries
if latency > slowQueryThresholdNs {
log.Warn().
Str("query", truncate(params.Query, queryLogTruncateLen)).
Dur("latency", time.Duration(latency)).
Str("type", params.Type).
Msg("Slow search query")
}
}()
if params.Limit <= 0 {
params.Limit = defaultQueryLimit
}
if params.Limit > maxQueryLimit {
params.Limit = maxQueryLimit
}
if params.OrderBy == "" {
params.OrderBy = defaultOrderBy
}
// Track active user queries so cache warming can yield the embedding mutex
m.activeQueries.Add(1)
defer m.activeQueries.Add(-1)
// Check cache first
cacheKey := m.getCacheKey(params)
if cached, ok := m.getFromCache(cacheKey); ok {
return cached, nil
}
// Use singleflight DoChan to coalesce concurrent identical requests.
// DoChan + select allows per-caller context cancellation: waiting callers
// can bail out when their context expires without blocking on a slow first call.
ch := m.searchGroup.DoChan(cacheKey, func() (any, error) {
return m.executeSearch(ctx, params)
})
select {
case res := <-ch:
if res.Err != nil {
return nil, res.Err
}
searchResult := res.Val.(*UnifiedSearchResult)
// Cache the result
m.putInCache(cacheKey, searchResult)
// Track query frequency for cache warming
m.trackQueryFrequency(params)
return searchResult, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// executeSearch performs the actual search without caching/coalescing.
func (m *Manager) executeSearch(ctx context.Context, params SearchParams) (*UnifiedSearchResult, error) {
// If query is provided and vector client is available, use vector search
if params.Query != "" && m.vectorClient != nil && m.vectorClient.IsConnected() {
return m.vectorSearch(ctx, params)
}
// Otherwise fall back to structured filter search
return m.filterSearch(ctx, params)
}
// vectorSearch performs semantic search via sqlite-vec.
func (m *Manager) vectorSearch(ctx context.Context, params SearchParams) (*UnifiedSearchResult, error) {
start := time.Now()
defer func() {
latency := time.Since(start).Nanoseconds()
atomic.AddInt64(&m.metrics.VectorSearches, 1)
atomic.AddInt64(&m.metrics.VectorLatencyNs, latency)
}()
// Build where filter based on search type
var docType sqlitevec.DocType
switch params.Type {
case "observations":
docType = sqlitevec.DocTypeObservation
case "sessions":
docType = sqlitevec.DocTypeSessionSummary
case "prompts":
docType = sqlitevec.DocTypeUserPrompt
}
where := sqlitevec.BuildWhereFilter(docType, params.Project)
// Query sqlite-vec
vectorResults, err := m.vectorClient.Query(ctx, params.Query, params.Limit*2, where)
if err != nil {
atomic.AddInt64(&m.metrics.SearchErrors, 1)
// Fall back to filter search on error
return m.filterSearch(ctx, params)
}
// Extract IDs grouped by document type using shared helper
extracted := sqlitevec.ExtractIDsByDocType(vectorResults)
obsIDs := extracted.ObservationIDs
summaryIDs := extracted.SummaryIDs
promptIDs := extracted.PromptIDs
// Fetch full records from SQLite
var results []SearchResult
if len(obsIDs) > 0 && (params.Type == "" || params.Type == "observations") {
obs, err := m.observationStore.GetObservationsByIDs(ctx, obsIDs, params.OrderBy, 0)
if err != nil {
log.Warn().Err(err).Int("count", len(obsIDs)).Msg("Failed to fetch observations by IDs in vector search")
} else {
for _, o := range obs {
// Skip superseded observations when requested
if params.ExcludeSuperseded && o.IsSuperseded {
continue
}
results = append(results, m.observationToResult(o, params.Format))
}
}
}
if len(summaryIDs) > 0 && (params.Type == "" || params.Type == "sessions") {
summaries, err := m.summaryStore.GetSummariesByIDs(ctx, summaryIDs, params.OrderBy, 0)
if err != nil {
log.Warn().Err(err).Int("count", len(summaryIDs)).Msg("Failed to fetch summaries by IDs in vector search")
} else {
for _, s := range summaries {
results = append(results, m.summaryToResult(s, params.Format))
}
}
}
if len(promptIDs) > 0 && (params.Type == "" || params.Type == "prompts") {
prompts, err := m.promptStore.GetPromptsByIDs(ctx, promptIDs, params.OrderBy, 0)
if err != nil {
log.Warn().Err(err).Int("count", len(promptIDs)).Msg("Failed to fetch prompts by IDs in vector search")
} else {
for _, p := range prompts {
results = append(results, m.promptToResult(p, params.Format))
}
}
}
// Apply limit
if len(results) > params.Limit {
results = results[:params.Limit]
}
return &UnifiedSearchResult{
Results: results,
TotalCount: len(results),
Query: params.Query,
}, nil
}
// filterSearch performs structured filter search via SQLite.
func (m *Manager) filterSearch(ctx context.Context, params SearchParams) (*UnifiedSearchResult, error) {
start := time.Now()
defer func() {
latency := time.Since(start).Nanoseconds()
atomic.AddInt64(&m.metrics.FilterSearches, 1)
atomic.AddInt64(&m.metrics.FilterLatencyNs, latency)
}()
var results []SearchResult
// Search observations
if params.Type == "" || params.Type == "observations" {
var obs []*models.Observation
var err error
// Use active observations (excluding superseded) when requested
if params.ExcludeSuperseded {
obs, err = m.observationStore.GetActiveObservations(ctx, params.Project, params.Limit)
} else {
obs, err = m.observationStore.GetRecentObservations(ctx, params.Project, params.Limit)
}
if err != nil {
log.Warn().Err(err).Str("project", params.Project).Msg("Failed to fetch observations in filter search")
} else {
for _, o := range obs {
results = append(results, m.observationToResult(o, params.Format))
}
}
}
// Search summaries
if params.Type == "" || params.Type == "sessions" {
summaries, err := m.summaryStore.GetRecentSummaries(ctx, params.Project, params.Limit)
if err != nil {
log.Warn().Err(err).Str("project", params.Project).Msg("Failed to fetch summaries in filter search")
} else {
for _, s := range summaries {
results = append(results, m.summaryToResult(s, params.Format))
}
}
}
// Apply limit
if len(results) > params.Limit {
results = results[:params.Limit]
}
return &UnifiedSearchResult{
Results: results,
TotalCount: len(results),
}, nil
}
// Decisions performs a semantic search optimized for finding decisions.
func (m *Manager) Decisions(ctx context.Context, params SearchParams) (*UnifiedSearchResult, error) {
// Boost query with decision-related keywords
if params.Query != "" {
params.Query = params.Query + " decision chose architecture"
}
params.Type = "observations"
return m.UnifiedSearch(ctx, params)
}
// Changes performs a semantic search optimized for finding code changes.
func (m *Manager) Changes(ctx context.Context, params SearchParams) (*UnifiedSearchResult, error) {
// Boost query with change-related keywords
if params.Query != "" {
params.Query = params.Query + " changed modified refactored"
}
params.Type = "observations"
return m.UnifiedSearch(ctx, params)
}
// HowItWorks performs a semantic search optimized for understanding architecture.
func (m *Manager) HowItWorks(ctx context.Context, params SearchParams) (*UnifiedSearchResult, error) {
// Boost query with architecture-related keywords
if params.Query != "" {
params.Query = params.Query + " architecture design pattern implements"
}
params.Type = "observations"
return m.UnifiedSearch(ctx, params)
}
// Helper methods
func (m *Manager) observationToResult(obs *models.Observation, format string) SearchResult {
result := SearchResult{
Type: "observation",
ID: obs.ID,
Project: obs.Project,
Scope: string(obs.Scope),
CreatedAt: obs.CreatedAtEpoch,
Metadata: map[string]any{
"obs_type": string(obs.Type),
"scope": string(obs.Scope),
},
}
if obs.Title.Valid {
result.Title = obs.Title.String
}
if format == "full" && obs.Narrative.Valid {
result.Content = obs.Narrative.String
}
return result
}
func (m *Manager) summaryToResult(summary *models.SessionSummary, format string) SearchResult {
result := SearchResult{
Type: "session",
ID: summary.ID,
Project: summary.Project,
CreatedAt: summary.CreatedAtEpoch,
}
if summary.Request.Valid {
result.Title = truncate(summary.Request.String, titleTruncateLen)
}
if format == "full" && summary.Learned.Valid {
result.Content = summary.Learned.String
}
return result
}
func (m *Manager) promptToResult(prompt *models.UserPromptWithSession, format string) SearchResult {
result := SearchResult{
Type: "prompt",
ID: prompt.ID,
Project: prompt.Project,
CreatedAt: prompt.CreatedAtEpoch,
}
result.Title = truncate(prompt.PromptText, titleTruncateLen)
if format == "full" {
result.Content = prompt.PromptText
}
return result
}
func truncate(s string, maxLen int) string {
s = strings.TrimSpace(s)
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}