fix: plugin no longer vanishes after Claude Code updates

Root cause: plugin registered as directory source in known_marketplaces.json,
which gets wiped on CLI updates. Now registers in extraKnownMarketplaces
(settings.json) as a GitHub source — same mechanism caveman/context-mode use.

Binaries install to ~/.claude-mnemonic/bin/ instead of the Claude-managed
plugins directory. Thin wrapper scripts in the repo let the marketplace
clone find them. Nothing gets cleaned up when Claude refreshes its cache.

Also fixed along the way:
- ONNX Runtime 1.24.3 → 1.26.0 (API v25 mismatch broke all embedding tests)
- Vector client leaked on DB reinit, processQueue had a race on sessionManager
- reloadConfig called os.Exit(0) bypassing graceful shutdown
- Removed dead QueryRowWithTimeout that leaked contexts
- Added tests for graph/watcher/maintenance/update (all were at 0%)
This commit is contained in:
2026-05-24 01:15:23 +01:00
parent cfc95c9ce4
commit f07875ee82
32 changed files with 3217 additions and 127 deletions
+42 -23
View File
@@ -8,6 +8,7 @@ import (
"os"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/go-chi/chi/v5"
@@ -366,9 +367,9 @@ func NewService(version string) (*Service, error) {
router := chi.NewRouter()
sseBroadcaster := sse.NewBroadcaster()
// Determine install directory (plugin location)
// Determine install directory (stable binary location, survives Claude Code updates)
homeDir, _ := os.UserHomeDir()
installDir := fmt.Sprintf("%s/.claude/plugins/marketplaces/claude-mnemonic", homeDir)
installDir := fmt.Sprintf("%s/.claude-mnemonic/bin", homeDir)
// Create rate limiter with generous limits (100 req/sec, burst of 200)
// These limits are per-client and allow for intensive CLI usage
@@ -736,10 +737,14 @@ func (s *Service) reinitializeDatabase() {
log.Info().Msg("Query expansion reconnected after reinit")
}
// Close old reranker if exists
// Close old vector client and reranker before swapping
s.initMu.RLock()
oldVectorClient := s.vectorClient
oldReranker := s.reranker
s.initMu.RUnlock()
if oldVectorClient != nil {
_ = oldVectorClient.Close()
}
if oldReranker != nil {
_ = oldReranker.Close()
}
@@ -815,8 +820,11 @@ func (s *Service) reloadConfig() {
// Give SSE clients a moment to receive the message
time.Sleep(100 * time.Millisecond)
// Exit cleanly - hooks will restart us with new config
os.Exit(0)
// Send SIGTERM to self for graceful shutdown (hooks will restart us)
p, err := os.FindProcess(os.Getpid())
if err == nil {
_ = p.Signal(syscall.SIGTERM)
}
}
// setInitError records an initialization error.
@@ -1592,15 +1600,17 @@ func (s *Service) processQueue() {
ticker := time.NewTicker(QueueProcessInterval)
defer ticker.Stop()
s.initMu.RLock()
notify := s.sessionManager.ProcessNotify
s.initMu.RUnlock()
for {
select {
case <-s.ctx.Done():
return
case <-s.sessionManager.ProcessNotify:
// Immediate processing when observation is queued
case <-notify:
s.processAllSessions()
case <-ticker.C:
// Fallback periodic processing
s.processAllSessions()
}
}
@@ -1610,31 +1620,36 @@ func (s *Service) processQueue() {
// Messages are processed in parallel using goroutines, with concurrency
// limited by a channel-based semaphore.
func (s *Service) processAllSessions() {
// Get all sessions with pending messages
sessions := s.sessionManager.GetAllSessions()
s.initMu.RLock()
mgr := s.sessionManager
proc := s.processor
s.initMu.RUnlock()
if mgr == nil || proc == nil {
return
}
sessions := mgr.GetAllSessions()
var wg sync.WaitGroup
sem := make(chan struct{}, MaxConcurrentProcessing)
for _, sess := range sessions {
// Get pending messages
messages := s.sessionManager.DrainMessages(sess.SessionDBID)
messages := mgr.DrainMessages(sess.SessionDBID)
if len(messages) == 0 {
continue
}
// Process each message in a goroutine with semaphore
for _, msg := range messages {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore slot
sem <- struct{}{}
go func(sess *session.ActiveSession, msg session.PendingMessage) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore slot
defer func() { <-sem }()
switch msg.Type {
case session.MessageTypeObservation:
if msg.Observation != nil {
err := s.processor.ProcessObservation(
err := proc.ProcessObservation(
s.ctx,
sess.SDKSessionID,
sess.Project,
@@ -1653,7 +1668,7 @@ func (s *Service) processAllSessions() {
case session.MessageTypeSummarize:
if msg.Summarize != nil {
err := s.processor.ProcessSummary(
err := proc.ProcessSummary(
s.ctx,
sess.SessionDBID,
sess.SDKSessionID,
@@ -1667,18 +1682,15 @@ func (s *Service) processAllSessions() {
Int64("sessionId", sess.SessionDBID).
Msg("Failed to process summary")
}
// Delete session after summary
s.sessionManager.DeleteSession(sess.SessionDBID)
mgr.DeleteSession(sess.SessionDBID)
}
}
}(sess, msg)
}
}
// Wait for all goroutines to complete
wg.Wait()
// Broadcast status after processing
s.broadcastProcessingStatus()
}
@@ -1787,8 +1799,15 @@ func (s *Service) Shutdown(ctx context.Context) error {
// broadcastProcessingStatus broadcasts the current processing status.
func (s *Service) broadcastProcessingStatus() {
isProcessing := s.sessionManager.IsAnySessionProcessing()
queueDepth := s.sessionManager.GetTotalQueueDepth()
s.initMu.RLock()
mgr := s.sessionManager
s.initMu.RUnlock()
if mgr == nil {
return
}
isProcessing := mgr.IsAnySessionProcessing()
queueDepth := mgr.GetTotalQueueDepth()
s.sseBroadcaster.Broadcast(map[string]any{
"type": "processing_status",