Files
claude-mnemonic/internal/maintenance/service.go
T
lukaszraczylo b7b82ce22f fix: bound SQLite WAL growth and prevent worker hangs (#49)
The worker's SQLite WAL could grow unbounded (observed 19MB) and wedge the
DB, hanging Claude Code on every prompt. No checkpoint ever truncated the
WAL (only PASSIVE auto-checkpoint, which cannot reclaim the file), the
connection-scoped pragmas were set via a single Exec so only one pooled
connection received them (e.g. busy_timeout=0 on the rest), and the
maintenance service that would optimize/checkpoint was never wired up.

- Register a sqlite3 ConnectHook driver so all pragmas (busy_timeout,
  journal_mode, synchronous, cache_size, foreign_keys, journal_size_limit)
  apply to every pooled connection; enable safe connection recycling.
- Add Store.Checkpoint (TRUNCATE), checkpoint-on-Close, and a periodic
  size-gated checkpoint loop with configurable interval/threshold.
- Wire up the previously-dead maintenance service; make trigger_maintenance
  actually run DB maintenance instead of only recalculating scores.
- Harden the user-prompt hook to honor its deadline and fail open so a
  slow worker can never stall a prompt.
- Add regression tests for WAL truncation, checkpoint-on-close, and
  per-connection pragmas.
2026-06-01 16:40:27 +01:00

313 lines
8.2 KiB
Go

// Package maintenance provides scheduled maintenance tasks for claude-mnemonic.
package maintenance
import (
"context"
"sync"
"time"
"github.com/lukaszraczylo/claude-mnemonic/internal/config"
"github.com/lukaszraczylo/claude-mnemonic/internal/db/gorm"
"github.com/rs/zerolog"
)
// Service handles scheduled maintenance tasks.
type Service struct {
log zerolog.Logger
lastRunTime time.Time
promptStore *gorm.PromptStore
store *gorm.Store
vectorCleanupFn func(ctx context.Context, deletedIDs []int64)
config *config.Config
summaryStore *gorm.SummaryStore
stopCh chan struct{}
doneCh chan struct{}
observationStore *gorm.ObservationStore
lastRunDuration time.Duration
totalCleanedObs int64
totalOptimizeRun int64
mu sync.Mutex
running bool
}
// NewService creates a new maintenance service.
func NewService(
store *gorm.Store,
observationStore *gorm.ObservationStore,
summaryStore *gorm.SummaryStore,
promptStore *gorm.PromptStore,
vectorCleanupFn func(ctx context.Context, deletedIDs []int64),
cfg *config.Config,
log zerolog.Logger,
) *Service {
return &Service{
store: store,
observationStore: observationStore,
summaryStore: summaryStore,
promptStore: promptStore,
vectorCleanupFn: vectorCleanupFn,
config: cfg,
log: log.With().Str("component", "maintenance").Logger(),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}
// Start begins the maintenance loop.
func (s *Service) Start(ctx context.Context) {
s.mu.Lock()
if s.running {
s.mu.Unlock()
return
}
s.running = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.running = false
s.mu.Unlock()
close(s.doneCh)
}()
if !s.config.MaintenanceEnabled {
s.log.Info().Msg("Maintenance disabled, not starting scheduler")
return
}
interval := max(time.Duration(s.config.MaintenanceIntervalHours)*time.Hour, time.Hour)
s.log.Info().
Dur("interval", interval).
Int("retention_days", s.config.ObservationRetentionDays).
Bool("cleanup_stale", s.config.CleanupStaleObservations).
Msg("Starting maintenance scheduler")
// Initial run after 5 minutes (allow system to stabilize).
// Use a cancellable timer so shutdown (ctx cancel / Stop) is not blocked for up to 5m.
initialDelay := time.NewTimer(5 * time.Minute)
select {
case <-ctx.Done():
initialDelay.Stop()
s.log.Info().Msg("Maintenance shutting down before initial run (context cancellation)")
return
case <-s.stopCh:
initialDelay.Stop()
s.log.Info().Msg("Maintenance shutting down before initial run (stop signal)")
return
case <-initialDelay.C:
}
s.runMaintenance(ctx)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.log.Info().Msg("Maintenance shutting down due to context cancellation")
return
case <-s.stopCh:
s.log.Info().Msg("Maintenance shutting down due to stop signal")
return
case <-ticker.C:
s.runMaintenance(ctx)
}
}
}
// Stop signals the maintenance service to stop.
func (s *Service) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return
}
close(s.stopCh)
}
// Wait waits for the maintenance service to finish.
func (s *Service) Wait() {
<-s.doneCh
}
// runMaintenance executes all maintenance tasks.
func (s *Service) runMaintenance(ctx context.Context) {
start := time.Now()
s.log.Info().Msg("Starting maintenance run")
var totalCleaned int64
// Task 1: Clean up old observations by age
if s.config.ObservationRetentionDays > 0 {
cleaned, err := s.cleanupOldObservations(ctx)
if err != nil {
s.log.Error().Err(err).Msg("Failed to cleanup old observations")
} else {
totalCleaned += cleaned
s.log.Info().Int64("cleaned", cleaned).Msg("Cleaned old observations by age")
}
}
// Task 2: Clean up stale observations
if s.config.CleanupStaleObservations {
cleaned, err := s.cleanupStaleObservations(ctx)
if err != nil {
s.log.Error().Err(err).Msg("Failed to cleanup stale observations")
} else {
totalCleaned += cleaned
s.log.Info().Int64("cleaned", cleaned).Msg("Cleaned stale observations")
}
}
// Task 3: Optimize database
var optimized bool
if err := s.store.Optimize(ctx); err != nil {
s.log.Error().Err(err).Msg("Failed to optimize database")
} else {
optimized = true
}
// Task 4: Clean up old prompts (keep last 1000 per session)
cleanedPrompts, err := s.cleanupOldPrompts(ctx)
if err != nil {
s.log.Error().Err(err).Msg("Failed to cleanup old prompts")
} else if cleanedPrompts > 0 {
s.log.Info().Int64("cleaned", cleanedPrompts).Msg("Cleaned old prompts")
}
// Update metrics
s.mu.Lock()
s.lastRunTime = time.Now()
s.lastRunDuration = time.Since(start)
s.totalCleanedObs += totalCleaned
if optimized {
s.totalOptimizeRun++
}
s.mu.Unlock()
s.log.Info().
Dur("duration", time.Since(start)).
Int64("observations_cleaned", totalCleaned).
Msg("Maintenance run completed")
}
// cleanupOldObservations deletes observations older than the retention period.
func (s *Service) cleanupOldObservations(ctx context.Context) (int64, error) {
cutoffEpoch := time.Now().AddDate(0, 0, -s.config.ObservationRetentionDays).Unix()
// Get IDs of old observations
var deletedIDs []int64
err := s.store.GetDB().WithContext(ctx).
Model(&gorm.Observation{}).
Where("created_at_epoch < ?", cutoffEpoch).
Pluck("id", &deletedIDs).Error
if err != nil {
return 0, err
}
if len(deletedIDs) == 0 {
return 0, nil
}
// Delete in batches to avoid long transactions
batchSize := 100
for i := 0; i < len(deletedIDs); i += batchSize {
end := min(i+batchSize, len(deletedIDs))
batch := deletedIDs[i:end]
if err := s.store.GetDB().WithContext(ctx).
Where("id IN ?", batch).
Delete(&gorm.Observation{}).Error; err != nil {
return int64(i), err
}
// Sync vector DB deletions
if s.vectorCleanupFn != nil {
s.vectorCleanupFn(ctx, batch)
}
}
return int64(len(deletedIDs)), nil
}
// cleanupStaleObservations deletes observations marked as stale.
func (s *Service) cleanupStaleObservations(ctx context.Context) (int64, error) {
// Get IDs of stale observations (is_superseded = true)
var deletedIDs []int64
err := s.store.GetDB().WithContext(ctx).
Model(&gorm.Observation{}).
Where("is_superseded = ?", true).
Pluck("id", &deletedIDs).Error
if err != nil {
return 0, err
}
if len(deletedIDs) == 0 {
return 0, nil
}
// Delete in batches
batchSize := 100
for i := 0; i < len(deletedIDs); i += batchSize {
end := min(i+batchSize, len(deletedIDs))
batch := deletedIDs[i:end]
if err := s.store.GetDB().WithContext(ctx).
Where("id IN ?", batch).
Delete(&gorm.Observation{}).Error; err != nil {
return int64(i), err
}
// Sync vector DB deletions
if s.vectorCleanupFn != nil {
s.vectorCleanupFn(ctx, batch)
}
}
return int64(len(deletedIDs)), nil
}
// cleanupOldPrompts removes old prompts keeping only the most recent per session.
func (s *Service) cleanupOldPrompts(ctx context.Context) (int64, error) {
// Delete prompts older than 30 days that aren't the most recent in their session
cutoffEpoch := time.Now().AddDate(0, 0, -30).Unix()
result := s.store.GetDB().WithContext(ctx).
Where("created_at_epoch < ?", cutoffEpoch).
Delete(&gorm.UserPrompt{})
return result.RowsAffected, result.Error
}
// Stats returns maintenance statistics.
func (s *Service) Stats() map[string]any {
s.mu.Lock()
defer s.mu.Unlock()
return map[string]any{
"enabled": s.config.MaintenanceEnabled,
"interval_hours": s.config.MaintenanceIntervalHours,
"retention_days": s.config.ObservationRetentionDays,
"cleanup_stale": s.config.CleanupStaleObservations,
"last_run": s.lastRunTime,
"last_duration_ms": s.lastRunDuration.Milliseconds(),
"total_cleaned_obs": s.totalCleanedObs,
"total_optimizes": s.totalOptimizeRun,
"running": s.running,
}
}
// RunNow triggers an immediate maintenance run in the background.
func (s *Service) RunNow(ctx context.Context) {
go s.runMaintenance(ctx)
}
// RunNowSync triggers an immediate maintenance run and blocks until it completes.
// Use this when the caller needs to report a synchronous result (e.g. an HTTP handler).
func (s *Service) RunNowSync(ctx context.Context) {
s.runMaintenance(ctx)
}