diff --git a/internal/worker/service.go b/internal/worker/service.go index a99f12a..d9efed0 100644 --- a/internal/worker/service.go +++ b/internal/worker/service.go @@ -669,6 +669,9 @@ func (s *Service) Start() error { Addr: fmt.Sprintf(":%d", port), Handler: s.router, ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 0, // Disabled for SSE (long-lived connections) + IdleTimeout: 120 * time.Second, } // Check if we're in restart mode (after update) diff --git a/internal/worker/sse/broadcaster.go b/internal/worker/sse/broadcaster.go index 739b73d..1b2934e 100644 --- a/internal/worker/sse/broadcaster.go +++ b/internal/worker/sse/broadcaster.go @@ -75,6 +75,31 @@ func (b *Broadcaster) RemoveClient(client *Client) { Msg("SSE client disconnected") } +// removeClientByID removes a client by ID (for dead client cleanup). +func (b *Broadcaster) removeClientByID(id string) { + b.mu.Lock() + client, exists := b.clients[id] + if exists { + delete(b.clients, id) + } + clientCount := len(b.clients) + b.mu.Unlock() + + if exists && client.Done != nil { + select { + case <-client.Done: + // Already closed + default: + close(client.Done) + } + } + + log.Debug(). + Str("clientId", id). + Int("totalClients", clientCount). + Msg("Dead SSE client removed") +} + // Broadcast sends a message to all connected clients. func (b *Broadcaster) Broadcast(data interface{}) { jsonData, err := json.Marshal(data) @@ -92,6 +117,9 @@ func (b *Broadcaster) Broadcast(data interface{}) { } b.mu.RUnlock() + // Track dead clients for removal + var deadClients []*Client + for _, client := range clients { select { case <-client.Done: @@ -102,12 +130,18 @@ func (b *Broadcaster) Broadcast(data interface{}) { log.Debug(). Str("clientId", client.ID). Err(err). - Msg("Failed to write to SSE client") + Msg("Failed to write to SSE client, marking for removal") + deadClients = append(deadClients, client) continue } client.Flusher.Flush() } } + + // Remove dead clients outside the iteration + for _, client := range deadClients { + b.removeClientByID(client.ID) + } } // ClientCount returns the number of connected clients.