Files
claude-mnemonic/internal/worker/session/manager.go
lukaszraczylo 4f4b4ac70f feat(chunking): add AST-aware code chunking for Go, Python, TypeScript
- [x] Add language-specific chunkers with AST parsing (Go, Python, TypeScript)
- [x] Implement chunking manager to dispatch files to appropriate chunkers
- [x] Integrate code chunks into vector sync for semantic search
- [x] Add tree-sitter dependency for Python/TypeScript parsing
- [x] Reorder struct fields for consistency across codebase
- [x] Rename error variables to follow Go conventions (err → unmarshalErr, etc.)
- [x] Add code chunk metadata to vector documents (language, symbol name, line ranges)
- [x] Update worker service to initialize chunking pipeline with all three languages
2026-01-07 13:19:58 +00:00

443 lines
11 KiB
Go

// Package session provides session lifecycle management for claude-mnemonic.
package session
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/lukaszraczylo/claude-mnemonic/internal/db/sqlite"
"github.com/rs/zerolog/log"
)
// MessageType represents the type of pending message.
type MessageType int
const (
MessageTypeObservation MessageType = iota
MessageTypeSummarize
)
// ObservationData contains data for a tool observation.
type ObservationData struct {
ToolInput interface{}
ToolResponse interface{}
ToolName string
CWD string
PromptNumber int
}
// SummarizeData contains data for a summarize request.
type SummarizeData struct {
LastUserMessage string
LastAssistantMessage string
}
// PendingMessage represents a message queued for SDK processing.
type PendingMessage struct {
Observation *ObservationData
Summarize *SummarizeData
Type MessageType
}
// ActiveSession represents an in-memory active session being processed.
type ActiveSession struct {
StartTime time.Time
ctx context.Context
cancel context.CancelFunc
notify chan struct{}
Project string
UserPrompt string
SDKSessionID string
ClaudeSessionID string
pendingMessages []PendingMessage
LastPromptNumber int
CumulativeInputTokens int64
CumulativeOutputTokens int64
SessionDBID int64
messageMu sync.Mutex
generatorActive atomic.Bool
}
// SessionTimeout is how long an inactive session can exist before cleanup.
const SessionTimeout = 30 * time.Minute
// CleanupInterval is how often to check for stale sessions.
const CleanupInterval = 5 * time.Minute
// Manager manages active session lifecycles.
type Manager struct {
ctx context.Context
sessionStore *sqlite.SessionStore
sessions map[int64]*ActiveSession
onCreated func(int64)
onDeleted func(int64)
cancel context.CancelFunc
ProcessNotify chan struct{}
mu sync.RWMutex
}
// NewManager creates a new session manager.
func NewManager(sessionStore *sqlite.SessionStore) *Manager {
ctx, cancel := context.WithCancel(context.Background())
m := &Manager{
sessionStore: sessionStore,
sessions: make(map[int64]*ActiveSession),
ctx: ctx,
cancel: cancel,
ProcessNotify: make(chan struct{}, 1),
}
// Start background cleanup goroutine
go m.cleanupLoop()
return m
}
// cleanupLoop periodically removes stale sessions.
func (m *Manager) cleanupLoop() {
ticker := time.NewTicker(CleanupInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.cleanupStaleSessions()
}
}
}
// cleanupStaleSessions removes sessions that have been inactive too long.
func (m *Manager) cleanupStaleSessions() {
m.mu.RLock()
var staleIDs []int64
now := time.Now()
for id, session := range m.sessions {
// Check if session has been inactive for too long
session.messageMu.Lock()
hasPending := len(session.pendingMessages) > 0
session.messageMu.Unlock()
// Don't delete sessions with pending messages or active processing
if hasPending || session.generatorActive.Load() {
continue
}
// Delete if session is older than timeout
if now.Sub(session.StartTime) > SessionTimeout {
staleIDs = append(staleIDs, id)
}
}
m.mu.RUnlock()
// Delete stale sessions
for _, id := range staleIDs {
log.Info().Int64("sessionId", id).Dur("age", SessionTimeout).Msg("Cleaning up stale session")
m.DeleteSession(id)
}
}
// SetOnSessionCreated sets a callback for when a session is created.
func (m *Manager) SetOnSessionCreated(callback func(int64)) {
m.onCreated = callback
}
// SetOnSessionDeleted sets a callback for when a session is deleted.
func (m *Manager) SetOnSessionDeleted(callback func(int64)) {
m.onDeleted = callback
}
// InitializeSession initializes a session, creating it if needed.
func (m *Manager) InitializeSession(ctx context.Context, sessionDBID int64, userPrompt string, promptNumber int) (*ActiveSession, error) {
m.mu.Lock()
// Check if already active
if session, ok := m.sessions[sessionDBID]; ok {
// Update user prompt for continuation
if userPrompt != "" {
session.UserPrompt = userPrompt
session.LastPromptNumber = promptNumber
}
m.mu.Unlock()
return session, nil
}
// Fetch from database (unlock during DB call to avoid blocking)
m.mu.Unlock()
dbSession, err := m.sessionStore.GetSessionByID(ctx, sessionDBID)
if err != nil {
return nil, err
}
if dbSession == nil {
return nil, nil
}
// Use provided userPrompt or fall back to database
prompt := userPrompt
if prompt == "" && dbSession.UserPrompt.Valid {
prompt = dbSession.UserPrompt.String
}
// Get prompt counter if not provided
if promptNumber <= 0 {
promptNumber, _ = m.sessionStore.GetPromptCounter(ctx, sessionDBID)
}
// Create session context
sessionCtx, cancel := context.WithCancel(context.Background())
session := &ActiveSession{
SessionDBID: sessionDBID,
ClaudeSessionID: dbSession.ClaudeSessionID,
SDKSessionID: dbSession.SDKSessionID.String,
Project: dbSession.Project,
UserPrompt: prompt,
LastPromptNumber: promptNumber,
StartTime: time.Now(),
pendingMessages: make([]PendingMessage, 0, 32),
notify: make(chan struct{}, 1),
ctx: sessionCtx,
cancel: cancel,
}
// Re-acquire lock to add session
m.mu.Lock()
// Double-check another goroutine didn't create it
if existing, ok := m.sessions[sessionDBID]; ok {
m.mu.Unlock()
cancel() // Clean up unused context
return existing, nil
}
m.sessions[sessionDBID] = session
onCreated := m.onCreated
m.mu.Unlock()
log.Info().
Int64("sessionId", sessionDBID).
Str("project", session.Project).
Str("claudeSessionId", session.ClaudeSessionID).
Msg("Session initialized")
// Notify callback (outside lock)
if onCreated != nil {
onCreated(sessionDBID)
}
return session, nil
}
// QueueObservation queues an observation for SDK processing.
func (m *Manager) QueueObservation(ctx context.Context, sessionDBID int64, data ObservationData) error {
m.mu.Lock()
session, ok := m.sessions[sessionDBID]
if !ok {
// Auto-initialize from database
m.mu.Unlock()
var err error
session, err = m.InitializeSession(ctx, sessionDBID, "", 0)
if err != nil || session == nil {
return err
}
} else {
m.mu.Unlock()
}
session.messageMu.Lock()
session.pendingMessages = append(session.pendingMessages, PendingMessage{
Type: MessageTypeObservation,
Observation: &data,
})
queueDepth := len(session.pendingMessages)
session.messageMu.Unlock()
// Non-blocking notification to session
select {
case session.notify <- struct{}{}:
default:
}
// Non-blocking notification to global processor
select {
case m.ProcessNotify <- struct{}{}:
default:
}
log.Info().
Int64("sessionId", sessionDBID).
Str("tool", data.ToolName).
Int("queueDepth", queueDepth).
Msg("Observation queued")
return nil
}
// QueueSummarize queues a summarize request for SDK processing.
func (m *Manager) QueueSummarize(ctx context.Context, sessionDBID int64, lastUserMessage, lastAssistantMessage string) error {
m.mu.Lock()
session, ok := m.sessions[sessionDBID]
if !ok {
// Auto-initialize from database
m.mu.Unlock()
var err error
session, err = m.InitializeSession(ctx, sessionDBID, "", 0)
if err != nil || session == nil {
return err
}
} else {
m.mu.Unlock()
}
session.messageMu.Lock()
session.pendingMessages = append(session.pendingMessages, PendingMessage{
Type: MessageTypeSummarize,
Summarize: &SummarizeData{
LastUserMessage: lastUserMessage,
LastAssistantMessage: lastAssistantMessage,
},
})
queueDepth := len(session.pendingMessages)
session.messageMu.Unlock()
// Non-blocking notification to session
select {
case session.notify <- struct{}{}:
default:
}
// Non-blocking notification to global processor
select {
case m.ProcessNotify <- struct{}{}:
default:
}
log.Info().
Int64("sessionId", sessionDBID).
Int("queueDepth", queueDepth).
Msg("Summarize request queued")
return nil
}
// DeleteSession removes a session and cleans up resources.
func (m *Manager) DeleteSession(sessionDBID int64) {
m.mu.Lock()
session, ok := m.sessions[sessionDBID]
if !ok {
m.mu.Unlock()
return
}
delete(m.sessions, sessionDBID)
m.mu.Unlock()
// Cancel context to stop generator
session.cancel()
duration := time.Since(session.StartTime)
log.Info().
Int64("sessionId", sessionDBID).
Str("project", session.Project).
Dur("duration", duration).
Msg("Session deleted")
// Trigger callback
if m.onDeleted != nil {
m.onDeleted(sessionDBID)
}
}
// ShutdownAll shuts down all active sessions.
func (m *Manager) ShutdownAll(ctx context.Context) {
// Stop cleanup goroutine
m.cancel()
m.mu.Lock()
sessionIDs := make([]int64, 0, len(m.sessions))
for id := range m.sessions {
sessionIDs = append(sessionIDs, id)
}
m.mu.Unlock()
for _, id := range sessionIDs {
m.DeleteSession(id)
}
log.Info().
Int("count", len(sessionIDs)).
Msg("All sessions shut down")
}
// GetActiveSessionCount returns the number of active sessions.
func (m *Manager) GetActiveSessionCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.sessions)
}
// GetTotalQueueDepth returns the total queue depth across all sessions.
func (m *Manager) GetTotalQueueDepth() int {
m.mu.RLock()
defer m.mu.RUnlock()
total := 0
for _, session := range m.sessions {
session.messageMu.Lock()
total += len(session.pendingMessages)
session.messageMu.Unlock()
}
return total
}
// IsAnySessionProcessing returns true if any session is actively processing.
func (m *Manager) IsAnySessionProcessing() bool {
m.mu.RLock()
defer m.mu.RUnlock()
for _, session := range m.sessions {
// Check for pending messages
session.messageMu.Lock()
hasPending := len(session.pendingMessages) > 0
session.messageMu.Unlock()
if hasPending {
return true
}
// Check for active generator
if session.generatorActive.Load() {
return true
}
}
return false
}
// GetAllSessions returns a copy of all active sessions.
func (m *Manager) GetAllSessions() []*ActiveSession {
m.mu.RLock()
defer m.mu.RUnlock()
sessions := make([]*ActiveSession, 0, len(m.sessions))
for _, session := range m.sessions {
sessions = append(sessions, session)
}
return sessions
}
// DrainMessages drains and returns all pending messages for a session.
func (m *Manager) DrainMessages(sessionDBID int64) []PendingMessage {
m.mu.RLock()
session, ok := m.sessions[sessionDBID]
m.mu.RUnlock()
if !ok {
return nil
}
session.messageMu.Lock()
messages := make([]PendingMessage, len(session.pendingMessages))
copy(messages, session.pendingMessages)
session.pendingMessages = session.pendingMessages[:0]
session.messageMu.Unlock()
return messages
}