diff --git a/internal/db/sqlite/store.go b/internal/db/sqlite/store.go index 1cdc670..08053b2 100644 --- a/internal/db/sqlite/store.go +++ b/internal/db/sqlite/store.go @@ -132,3 +132,8 @@ func (s *Store) QueryRowContext(ctx context.Context, query string, args ...inter } return stmt.QueryRowContext(ctx, args...) } + +// Ping checks if the database connection is alive. +func (s *Store) Ping() error { + return s.db.Ping() +} diff --git a/internal/update/update.go b/internal/update/update.go index 8fcf439..88d660f 100644 --- a/internal/update/update.go +++ b/internal/update/update.go @@ -573,21 +573,30 @@ func (u *Updater) Restart() error { log.Info().Str("path", workerPath).Msg("Restarting worker with new binary") - // Start the new process - cmd := exec.Command(workerPath) // #nosec G204 -- workerPath is from internal installDir - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Env = os.Environ() + // Use nohup to start a detached process that survives parent exit + // The new worker will retry binding to the port after the old process exits + cmd := exec.Command("nohup", workerPath) // #nosec G204 -- workerPath is from internal installDir + cmd.Stdout = nil // Detach stdout + cmd.Stderr = nil // Detach stderr + cmd.Stdin = nil // Detach stdin + cmd.Env = append(os.Environ(), "CLAUDE_MNEMONIC_RESTART=1") + // Start in background - don't wait if err := cmd.Start(); err != nil { return fmt.Errorf("failed to start new worker: %w", err) } - // Give the new process time to start - time.Sleep(RestartDelay) + // Release the child process so it's not a zombie + go func() { + _ = cmd.Wait() + }() - // Exit current process - the new one is now running log.Info().Int("new_pid", cmd.Process.Pid).Msg("New worker started, exiting old process") + + // Give a moment for the log to flush + time.Sleep(100 * time.Millisecond) + + // Exit current process - the new one will bind to the port os.Exit(0) return nil // Never reached diff --git a/internal/vector/chroma/client.go b/internal/vector/chroma/client.go index 35384ca..cc46b4f 100644 --- a/internal/vector/chroma/client.go +++ b/internal/vector/chroma/client.go @@ -449,6 +449,13 @@ func (c *Client) nextID() int { return c.requestID } +// IsConnected returns whether the client is currently connected to ChromaDB. +func (c *Client) IsConnected() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.connected +} + // Close closes the connection to ChromaDB. func (c *Client) Close() error { c.mu.Lock() diff --git a/internal/worker/handlers.go b/internal/worker/handlers.go index 6a8f7b6..1cbabb2 100644 --- a/internal/worker/handlers.go +++ b/internal/worker/handlers.go @@ -751,6 +751,110 @@ func (s *Service) handleUpdateStatus(w http.ResponseWriter, r *http.Request) { writeJSON(w, status) } +// ComponentHealth represents the health status of a single component. +type ComponentHealth struct { + Name string `json:"name"` + Status string `json:"status"` // "healthy", "degraded", "unhealthy" + Message string `json:"message,omitempty"` +} + +// SelfCheckResponse contains the health status of all components. +type SelfCheckResponse struct { + Overall string `json:"overall"` // "healthy", "degraded", "unhealthy" + Version string `json:"version"` + Uptime string `json:"uptime"` + Components []ComponentHealth `json:"components"` +} + +// handleSelfCheck returns the health status of all components. +func (s *Service) handleSelfCheck(w http.ResponseWriter, r *http.Request) { + components := []ComponentHealth{} + overall := "healthy" + + // Check Worker Service + workerStatus := ComponentHealth{Name: "Worker Service", Status: "healthy"} + if !s.ready.Load() { + if err := s.GetInitError(); err != nil { + workerStatus.Status = "unhealthy" + workerStatus.Message = err.Error() + overall = "unhealthy" + } else { + workerStatus.Status = "degraded" + workerStatus.Message = "Initializing" + if overall == "healthy" { + overall = "degraded" + } + } + } + components = append(components, workerStatus) + + // Check SQLite Database + dbStatus := ComponentHealth{Name: "SQLite Database", Status: "healthy"} + if s.store == nil { + dbStatus.Status = "unhealthy" + dbStatus.Message = "Not initialized" + overall = "unhealthy" + } else if err := s.store.Ping(); err != nil { + dbStatus.Status = "unhealthy" + dbStatus.Message = err.Error() + overall = "unhealthy" + } + components = append(components, dbStatus) + + // Check ChromaDB + chromaStatus := ComponentHealth{Name: "ChromaDB", Status: "healthy"} + if s.chromaClient == nil { + chromaStatus.Status = "degraded" + chromaStatus.Message = "Not configured" + if overall == "healthy" { + overall = "degraded" + } + } else if !s.chromaClient.IsConnected() { + chromaStatus.Status = "degraded" + chromaStatus.Message = "Not connected" + if overall == "healthy" { + overall = "degraded" + } + } + components = append(components, chromaStatus) + + // Check SDK Processor + sdkStatus := ComponentHealth{Name: "SDK Processor", Status: "healthy"} + if s.processor == nil { + sdkStatus.Status = "degraded" + sdkStatus.Message = "Not initialized" + if overall == "healthy" { + overall = "degraded" + } + } else if !s.processor.IsAvailable() { + sdkStatus.Status = "degraded" + sdkStatus.Message = "Claude CLI not available" + if overall == "healthy" { + overall = "degraded" + } + } + components = append(components, sdkStatus) + + // Check SSE Broadcaster + sseStatus := ComponentHealth{Name: "SSE Broadcaster", Status: "healthy"} + if s.sseBroadcaster == nil { + sseStatus.Status = "unhealthy" + sseStatus.Message = "Not initialized" + overall = "unhealthy" + } + components = append(components, sseStatus) + + // Calculate uptime + uptime := time.Since(s.startTime).Round(time.Second).String() + + writeJSON(w, SelfCheckResponse{ + Overall: overall, + Version: s.version, + Uptime: uptime, + Components: components, + }) +} + // handleUpdateRestart restarts the worker with the new binary. func (s *Service) handleUpdateRestart(w http.ResponseWriter, r *http.Request) { status := s.updater.GetStatus() diff --git a/internal/worker/sdk/processor.go b/internal/worker/sdk/processor.go index 20943d5..adfcc7f 100644 --- a/internal/worker/sdk/processor.go +++ b/internal/worker/sdk/processor.go @@ -92,6 +92,12 @@ func NewProcessor(observationStore *sqlite.ObservationStore, summaryStore *sqlit }, nil } +// IsAvailable checks if the Claude CLI is available for processing. +func (p *Processor) IsAvailable() bool { + _, err := os.Stat(p.claudePath) + return err == nil +} + // ProcessObservation processes a single tool observation and extracts insights. func (p *Processor) ProcessObservation(ctx context.Context, sdkSessionID, project string, toolName string, toolInput, toolResponse interface{}, promptNumber int, cwd string) error { p.mu.Lock() diff --git a/internal/worker/service.go b/internal/worker/service.go index 14d65d2..a99f12a 100644 --- a/internal/worker/service.go +++ b/internal/worker/service.go @@ -602,6 +602,9 @@ func (s *Service) setupRoutes() { s.router.Get("/api/update/status", s.handleUpdateStatus) s.router.Post("/api/update/restart", s.handleUpdateRestart) + // Selfcheck endpoint (works before DB is ready - checks all components) + s.router.Get("/api/selfcheck", s.handleSelfCheck) + // SSE endpoint (works before DB is ready) s.router.Get("/api/events", s.sseBroadcaster.HandleSSE) @@ -668,11 +671,34 @@ func (s *Service) Start() error { ReadHeaderTimeout: 10 * time.Second, } + // Check if we're in restart mode (after update) + isRestart := os.Getenv("CLAUDE_MNEMONIC_RESTART") == "1" + s.wg.Add(1) go func() { defer s.wg.Done() - if err := s.server.ListenAndServe(); err != http.ErrServerClosed { - log.Error().Err(err).Msg("HTTP server error") + + var lastErr error + maxRetries := 1 + if isRestart { + maxRetries = 10 // Retry up to 10 times during restart + } + + for i := 0; i < maxRetries; i++ { + lastErr = s.server.ListenAndServe() + if lastErr == http.ErrServerClosed { + return // Normal shutdown + } + + if i < maxRetries-1 && isRestart { + log.Warn().Err(lastErr).Int("retry", i+1).Msg("Port not ready, retrying...") + time.Sleep(500 * time.Millisecond) + continue + } + } + + if lastErr != nil { + log.Error().Err(lastErr).Msg("HTTP server error") } }() @@ -681,6 +707,7 @@ func (s *Service) Start() error { log.Info(). Int("port", port). Int("pid", getPID()). + Bool("restart_mode", isRestart). Msg("Worker HTTP server started (initialization in progress)") return nil diff --git a/internal/worker/static/placeholder.html b/internal/worker/static/placeholder.html index 7fda878..e69de29 100644 --- a/internal/worker/static/placeholder.html +++ b/internal/worker/static/placeholder.html @@ -1 +0,0 @@ - diff --git a/ui/src/App.vue b/ui/src/App.vue index 96e8df3..2b38384 100644 --- a/ui/src/App.vue +++ b/ui/src/App.vue @@ -1,6 +1,6 @@