Release dec 2025 (#15)

* Resolves issue #13

- Switched model to bge-small-en-v1.5
- Added lazy re-embedding
- Added model version tracking per vector
- Added conversion of vectors to the new model

* Add lfs support to the workflow.

* Implements importance scoring with decay + voting #6

* Resolves issue #5 by marking observations as superseeded and scheduled for deletion

* Implement pattern detection #7

* Improve injections and observations accuracy

- Session start: Recent observations for project context (recency-based)
- User prompt: Semantically relevant observations (similarity-based with threshold)

* Added two stage retrieval with bi and cross encoder #8

* Implement query expansion and reformulation #9

* Knowledge graph and relationships ( resolves #4 )

- File Overlap Detection: Detects relationships when observations modify/read the same files
- Concept Overlap Detection: Detects relationships based on shared semantic concepts
- Type Progression Detection: Infers relationships from natural observation type progressions (e.g., discovery → bugfix = "fixes")
- Temporal Proximity Detection: Detects relationships between observations in the same session within 5 minutes
- Narrative Mention Detection: Detects explicit relationship language in narratives (e.g., "fixes", "depends on", "supersedes")

* Add visualisation of the relations to the dashboard.

* fixup! Add visualisation of the relations to the dashboard.

* Update documentation with new settings and screenshots.
This commit is contained in:
2025-12-19 17:57:11 +00:00
committed by GitHub
parent 48957a6c81
commit f79782a008
69 changed files with 43967 additions and 194 deletions
+238 -4
View File
@@ -4,13 +4,18 @@ package worker
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"github.com/lukaszraczylo/claude-mnemonic/internal/db/sqlite"
"github.com/lukaszraczylo/claude-mnemonic/internal/embedding"
"github.com/lukaszraczylo/claude-mnemonic/internal/privacy"
"github.com/lukaszraczylo/claude-mnemonic/internal/reranking"
"github.com/lukaszraczylo/claude-mnemonic/internal/search/expansion"
"github.com/lukaszraczylo/claude-mnemonic/internal/vector/sqlitevec"
"github.com/lukaszraczylo/claude-mnemonic/internal/worker/sdk"
"github.com/lukaszraczylo/claude-mnemonic/internal/worker/session"
@@ -641,6 +646,18 @@ func (s *Service) handleGetTypes(w http.ResponseWriter, r *http.Request) {
})
}
// handleGetModels returns available embedding models.
func (s *Service) handleGetModels(w http.ResponseWriter, _ *http.Request) {
models := embedding.ListModels()
defaultModel := embedding.GetDefaultModel()
writeJSON(w, map[string]interface{}{
"models": models,
"default": defaultModel,
"current": s.embedSvc.Version(),
})
}
// handleGetStats returns worker statistics.
func (s *Service) handleGetStats(w http.ResponseWriter, r *http.Request) {
project := r.URL.Query().Get("project")
@@ -658,6 +675,22 @@ func (s *Service) handleGetStats(w http.ResponseWriter, r *http.Request) {
"ready": s.ready.Load(),
}
// Add embedding model info
if s.embedSvc != nil {
response["embeddingModel"] = map[string]interface{}{
"name": s.embedSvc.Name(),
"version": s.embedSvc.Version(),
"dimensions": s.embedSvc.Dimensions(),
}
}
// Add vector count
if s.vectorClient != nil {
if count, err := s.vectorClient.Count(r.Context()); err == nil {
response["vectorCount"] = count
}
}
// Include project-specific observation count if project is specified
if project != "" {
count, err := s.observationStore.GetObservationCount(r.Context(), project)
@@ -715,15 +748,69 @@ func (s *Service) handleSearchByPrompt(w http.ResponseWriter, r *http.Request) {
var observations []*models.Observation
var err error
var usedVector bool
similarityScores := make(map[int64]float64) // Track similarity per observation
// Get threshold settings from config
threshold := s.config.ContextRelevanceThreshold
maxResults := s.config.ContextMaxPromptResults
// Generate expanded queries if query expander is available
var expandedQueries []expansion.ExpandedQuery
var detectedIntent string
if s.queryExpander != nil {
cfg := expansion.DefaultConfig()
cfg.EnableVocabularyExpansion = false // Vocabulary expansion is optional
expandedQueries = s.queryExpander.Expand(r.Context(), query, cfg)
if len(expandedQueries) > 0 {
detectedIntent = string(expandedQueries[0].Intent)
}
}
if len(expandedQueries) == 0 {
// Fallback to just the original query
expandedQueries = []expansion.ExpandedQuery{
{Query: query, Weight: 1.0, Source: "original"},
}
}
// Try vector search first if available
if s.vectorClient != nil && s.vectorClient.IsConnected() {
where := sqlitevec.BuildWhereFilter(sqlitevec.DocTypeObservation, "")
vectorResults, vecErr := s.vectorClient.Query(r.Context(), query, limit*2, where)
if vecErr == nil && len(vectorResults) > 0 {
// Search with each expanded query and merge results
allVectorResults := make([]sqlitevec.QueryResult, 0)
queryWeights := make(map[string]float64) // Track weights for score merging
for _, eq := range expandedQueries {
vectorResults, vecErr := s.vectorClient.Query(r.Context(), eq.Query, limit*2, where)
if vecErr == nil && len(vectorResults) > 0 {
// Apply weight to similarity scores before merging
for i := range vectorResults {
vectorResults[i].Similarity *= eq.Weight
}
allVectorResults = append(allVectorResults, vectorResults...)
queryWeights[eq.Query] = eq.Weight
}
}
if len(allVectorResults) > 0 {
// Filter by relevance threshold before extracting IDs
// Use a slightly lower threshold for expanded queries
effectiveThreshold := threshold * 0.9 // Allow slightly lower scores for expanded queries
filteredResults := sqlitevec.FilterByThreshold(allVectorResults, effectiveThreshold, 0)
// Build similarity map for filtered results (keeping highest weighted score per observation)
for _, vr := range filteredResults {
if sqliteID, ok := vr.Metadata["sqlite_id"].(float64); ok {
id := int64(sqliteID)
// Keep the highest score for each observation
if existing, exists := similarityScores[id]; !exists || vr.Similarity > existing {
similarityScores[id] = vr.Similarity
}
}
}
// Extract observation IDs with project/scope filtering using shared helper
obsIDs := sqlitevec.ExtractObservationIDs(vectorResults, project)
obsIDs := sqlitevec.ExtractObservationIDs(filteredResults, project)
if len(obsIDs) > 0 {
// Fetch full observations from SQLite
@@ -773,23 +860,132 @@ func (s *Service) handleSearchByPrompt(w http.ResponseWriter, r *http.Request) {
freshObservations = append(freshObservations, obs)
}
// Apply cross-encoder reranking if available
var reranked bool
if s.reranker != nil && len(freshObservations) > 0 && usedVector {
// Build candidates from observations with their bi-encoder scores
candidates := make([]reranking.Candidate, len(freshObservations))
for i, obs := range freshObservations {
content := obs.Title.String
if obs.Narrative.Valid && obs.Narrative.String != "" {
content = content + " " + obs.Narrative.String
}
candidates[i] = reranking.Candidate{
ID: fmt.Sprintf("%d", obs.ID),
Content: content,
Score: similarityScores[obs.ID],
Metadata: map[string]any{"obs_idx": i},
}
}
// Rerank using cross-encoder - use pure mode or combined scores
var rerankResults []reranking.RerankResult
var rerankErr error
if s.config.RerankingPureMode {
rerankResults, rerankErr = s.reranker.RerankByScore(query, candidates, s.config.RerankingResults)
} else {
rerankResults, rerankErr = s.reranker.Rerank(query, candidates, s.config.RerankingResults)
}
if rerankErr != nil {
log.Warn().Err(rerankErr).Msg("Cross-encoder reranking failed, using original order")
} else if len(rerankResults) > 0 {
// Update similarity scores with reranked scores
for _, rr := range rerankResults {
if id, err := strconv.ParseInt(rr.ID, 10, 64); err == nil {
similarityScores[id] = rr.CombinedScore
}
}
// Reorder observations based on rerank results
reorderedObs := make([]*models.Observation, 0, len(rerankResults))
obsMap := make(map[int64]*models.Observation)
for _, obs := range freshObservations {
obsMap[obs.ID] = obs
}
for _, rr := range rerankResults {
if id, err := strconv.ParseInt(rr.ID, 10, 64); err == nil {
if obs, ok := obsMap[id]; ok {
reorderedObs = append(reorderedObs, obs)
}
}
}
freshObservations = reorderedObs
reranked = true
log.Debug().
Int("candidates", len(candidates)).
Int("returned", len(rerankResults)).
Msg("Cross-encoder reranking complete")
}
}
// Cluster similar observations to remove duplicates
clusteredObservations := clusterObservations(freshObservations, 0.4)
// Sort by similarity score (highest first) if we have scores and didn't rerank
if len(similarityScores) > 0 && len(clusteredObservations) > 0 && !reranked {
sort.Slice(clusteredObservations, func(i, j int) bool {
scoreI := similarityScores[clusteredObservations[i].ID]
scoreJ := similarityScores[clusteredObservations[j].ID]
return scoreI > scoreJ
})
}
// Apply max results cap if configured
if maxResults > 0 && len(clusteredObservations) > maxResults {
clusteredObservations = clusteredObservations[:maxResults]
}
// Record retrieval stats (no verification done, so verified=0, deleted=0)
s.recordRetrievalStats(project, int64(len(clusteredObservations)), 0, 0, true)
// Increment retrieval counts for scoring (async, non-blocking)
if len(clusteredObservations) > 0 {
ids := make([]int64, len(clusteredObservations))
for i, obs := range clusteredObservations {
ids[i] = obs.ID
}
s.incrementRetrievalCounts(ids)
}
log.Info().
Str("project", project).
Str("query", query).
Str("intent", detectedIntent).
Int("expansions", len(expandedQueries)).
Int("found", len(clusteredObservations)).
Int("stale_excluded", staleCount).
Float64("threshold", threshold).
Msg("Prompt-based observation search")
// Build response with similarity scores
obsWithScores := make([]map[string]interface{}, len(clusteredObservations))
for i, obs := range clusteredObservations {
obsMap := obs.ToMap()
if score, ok := similarityScores[obs.ID]; ok {
obsMap["similarity"] = score
}
obsWithScores[i] = obsMap
}
// Build expansion info for response
expansionInfo := make([]map[string]interface{}, len(expandedQueries))
for i, eq := range expandedQueries {
expansionInfo[i] = map[string]interface{}{
"query": eq.Query,
"weight": eq.Weight,
"source": eq.Source,
}
}
writeJSON(w, map[string]interface{}{
"project": project,
"query": query,
"observations": clusteredObservations,
"intent": detectedIntent,
"expansions": expansionInfo,
"observations": obsWithScores,
"threshold": threshold,
"max_results": maxResults,
})
}
@@ -857,6 +1053,15 @@ func (s *Service) handleContextInject(w http.ResponseWriter, r *http.Request) {
// Record retrieval stats (no verification done)
s.recordRetrievalStats(project, int64(len(clusteredObservations)), 0, 0, false)
// Increment retrieval counts for scoring (async, non-blocking)
if len(clusteredObservations) > 0 {
ids := make([]int64, len(clusteredObservations))
for i, obs := range clusteredObservations {
ids[i] = obs.ID
}
s.incrementRetrievalCounts(ids)
}
log.Info().
Str("project", project).
Int("total", len(observations)).
@@ -1015,6 +1220,35 @@ func (s *Service) handleSelfCheck(w http.ResponseWriter, r *http.Request) {
}
components = append(components, sseStatus)
// Check Cross-Encoder Reranker
rerankerStatus := ComponentHealth{Name: "Cross-Encoder Reranker", Status: "healthy"}
if !s.config.RerankingEnabled {
rerankerStatus.Status = "degraded"
rerankerStatus.Message = "Disabled in config"
if overall == "healthy" {
overall = "degraded"
}
} else if s.reranker == nil {
rerankerStatus.Status = "degraded"
rerankerStatus.Message = "Not initialized"
if overall == "healthy" {
overall = "degraded"
}
} else {
// Verify reranker is functional using Score
_, normalizedScore, err := s.reranker.Score("test query", "test document")
if err != nil {
rerankerStatus.Status = "unhealthy"
rerankerStatus.Message = fmt.Sprintf("Score check failed: %v", err)
if overall == "healthy" {
overall = "degraded"
}
} else {
rerankerStatus.Message = fmt.Sprintf("Score check passed (%.4f)", normalizedScore)
}
}
components = append(components, rerankerStatus)
// Calculate uptime
uptime := time.Since(s.startTime).Round(time.Second).String()
+292
View File
@@ -0,0 +1,292 @@
// Package worker provides the main worker service for claude-mnemonic.
package worker
import (
"encoding/json"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/lukaszraczylo/claude-mnemonic/pkg/models"
)
// DefaultPatternsLimit is the default number of patterns to return.
const DefaultPatternsLimit = 100
// handleGetPatterns returns all active patterns, optionally filtered by type or project.
func (s *Service) handleGetPatterns(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
// Parse query parameters
limit := DefaultPatternsLimit
if l := r.URL.Query().Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 {
limit = parsed
}
}
patternType := r.URL.Query().Get("type")
project := r.URL.Query().Get("project")
var patterns []*models.Pattern
var err error
if patternType != "" {
// Filter by type
patterns, err = store.GetPatternsByType(r.Context(), models.PatternType(patternType), limit)
} else if project != "" {
// Filter by project
patterns, err = store.GetPatternsByProject(r.Context(), project, limit)
} else {
// Get all active patterns
patterns, err = store.GetActivePatterns(r.Context(), limit)
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, patterns)
}
// handleGetPatternStats returns aggregate statistics about patterns.
func (s *Service) handleGetPatternStats(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
stats, err := store.GetPatternStats(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, stats)
}
// handleGetPatternByID returns a single pattern by ID.
func (s *Service) handleGetPatternByID(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid pattern ID", http.StatusBadRequest)
return
}
pattern, err := store.GetPatternByID(r.Context(), id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if pattern == nil {
http.Error(w, "pattern not found", http.StatusNotFound)
return
}
writeJSON(w, pattern)
}
// handleGetPatternInsight returns a formatted insight string for a pattern.
func (s *Service) handleGetPatternInsight(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
detector := s.patternDetector
s.initMu.RUnlock()
if detector == nil {
http.Error(w, "pattern detector not initialized", http.StatusServiceUnavailable)
return
}
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid pattern ID", http.StatusBadRequest)
return
}
insight, err := detector.GetPatternInsight(r.Context(), id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, map[string]string{"insight": insight})
}
// handleDeletePattern deletes a pattern by ID.
func (s *Service) handleDeletePattern(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid pattern ID", http.StatusBadRequest)
return
}
if err := store.DeletePattern(r.Context(), id); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, map[string]string{"status": "deleted"})
}
// handleDeprecatePattern marks a pattern as deprecated.
func (s *Service) handleDeprecatePattern(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid pattern ID", http.StatusBadRequest)
return
}
if err := store.MarkPatternDeprecated(r.Context(), id); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, map[string]string{"status": "deprecated"})
}
// MergePatternsRequest is the request body for merging patterns.
type MergePatternsRequest struct {
SourceID int64 `json:"source_id"`
TargetID int64 `json:"target_id"`
}
// handleSearchPatterns performs full-text search on patterns.
func (s *Service) handleSearchPatterns(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
query := r.URL.Query().Get("q")
if query == "" {
http.Error(w, "query parameter 'q' is required", http.StatusBadRequest)
return
}
limit := DefaultPatternsLimit
if l := r.URL.Query().Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 {
limit = parsed
}
}
patterns, err := store.SearchPatternsFTS(r.Context(), query, limit)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, patterns)
}
// handleGetPatternByName returns a pattern by its name.
func (s *Service) handleGetPatternByName(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
name := r.URL.Query().Get("name")
if name == "" {
http.Error(w, "query parameter 'name' is required", http.StatusBadRequest)
return
}
pattern, err := store.GetPatternByName(r.Context(), name)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if pattern == nil {
http.Error(w, "pattern not found", http.StatusNotFound)
return
}
writeJSON(w, pattern)
}
// handleMergePatterns merges a source pattern into a target pattern.
func (s *Service) handleMergePatterns(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
store := s.patternStore
s.initMu.RUnlock()
if store == nil {
http.Error(w, "pattern store not initialized", http.StatusServiceUnavailable)
return
}
var req MergePatternsRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if req.SourceID == 0 || req.TargetID == 0 {
http.Error(w, "source_id and target_id are required", http.StatusBadRequest)
return
}
if req.SourceID == req.TargetID {
http.Error(w, "source_id and target_id cannot be the same", http.StatusBadRequest)
return
}
if err := store.MergePatterns(r.Context(), req.SourceID, req.TargetID); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, map[string]string{"status": "merged"})
}
+174
View File
@@ -0,0 +1,174 @@
// Package worker provides the main worker service for claude-mnemonic.
package worker
import (
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/lukaszraczylo/claude-mnemonic/pkg/models"
)
// DefaultRelationsLimit is the default number of relations to return.
const DefaultRelationsLimit = 50
// handleGetRelations returns relations for an observation.
func (s *Service) handleGetRelations(w http.ResponseWriter, r *http.Request) {
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid observation id", http.StatusBadRequest)
return
}
relations, err := s.relationStore.GetRelationsWithDetails(r.Context(), id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if relations == nil {
relations = []*models.RelationWithDetails{}
}
writeJSON(w, relations)
}
// handleGetRelationGraph returns the relation graph for an observation.
func (s *Service) handleGetRelationGraph(w http.ResponseWriter, r *http.Request) {
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid observation id", http.StatusBadRequest)
return
}
// Get depth parameter (default 2)
depth := 2
if depthStr := r.URL.Query().Get("depth"); depthStr != "" {
if d, err := strconv.Atoi(depthStr); err == nil && d > 0 && d <= 5 {
depth = d
}
}
graph, err := s.relationStore.GetRelationGraph(r.Context(), id, depth)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, graph)
}
// handleGetRelatedObservations returns observations related to a given one.
func (s *Service) handleGetRelatedObservations(w http.ResponseWriter, r *http.Request) {
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid observation id", http.StatusBadRequest)
return
}
// Get minimum confidence parameter (default 0.4)
minConfidence := 0.4
if confStr := r.URL.Query().Get("min_confidence"); confStr != "" {
if c, err := strconv.ParseFloat(confStr, 64); err == nil && c >= 0 && c <= 1 {
minConfidence = c
}
}
// Get related observation IDs
relatedIDs, err := s.relationStore.GetRelatedObservationIDs(r.Context(), id, minConfidence)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if len(relatedIDs) == 0 {
writeJSON(w, []*models.Observation{})
return
}
// Fetch full observations
observations, err := s.observationStore.GetObservationsByIDs(r.Context(), relatedIDs, "importance", 50)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if observations == nil {
observations = []*models.Observation{}
}
writeJSON(w, observations)
}
// handleGetRelationsByType returns all relations of a specific type.
func (s *Service) handleGetRelationsByType(w http.ResponseWriter, r *http.Request) {
relType := chi.URLParam(r, "type")
// Validate relation type
validType := false
for _, t := range models.AllRelationTypes {
if string(t) == relType {
validType = true
break
}
}
if !validType {
http.Error(w, "invalid relation type", http.StatusBadRequest)
return
}
limit := DefaultRelationsLimit
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 100 {
limit = l
}
}
relations, err := s.relationStore.GetRelationsByType(r.Context(), models.RelationType(relType), limit)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if relations == nil {
relations = []*models.ObservationRelation{}
}
writeJSON(w, relations)
}
// handleGetRelationStats returns statistics about relations.
func (s *Service) handleGetRelationStats(w http.ResponseWriter, r *http.Request) {
// Get total relation count
totalCount, err := s.relationStore.GetTotalRelationCount(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Get high confidence relations count
highConfRelations, err := s.relationStore.GetHighConfidenceRelations(r.Context(), 0.7, 1000)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Count by relation type
typeCounts := make(map[string]int)
for _, t := range models.AllRelationTypes {
relations, err := s.relationStore.GetRelationsByType(r.Context(), t, 1000)
if err == nil {
typeCounts[string(t)] = len(relations)
}
}
writeJSON(w, map[string]interface{}{
"total_count": totalCount,
"high_confidence": len(highConfRelations),
"by_type": typeCounts,
"min_confidence_used": 0.4,
})
}
+354
View File
@@ -0,0 +1,354 @@
// Package worker provides the main worker service for claude-mnemonic.
package worker
import (
"context"
"encoding/json"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"github.com/lukaszraczylo/claude-mnemonic/pkg/models"
)
// FeedbackRequest represents a user feedback submission.
type FeedbackRequest struct {
Feedback int `json:"feedback"` // -1 (thumbs down), 0 (neutral), 1 (thumbs up)
}
// handleObservationFeedback handles user feedback (thumbs up/down) for an observation.
// POST /api/observations/{id}/feedback
func (s *Service) handleObservationFeedback(w http.ResponseWriter, r *http.Request) {
// Parse observation ID
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid observation id", http.StatusBadRequest)
return
}
// Parse request body
var req FeedbackRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
// Validate feedback value
if req.Feedback < -1 || req.Feedback > 1 {
http.Error(w, "feedback must be -1, 0, or 1", http.StatusBadRequest)
return
}
// Get required components
s.initMu.RLock()
observationStore := s.observationStore
scoreCalculator := s.scoreCalculator
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
return
}
// Update feedback in database
if err := observationStore.UpdateObservationFeedback(r.Context(), id, req.Feedback); err != nil {
http.Error(w, "failed to update feedback", http.StatusInternalServerError)
return
}
// Recalculate score immediately if calculator is available
var newScore float64
if scoreCalculator != nil {
obs, err := observationStore.GetObservationByID(r.Context(), id)
if err == nil && obs != nil {
obs.UserFeedback = req.Feedback // Apply the new feedback
newScore = scoreCalculator.Calculate(obs, time.Now())
if err := observationStore.UpdateImportanceScore(r.Context(), id, newScore); err != nil {
// Log but don't fail - feedback was recorded
// Score will be updated on next recalculation cycle
}
}
}
// Broadcast update via SSE
s.sseBroadcaster.Broadcast(map[string]interface{}{
"type": "observation_feedback",
"id": id,
"feedback": req.Feedback,
"score": newScore,
})
writeJSON(w, map[string]interface{}{
"status": "ok",
"id": id,
"feedback": req.Feedback,
"score": newScore,
})
}
// handleGetScoringStats returns scoring statistics and configuration.
// GET /api/scoring/stats
func (s *Service) handleGetScoringStats(w http.ResponseWriter, r *http.Request) {
project := r.URL.Query().Get("project")
s.initMu.RLock()
observationStore := s.observationStore
recalculator := s.recalculator
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
return
}
// Get feedback statistics
feedbackStats, err := observationStore.GetObservationFeedbackStats(r.Context(), project)
if err != nil {
http.Error(w, "failed to get feedback stats", http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"feedback": feedbackStats,
}
// Add recalculator stats if available
if recalculator != nil {
response["recalculator"] = recalculator.GetStats()
}
writeJSON(w, response)
}
// handleGetTopObservations returns the highest-scoring observations.
// GET /api/observations/top
func (s *Service) handleGetTopObservations(w http.ResponseWriter, r *http.Request) {
limit := parseIntParam(r, "limit", 10)
project := r.URL.Query().Get("project")
s.initMu.RLock()
observationStore := s.observationStore
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
return
}
observations, err := observationStore.GetTopScoringObservations(r.Context(), project, limit)
if err != nil {
http.Error(w, "failed to get top observations", http.StatusInternalServerError)
return
}
if observations == nil {
observations = []*models.Observation{}
}
writeJSON(w, observations)
}
// handleGetMostRetrieved returns the most frequently retrieved observations.
// GET /api/observations/most-retrieved
func (s *Service) handleGetMostRetrieved(w http.ResponseWriter, r *http.Request) {
limit := parseIntParam(r, "limit", 10)
project := r.URL.Query().Get("project")
s.initMu.RLock()
observationStore := s.observationStore
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
return
}
observations, err := observationStore.GetMostRetrievedObservations(r.Context(), project, limit)
if err != nil {
http.Error(w, "failed to get most retrieved observations", http.StatusInternalServerError)
return
}
if observations == nil {
observations = []*models.Observation{}
}
writeJSON(w, observations)
}
// handleExplainScore returns a breakdown of how an observation's score was calculated.
// GET /api/observations/{id}/score
func (s *Service) handleExplainScore(w http.ResponseWriter, r *http.Request) {
// Parse observation ID
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid observation id", http.StatusBadRequest)
return
}
s.initMu.RLock()
observationStore := s.observationStore
scoreCalculator := s.scoreCalculator
s.initMu.RUnlock()
if observationStore == nil || scoreCalculator == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
return
}
// Get observation
obs, err := observationStore.GetObservationByID(r.Context(), id)
if err != nil {
http.Error(w, "failed to get observation", http.StatusInternalServerError)
return
}
if obs == nil {
http.Error(w, "observation not found", http.StatusNotFound)
return
}
// Calculate score components
components := scoreCalculator.CalculateComponents(obs, time.Now())
writeJSON(w, map[string]interface{}{
"id": id,
"components": components,
"config": scoreCalculator.GetConfig(),
})
}
// handleUpdateConceptWeight updates a concept weight.
// PUT /api/scoring/concepts/{concept}
func (s *Service) handleUpdateConceptWeight(w http.ResponseWriter, r *http.Request) {
concept := chi.URLParam(r, "concept")
if concept == "" {
http.Error(w, "concept is required", http.StatusBadRequest)
return
}
var req struct {
Weight float64 `json:"weight"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
// Validate weight
if req.Weight < 0 || req.Weight > 1 {
http.Error(w, "weight must be between 0 and 1", http.StatusBadRequest)
return
}
s.initMu.RLock()
observationStore := s.observationStore
recalculator := s.recalculator
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
return
}
// Update in database
if err := observationStore.UpdateConceptWeight(r.Context(), concept, req.Weight); err != nil {
http.Error(w, "failed to update concept weight", http.StatusInternalServerError)
return
}
// Refresh concept weights in recalculator
if recalculator != nil {
if err := recalculator.RefreshConceptWeights(r.Context()); err != nil {
// Log but don't fail - weight was saved
}
}
writeJSON(w, map[string]interface{}{
"status": "ok",
"concept": concept,
"weight": req.Weight,
})
}
// handleGetConceptWeights returns all concept weights.
// GET /api/scoring/concepts
func (s *Service) handleGetConceptWeights(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
observationStore := s.observationStore
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
return
}
weights, err := observationStore.GetConceptWeights(r.Context())
if err != nil {
http.Error(w, "failed to get concept weights", http.StatusInternalServerError)
return
}
writeJSON(w, weights)
}
// handleTriggerRecalculation triggers an immediate score recalculation.
// POST /api/scoring/recalculate
func (s *Service) handleTriggerRecalculation(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
recalculator := s.recalculator
s.initMu.RUnlock()
if recalculator == nil {
http.Error(w, "recalculator not available", http.StatusServiceUnavailable)
return
}
// Run recalculation in background
go func() {
if err := recalculator.RecalculateNow(r.Context()); err != nil {
// Log error but don't block response
}
}()
writeJSON(w, map[string]string{"status": "recalculation triggered"})
}
// parseIntParam parses an integer query parameter with a default value.
func parseIntParam(r *http.Request, name string, defaultVal int) int {
if val := r.URL.Query().Get(name); val != "" {
if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 {
return parsed
}
}
return defaultVal
}
// incrementRetrievalCounts increments retrieval counts for observations.
// Called after search results are returned to track popularity.
func (s *Service) incrementRetrievalCounts(ids []int64) {
if len(ids) == 0 {
return
}
s.initMu.RLock()
store := s.observationStore
s.initMu.RUnlock()
if store == nil {
return
}
// Increment in background to not block response
go func() {
// Create a new context with timeout for the background operation
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := store.IncrementRetrievalCount(ctx, ids); err != nil {
// Log but don't fail - this is a background operation
}
}()
}
+490 -1
View File
@@ -15,6 +15,10 @@ import (
"github.com/lukaszraczylo/claude-mnemonic/internal/config"
"github.com/lukaszraczylo/claude-mnemonic/internal/db/sqlite"
"github.com/lukaszraczylo/claude-mnemonic/internal/embedding"
"github.com/lukaszraczylo/claude-mnemonic/internal/pattern"
"github.com/lukaszraczylo/claude-mnemonic/internal/reranking"
"github.com/lukaszraczylo/claude-mnemonic/internal/scoring"
"github.com/lukaszraczylo/claude-mnemonic/internal/search/expansion"
"github.com/lukaszraczylo/claude-mnemonic/internal/update"
"github.com/lukaszraczylo/claude-mnemonic/internal/vector/sqlitevec"
"github.com/lukaszraczylo/claude-mnemonic/internal/watcher"
@@ -64,6 +68,12 @@ type Service struct {
observationStore *sqlite.ObservationStore
summaryStore *sqlite.SummaryStore
promptStore *sqlite.PromptStore
conflictStore *sqlite.ConflictStore
patternStore *sqlite.PatternStore
relationStore *sqlite.RelationStore
// Pattern detection
patternDetector *pattern.Detector
// Domain services
sessionManager *session.Manager
@@ -75,6 +85,16 @@ type Service struct {
vectorClient *sqlitevec.Client
vectorSync *sqlitevec.Sync
// Cross-encoder reranking (for improved search relevance)
reranker *reranking.Service
// Query expansion (for improved search recall)
queryExpander *expansion.Expander
// Importance scoring
scoreCalculator *scoring.Calculator
recalculator *scoring.Recalculator
// HTTP server
router *chi.Mux
server *http.Server
@@ -177,6 +197,15 @@ func (s *Service) initializeAsync() {
observationStore := sqlite.NewObservationStore(store)
summaryStore := sqlite.NewSummaryStore(store)
promptStore := sqlite.NewPromptStore(store)
conflictStore := sqlite.NewConflictStore(store)
patternStore := sqlite.NewPatternStore(store)
relationStore := sqlite.NewRelationStore(store)
// Enable conflict detection by linking stores
observationStore.SetConflictStore(conflictStore)
// Enable relation detection by linking stores
observationStore.SetRelationStore(relationStore)
// Create session manager
sessionManager := session.NewManager(sessionStore)
@@ -186,6 +215,8 @@ func (s *Service) initializeAsync() {
var vectorClient *sqlitevec.Client
var vectorSync *sqlitevec.Sync
var reranker *reranking.Service
emb, err := embedding.NewService()
if err != nil {
log.Warn().Err(err).Msg("Embedding service creation failed - vector search disabled")
@@ -200,8 +231,32 @@ func (s *Service) initializeAsync() {
} else {
vectorClient = client
vectorSync = sqlitevec.NewSync(client)
log.Info().Msg("sqlite-vec vector search enabled")
log.Info().
Str("model", embedSvc.Version()).
Msg("sqlite-vec vector search enabled")
}
// Create cross-encoder reranking service if enabled
if s.config.RerankingEnabled {
rerankCfg := reranking.DefaultConfig()
if s.config.RerankingAlpha > 0 && s.config.RerankingAlpha <= 1 {
rerankCfg.Alpha = s.config.RerankingAlpha
}
ranker, err := reranking.NewService(rerankCfg)
if err != nil {
log.Warn().Err(err).Msg("Cross-encoder reranking service creation failed - reranking disabled")
} else {
reranker = ranker
log.Info().
Float64("alpha", rerankCfg.Alpha).
Msg("Cross-encoder reranking enabled")
}
}
// Create query expander for improved search recall
s.queryExpander = expansion.NewExpander(embedSvc)
log.Info().Msg("Query expansion enabled")
}
// Create SDK processor (optional - will be nil if Claude CLI not available)
@@ -225,11 +280,38 @@ func (s *Service) initializeAsync() {
s.observationStore = observationStore
s.summaryStore = summaryStore
s.promptStore = promptStore
s.conflictStore = conflictStore
s.patternStore = patternStore
s.relationStore = relationStore
s.sessionManager = sessionManager
s.processor = processor
s.embedSvc = embedSvc
s.vectorClient = vectorClient
s.vectorSync = vectorSync
s.reranker = reranker
s.initMu.Unlock()
// Initialize pattern detector
patternDetector := pattern.NewDetector(patternStore, observationStore, pattern.DefaultConfig())
// Set pattern sync callback if vector sync is available
if vectorSync != nil {
patternDetector.SetSyncFunc(func(p *models.Pattern) {
if err := vectorSync.SyncPattern(s.ctx, p); err != nil {
log.Warn().Err(err).Int64("id", p.ID).Msg("Failed to sync pattern to sqlite-vec")
}
})
// Set cleanup callback for pattern deletions
patternStore.SetCleanupFunc(func(ctx context.Context, deletedIDs []int64) {
if err := vectorSync.DeletePatterns(ctx, deletedIDs); err != nil {
log.Warn().Err(err).Ints64("ids", deletedIDs).Msg("Failed to delete patterns from sqlite-vec")
}
})
}
s.initMu.Lock()
s.patternDetector = patternDetector
s.initMu.Unlock()
// Set vector sync callbacks on processor if both are available
@@ -238,6 +320,22 @@ func (s *Service) initializeAsync() {
if err := vectorSync.SyncObservation(s.ctx, obs); err != nil {
log.Warn().Err(err).Int64("id", obs.ID).Msg("Failed to sync observation to sqlite-vec")
}
// Trigger pattern detection for the new observation
if patternDetector != nil {
go func(observation *models.Observation) {
detectCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if result, err := patternDetector.AnalyzeObservation(detectCtx, observation); err != nil {
log.Warn().Err(err).Int64("obs_id", observation.ID).Msg("Pattern detection failed")
} else if result.MatchedPattern != nil {
log.Debug().
Int64("pattern_id", result.MatchedPattern.ID).
Str("pattern_name", result.MatchedPattern.Name).
Bool("is_new", result.IsNewPattern).
Msg("Pattern matched for observation")
}
}(obs)
}
})
processor.SetSyncSummaryFunc(func(summary *models.SessionSummary) {
if err := vectorSync.SyncSummary(s.ctx, summary); err != nil {
@@ -282,6 +380,37 @@ func (s *Service) initializeAsync() {
})
})
// Initialize importance scoring system
scoringConfig := models.DefaultScoringConfig()
// Load concept weights from database if available
if weights, err := observationStore.GetConceptWeights(s.ctx); err == nil && len(weights) > 0 {
scoringConfig.ConceptWeights = weights
log.Info().Int("count", len(weights)).Msg("Loaded concept weights from database")
}
scoreCalculator := scoring.NewCalculator(scoringConfig)
recalculator := scoring.NewRecalculator(observationStore, scoreCalculator, log.Logger)
s.initMu.Lock()
s.scoreCalculator = scoreCalculator
s.recalculator = recalculator
s.initMu.Unlock()
// Start background recalculator
s.wg.Add(1)
go func() {
defer s.wg.Done()
recalculator.Start(s.ctx)
}()
log.Info().Msg("Importance scoring system initialized")
// Start pattern detector background analysis
if patternDetector != nil {
patternDetector.Start()
log.Info().Msg("Pattern recognition engine started")
}
// Mark as ready
s.ready.Store(true)
log.Info().Msg("Async initialization complete - service ready")
@@ -294,6 +423,27 @@ func (s *Service) initializeAsync() {
// Start file watchers for auto-recreation on deletion
s.startWatchers()
// Check if vectors need rebuilding (empty or model version mismatch) and trigger background rebuild
if vectorClient != nil && vectorSync != nil {
needsRebuild, reason := vectorClient.NeedsRebuild(s.ctx)
if needsRebuild {
log.Info().
Str("reason", reason).
Str("model", vectorClient.ModelVersion()).
Msg("Vector rebuild required")
if reason == "empty" {
// Full rebuild - vectors table is empty
s.wg.Add(1)
go s.rebuildAllVectors(observationStore, summaryStore, promptStore, vectorSync)
} else {
// Granular rebuild - only rebuild vectors with mismatched model versions
s.wg.Add(1)
go s.rebuildStaleVectors(observationStore, summaryStore, promptStore, vectorClient, vectorSync)
}
}
}
}
// startWatchers initializes and starts file watchers for database and config.
@@ -384,6 +534,15 @@ func (s *Service) reinitializeDatabase() {
observationStore := sqlite.NewObservationStore(store)
summaryStore := sqlite.NewSummaryStore(store)
promptStore := sqlite.NewPromptStore(store)
conflictStore := sqlite.NewConflictStore(store)
patternStore := sqlite.NewPatternStore(store)
relationStore := sqlite.NewRelationStore(store)
// Enable conflict detection by linking stores
observationStore.SetConflictStore(conflictStore)
// Enable relation detection by linking stores
observationStore.SetRelationStore(relationStore)
// Create new session manager
sessionManager := session.NewManager(sessionStore)
@@ -393,6 +552,8 @@ func (s *Service) reinitializeDatabase() {
var vectorClient *sqlitevec.Client
var vectorSync *sqlitevec.Sync
var reranker *reranking.Service
emb, err := embedding.NewService()
if err != nil {
log.Warn().Err(err).Msg("Embedding service creation failed after reinit")
@@ -408,6 +569,34 @@ func (s *Service) reinitializeDatabase() {
vectorSync = sqlitevec.NewSync(client)
log.Info().Msg("sqlite-vec reconnected after reinit")
}
// Recreate cross-encoder reranking service if enabled
if s.config.RerankingEnabled {
rerankCfg := reranking.DefaultConfig()
if s.config.RerankingAlpha > 0 && s.config.RerankingAlpha <= 1 {
rerankCfg.Alpha = s.config.RerankingAlpha
}
ranker, err := reranking.NewService(rerankCfg)
if err != nil {
log.Warn().Err(err).Msg("Cross-encoder reranking service creation failed after reinit")
} else {
reranker = ranker
log.Info().Msg("Cross-encoder reranking reconnected after reinit")
}
}
// Recreate query expander
s.queryExpander = expansion.NewExpander(embedSvc)
log.Info().Msg("Query expansion reconnected after reinit")
}
// Close old reranker if exists
s.initMu.RLock()
oldReranker := s.reranker
s.initMu.RUnlock()
if oldReranker != nil {
_ = oldReranker.Close()
}
// Recreate SDK processor with new stores
@@ -422,6 +611,30 @@ func (s *Service) reinitializeDatabase() {
})
}
// Stop old pattern detector if it exists
if s.patternDetector != nil {
s.patternDetector.Stop()
}
// Create new pattern detector
patternDetector := pattern.NewDetector(patternStore, observationStore, pattern.DefaultConfig())
// Set pattern sync callback if vector sync is available
if vectorSync != nil {
patternDetector.SetSyncFunc(func(p *models.Pattern) {
if err := vectorSync.SyncPattern(s.ctx, p); err != nil {
log.Warn().Err(err).Int64("id", p.ID).Msg("Failed to sync pattern to sqlite-vec")
}
})
// Set cleanup callback for pattern deletions
patternStore.SetCleanupFunc(func(ctx context.Context, deletedIDs []int64) {
if err := vectorSync.DeletePatterns(ctx, deletedIDs); err != nil {
log.Warn().Err(err).Ints64("ids", deletedIDs).Msg("Failed to delete patterns from sqlite-vec")
}
})
}
// Atomically swap all components
s.initMu.Lock()
s.store = store
@@ -429,20 +642,44 @@ func (s *Service) reinitializeDatabase() {
s.observationStore = observationStore
s.summaryStore = summaryStore
s.promptStore = promptStore
s.conflictStore = conflictStore
s.patternStore = patternStore
s.relationStore = relationStore
s.patternDetector = patternDetector
s.sessionManager = sessionManager
s.processor = processor
s.embedSvc = embedSvc
s.vectorClient = vectorClient
s.vectorSync = vectorSync
s.reranker = reranker
s.initError = nil
s.initMu.Unlock()
// Start pattern detector
patternDetector.Start()
// Set vector sync callbacks on processor if both are available
if processor != nil && vectorSync != nil {
processor.SetSyncObservationFunc(func(obs *models.Observation) {
if err := vectorSync.SyncObservation(s.ctx, obs); err != nil {
log.Warn().Err(err).Int64("id", obs.ID).Msg("Failed to sync observation to sqlite-vec")
}
// Trigger pattern detection for the new observation
if patternDetector != nil {
go func(observation *models.Observation) {
detectCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if result, err := patternDetector.AnalyzeObservation(detectCtx, observation); err != nil {
log.Warn().Err(err).Int64("obs_id", observation.ID).Msg("Pattern detection failed")
} else if result.MatchedPattern != nil {
log.Debug().
Int64("pattern_id", result.MatchedPattern.ID).
Str("pattern_name", result.MatchedPattern.Name).
Bool("is_new", result.IsNewPattern).
Msg("Pattern matched for observation")
}
}(obs)
}
})
processor.SetSyncSummaryFunc(func(summary *models.SessionSummary) {
if err := vectorSync.SyncSummary(s.ctx, summary); err != nil {
@@ -565,6 +802,210 @@ func (s *Service) processStaleQueue() {
}
}
// rebuildAllVectors rebuilds all vectors from observations, summaries, and prompts.
// Called when the vectors table is empty (e.g., after migration 20 drops all vectors).
func (s *Service) rebuildAllVectors(
observationStore *sqlite.ObservationStore,
summaryStore *sqlite.SummaryStore,
promptStore *sqlite.PromptStore,
vectorSync *sqlitevec.Sync,
) {
defer s.wg.Done()
log.Info().Msg("Starting full vector rebuild...")
start := time.Now()
var totalSynced int
var syncErrors int
// Rebuild observations
observations, err := observationStore.GetAllObservations(s.ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch observations for vector rebuild")
} else {
for _, obs := range observations {
if err := vectorSync.SyncObservation(s.ctx, obs); err != nil {
log.Warn().Err(err).Int64("id", obs.ID).Msg("Failed to sync observation during rebuild")
syncErrors++
} else {
totalSynced++
}
}
log.Info().Int("count", len(observations)).Msg("Rebuilt observation vectors")
}
// Rebuild summaries
summaries, err := summaryStore.GetAllSummaries(s.ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch summaries for vector rebuild")
} else {
for _, summary := range summaries {
if err := vectorSync.SyncSummary(s.ctx, summary); err != nil {
log.Warn().Err(err).Int64("id", summary.ID).Msg("Failed to sync summary during rebuild")
syncErrors++
} else {
totalSynced++
}
}
log.Info().Int("count", len(summaries)).Msg("Rebuilt summary vectors")
}
// Rebuild user prompts
prompts, err := promptStore.GetAllPrompts(s.ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch prompts for vector rebuild")
} else {
for _, prompt := range prompts {
if err := vectorSync.SyncUserPrompt(s.ctx, prompt); err != nil {
log.Warn().Err(err).Int64("id", prompt.ID).Msg("Failed to sync prompt during rebuild")
syncErrors++
} else {
totalSynced++
}
}
log.Info().Int("count", len(prompts)).Msg("Rebuilt prompt vectors")
}
elapsed := time.Since(start)
log.Info().
Int("total_synced", totalSynced).
Int("errors", syncErrors).
Dur("elapsed", elapsed).
Msg("Full vector rebuild complete")
}
// rebuildStaleVectors rebuilds only vectors with mismatched or unknown model versions.
// This is more efficient than rebuilding all vectors when only some need updating.
func (s *Service) rebuildStaleVectors(
observationStore *sqlite.ObservationStore,
summaryStore *sqlite.SummaryStore,
promptStore *sqlite.PromptStore,
vectorClient *sqlitevec.Client,
vectorSync *sqlitevec.Sync,
) {
defer s.wg.Done()
log.Info().Msg("Starting granular vector rebuild for stale vectors...")
start := time.Now()
// Get all stale vectors
staleVectors, err := vectorClient.GetStaleVectors(s.ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to get stale vectors")
return
}
if len(staleVectors) == 0 {
log.Info().Msg("No stale vectors found")
return
}
log.Info().Int("stale_count", len(staleVectors)).Msg("Found stale vectors to rebuild")
// Group stale vectors by doc_type and sqlite_id for efficient lookup
staleObsIDs := make(map[int64]bool)
staleSummaryIDs := make(map[int64]bool)
stalePromptIDs := make(map[int64]bool)
staleDocIDs := make([]string, 0, len(staleVectors))
for _, sv := range staleVectors {
staleDocIDs = append(staleDocIDs, sv.DocID)
switch sv.DocType {
case "observation":
staleObsIDs[sv.SQLiteID] = true
case "summary":
staleSummaryIDs[sv.SQLiteID] = true
case "prompt":
stalePromptIDs[sv.SQLiteID] = true
}
}
// Delete stale vectors before re-syncing
if err := vectorClient.DeleteVectorsByDocIDs(s.ctx, staleDocIDs); err != nil {
log.Error().Err(err).Msg("Failed to delete stale vectors")
return
}
var totalSynced int
var syncErrors int
// Rebuild stale observations
if len(staleObsIDs) > 0 {
ids := make([]int64, 0, len(staleObsIDs))
for id := range staleObsIDs {
ids = append(ids, id)
}
observations, err := observationStore.GetObservationsByIDs(s.ctx, ids, "date_desc", 0)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch observations for rebuild")
} else {
for _, obs := range observations {
if err := vectorSync.SyncObservation(s.ctx, obs); err != nil {
log.Warn().Err(err).Int64("id", obs.ID).Msg("Failed to sync observation during rebuild")
syncErrors++
} else {
totalSynced++
}
}
log.Info().Int("count", len(observations)).Msg("Rebuilt stale observation vectors")
}
}
// Rebuild stale summaries
if len(staleSummaryIDs) > 0 {
ids := make([]int64, 0, len(staleSummaryIDs))
for id := range staleSummaryIDs {
ids = append(ids, id)
}
summaries, err := summaryStore.GetSummariesByIDs(s.ctx, ids, "date_desc", 0)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch summaries for rebuild")
} else {
for _, summary := range summaries {
if err := vectorSync.SyncSummary(s.ctx, summary); err != nil {
log.Warn().Err(err).Int64("id", summary.ID).Msg("Failed to sync summary during rebuild")
syncErrors++
} else {
totalSynced++
}
}
log.Info().Int("count", len(summaries)).Msg("Rebuilt stale summary vectors")
}
}
// Rebuild stale prompts
if len(stalePromptIDs) > 0 {
ids := make([]int64, 0, len(stalePromptIDs))
for id := range stalePromptIDs {
ids = append(ids, id)
}
prompts, err := promptStore.GetPromptsByIDs(s.ctx, ids, "date_desc", 0)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch prompts for rebuild")
} else {
for _, prompt := range prompts {
if err := vectorSync.SyncUserPrompt(s.ctx, prompt); err != nil {
log.Warn().Err(err).Int64("id", prompt.ID).Msg("Failed to sync prompt during rebuild")
syncErrors++
} else {
totalSynced++
}
}
log.Info().Int("count", len(prompts)).Msg("Rebuilt stale prompt vectors")
}
}
elapsed := time.Since(start)
log.Info().
Int("total_synced", totalSynced).
Int("errors", syncErrors).
Dur("elapsed", elapsed).
Msg("Granular vector rebuild complete")
}
// verifyStaleObservation verifies a single stale observation in the background.
func (s *Service) verifyStaleObservation(req staleVerifyRequest) {
// Wait for service to be ready
@@ -667,11 +1108,42 @@ func (s *Service) setupRoutes() {
r.Get("/api/stats", s.handleGetStats)
r.Get("/api/stats/retrieval", s.handleGetRetrievalStats)
r.Get("/api/types", s.handleGetTypes)
r.Get("/api/models", s.handleGetModels)
// Observation scoring and feedback routes
r.Post("/api/observations/{id}/feedback", s.handleObservationFeedback)
r.Get("/api/observations/{id}/score", s.handleExplainScore)
r.Get("/api/observations/top", s.handleGetTopObservations)
r.Get("/api/observations/most-retrieved", s.handleGetMostRetrieved)
// Scoring configuration routes
r.Get("/api/scoring/stats", s.handleGetScoringStats)
r.Get("/api/scoring/concepts", s.handleGetConceptWeights)
r.Put("/api/scoring/concepts/{concept}", s.handleUpdateConceptWeight)
r.Post("/api/scoring/recalculate", s.handleTriggerRecalculation)
// Context injection
r.Get("/api/context/count", s.handleContextCount)
r.Get("/api/context/inject", s.handleContextInject)
r.Get("/api/context/search", s.handleSearchByPrompt)
// Pattern routes
r.Get("/api/patterns", s.handleGetPatterns)
r.Get("/api/patterns/stats", s.handleGetPatternStats)
r.Get("/api/patterns/search", s.handleSearchPatterns)
r.Get("/api/patterns/by-name", s.handleGetPatternByName)
r.Get("/api/patterns/{id}", s.handleGetPatternByID)
r.Get("/api/patterns/{id}/insight", s.handleGetPatternInsight)
r.Delete("/api/patterns/{id}", s.handleDeletePattern)
r.Post("/api/patterns/{id}/deprecate", s.handleDeprecatePattern)
r.Post("/api/patterns/merge", s.handleMergePatterns)
// Relation routes (knowledge graph)
r.Get("/api/relations/stats", s.handleGetRelationStats)
r.Get("/api/relations/type/{type}", s.handleGetRelationsByType)
r.Get("/api/observations/{id}/relations", s.handleGetRelations)
r.Get("/api/observations/{id}/graph", s.handleGetRelationGraph)
r.Get("/api/observations/{id}/related", s.handleGetRelatedObservations)
})
}
@@ -894,6 +1366,16 @@ func (s *Service) Shutdown(ctx context.Context) error {
_ = s.configWatcher.Stop()
}
// Stop background recalculator
if s.recalculator != nil {
s.recalculator.Stop()
}
// Stop pattern detector
if s.patternDetector != nil {
s.patternDetector.Stop()
}
// Shutdown all sessions
s.sessionManager.ShutdownAll(ctx)
@@ -904,6 +1386,13 @@ func (s *Service) Shutdown(ctx context.Context) error {
}
}
// Close reranking service
if s.reranker != nil {
if err := s.reranker.Close(); err != nil {
log.Error().Err(err).Msg("Reranking service close error")
}
}
// Close embedding service
if s.embedSvc != nil {
if err := s.embedSvc.Close(); err != nil {