From a270290023010a5fef7f78282a81455b03c88256 Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Sun, 11 Jan 2026 01:22:30 +0000 Subject: [PATCH] fix: reorganize struct fields and config parameters for consistency - [x] Reorder Config struct fields alphabetically and by related functionality - [x] Reorganize Observation model fields with archival fields grouped together - [x] Reorder ObservationStore fields to group related members - [x] Reorder Store struct fields with health check caching grouped - [x] Reorganize HealthInfo and PoolMetrics struct field order - [x] Reorder maintenance Service struct fields logically - [x] Reorganize MCP server handler parameter structs alphabetically - [x] Reorder pattern detector candidate tracking fields - [x] Reorganize search Manager struct fields by functionality - [x] Reorder vector Client struct fields with mutex protections grouped - [x] Reorganize handler request/response struct fields - [x] Update handlers_test.go to expect wrapped response format - [x] Reorder middleware TokenAuth and rate limiter fields - [x] Reorganize Service struct fields with grouped functionality - [x] Fix RateLimiter field ordering for clarity - [x] Reorder CircuitBreaker metrics fields --- internal/config/config.go | 22 ++- internal/db/gorm/models.go | 16 +- internal/db/gorm/observation_store.go | 20 +-- internal/db/gorm/store.go | 42 +++-- internal/maintenance/service.go | 16 +- internal/mcp/server.go | 52 +++--- internal/pattern/detector.go | 4 +- internal/search/manager.go | 88 +++++----- internal/vector/sqlitevec/client.go | 84 ++++------ internal/worker/handlers_data.go | 6 +- internal/worker/handlers_import_export.go | 28 ++-- internal/worker/handlers_sessions.go | 2 +- internal/worker/handlers_test.go | 37 ++-- internal/worker/middleware.go | 10 +- internal/worker/middleware_test.go | 2 +- internal/worker/ratelimit.go | 37 ++-- internal/worker/sdk/processor.go | 26 +-- internal/worker/service.go | 195 +++++++++------------- 18 files changed, 312 insertions(+), 375 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 68e9274..fea40fd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -46,34 +46,32 @@ type Config struct { VectorStorageStrategy string `json:"vector_storage_strategy"` ContextObsConcepts []string `json:"context_obs_concepts"` ContextObsTypes []string `json:"context_obs_types"` - ContextMaxPromptResults int `json:"context_max_prompt_results"` - RerankingResults int `json:"reranking_results"` + ContextFullCount int `json:"context_full_count"` + GraphBranchFactor int `json:"graph_branch_factor"` GraphEdgeWeight float64 `json:"graph_edge_weight"` ContextRelevanceThreshold float64 `json:"context_relevance_threshold"` RerankingCandidates int `json:"reranking_candidates"` WorkerPort int `json:"worker_port"` RerankingMinImprovement float64 `json:"reranking_min_improvement"` ContextObservations int `json:"context_observations"` - ContextFullCount int `json:"context_full_count"` + ContextMaxPromptResults int `json:"context_max_prompt_results"` ContextSessionCount int `json:"context_session_count"` MaxConns int `json:"max_conns"` RerankingAlpha float64 `json:"reranking_alpha"` GraphMaxHops int `json:"graph_max_hops"` - GraphBranchFactor int `json:"graph_branch_factor"` + RerankingResults int `json:"reranking_results"` GraphRebuildIntervalMin int `json:"graph_rebuild_interval_min"` HubThreshold int `json:"hub_threshold"` - ContextShowLastSummary bool `json:"context_show_last_summary"` - RerankingEnabled bool `json:"reranking_enabled"` + ObservationRetentionDays int `json:"observation_retention_days"` + MaintenanceIntervalHours int `json:"maintenance_interval_hours"` ContextShowWorkTokens bool `json:"context_show_work_tokens"` ContextShowReadTokens bool `json:"context_show_read_tokens"` RerankingPureMode bool `json:"reranking_pure_mode"` GraphEnabled bool `json:"graph_enabled"` - - // Maintenance settings - MaintenanceEnabled bool `json:"maintenance_enabled"` // Enable scheduled maintenance - MaintenanceIntervalHours int `json:"maintenance_interval_hours"` // How often to run maintenance (default 6 hours) - ObservationRetentionDays int `json:"observation_retention_days"` // Delete observations older than N days (0 = no age-based deletion) - CleanupStaleObservations bool `json:"cleanup_stale_observations"` // Auto-cleanup stale observations during maintenance + MaintenanceEnabled bool `json:"maintenance_enabled"` + RerankingEnabled bool `json:"reranking_enabled"` + ContextShowLastSummary bool `json:"context_show_last_summary"` + CleanupStaleObservations bool `json:"cleanup_stale_observations"` } var ( diff --git a/internal/db/gorm/models.go b/internal/db/gorm/models.go index c0e071f..d71f8f1 100644 --- a/internal/db/gorm/models.go +++ b/internal/db/gorm/models.go @@ -53,16 +53,18 @@ type Observation struct { Scope models.ObservationScope `gorm:"type:text;default:'project';check:scope IN ('project', 'global');index:idx_observations_scope;index:idx_observations_project_scope,priority:2"` Type models.ObservationType `gorm:"type:text;check:type IN ('decision', 'bugfix', 'feature', 'refactor', 'discovery', 'change');index;not null"` CreatedAt string `gorm:"not null"` - Title sql.NullString `gorm:"type:text"` + Facts models.JSONStringArray `gorm:"type:text"` Narrative sql.NullString `gorm:"type:text"` Concepts models.JSONStringArray `gorm:"type:text"` FilesRead models.JSONStringArray `gorm:"type:text"` FilesModified models.JSONStringArray `gorm:"type:text"` Subtitle sql.NullString `gorm:"type:text"` - Facts models.JSONStringArray `gorm:"type:text"` - LastRetrievedAt sql.NullInt64 `gorm:"column:last_retrieved_at_epoch"` - PromptNumber sql.NullInt64 + Title sql.NullString `gorm:"type:text"` + ArchivedReason sql.NullString ScoreUpdatedAt sql.NullInt64 `gorm:"column:score_updated_at_epoch;index:idx_observations_score_updated"` + PromptNumber sql.NullInt64 + ArchivedAt sql.NullInt64 `gorm:"column:archived_at_epoch"` + LastRetrievedAt sql.NullInt64 `gorm:"column:last_retrieved_at_epoch"` ID int64 `gorm:"primaryKey;autoIncrement"` ImportanceScore float64 `gorm:"type:real;default:1.0;index:idx_observations_importance,priority:1,sort:desc"` UserFeedback int `gorm:"default:0"` @@ -70,11 +72,7 @@ type Observation struct { CreatedAtEpoch int64 `gorm:"index:idx_observations_created,sort:desc;index:idx_observations_project_created,priority:2,sort:desc;not null"` DiscoveryTokens int64 `gorm:"default:0"` IsSuperseded int `gorm:"default:0;index:idx_observations_superseded;index:idx_observations_active,priority:2"` - - // Archival fields - IsArchived int `gorm:"default:0;index:idx_observations_archived;index:idx_observations_active,priority:1"` - ArchivedAt sql.NullInt64 `gorm:"column:archived_at_epoch"` - ArchivedReason sql.NullString + IsArchived int `gorm:"default:0;index:idx_observations_archived;index:idx_observations_active,priority:1"` } func (Observation) TableName() string { return "observations" } diff --git a/internal/db/gorm/observation_store.go b/internal/db/gorm/observation_store.go index 9e8bbde..e409cec 100644 --- a/internal/db/gorm/observation_store.go +++ b/internal/db/gorm/observation_store.go @@ -41,18 +41,16 @@ type CleanupFunc func(ctx context.Context, deletedIDs []int64) // ObservationStore provides observation-related database operations using GORM. type ObservationStore struct { - db *gorm.DB - rawDB *sql.DB - cleanupFunc CleanupFunc - conflictStore any // Placeholder for ConflictStore (Phase 4) - relationStore any // Placeholder for RelationStore (Phase 4) - - // Cleanup queue for async observation cleanup with proper error handling + conflictStore any + relationStore any + db *gorm.DB + rawDB *sql.DB + cleanupFunc CleanupFunc cleanupQueue chan string + stopCleanup chan struct{} cleanupWg sync.WaitGroup cleanupOnce sync.Once - cleanupStarted atomic.Bool // tracks if cleanup worker was started - stopCleanup chan struct{} + cleanupStarted atomic.Bool } // NewObservationStore creates a new observation store. @@ -854,10 +852,10 @@ func (s *ObservationStore) GetArchivalStats(ctx context.Context, project string) // Use a single query with conditional aggregation to get all stats at once. // This is much faster than 4 separate queries (saves 3 round trips). type statsResult struct { + OldestEpoch *int64 + NewestEpoch *int64 TotalCount int64 ArchivedCount int64 - OldestEpoch *int64 // Pointer to handle NULL - NewestEpoch *int64 } var result statsResult diff --git a/internal/db/gorm/store.go b/internal/db/gorm/store.go index ee43a37..17c8a69 100644 --- a/internal/db/gorm/store.go +++ b/internal/db/gorm/store.go @@ -19,15 +19,13 @@ import ( // Store represents the GORM database connection with sqlite-vec support. type Store struct { - DB *gorm.DB - sqlDB *sql.DB // For FTS5 and sqlite-vec operations that require raw SQL - metrics *PoolMetrics - - // Health check caching to reduce database load from frequent monitoring - healthCacheMu sync.RWMutex - cachedHealth *HealthInfo healthCacheTime time.Time - healthCacheTTL time.Duration // Default: 5 seconds + DB *gorm.DB + sqlDB *sql.DB + metrics *PoolMetrics + cachedHealth *HealthInfo + healthCacheTTL time.Duration + healthCacheMu sync.RWMutex } // Config holds database configuration. @@ -313,13 +311,13 @@ func (s *Store) performHealthCheck(ctx context.Context) *HealthInfo { // HealthInfo contains database health check results. type HealthInfo struct { - Status string `json:"status"` // healthy, degraded, unhealthy Timestamp time.Time `json:"timestamp"` - QueryLatency time.Duration `json:"query_latency_ns"` - PoolStats PoolStats `json:"pool_stats"` - HistoricalMetrics MetricsSummary `json:"historical_metrics,omitempty"` + Status string `json:"status"` Error string `json:"error,omitempty"` Warning string `json:"warning,omitempty"` + HistoricalMetrics MetricsSummary `json:"historical_metrics,omitempty"` + PoolStats PoolStats `json:"pool_stats"` + QueryLatency time.Duration `json:"query_latency_ns"` } // PoolStats contains connection pool statistics. @@ -345,16 +343,16 @@ const ( // PoolMetrics tracks historical connection pool metrics with a sliding window. type PoolMetrics struct { + lastSampleTime time.Time + latencySamples []time.Duration + latencyIdx int + latencyCount int + totalQueries int64 + totalWaitTime time.Duration + peakInUse int + peakWaitCount int64 + windowSize int mu sync.RWMutex - latencySamples []time.Duration // Circular buffer of latency samples - latencyIdx int // Current index in circular buffer - latencyCount int // Number of samples collected - totalQueries int64 // Total queries executed - totalWaitTime time.Duration // Cumulative wait time for connections - peakInUse int // Peak concurrent connections in use - peakWaitCount int64 // Peak wait count observed - lastSampleTime time.Time // Last time a sample was recorded - windowSize int // Size of sliding window } // NewPoolMetrics creates a new pool metrics collector with the given window size. @@ -450,6 +448,7 @@ func (m *PoolMetrics) GetMetricsSummary() MetricsSummary { // MetricsSummary contains aggregated pool metrics. type MetricsSummary struct { + LastSampleTime time.Time `json:"last_sample_time"` TotalQueries int64 `json:"total_queries"` SampleCount int `json:"sample_count"` AvgLatency time.Duration `json:"avg_latency_ns"` @@ -459,7 +458,6 @@ type MetricsSummary struct { PeakInUse int `json:"peak_in_use"` PeakWaitCount int64 `json:"peak_wait_count"` TotalWaitTime time.Duration `json:"total_wait_time_ns"` - LastSampleTime time.Time `json:"last_sample_time"` } // GetMetrics returns the current metrics without performing a health check. diff --git a/internal/maintenance/service.go b/internal/maintenance/service.go index 0456d85..994fcdf 100644 --- a/internal/maintenance/service.go +++ b/internal/maintenance/service.go @@ -13,23 +13,21 @@ import ( // Service handles scheduled maintenance tasks. type Service struct { - store *gorm.Store - observationStore *gorm.ObservationStore - summaryStore *gorm.SummaryStore + log zerolog.Logger + lastRunTime time.Time promptStore *gorm.PromptStore + store *gorm.Store vectorCleanupFn func(ctx context.Context, deletedIDs []int64) config *config.Config - log zerolog.Logger + summaryStore *gorm.SummaryStore stopCh chan struct{} doneCh chan struct{} - mu sync.Mutex - running bool - - // Metrics - lastRunTime time.Time + observationStore *gorm.ObservationStore lastRunDuration time.Duration totalCleanedObs int64 totalOptimizeRun int64 + mu sync.Mutex + running bool } // NewService creates a new maintenance service. diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 32c7791..5864be4 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -1585,15 +1585,15 @@ func (s *Server) handleGetObservation(ctx context.Context, args json.RawMessage) // handleEditObservation updates an existing observation with provided fields. func (s *Server) handleEditObservation(ctx context.Context, args json.RawMessage) (string, error) { var params struct { - ID int64 `json:"id"` Title *string `json:"title,omitempty"` Subtitle *string `json:"subtitle,omitempty"` Narrative *string `json:"narrative,omitempty"` + Scope *string `json:"scope,omitempty"` Facts []string `json:"facts,omitempty"` Concepts []string `json:"concepts,omitempty"` FilesRead []string `json:"files_read,omitempty"` FilesModified []string `json:"files_modified,omitempty"` - Scope *string `json:"scope,omitempty"` + ID int64 `json:"id"` } if err := json.Unmarshal(args, ¶ms); err != nil { return "", fmt.Errorf("invalid arguments: %w", err) @@ -1808,9 +1808,9 @@ func (s *Server) handleSuggestConsolidations(ctx context.Context, args json.RawM // Find similar pairs using vector search if available type consolidationGroup struct { Primary *models.Observation `json:"primary"` + Reason string `json:"reason"` Similar []*models.Observation `json:"similar"` Similarity float64 `json:"avg_similarity"` - Reason string `json:"reason"` } groups := []consolidationGroup{} @@ -1920,9 +1920,9 @@ func (s *Server) handleSuggestConsolidations(ctx context.Context, args json.RawM // handleTagObservation adds, removes, or sets tags on an observation. func (s *Server) handleTagObservation(ctx context.Context, args json.RawMessage) (string, error) { var params struct { - ID int64 `json:"id"` - Tags []string `json:"tags"` Mode string `json:"mode"` + Tags []string `json:"tags"` + ID int64 `json:"id"` } params.Mode = "add" // default @@ -2073,8 +2073,8 @@ func (s *Server) handleGetObservationsByTag(ctx context.Context, args json.RawMe func (s *Server) handleGetTemporalTrends(ctx context.Context, args json.RawMessage) (string, error) { var params struct { Project string `json:"project"` - Days int `json:"days"` GroupBy string `json:"group_by"` + Days int `json:"days"` } params.Days = 30 params.GroupBy = "day" @@ -2347,10 +2347,10 @@ func (s *Server) handleGetDataQualityReport(ctx context.Context, args json.RawMe func (s *Server) handleBatchTagByPattern(ctx context.Context, args json.RawMessage) (string, error) { var params struct { Pattern string `json:"pattern"` - Tags []string `json:"tags"` Project string `json:"project"` - DryRun bool `json:"dry_run"` + Tags []string `json:"tags"` MaxMatches int `json:"max_matches"` + DryRun bool `json:"dry_run"` } params.DryRun = true params.MaxMatches = 100 @@ -2485,14 +2485,14 @@ func (s *Server) handleExplainSearchRanking(ctx context.Context, args json.RawMe // Build detailed explanations for each result type RankExplanation struct { - Rank int `json:"rank"` - ID int64 `json:"id"` + ScoreBreakdown map[string]float64 `json:"score_breakdown"` + Metadata map[string]any `json:"metadata,omitempty"` Title string `json:"title"` Type string `json:"type"` - Score float64 `json:"score"` - ScoreBreakdown map[string]float64 `json:"score_breakdown"` MatchedFields []string `json:"matched_fields"` - Metadata map[string]any `json:"metadata,omitempty"` + Rank int `json:"rank"` + ID int64 `json:"id"` + Score float64 `json:"score"` } explanations := make([]RankExplanation, 0, len(result.Results)) @@ -2562,10 +2562,10 @@ func (s *Server) handleExportObservations(ctx context.Context, args json.RawMess var params struct { Format string `json:"format"` Project string `json:"project"` + ObsType string `json:"obs_type"` Limit int `json:"limit"` DateStart int64 `json:"date_start"` DateEnd int64 `json:"date_end"` - ObsType string `json:"obs_type"` } params.Format = "json" params.Limit = 100 @@ -2705,11 +2705,11 @@ func (s *Server) handleCheckSystemHealth(ctx context.Context) (string, error) { } type HealthReport struct { - OverallStatus string `json:"overall_status"` - HealthScore int `json:"health_score"` Timestamp time.Time `json:"timestamp"` Subsystems map[string]*SubsystemHealth `json:"subsystems"` + OverallStatus string `json:"overall_status"` Actions []string `json:"recommended_actions,omitempty"` + HealthScore int `json:"health_score"` } report := &HealthReport{ @@ -2902,19 +2902,19 @@ func (s *Server) handleAnalyzeSearchPatterns(ctx context.Context, args json.RawM type QueryPattern struct { Query string `json:"query"` + LastUsed string `json:"last_used"` Count int `json:"count"` AvgResults float64 `json:"avg_results"` ZeroResults int `json:"zero_result_count"` - LastUsed string `json:"last_used"` } type PatternAnalysis struct { Period string `json:"period"` - TotalSearches int `json:"total_searches"` - UniqueQueries int `json:"unique_queries"` TopQueries []QueryPattern `json:"top_queries"` ZeroResultQueries []string `json:"zero_result_queries,omitempty"` Insights []string `json:"insights,omitempty"` + TotalSearches int `json:"total_searches"` + UniqueQueries int `json:"unique_queries"` } analysis := &PatternAnalysis{ @@ -3033,23 +3033,23 @@ func (s *Server) handleGetObservationRelationships(ctx context.Context, args jso // Build response with additional context type RelationInfo struct { - ID int64 `json:"id"` - SourceID int64 `json:"source_id"` - TargetID int64 `json:"target_id"` Type string `json:"type"` - Confidence float64 `json:"confidence"` SourceTitle string `json:"source_title,omitempty"` TargetTitle string `json:"target_title,omitempty"` SourceType string `json:"source_type,omitempty"` TargetType string `json:"target_type,omitempty"` + ID int64 `json:"id"` + SourceID int64 `json:"source_id"` + TargetID int64 `json:"target_id"` + Confidence float64 `json:"confidence"` } type GraphResponse struct { + Relations []RelationInfo `json:"relations"` + UniqueNodes []int64 `json:"unique_nodes"` CenterID int64 `json:"center_id"` MaxDepth int `json:"max_depth"` TotalRelations int `json:"total_relations"` - Relations []RelationInfo `json:"relations"` - UniqueNodes []int64 `json:"unique_nodes"` } // Collect unique node IDs @@ -3162,10 +3162,10 @@ func (s *Server) handleGetObservationScoringBreakdown(ctx context.Context, args // handleAnalyzeObservationImportance returns importance analysis for a project's observations. func (s *Server) handleAnalyzeObservationImportance(ctx context.Context, args json.RawMessage) (string, error) { var params struct { - Project string `json:"project"` IncludeTopScored *bool `json:"include_top_scored"` IncludeMostRetrieved *bool `json:"include_most_retrieved"` IncludeConceptWeights *bool `json:"include_concept_weights"` + Project string `json:"project"` Limit int `json:"limit"` } if err := json.Unmarshal(args, ¶ms); err != nil { diff --git a/internal/pattern/detector.go b/internal/pattern/detector.go index 512a520..2d2888e 100644 --- a/internal/pattern/detector.go +++ b/internal/pattern/detector.go @@ -340,8 +340,8 @@ func (d *Detector) cleanupOldCandidates() { if d.config.MaxCandidates > 0 && len(d.candidates) > d.config.MaxCandidates { // Find oldest candidates to evict using O(n log n) sort instead of O(n²) selection sort type keyAge struct { - key string - age int64 + key string + age int64 } candidates := make([]keyAge, 0, len(d.candidates)) for k, c := range d.candidates { diff --git a/internal/search/manager.go b/internal/search/manager.go index eaaacb8..5896556 100644 --- a/internal/search/manager.go +++ b/internal/search/manager.go @@ -26,20 +26,20 @@ var multiSpaceRegex = regexp.MustCompile(`\s+`) // Search configuration constants. const ( // Cache configuration - defaultCacheTTL = 30 * time.Second // Short TTL for freshness - defaultCacheMaxSize = 200 // Max cached results - cacheEvictionPercent = 10 // Evict 10% when cache is full - cacheEvictionThreshold = 80 // Start eviction scan at 80% capacity + defaultCacheTTL = 30 * time.Second // Short TTL for freshness + defaultCacheMaxSize = 200 // Max cached results + cacheEvictionPercent = 10 // Evict 10% when cache is full + cacheEvictionThreshold = 80 // Start eviction scan at 80% capacity // Latency tracking - latencyHistogramCap = 1000 // Max latency samples for histogram - slowQueryThresholdNs = 100 * 1e6 // 100ms threshold for slow query logging + latencyHistogramCap = 1000 // Max latency samples for histogram + slowQueryThresholdNs = 100 * 1e6 // 100ms threshold for slow query logging // Query frequency tracking - maxFrequencyEntries = 1000 // Max queries to track for warming - frequencyEvictionBatch = 100 // Remove 10% when frequency map is full - staleQueryThreshold = 24 * time.Hour // Remove queries older than 24 hours - recentQueryWindow = time.Hour // Only consider queries from last hour for warming + maxFrequencyEntries = 1000 // Max queries to track for warming + frequencyEvictionBatch = 100 // Remove 10% when frequency map is full + staleQueryThreshold = 24 * time.Hour // Remove queries older than 24 hours + recentQueryWindow = time.Hour // Only consider queries from last hour for warming // Cache warming configuration cacheWarmingInitDelay = 30 * time.Second // Delay before starting warming @@ -63,19 +63,17 @@ const ( // SearchMetrics tracks search performance statistics. type SearchMetrics struct { - TotalSearches int64 // Total number of searches performed - VectorSearches int64 // Searches using vector search - FilterSearches int64 // Searches using filter/FTS search - TotalLatencyNs int64 // Cumulative latency in nanoseconds - VectorLatencyNs int64 // Cumulative vector search latency - FilterLatencyNs int64 // Cumulative filter search latency - CacheHits int64 // Number of result cache hits - CoalescedRequests int64 // Number of requests served via singleflight coalescing - SearchErrors int64 // Number of search errors - - // Percentile tracking (approximate using reservoir sampling) - latencyHistogram []int64 // Recent latency samples - histogramMu sync.Mutex + latencyHistogram []int64 + TotalSearches int64 + VectorSearches int64 + FilterSearches int64 + TotalLatencyNs int64 + VectorLatencyNs int64 + FilterLatencyNs int64 + CacheHits int64 + CoalescedRequests int64 + SearchErrors int64 + histogramMu sync.Mutex } // GetStats returns the current search statistics. @@ -117,36 +115,28 @@ func (m *SearchMetrics) GetStats() map[string]any { // Manager provides unified search across SQLite and sqlite-vec. type Manager struct { - observationStore *gorm.ObservationStore - summaryStore *gorm.SummaryStore - promptStore *gorm.PromptStore + ctx context.Context + searchGroup singleflight.Group + cancel context.CancelFunc vectorClient *sqlitevec.Client metrics *SearchMetrics - - // Context for graceful shutdown of background goroutines - ctx context.Context - cancel context.CancelFunc - - // Request coalescing for concurrent identical queries - searchGroup singleflight.Group - - // Result cache for repeated queries (short TTL) - resultCache map[string]*cachedResult - resultCacheMu sync.RWMutex - cacheTTL time.Duration - cacheMaxSize int - - // Query frequency tracking for cache warming + promptStore *gorm.PromptStore + observationStore *gorm.ObservationStore + summaryStore *gorm.SummaryStore + resultCache map[string]*cachedResult queryFrequency map[string]*queryFrequencyInfo + cacheTTL time.Duration + cacheMaxSize int + resultCacheMu sync.RWMutex queryFrequencyMu sync.RWMutex } // queryFrequencyInfo tracks how often a query is used. type queryFrequencyInfo struct { - params SearchParams - count int64 - lastUsed time.Time + lastUsed time.Time lastCached time.Time + params SearchParams + count int64 } // cachedResult stores a cached search result with expiry. @@ -270,8 +260,8 @@ func (m *Manager) warmFrequentQueries() { m.queryFrequencyMu.RLock() // Find top N most frequent queries that aren't recently cached type queryScore struct { - key string info *queryFrequencyInfo + key string score float64 } candidates := make([]queryScore, 0, len(m.queryFrequency)) @@ -359,8 +349,8 @@ func (m *Manager) trackQueryFrequency(params SearchParams) { // Collect keys and times for eviction (still under lock, but fast) type entry struct { - key string lastUsed time.Time + key string } entries := make([]entry, 0, mapLen) for k, v := range m.queryFrequency { @@ -390,11 +380,11 @@ func (m *Manager) trackQueryFrequency(params SearchParams) { // RecentQuery represents a recently executed search query. type RecentQuery struct { + LastUsed time.Time `json:"last_used"` Query string `json:"query"` Project string `json:"project,omitempty"` - Type string `json:"type,omitempty"` // observations, sessions, prompts - Count int64 `json:"count"` // Number of times executed - LastUsed time.Time `json:"last_used"` + Type string `json:"type,omitempty"` + Count int64 `json:"count"` } // GetRecentQueries returns the most recent search queries, sorted by last used time. diff --git a/internal/vector/sqlitevec/client.go b/internal/vector/sqlitevec/client.go index fbb001d..76f194a 100644 --- a/internal/vector/sqlitevec/client.go +++ b/internal/vector/sqlitevec/client.go @@ -25,51 +25,39 @@ type embeddingCacheEntry struct { // resultCacheEntry stores cached query results. type resultCacheEntry struct { + queryHash string results []QueryResult - timestamp int64 // Unix nano - queryHash string // Hash of query + filters for validation + timestamp int64 } // Client provides vector operations via sqlite-vec. type Client struct { - db *sql.DB - embedSvc *embedding.Service - - // Separate mutexes for read and write operations to reduce contention - writeMu sync.Mutex // Protects write operations (AddDocuments, DeleteDocuments) - readMu sync.RWMutex // Protects read operations (Query, Count) - - // Embedding cache for query deduplication - queryCache map[string]embeddingCacheEntry - queryCacheMu sync.RWMutex - cacheMaxSize int - cacheTTLNano int64 // Cache TTL in nanoseconds - - // Result cache for repeated searches - resultCache map[string]resultCacheEntry - resultCacheMu sync.RWMutex + embeddingGroup singleflight.Group + resultCache map[string]resultCacheEntry + db *sql.DB + embedSvc *embedding.Service + queryCache map[string]embeddingCacheEntry + stopCleanup chan struct{} + stats CacheStats + cleanupWg sync.WaitGroup + resultCacheTTLNano int64 + cacheTTLNano int64 resultCacheMaxSize int - resultCacheTTLNano int64 // Shorter TTL for results (data changes more often) - - // Cache statistics - stats CacheStats - - // Background cleanup control - stopCleanup chan struct{} - cleanupWg sync.WaitGroup - - // Singleflight to deduplicate concurrent embedding computations - embeddingGroup singleflight.Group + cacheMaxSize int + resultCacheMu sync.RWMutex + queryCacheMu sync.RWMutex + readMu sync.RWMutex + writeMu sync.Mutex } // CacheStats tracks cache performance metrics using atomic counters for lock-free updates. type CacheStats struct { - embeddingHits atomic.Int64 - embeddingMisses atomic.Int64 - resultHits atomic.Int64 - resultMisses atomic.Int64 + embeddingHits atomic.Int64 + embeddingMisses atomic.Int64 + resultHits atomic.Int64 + resultMisses atomic.Int64 embeddingEvictions atomic.Int64 - resultEvictions atomic.Int64 + resultEvictions atomic.Int64 } // CacheStatsSnapshot is the exported version of CacheStats for JSON marshaling. @@ -139,8 +127,8 @@ func NewClient(cfg Config, embedSvc *embedding.Service) (*Client, error) { cacheMaxSize: 500, // Cache up to 500 query embeddings cacheTTLNano: 5 * 60 * 1e9, // 5 minute TTL for embeddings resultCache: make(map[string]resultCacheEntry), - resultCacheMaxSize: 200, // Cache up to 200 search results - resultCacheTTLNano: 60 * 1e9, // 1 minute TTL for results (shorter since data changes) + resultCacheMaxSize: 200, // Cache up to 200 search results + resultCacheTTLNano: 60 * 1e9, // 1 minute TTL for results (shorter since data changes) stopCleanup: make(chan struct{}), } @@ -442,9 +430,9 @@ func (c *Client) cleanupExpiredCaches() { // BatchQueryResult holds results from a batch query operation. type BatchQueryResult struct { - Query string // Original query string - Results []QueryResult // Results for this query - Error error // Error if query failed + Error error + Query string + Results []QueryResult } // QueryBatch performs multiple vector searches concurrently. @@ -691,15 +679,15 @@ func (c *Client) GetStaleVectors(ctx context.Context) ([]StaleVectorInfo, error) // VectorHealthStats contains comprehensive health information about the vector store. type VectorHealthStats struct { - TotalVectors int64 `json:"total_vectors"` - StaleVectors int64 `json:"stale_vectors"` - CurrentModel string `json:"current_model"` - NeedsRebuild bool `json:"needs_rebuild"` - RebuildReason string `json:"rebuild_reason,omitempty"` - CoverageByType map[string]int64 `json:"coverage_by_type"` - ModelVersions map[string]int64 `json:"model_versions"` - ProjectCounts map[string]int64 `json:"project_counts"` - EmbeddingCache CacheStatsSnapshot `json:"embedding_cache"` + CoverageByType map[string]int64 `json:"coverage_by_type"` + ModelVersions map[string]int64 `json:"model_versions"` + ProjectCounts map[string]int64 `json:"project_counts"` + CurrentModel string `json:"current_model"` + RebuildReason string `json:"rebuild_reason,omitempty"` + EmbeddingCache CacheStatsSnapshot `json:"embedding_cache"` + TotalVectors int64 `json:"total_vectors"` + StaleVectors int64 `json:"stale_vectors"` + NeedsRebuild bool `json:"needs_rebuild"` } // GetHealthStats returns comprehensive health statistics about the vector store. diff --git a/internal/worker/handlers_data.go b/internal/worker/handlers_data.go index a0b0863..932b912 100644 --- a/internal/worker/handlers_data.go +++ b/internal/worker/handlers_data.go @@ -484,11 +484,11 @@ type UpdateObservationRequest struct { Title *string `json:"title,omitempty"` Subtitle *string `json:"subtitle,omitempty"` Narrative *string `json:"narrative,omitempty"` + Scope *string `json:"scope,omitempty"` Facts []string `json:"facts,omitempty"` Concepts []string `json:"concepts,omitempty"` FilesRead []string `json:"files_read,omitempty"` FilesModified []string `json:"files_modified,omitempty"` - Scope *string `json:"scope,omitempty"` } // handleUpdateObservation updates an existing observation. @@ -562,8 +562,8 @@ func (s *Service) handleUpdateObservation(w http.ResponseWriter, r *http.Request // Broadcast update event s.sseBroadcaster.Broadcast(map[string]any{ - "type": "observation_updated", - "id": id, + "type": "observation_updated", + "id": id, }) writeJSON(w, map[string]any{ diff --git a/internal/worker/handlers_import_export.go b/internal/worker/handlers_import_export.go index 7430b46..a3dc351 100644 --- a/internal/worker/handlers_import_export.go +++ b/internal/worker/handlers_import_export.go @@ -29,23 +29,23 @@ type BulkImportRequest struct { // BulkObservationInput represents a single observation in bulk import. type BulkObservationInput struct { - Type string `json:"type"` // bugfix, feature, refactor, etc. + Type string `json:"type"` Title string `json:"title"` Subtitle string `json:"subtitle,omitempty"` - Facts []string `json:"facts,omitempty"` Narrative string `json:"narrative,omitempty"` + Scope string `json:"scope,omitempty"` + Facts []string `json:"facts,omitempty"` Concepts []string `json:"concepts,omitempty"` FilesRead []string `json:"files_read,omitempty"` FilesModified []string `json:"files_modified,omitempty"` - Scope string `json:"scope,omitempty"` // project or global } // BulkImportResponse contains the result of a bulk import operation. type BulkImportResponse struct { + Errors []string `json:"errors,omitempty"` Imported int `json:"imported"` Failed int `json:"failed"` SkippedDuplicates int `json:"skipped_duplicates,omitempty"` - Errors []string `json:"errors,omitempty"` } // handleBulkImport handles bulk import of observations. @@ -208,10 +208,10 @@ func (s *Service) handleBulkImport(w http.ResponseWriter, r *http.Request) { // ArchiveRequest is the request body for archiving observations. type ArchiveRequest struct { - IDs []int64 `json:"ids,omitempty"` // Specific IDs to archive - Project string `json:"project,omitempty"` // Archive all in project older than max_age_days - MaxAgeDays int `json:"max_age_days,omitempty"` // Only used with project - Reason string `json:"reason,omitempty"` // Optional reason for archival + Project string `json:"project,omitempty"` + Reason string `json:"reason,omitempty"` + IDs []int64 `json:"ids,omitempty"` + MaxAgeDays int `json:"max_age_days,omitempty"` } // handleArchiveObservations archives observations by ID or by age. @@ -233,8 +233,8 @@ func (s *Service) handleArchiveObservations(w http.ResponseWriter, r *http.Reque if len(req.IDs) > 5 { // Use parallel archival for batches larger than 5 type archiveResult struct { - id int64 err error + id int64 } results := make(chan archiveResult, len(req.IDs)) @@ -384,8 +384,8 @@ func (s *Service) handleExportObservations(w http.ResponseWriter, r *http.Reques if format == "" { format = "json" } - scope := r.URL.Query().Get("scope") // project, global, or empty for all - obsType := r.URL.Query().Get("type") // bugfix, feature, etc. + scope := r.URL.Query().Get("scope") // project, global, or empty for all + obsType := r.URL.Query().Get("type") // bugfix, feature, etc. limit := gorm.ParseLimitParamWithMax(r, 1000, 5000) // Higher limit for exports, capped at 5000 // Validate format @@ -491,10 +491,10 @@ func escapeCsvField(s string) string { // BulkStatusRequest represents a request to update status for multiple observations. type BulkStatusRequest struct { - IDs []int64 `json:"ids"` - Action string `json:"action"` // "supersede", "archive", "set_feedback" + Action string `json:"action"` Reason string `json:"reason,omitempty"` - Feedback int `json:"feedback,omitempty"` // -1, 0, 1 for set_feedback action + IDs []int64 `json:"ids"` + Feedback int `json:"feedback,omitempty"` } // handleBulkStatusUpdate updates status for multiple observations in one request. diff --git a/internal/worker/handlers_sessions.go b/internal/worker/handlers_sessions.go index 837f7a3..d85f374 100644 --- a/internal/worker/handlers_sessions.go +++ b/internal/worker/handlers_sessions.go @@ -25,10 +25,10 @@ type SessionInitRequest struct { // SessionInitResponse is the response for session initialization. type SessionInitResponse struct { + Reason string `json:"reason,omitempty"` SessionDBID int64 `json:"sessionDbId"` PromptNumber int `json:"promptNumber"` Skipped bool `json:"skipped,omitempty"` - Reason string `json:"reason,omitempty"` } // DuplicatePromptWindowSeconds is the time window for detecting duplicate prompt submissions. diff --git a/internal/worker/handlers_test.go b/internal/worker/handlers_test.go index 89ed2c0..7b27162 100644 --- a/internal/worker/handlers_test.go +++ b/internal/worker/handlers_test.go @@ -66,6 +66,7 @@ func testService(t *testing.T) (*Service, func()) { cancel: cancel, startTime: time.Now(), retrievalStats: make(map[string]*RetrievalStats), + cachedObsCounts: make(map[string]cachedCount), } svc.setupRoutes() @@ -345,11 +346,13 @@ func TestHandleGetObservations_Limit(t *testing.T) { assert.Equal(t, http.StatusOK, rec.Code) - // Parse as generic JSON array since the model uses custom marshaling - var observations []map[string]interface{} - err := json.Unmarshal(rec.Body.Bytes(), &observations) + // Parse as object with observations key (API returns wrapped response) + var response map[string]interface{} + err := json.Unmarshal(rec.Body.Bytes(), &response) require.NoError(t, err) + observations, ok := response["observations"].([]interface{}) + require.True(t, ok, "expected observations array in response") assert.Len(t, observations, 10) } @@ -1135,10 +1138,13 @@ func TestHandleGetObservations_DefaultLimit(t *testing.T) { assert.Equal(t, http.StatusOK, rec.Code) - var observations []map[string]interface{} - err := json.Unmarshal(rec.Body.Bytes(), &observations) + var response map[string]interface{} + err := json.Unmarshal(rec.Body.Bytes(), &response) require.NoError(t, err) + observations, ok := response["observations"].([]interface{}) + require.True(t, ok, "expected observations array in response") + // Should return default limit (100) assert.LessOrEqual(t, len(observations), DefaultObservationsLimit) } @@ -1159,10 +1165,12 @@ func TestHandleGetObservations_FilterByProject(t *testing.T) { assert.Equal(t, http.StatusOK, rec.Code) - var observations []map[string]interface{} - err := json.Unmarshal(rec.Body.Bytes(), &observations) + var response map[string]interface{} + err := json.Unmarshal(rec.Body.Bytes(), &response) require.NoError(t, err) + observations, ok := response["observations"].([]interface{}) + require.True(t, ok, "expected observations array in response") assert.Len(t, observations, 2) } @@ -1412,10 +1420,12 @@ func TestHandleGetObservations(t *testing.T) { assert.Equal(t, http.StatusOK, rec.Code) - var observations []map[string]interface{} - err := json.Unmarshal(rec.Body.Bytes(), &observations) + var response map[string]interface{} + err := json.Unmarshal(rec.Body.Bytes(), &response) require.NoError(t, err) + observations, ok := response["observations"].([]interface{}) + require.True(t, ok, "expected observations array in response") assert.GreaterOrEqual(t, len(observations), 2) } @@ -2697,10 +2707,13 @@ func TestHandleGetObservations_EmptyResult(t *testing.T) { assert.Equal(t, http.StatusOK, rec.Code) - // Should return empty array, not null - var obs []interface{} - err := json.Unmarshal(rec.Body.Bytes(), &obs) + // Should return empty array within observations key, not null + var response map[string]interface{} + err := json.Unmarshal(rec.Body.Bytes(), &response) require.NoError(t, err) + + obs, ok := response["observations"].([]interface{}) + require.True(t, ok, "expected observations array in response") assert.NotNil(t, obs) } diff --git a/internal/worker/middleware.go b/internal/worker/middleware.go index cd42eb7..4c6fee2 100644 --- a/internal/worker/middleware.go +++ b/internal/worker/middleware.go @@ -91,12 +91,10 @@ func MaxBodySize(maxBytes int64) func(http.Handler) http.Handler { // TokenAuth provides simple token-based authentication for localhost services. // The token is generated at startup and must be provided in the X-Auth-Token header. type TokenAuth struct { - token string - enabled bool - mu sync.RWMutex - - // ExemptPaths are paths that don't require authentication (e.g., health checks) ExemptPaths map[string]bool + token string + mu sync.RWMutex + enabled bool } // NewTokenAuth creates a new TokenAuth with a randomly generated token. @@ -176,7 +174,7 @@ func (ta *TokenAuth) Middleware(next http.Handler) http.Handler { // It wraps the base per-client rate limiter with additional per-operation limits. type ExpensiveOperationLimiter struct { // Track last execution time per operation type - lastRebuild int64 // Unix timestamp + lastRebuild int64 // Unix timestamp rebuildCooldown int64 // Minimum seconds between rebuilds mu sync.Mutex diff --git a/internal/worker/middleware_test.go b/internal/worker/middleware_test.go index 28477a4..0fe6e3b 100644 --- a/internal/worker/middleware_test.go +++ b/internal/worker/middleware_test.go @@ -42,8 +42,8 @@ func TestSecurityHeaders_CORS(t *testing.T) { tests := []struct { name string origin string - expectCORS bool expectedOrigin string + expectCORS bool }{ { name: "localhost:37778 origin allowed", diff --git a/internal/worker/ratelimit.go b/internal/worker/ratelimit.go index 6e1ec63..0a49ead 100644 --- a/internal/worker/ratelimit.go +++ b/internal/worker/ratelimit.go @@ -9,13 +9,13 @@ import ( // RateLimiter implements a token bucket rate limiter. type RateLimiter struct { - rate float64 // tokens per second - burst int // maximum burst size - mu sync.Mutex // protects following fields - tokens float64 // current tokens - lastUpdate time.Time // last token update time - requests int64 // total requests - rejected int64 // rejected requests + lastUpdate time.Time + rate float64 + burst int + tokens float64 + requests int64 + rejected int64 + mu sync.Mutex } // LastUpdateTime returns the last update time. @@ -77,12 +77,12 @@ func (rl *RateLimiter) Stats() map[string]any { defer rl.mu.Unlock() return map[string]any{ - "rate": rl.rate, - "burst": rl.burst, - "current_tokens": rl.tokens, - "total_requests": rl.requests, - "rejected": rl.rejected, - "rejection_rate": float64(rl.rejected) / max(float64(rl.requests), 1), + "rate": rl.rate, + "burst": rl.burst, + "current_tokens": rl.tokens, + "total_requests": rl.requests, + "rejected": rl.rejected, + "rejection_rate": float64(rl.rejected) / max(float64(rl.requests), 1), } } @@ -102,14 +102,13 @@ func RateLimitMiddleware(limiter *RateLimiter) func(http.Handler) http.Handler { // PerClientRateLimiter implements per-client rate limiting. type PerClientRateLimiter struct { - rate float64 - burst int - clients map[string]*RateLimiter - mu sync.Mutex - // Cleanup settings + lastCleanup time.Time + clients map[string]*RateLimiter + rate float64 + burst int cleanupInterval time.Duration maxIdleTime time.Duration - lastCleanup time.Time + mu sync.Mutex } // NewPerClientRateLimiter creates a new per-client rate limiter. diff --git a/internal/worker/sdk/processor.go b/internal/worker/sdk/processor.go index d4b6f93..c83d805 100644 --- a/internal/worker/sdk/processor.go +++ b/internal/worker/sdk/processor.go @@ -26,11 +26,11 @@ import ( // CircuitBreaker implements a simple circuit breaker pattern for CLI calls. type CircuitBreaker struct { - failures int64 // Current failure count - lastFailure int64 // Unix timestamp of last failure - threshold int64 // Number of failures before opening - resetTimeout int64 // Seconds to wait before trying again - state int32 // 0=closed, 1=open, 2=half-open + failures int64 // Current failure count + lastFailure int64 // Unix timestamp of last failure + threshold int64 // Number of failures before opening + resetTimeout int64 // Seconds to wait before trying again + state int32 // 0=closed, 1=open, 2=half-open } const ( @@ -100,12 +100,12 @@ func (cb *CircuitBreaker) State() string { // CircuitBreakerMetrics contains metrics about the circuit breaker state. type CircuitBreakerMetrics struct { - State string `json:"state"` - Failures int64 `json:"failures"` - Threshold int64 `json:"threshold"` - ResetTimeoutSecs int64 `json:"reset_timeout_secs"` - LastFailureUnix int64 `json:"last_failure_unix,omitempty"` - SecondsUntilReset int64 `json:"seconds_until_reset,omitempty"` + State string `json:"state"` + Failures int64 `json:"failures"` + Threshold int64 `json:"threshold"` + ResetTimeoutSecs int64 `json:"reset_timeout_secs"` + LastFailureUnix int64 `json:"last_failure_unix,omitempty"` + SecondsUntilReset int64 `json:"seconds_until_reset,omitempty"` } // Metrics returns the current metrics of the circuit breaker. @@ -277,8 +277,8 @@ func NewProcessor(observationStore *gorm.ObservationStore, summaryStore *gorm.Su observationStore: observationStore, summaryStore: summaryStore, sem: make(chan struct{}, MaxConcurrentCLICalls), - circuitBreaker: NewCircuitBreaker(5, 60), // Open after 5 failures, reset after 60s - deduplicator: NewRequestDeduplicator(300, 1000), // 5-minute TTL, 1000 max entries + circuitBreaker: NewCircuitBreaker(5, 60), // Open after 5 failures, reset after 60s + deduplicator: NewRequestDeduplicator(300, 1000), // 5-minute TTL, 1000 max entries vectorSyncChan: make(chan *models.Observation, MaxVectorSyncWorkers*2), // Buffered channel vectorSyncDone: make(chan struct{}), }, nil diff --git a/internal/worker/service.go b/internal/worker/service.go index ba40292..2088ee1 100644 --- a/internal/worker/service.go +++ b/internal/worker/service.go @@ -78,16 +78,16 @@ func retryWithBackoff(ctx context.Context, maxRetries int, initialBackoff time.D // RetrievalStats tracks observation retrieval metrics. type RetrievalStats struct { - TotalRequests int64 // Total retrieval requests (inject + search) - ObservationsServed int64 // Observations returned to clients - VerifiedStale int64 // Stale observations that passed verification - DeletedInvalid int64 // Invalid observations deleted - SearchRequests int64 // Semantic search requests - ContextInjections int64 // Session-start context injections - StaleExcluded int64 // Observations excluded due to staleness check - FreshCount int64 // Observations that passed staleness check - DuplicatesRemoved int64 // Observations removed by clustering - LastUpdated int64 // Unix timestamp of last update (atomic) + TotalRequests int64 // Total retrieval requests (inject + search) + ObservationsServed int64 // Observations returned to clients + VerifiedStale int64 // Stale observations that passed verification + DeletedInvalid int64 // Invalid observations deleted + SearchRequests int64 // Semantic search requests + ContextInjections int64 // Session-start context injections + StaleExcluded int64 // Observations excluded due to staleness check + FreshCount int64 // Observations that passed staleness check + DuplicatesRemoved int64 // Observations removed by clustering + LastUpdated int64 // Unix timestamp of last update (atomic) } // maxRetrievalStatsProjects limits the number of projects tracked to prevent unbounded memory growth. @@ -101,132 +101,91 @@ const maxRecentQueries = 100 // Service is the main worker service orchestrator. type Service struct { - // Version of the worker binary - version string - - // Configuration - config *config.Config - - // Database - store *gorm.Store - sessionStore *gorm.SessionStore - observationStore *gorm.ObservationStore - summaryStore *gorm.SummaryStore - promptStore *gorm.PromptStore - conflictStore *gorm.ConflictStore - patternStore *gorm.PatternStore - relationStore *gorm.RelationStore - - // Pattern detection - patternDetector *pattern.Detector - - // Domain services - sessionManager *session.Manager - sseBroadcaster *sse.Broadcaster - processor *sdk.Processor - - // Vector database (sqlite-vec with local embeddings) - embedSvc *embedding.Service - vectorClient *sqlitevec.Client - vectorSync *sqlitevec.Sync - - // Cross-encoder reranking (for improved search relevance) - reranker *reranking.Service - - // Query expansion (for improved search recall) - queryExpander *expansion.Expander - - // Importance scoring - scoreCalculator *scoring.Calculator - recalculator *scoring.Recalculator - - // HTTP server - router *chi.Mux - server *http.Server - startTime time.Time - - // Retrieval statistics (per-project) - retrievalStats map[string]*RetrievalStats - retrievalStatsMu sync.RWMutex - - // Lifecycle - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - // Initialization state (for deferred init) - ready atomic.Bool - initError error - initMu sync.RWMutex - - // Background verification queue for stale observations - staleQueue chan staleVerifyRequest - staleQueueOnce sync.Once - - // File watchers for auto-recreation on deletion - dbWatcher *watcher.Watcher - configWatcher *watcher.Watcher - - // Self-updater - updater *update.Updater - - // Rate limiting - rateLimiter *PerClientRateLimiter - expensiveOpLimiter *ExpensiveOperationLimiter - bulkOpLimiter *BulkOperationLimiter - - // Rebuild status tracking - rebuildStatus *RebuildStatus - rebuildStatusMu sync.RWMutex - - // Recent search query tracking (circular buffer for O(1) insertion) - recentQueriesBuf [maxRecentQueries]RecentSearchQuery // fixed-size circular buffer - recentQueriesHead int // index of most recent (newest) - recentQueriesLen int // current number of items - recentQueriesMu sync.RWMutex - - // Stats caching to reduce database load - cachedObsCounts map[string]cachedCount // per-project observation counts - cachedObsCountsMu sync.RWMutex - statsCacheTTL time.Duration - - // Vector sync worker pool - limits concurrent vector sync goroutines - vectorSyncSem chan struct{} // semaphore for rate limiting + startTime time.Time + ctx context.Context + initError error + server *http.Server + reranker *reranking.Service + observationStore *gorm.ObservationStore + summaryStore *gorm.SummaryStore + promptStore *gorm.PromptStore + conflictStore *gorm.ConflictStore + patternStore *gorm.PatternStore + relationStore *gorm.RelationStore + patternDetector *pattern.Detector + sessionManager *session.Manager + sseBroadcaster *sse.Broadcaster + processor *sdk.Processor + embedSvc *embedding.Service + vectorClient *sqlitevec.Client + vectorSync *sqlitevec.Sync + vectorSyncSem chan struct{} + queryExpander *expansion.Expander + scoreCalculator *scoring.Calculator + recalculator *scoring.Recalculator + router *chi.Mux + store *gorm.Store + retrievalStats map[string]*RetrievalStats + sessionStore *gorm.SessionStore + cancel context.CancelFunc + cachedObsCounts map[string]cachedCount + config *config.Config + rebuildStatus *RebuildStatus + staleQueue chan staleVerifyRequest + bulkOpLimiter *BulkOperationLimiter + dbWatcher *watcher.Watcher + configWatcher *watcher.Watcher + updater *update.Updater + rateLimiter *PerClientRateLimiter + expensiveOpLimiter *ExpensiveOperationLimiter + version string + recentQueriesBuf [maxRecentQueries]RecentSearchQuery + wg sync.WaitGroup + recentQueriesLen int + recentQueriesHead int + statsCacheTTL time.Duration + initMu sync.RWMutex + rebuildStatusMu sync.RWMutex + retrievalStatsMu sync.RWMutex + recentQueriesMu sync.RWMutex + cachedObsCountsMu sync.RWMutex + staleQueueOnce sync.Once + ready atomic.Bool } // cachedCount stores a cached count value with expiration. type cachedCount struct { - count int timestamp time.Time + count int } // RebuildStatus tracks the progress of vector rebuild operations. type RebuildStatus struct { - InProgress bool `json:"in_progress"` - StartTime time.Time `json:"start_time,omitempty"` - Phase string `json:"phase,omitempty"` // "observations", "summaries", "prompts", "complete" - TotalSynced int `json:"total_synced"` - TotalErrors int `json:"total_errors"` - CurrentPhase int `json:"current_phase"` // 1, 2, 3 for the three phases - TotalPhases int `json:"total_phases"` // 3 - ElapsedMs int64 `json:"elapsed_ms,omitempty"` - EstimatedPct float64 `json:"estimated_pct,omitempty"` // 0-100 + StartTime time.Time `json:"start_time,omitempty"` + Phase string `json:"phase,omitempty"` + TotalSynced int `json:"total_synced"` + TotalErrors int `json:"total_errors"` + CurrentPhase int `json:"current_phase"` + TotalPhases int `json:"total_phases"` + ElapsedMs int64 `json:"elapsed_ms,omitempty"` + EstimatedPct float64 `json:"estimated_pct,omitempty"` + InProgress bool `json:"in_progress"` } // staleVerifyRequest represents a request to verify a stale observation in background type staleVerifyRequest struct { - observationID int64 cwd string + observationID int64 } // RecentSearchQuery tracks a search query for analytics. type RecentSearchQuery struct { - Query string `json:"query"` - Project string `json:"project,omitempty"` - Type string `json:"type,omitempty"` // observations, summaries, prompts - Results int `json:"results"` - UsedVector bool `json:"used_vector"` - Timestamp time.Time `json:"timestamp"` + Timestamp time.Time `json:"timestamp"` + Query string `json:"query"` + Project string `json:"project,omitempty"` + Type string `json:"type,omitempty"` + Results int `json:"results"` + UsedVector bool `json:"used_vector"` } // asyncVectorSync executes a vector sync operation with rate limiting.