fixup! chore: update marketplace for v0.11.37

march-improvements
This commit is contained in:
2026-03-06 15:39:52 +00:00
parent 1a6f6b6e5e
commit 77f5f02510
32 changed files with 2404 additions and 2778 deletions
+32
View File
@@ -5,6 +5,7 @@ import (
"encoding/json"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
)
@@ -52,6 +53,7 @@ type Config struct {
ContextRelevanceThreshold float64 `json:"context_relevance_threshold"`
RerankingCandidates int `json:"reranking_candidates"`
WorkerPort int `json:"worker_port"`
DeduplicationThreshold float64 `json:"deduplication_threshold"`
RerankingMinImprovement float64 `json:"reranking_min_improvement"`
ContextObservations int `json:"context_observations"`
ContextMaxPromptResults int `json:"context_max_prompt_results"`
@@ -64,10 +66,13 @@ type Config struct {
HubThreshold int `json:"hub_threshold"`
ObservationRetentionDays int `json:"observation_retention_days"`
MaintenanceIntervalHours int `json:"maintenance_interval_hours"`
ContextMaxTokensStartup int `json:"context_max_tokens_startup"`
ContextMaxTokensPrompt int `json:"context_max_tokens_prompt"`
ContextShowWorkTokens bool `json:"context_show_work_tokens"`
ContextShowReadTokens bool `json:"context_show_read_tokens"`
RerankingPureMode bool `json:"reranking_pure_mode"`
GraphEnabled bool `json:"graph_enabled"`
DeduplicationEnabled bool `json:"deduplication_enabled"`
MaintenanceEnabled bool `json:"maintenance_enabled"`
RerankingEnabled bool `json:"reranking_enabled"`
ContextShowLastSummary bool `json:"context_show_last_summary"`
@@ -168,6 +173,10 @@ func Default() *Config {
ContextObsConcepts: DefaultObservationConcepts,
ContextRelevanceThreshold: 0.3, // Minimum 30% similarity to include
ContextMaxPromptResults: 10, // Cap at 10 results max (0 = no cap, threshold only)
ContextMaxTokensStartup: 16000, // Max tokens for SessionStart context injection
ContextMaxTokensPrompt: 8000, // Max tokens for UserPromptSubmit context injection
DeduplicationEnabled: true, // Enable write-time vector dedup
DeduplicationThreshold: 0.9, // Similarity threshold for merging (0.9 = very similar)
MaintenanceEnabled: true, // Enable scheduled maintenance
MaintenanceIntervalHours: 6, // Run every 6 hours
ObservationRetentionDays: 0, // 0 = no age-based deletion (keep all)
@@ -269,6 +278,29 @@ func Load() (*Config, error) {
if v, ok := settings["CLAUDE_MNEMONIC_HUB_THRESHOLD"].(float64); ok && v > 0 {
cfg.HubThreshold = int(v)
}
if v, ok := settings["CLAUDE_MNEMONIC_CONTEXT_MAX_TOKENS_STARTUP"].(float64); ok && v > 0 {
cfg.ContextMaxTokensStartup = int(v)
}
if v, ok := settings["CLAUDE_MNEMONIC_CONTEXT_MAX_TOKENS_PROMPT"].(float64); ok && v > 0 {
cfg.ContextMaxTokensPrompt = int(v)
}
// Deduplication settings
if v, ok := settings["CLAUDE_MNEMONIC_DEDUP_ENABLED"].(bool); ok {
cfg.DeduplicationEnabled = v
}
if v, ok := settings["CLAUDE_MNEMONIC_DEDUP_THRESHOLD"].(float64); ok && v > 0 && v <= 1 {
cfg.DeduplicationThreshold = v
}
// Also support env vars for dedup settings
if v := os.Getenv("CLAUDE_MNEMONIC_DEDUP_ENABLED"); v != "" {
cfg.DeduplicationEnabled = v == "true" || v == "1"
}
if v := os.Getenv("CLAUDE_MNEMONIC_DEDUP_THRESHOLD"); v != "" {
if f, err := strconv.ParseFloat(v, 64); err == nil && f > 0 && f <= 1 {
cfg.DeduplicationThreshold = f
}
}
return cfg, nil
}
+15 -2
View File
@@ -4,6 +4,8 @@ package gorm
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"gorm.io/gorm"
@@ -18,6 +20,7 @@ type PatternCleanupFunc func(ctx context.Context, deletedIDs []int64)
type PatternStore struct {
db *gorm.DB
cleanupFunc PatternCleanupFunc
cleanupMu sync.RWMutex
}
// NewPatternStore creates a new pattern store.
@@ -29,6 +32,8 @@ func NewPatternStore(store *Store) *PatternStore {
// SetCleanupFunc sets the callback for when patterns are deleted.
func (s *PatternStore) SetCleanupFunc(fn PatternCleanupFunc) {
s.cleanupMu.Lock()
defer s.cleanupMu.Unlock()
s.cleanupFunc = fn
}
@@ -238,6 +243,9 @@ func (s *PatternStore) MarkPatternDeprecated(ctx context.Context, id int64) erro
// MergePatterns merges a source pattern into a target pattern.
func (s *PatternStore) MergePatterns(ctx context.Context, sourceID, targetID int64) error {
if sourceID == targetID {
return fmt.Errorf("cannot merge pattern into itself")
}
// Get both patterns
source, err := s.GetPatternByID(ctx, sourceID)
if err != nil {
@@ -294,8 +302,13 @@ func (s *PatternStore) MergePatterns(ctx context.Context, sourceID, targetID int
func (s *PatternStore) DeletePattern(ctx context.Context, id int64) error {
result := s.db.WithContext(ctx).Delete(&Pattern{}, id)
if result.Error == nil && s.cleanupFunc != nil {
s.cleanupFunc(ctx, []int64{id})
if result.Error == nil {
s.cleanupMu.RLock()
fn := s.cleanupFunc
s.cleanupMu.RUnlock()
if fn != nil {
fn(ctx, []int64{id})
}
}
return result.Error
+14 -3
View File
@@ -4,8 +4,10 @@ package gorm
import (
"context"
"database/sql"
"sync"
"time"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
"gorm.io/gorm/clause"
@@ -23,6 +25,7 @@ const MaxPromptsGlobal = 500
type PromptStore struct {
db *gorm.DB
cleanupFunc PromptCleanupFunc
cleanupMu sync.RWMutex
}
// NewPromptStore creates a new prompt store.
@@ -35,6 +38,8 @@ func NewPromptStore(store *Store, cleanupFunc PromptCleanupFunc) *PromptStore {
// SetCleanupFunc sets the callback for when prompts are deleted during cleanup.
func (s *PromptStore) SetCleanupFunc(fn PromptCleanupFunc) {
s.cleanupMu.Lock()
defer s.cleanupMu.Unlock()
s.cleanupFunc = fn
}
@@ -81,9 +86,15 @@ func (s *PromptStore) SaveUserPromptWithMatches(ctx context.Context, claudeSessi
go func() {
cleanupCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
deletedIDs, _ := s.CleanupOldPrompts(cleanupCtx)
if len(deletedIDs) > 0 && s.cleanupFunc != nil {
s.cleanupFunc(cleanupCtx, deletedIDs)
if deletedIDs, err := s.CleanupOldPrompts(cleanupCtx); err != nil {
log.Warn().Err(err).Msg("Background prompt cleanup failed")
} else if len(deletedIDs) > 0 {
s.cleanupMu.RLock()
fn := s.cleanupFunc
s.cleanupMu.RUnlock()
if fn != nil {
fn(cleanupCtx, deletedIDs)
}
}
}()
+5 -2
View File
@@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"github.com/sugarme/tokenizer"
@@ -69,8 +70,10 @@ func newBGEModel() (EmbeddingModel, error) {
libPath := filepath.Join(libDir, onnxRuntimeLibName)
ort.SetSharedLibraryPath(libPath)
// Initialize ONNX runtime
if err := ort.InitializeEnvironment(); err != nil {
// Initialize ONNX runtime (idempotent - ignore "already initialized" since
// the ONNX environment is process-global and shared with the reranking service)
err = ort.InitializeEnvironment()
if err != nil && !strings.Contains(err.Error(), "already been initialized") {
return nil, fmt.Errorf("initialize ONNX runtime: %w", err)
}
+866 -2230
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+48 -11
View File
@@ -13,6 +13,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/rs/zerolog/log"
)
@@ -142,19 +143,55 @@ func formatWarning(format string, args ...any) string {
}
// handleHealth handles health check requests.
// Returns 200 OK immediately (even during init) so hooks can connect quickly.
// Use /api/ready for full readiness check.
// Returns 200 when ready, 503 when initializing or degraded.
func (s *Service) handleHealth(w http.ResponseWriter, r *http.Request) {
status := "starting"
if s.ready.Load() {
status = "ready"
} else if err := s.GetInitError(); err != nil {
status = "error"
status := "ready"
dbStatus := "ok"
embeddingStatus := "ok"
if !s.ready.Load() {
status = "initializing"
if err := s.GetInitError(); err != nil {
status = "error"
}
}
writeJSON(w, map[string]any{
"status": status,
"version": s.version,
})
// Check embedding service
if s.embedSvc == nil {
embeddingStatus = "unavailable"
if status == "ready" {
status = "degraded"
}
}
// Check DB
if s.store == nil {
dbStatus = "unavailable"
if status == "ready" {
status = "degraded"
}
}
activeSessions := 0
if s.sessionManager != nil {
activeSessions = s.sessionManager.GetActiveSessionCount()
}
resp := map[string]any{
"status": status,
"ready": s.ready.Load(),
"uptime_seconds": int(time.Since(s.startTime).Seconds()),
"active_sessions": activeSessions,
"db_status": dbStatus,
"embedding_status": embeddingStatus,
"version": s.version,
}
w.Header().Set("Content-Type", "application/json")
if status != "ready" {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(resp)
}
// handleVersion returns the worker version for version checking.
+2 -2
View File
@@ -46,7 +46,7 @@ func (s *Service) handleGetRelationGraph(w http.ResponseWriter, r *http.Request)
// Get depth parameter (default 2)
depth := 2
if depthStr := r.URL.Query().Get("depth"); depthStr != "" {
if d, err := strconv.Atoi(depthStr); err == nil && d > 0 && d <= 5 {
if d, parseErr := strconv.Atoi(depthStr); parseErr == nil && d > 0 && d <= 5 {
depth = d
}
}
@@ -72,7 +72,7 @@ func (s *Service) handleGetRelatedObservations(w http.ResponseWriter, r *http.Re
// Get minimum confidence parameter (default 0.4)
minConfidence := 0.4
if confStr := r.URL.Query().Get("min_confidence"); confStr != "" {
if c, err := strconv.ParseFloat(confStr, 64); err == nil && c >= 0 && c <= 1 {
if c, parseErr := strconv.ParseFloat(confStr, 64); parseErr == nil && c >= 0 && c <= 1 {
minConfidence = c
}
}
+19 -27
View File
@@ -42,11 +42,9 @@ func (s *Service) handleObservationFeedback(w http.ResponseWriter, r *http.Reque
return
}
// Get required components
s.initMu.RLock()
// Get required components (initMu.RLock held by requireReady middleware)
observationStore := s.observationStore
scoreCalculator := s.scoreCalculator
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
@@ -95,10 +93,9 @@ func (s *Service) handleObservationFeedback(w http.ResponseWriter, r *http.Reque
func (s *Service) handleGetScoringStats(w http.ResponseWriter, r *http.Request) {
project := r.URL.Query().Get("project")
s.initMu.RLock()
// initMu.RLock held by requireReady middleware
observationStore := s.observationStore
recalculator := s.recalculator
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
@@ -130,9 +127,8 @@ func (s *Service) handleGetTopObservations(w http.ResponseWriter, r *http.Reques
limit := parseIntParam(r, "limit", 10)
project := r.URL.Query().Get("project")
s.initMu.RLock()
// initMu.RLock held by requireReady middleware
observationStore := s.observationStore
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
@@ -158,9 +154,8 @@ func (s *Service) handleGetMostRetrieved(w http.ResponseWriter, r *http.Request)
limit := parseIntParam(r, "limit", 10)
project := r.URL.Query().Get("project")
s.initMu.RLock()
// initMu.RLock held by requireReady middleware
observationStore := s.observationStore
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
@@ -191,10 +186,9 @@ func (s *Service) handleExplainScore(w http.ResponseWriter, r *http.Request) {
return
}
s.initMu.RLock()
// initMu.RLock held by requireReady middleware
observationStore := s.observationStore
scoreCalculator := s.scoreCalculator
s.initMu.RUnlock()
if observationStore == nil || scoreCalculator == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
@@ -245,10 +239,9 @@ func (s *Service) handleUpdateConceptWeight(w http.ResponseWriter, r *http.Reque
return
}
s.initMu.RLock()
// initMu.RLock held by requireReady middleware
observationStore := s.observationStore
recalculator := s.recalculator
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
@@ -279,9 +272,8 @@ func (s *Service) handleUpdateConceptWeight(w http.ResponseWriter, r *http.Reque
// handleGetConceptWeights returns all concept weights.
// GET /api/scoring/concepts
func (s *Service) handleGetConceptWeights(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
// initMu.RLock held by requireReady middleware
observationStore := s.observationStore
s.initMu.RUnlock()
if observationStore == nil {
http.Error(w, "service not ready", http.StatusServiceUnavailable)
@@ -300,19 +292,22 @@ func (s *Service) handleGetConceptWeights(w http.ResponseWriter, r *http.Request
// handleTriggerRecalculation triggers an immediate score recalculation.
// POST /api/scoring/recalculate
func (s *Service) handleTriggerRecalculation(w http.ResponseWriter, r *http.Request) {
s.initMu.RLock()
// initMu.RLock held by requireReady middleware
recalculator := s.recalculator
s.initMu.RUnlock()
if recalculator == nil {
http.Error(w, "recalculator not available", http.StatusServiceUnavailable)
return
}
// Run recalculation in background
// Run recalculation in background with independent context
s.wg.Add(1)
go func() {
if err := recalculator.RecalculateNow(r.Context()); err != nil {
log.Warn().Err(err).Msg("Background score recalculation failed")
defer s.wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
if err := recalculator.RecalculateNow(ctx); err != nil {
log.Error().Err(err).Msg("Background recalculation failed")
}
}()
@@ -336,27 +331,24 @@ func (s *Service) incrementRetrievalCounts(ids []int64) {
return
}
s.initMu.RLock()
// initMu.RLock held by requireReady middleware (caller is always behind requireReady)
store := s.observationStore
s.initMu.RUnlock()
if store == nil {
return
}
// Increment in background to not block response
// Use service context to respect shutdown signals
s.wg.Add(1)
go func() {
defer s.wg.Done()
ctx, cancel := context.WithTimeout(s.ctx, 3*time.Second)
// Create a new context with timeout for the background operation
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := store.IncrementRetrievalCount(ctx, ids); err != nil {
// Log but don't fail - this is a background operation
if s.ctx.Err() == nil { // Don't log during shutdown
log.Debug().Err(err).Msg("Failed to increment retrieval counts")
}
_ = err // Explicitly ignore - background operation
}
}()
}
+20 -9
View File
@@ -459,14 +459,13 @@ func TestHandleHealth_ReturnsVersion(t *testing.T) {
svc.handleHealth(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
var response map[string]interface{}
err := json.Unmarshal(rec.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "ready", response["status"])
assert.Equal(t, "test-version-1.2.3", response["version"])
// Status may be "degraded" if embedSvc is nil in test, but version is always present
assert.Contains(t, []string{"ready", "degraded"}, response["status"])
}
func TestHandleVersion(t *testing.T) {
@@ -2028,13 +2027,14 @@ func TestHandleHealth_NotReady(t *testing.T) {
svc.handleHealth(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, http.StatusServiceUnavailable, rec.Code)
var response map[string]interface{}
err := json.Unmarshal(rec.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "starting", response["status"])
assert.Equal(t, "initializing", response["status"])
assert.Equal(t, false, response["ready"])
}
// TestHandleContextInject_EmptyProject tests context inject with empty project.
@@ -2399,7 +2399,12 @@ func TestHandleHealthEndpoint(t *testing.T) {
svc.router.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
// Response is valid JSON with health details
var response map[string]interface{}
err := json.Unmarshal(rec.Body.Bytes(), &response)
require.NoError(t, err)
assert.NotNil(t, response["status"])
assert.NotNil(t, response["version"])
}
// TestHandleSelfCheckEndpoint tests self-check endpoint via router.
@@ -2894,12 +2899,18 @@ func TestHandleHealth(t *testing.T) {
svc.router.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
var response map[string]interface{}
err := json.Unmarshal(rec.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "ready", response["status"])
// Test service has store set but no embedSvc, so status is "degraded"
assert.Contains(t, []string{"ready", "degraded"}, response["status"])
assert.NotNil(t, response["version"])
assert.NotNil(t, response["uptime_seconds"])
assert.NotNil(t, response["active_sessions"])
assert.NotNil(t, response["db_status"])
assert.NotNil(t, response["embedding_status"])
assert.NotNil(t, response["ready"])
}
// TestHandleSessionInit_ValidRequest tests session init with valid request.
+161
View File
@@ -0,0 +1,161 @@
// Package sdk provides write-time observation deduplication via vector similarity.
package sdk
import (
"context"
"fmt"
"strings"
"github.com/lukaszraczylo/claude-mnemonic/internal/config"
"github.com/lukaszraczylo/claude-mnemonic/internal/db/gorm"
"github.com/lukaszraczylo/claude-mnemonic/internal/vector/sqlitevec"
"github.com/lukaszraczylo/claude-mnemonic/pkg/models"
"github.com/rs/zerolog/log"
)
// DeduplicationResult represents the outcome of a vector similarity dedup check.
type DeduplicationResult struct {
ExistingID int64
Similarity float64
Action string // "insert", "merge"
}
// checkVectorDeduplication checks if a similar observation already exists using vector similarity.
// Returns a result indicating whether to insert or merge, or an error.
// On any failure, returns Action="insert" so the caller always proceeds with storage.
func (p *Processor) checkVectorDeduplication(ctx context.Context, obs *models.ParsedObservation, project string) *DeduplicationResult {
cfg := config.Get()
if !cfg.DeduplicationEnabled {
return &DeduplicationResult{Action: "insert"}
}
if p.vectorClient == nil {
return &DeduplicationResult{Action: "insert"}
}
// Build search text from observation fields
searchText := buildObservationSearchText(obs)
if searchText == "" {
return &DeduplicationResult{Action: "insert"}
}
// Query vector DB for similar observations in the same project
where := sqlitevec.BuildWhereFilter(sqlitevec.DocTypeObservation, project)
results, err := p.vectorClient.Query(ctx, searchText, 3, where)
if err != nil {
log.Debug().Err(err).Msg("Vector search failed during dedup check")
return &DeduplicationResult{Action: "insert"}
}
// Check results for high similarity
for _, r := range results {
if r.Similarity >= cfg.DeduplicationThreshold {
obsID := extractObservationIDFromVectorDoc(r)
if obsID > 0 {
return &DeduplicationResult{
ExistingID: obsID,
Similarity: r.Similarity,
Action: "merge",
}
}
}
}
return &DeduplicationResult{Action: "insert"}
}
// buildObservationSearchText creates searchable text from a parsed observation.
func buildObservationSearchText(obs *models.ParsedObservation) string {
var parts []string
if obs.Title != "" {
parts = append(parts, obs.Title)
}
if obs.Subtitle != "" {
parts = append(parts, obs.Subtitle)
}
if obs.Narrative != "" {
parts = append(parts, obs.Narrative)
}
text := strings.Join(parts, " ")
if len(text) > 2000 {
text = text[:2000]
}
return text
}
// extractObservationIDFromVectorDoc extracts the SQLite observation ID from a vector query result.
func extractObservationIDFromVectorDoc(r sqlitevec.QueryResult) int64 {
// Prefer the sqlite_id metadata field (set during vector sync)
if sqliteID, ok := r.Metadata["sqlite_id"].(float64); ok && sqliteID > 0 {
return int64(sqliteID)
}
if sqliteID, ok := r.Metadata["sqlite_id"].(int64); ok && sqliteID > 0 {
return sqliteID
}
// Fallback: parse from doc_id format "obs_{id}_composite" or "obs_{id}_narrative"
if !strings.HasPrefix(r.ID, "obs_") {
return 0
}
parts := strings.SplitN(r.ID[4:], "_", 2)
if len(parts) == 0 {
return 0
}
var id int64
fmt.Sscanf(parts[0], "%d", &id)
return id
}
// mergeObservation updates an existing observation with new information from a duplicate.
// It appends new facts, updates the narrative if the new one is longer,
// and bumps the importance score to reflect reconfirmation.
func (p *Processor) mergeObservation(ctx context.Context, existingID int64, newObs *models.ParsedObservation) error {
existing, err := p.observationStore.GetObservationByID(ctx, existingID)
if err != nil {
return fmt.Errorf("fetch existing observation %d: %w", existingID, err)
}
if existing == nil {
return fmt.Errorf("observation %d not found", existingID)
}
update := &gorm.ObservationUpdate{}
changed := false
// Merge facts: append new facts not already present
if len(newObs.Facts) > 0 {
existingFactSet := make(map[string]struct{}, len(existing.Facts))
for _, f := range existing.Facts {
existingFactSet[f] = struct{}{}
}
mergedFacts := make([]string, len(existing.Facts))
copy(mergedFacts, existing.Facts)
for _, f := range newObs.Facts {
if _, exists := existingFactSet[f]; !exists {
mergedFacts = append(mergedFacts, f)
changed = true
}
}
if changed {
update.Facts = &mergedFacts
}
}
// Update narrative if the new one is longer/more detailed
if len(newObs.Narrative) > len(existing.Narrative.String) {
update.Narrative = &newObs.Narrative
changed = true
}
if !changed {
// Nothing new to merge, but still count it as a confirmed observation
log.Debug().Int64("id", existingID).Msg("Dedup merge: no new content, skipping update")
return nil
}
_, err = p.observationStore.UpdateObservation(ctx, existingID, update)
if err != nil {
return fmt.Errorf("update observation %d: %w", existingID, err)
}
return nil
}
+143
View File
@@ -0,0 +1,143 @@
package sdk
import (
"testing"
"github.com/lukaszraczylo/claude-mnemonic/internal/vector/sqlitevec"
"github.com/lukaszraczylo/claude-mnemonic/pkg/models"
)
func TestBuildObservationSearchText(t *testing.T) {
tests := []struct {
name string
obs *models.ParsedObservation
expected string
}{
{
name: "empty observation",
obs: &models.ParsedObservation{},
expected: "",
},
{
name: "title only",
obs: &models.ParsedObservation{
Title: "Fix database connection",
},
expected: "Fix database connection",
},
{
name: "all fields",
obs: &models.ParsedObservation{
Title: "Fix database connection",
Subtitle: "Connection pooling issue",
Narrative: "The database connection pool was exhausted due to leaked connections.",
},
expected: "Fix database connection Connection pooling issue The database connection pool was exhausted due to leaked connections.",
},
{
name: "truncates long text",
obs: &models.ParsedObservation{
Narrative: string(make([]byte, 3000)),
},
expected: string(make([]byte, 2000)),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := buildObservationSearchText(tt.obs)
if result != tt.expected {
t.Errorf("got %q, want %q", result, tt.expected)
}
})
}
}
func TestExtractObservationIDFromVectorDoc(t *testing.T) {
tests := []struct {
name string
result sqlitevec.QueryResult
expected int64
}{
{
name: "from sqlite_id metadata (float64)",
result: sqlitevec.QueryResult{
ID: "obs_42_narrative",
Metadata: map[string]any{"sqlite_id": float64(42)},
},
expected: 42,
},
{
name: "from sqlite_id metadata (int64)",
result: sqlitevec.QueryResult{
ID: "obs_42_narrative",
Metadata: map[string]any{"sqlite_id": int64(42)},
},
expected: 42,
},
{
name: "fallback to doc_id parsing",
result: sqlitevec.QueryResult{
ID: "obs_99_composite",
Metadata: map[string]any{},
},
expected: 99,
},
{
name: "non-observation doc_id",
result: sqlitevec.QueryResult{
ID: "summary_5_text",
Metadata: map[string]any{},
},
expected: 0,
},
{
name: "zero sqlite_id falls back to doc_id",
result: sqlitevec.QueryResult{
ID: "obs_123_narrative",
Metadata: map[string]any{"sqlite_id": float64(0)},
},
expected: 123,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := extractObservationIDFromVectorDoc(tt.result)
if result != tt.expected {
t.Errorf("got %d, want %d", result, tt.expected)
}
})
}
}
func TestCheckVectorDeduplication_NilClient(t *testing.T) {
p := &Processor{
// No vectorClient set
}
obs := &models.ParsedObservation{
Title: "Test observation",
Narrative: "Some narrative text",
}
result := p.checkVectorDeduplication(nil, obs, "test-project")
if result.Action != "insert" {
t.Errorf("expected Action='insert' when vectorClient is nil, got %q", result.Action)
}
}
func TestCheckVectorDeduplication_EmptySearchText(t *testing.T) {
p := &Processor{
// vectorClient would be set but obs is empty
}
obs := &models.ParsedObservation{
// All empty fields
}
result := p.checkVectorDeduplication(nil, obs, "test-project")
if result.Action != "insert" {
t.Errorf("expected Action='insert' for empty observation, got %q", result.Action)
}
}
+62 -6
View File
@@ -19,6 +19,7 @@ import (
"github.com/lukaszraczylo/claude-mnemonic/internal/config"
"github.com/lukaszraczylo/claude-mnemonic/internal/db/gorm"
"github.com/lukaszraczylo/claude-mnemonic/internal/vector/sqlitevec"
"github.com/lukaszraczylo/claude-mnemonic/pkg/models"
"github.com/lukaszraczylo/claude-mnemonic/pkg/similarity"
"github.com/rs/zerolog/log"
@@ -194,6 +195,36 @@ func hashRequest(toolName, input, output string) string {
return hex.EncodeToString(h.Sum(nil))[:16] // Short hash is sufficient
}
// maxStdoutBytes is the maximum number of bytes to capture from CLI stdout.
const maxStdoutBytes = 1 * 1024 * 1024 // 1 MiB
// maxStderrBytes is the maximum number of bytes to capture from CLI stderr.
const maxStderrBytes = 64 * 1024 // 64 KiB
// limitedWriter wraps a bytes.Buffer and silently discards writes beyond a maximum size.
type limitedWriter struct {
buf bytes.Buffer
max int
}
// Write implements io.Writer. It writes up to the remaining capacity and silently discards the rest.
func (lw *limitedWriter) Write(p []byte) (int, error) {
remaining := lw.max - lw.buf.Len()
if remaining <= 0 {
return len(p), nil // Silently discard
}
if len(p) > remaining {
p = p[:remaining]
}
lw.buf.Write(p)
return len(p), nil
}
// String returns the buffered content as a string.
func (lw *limitedWriter) String() string {
return lw.buf.String()
}
// BroadcastFunc is a callback for broadcasting events to SSE clients.
type BroadcastFunc func(event map[string]any)
@@ -212,6 +243,7 @@ const MaxVectorSyncWorkers = 8
type Processor struct {
observationStore *gorm.ObservationStore
summaryStore *gorm.SummaryStore
vectorClient *sqlitevec.Client
broadcastFunc BroadcastFunc
syncObservationFunc SyncObservationFunc
syncSummaryFunc SyncSummaryFunc
@@ -240,6 +272,11 @@ func (p *Processor) SetSyncSummaryFunc(fn SyncSummaryFunc) {
p.syncSummaryFunc = fn
}
// SetVectorClient sets the vector client for write-time deduplication.
func (p *Processor) SetVectorClient(client *sqlitevec.Client) {
p.vectorClient = client
}
// broadcast sends an event via the broadcast callback if set.
func (p *Processor) broadcast(event map[string]any) {
if p.broadcastFunc != nil {
@@ -429,16 +466,34 @@ func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, projec
// Convert to stored observation for similarity check
storedObs := obs.ToStoredObservation()
// Check if this observation is too similar to existing ones
// Check if this observation is too similar to existing ones (text-based Jaccard)
if existingObs != nil && similarity.IsSimilarToAny(storedObs, existingObs, similarityThreshold) {
log.Debug().
Str("type", string(obs.Type)).
Str("title", obs.Title).
Msg("Skipping observation - too similar to existing")
Msg("Skipping observation - too similar to existing (text)")
skippedCount++
continue
}
// Check vector similarity for high-confidence dedup with merge
dedupResult := p.checkVectorDeduplication(ctx, obs, project)
if dedupResult.Action == "merge" {
log.Info().
Int64("existing_id", dedupResult.ExistingID).
Float64("similarity", dedupResult.Similarity).
Str("title", obs.Title).
Msg("Merging duplicate observation (vector dedup)")
if err := p.mergeObservation(ctx, dedupResult.ExistingID, obs); err != nil {
log.Warn().Err(err).Int64("existing_id", dedupResult.ExistingID).
Msg("Merge failed, inserting as new observation")
// Fall through to normal insert
} else {
skippedCount++
continue
}
}
id, createdAtEpoch, err := p.observationStore.StoreObservation(ctx, sdkSessionID, project, obs, promptNumber, 0)
if err != nil {
log.Error().Err(err).Msg("Failed to store observation")
@@ -644,10 +699,11 @@ func (p *Processor) callClaudeCLI(ctx context.Context, prompt string) (string, e
// Disable any plugin hooks by setting an env var that our hooks can check
cmd.Env = append(os.Environ(), "CLAUDE_MNEMONIC_INTERNAL=1")
// Capture output
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
// Capture output with size limits to prevent unbounded memory usage
stdout := &limitedWriter{max: maxStdoutBytes}
stderr := &limitedWriter{max: maxStderrBytes}
cmd.Stdout = stdout
cmd.Stderr = stderr
// Run command
err := cmd.Run()
+26 -2
View File
@@ -43,6 +43,13 @@ const (
// QueueProcessInterval is how often the background queue processor runs.
QueueProcessInterval = 2 * time.Second
// reinitializationDrainDelay is the delay after marking the service as not ready
// to allow in-flight requests to complete before reinitializing.
reinitializationDrainDelay = 200 * time.Millisecond
// MaxConcurrentProcessing limits the number of concurrent session processing goroutines.
MaxConcurrentProcessing = 4
// VectorSyncMaxRetries is the maximum number of retries for vector sync operations.
VectorSyncMaxRetries = 3
@@ -138,6 +145,7 @@ type Service struct {
updater *update.Updater
rateLimiter *PerClientRateLimiter
expensiveOpLimiter *ExpensiveOperationLimiter
contextCache sync.Map
version string
recentQueriesBuf [maxRecentQueries]RecentSearchQuery
wg sync.WaitGroup
@@ -178,6 +186,13 @@ type staleVerifyRequest struct {
observationID int64
}
// contextCacheEntry caches clustering results for context injection.
type contextCacheEntry struct {
timestamp time.Time
observations []*models.Observation
obsCount int
}
// RecentSearchQuery tracks a search query for analytics.
type RecentSearchQuery struct {
Timestamp time.Time `json:"timestamp"`
@@ -288,6 +303,11 @@ func (s *Service) setupVectorSyncCallbacks(
})
}
// Set vector client on processor for write-time deduplication
if processor != nil && s.vectorClient != nil {
processor.SetVectorClient(s.vectorClient)
}
// Set cleanup callback on observation store to sync deletes to vector store
if observationStore != nil && vectorSync != nil {
observationStore.SetCleanupFunc(func(ctx context.Context, deletedIDs []int64) {
@@ -614,6 +634,7 @@ func (s *Service) startWatchers() {
func (s *Service) reinitializeDatabase() {
// Block new requests
s.ready.Store(false)
time.Sleep(reinitializationDrainDelay) // Allow in-flight requests to complete
log.Info().Msg("Database reinitialization starting...")
// Get old store references
@@ -1587,12 +1608,13 @@ func (s *Service) processQueue() {
// processAllSessions processes pending messages for all active sessions.
// Messages are processed in parallel using goroutines, with concurrency
// limited by the processor's semaphore.
// limited by a channel-based semaphore.
func (s *Service) processAllSessions() {
// Get all sessions with pending messages
sessions := s.sessionManager.GetAllSessions()
var wg sync.WaitGroup
sem := make(chan struct{}, MaxConcurrentProcessing)
for _, sess := range sessions {
// Get pending messages
@@ -1601,11 +1623,13 @@ func (s *Service) processAllSessions() {
continue
}
// Process each message in a goroutine
// Process each message in a goroutine with semaphore
for _, msg := range messages {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore slot
go func(sess *session.ActiveSession, msg session.PendingMessage) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore slot
switch msg.Type {
case session.MessageTypeObservation:
+4
View File
@@ -75,6 +75,7 @@ type Manager struct {
onDeleted func(int64)
cancel context.CancelFunc
ProcessNotify chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
}
@@ -89,12 +90,14 @@ func NewManager(sessionStore *gorm.SessionStore) *Manager {
ProcessNotify: make(chan struct{}, 1),
}
// Start background cleanup goroutine
m.wg.Add(1)
go m.cleanupLoop()
return m
}
// cleanupLoop periodically removes stale sessions.
func (m *Manager) cleanupLoop() {
defer m.wg.Done()
ticker := time.NewTicker(CleanupInterval)
defer ticker.Stop()
@@ -350,6 +353,7 @@ func (m *Manager) DeleteSession(sessionDBID int64) {
func (m *Manager) ShutdownAll(ctx context.Context) {
// Stop cleanup goroutine
m.cancel()
m.wg.Wait()
m.mu.Lock()
sessionIDs := make([]int64, 0, len(m.sessions))
+1
View File
@@ -952,6 +952,7 @@ func TestCleanupLoop_ExitsOnCancel(t *testing.T) {
// Start cleanup loop in goroutine
done := make(chan struct{})
manager.wg.Add(1)
go func() {
manager.cleanupLoop()
close(done)
+1 -1
View File
@@ -212,7 +212,7 @@ func (b *Broadcaster) HandleSSE(w http.ResponseWriter, r *http.Request) {
defer b.RemoveClient(client)
// Send initial connection message
fmt.Fprintf(w, "data: {\"type\":\"connected\",\"clientId\":\"%s\"}\n\n", client.ID)
_, _ = fmt.Fprintf(w, "data: {\"type\":\"connected\",\"clientId\":\"%s\"}\n\n", client.ID)
client.Flusher.Flush()
// Wait for client disconnect