From 38b37869429b677562d50aa737e5d5978c432f68 Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Mon, 15 Dec 2025 13:16:55 +0000 Subject: [PATCH] HTTP and SSE dead client fix Fix 1: HTTP Server timeouts (service.go) - Added IdleTimeout: 120s - closes idle keep-alive connections - Added ReadTimeout: 30s - prevents hung connections waiting for request data Fix 2: SSE dead client cleanup (broadcaster.go) - This was the real leak - Bug: When Write() failed on a disconnected client, it just logged and continued - the dead client stayed in the clients map forever - Effect: Dead clients accumulated and every Broadcast() call would try (and fail) to write to them - Fix: Now tracks failed writes and removes dead clients from the map The SSE bug was likely the main cause. Every time a browser tab closed or connection dropped, the client stayed registered. On each broadcast (processing status updates happen frequently), it would try to write to dead connections, fail, but never clean up. --- internal/worker/service.go | 3 +++ internal/worker/sse/broadcaster.go | 36 +++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/internal/worker/service.go b/internal/worker/service.go index a99f12a..d9efed0 100644 --- a/internal/worker/service.go +++ b/internal/worker/service.go @@ -669,6 +669,9 @@ func (s *Service) Start() error { Addr: fmt.Sprintf(":%d", port), Handler: s.router, ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 0, // Disabled for SSE (long-lived connections) + IdleTimeout: 120 * time.Second, } // Check if we're in restart mode (after update) diff --git a/internal/worker/sse/broadcaster.go b/internal/worker/sse/broadcaster.go index 739b73d..1b2934e 100644 --- a/internal/worker/sse/broadcaster.go +++ b/internal/worker/sse/broadcaster.go @@ -75,6 +75,31 @@ func (b *Broadcaster) RemoveClient(client *Client) { Msg("SSE client disconnected") } +// removeClientByID removes a client by ID (for dead client cleanup). +func (b *Broadcaster) removeClientByID(id string) { + b.mu.Lock() + client, exists := b.clients[id] + if exists { + delete(b.clients, id) + } + clientCount := len(b.clients) + b.mu.Unlock() + + if exists && client.Done != nil { + select { + case <-client.Done: + // Already closed + default: + close(client.Done) + } + } + + log.Debug(). + Str("clientId", id). + Int("totalClients", clientCount). + Msg("Dead SSE client removed") +} + // Broadcast sends a message to all connected clients. func (b *Broadcaster) Broadcast(data interface{}) { jsonData, err := json.Marshal(data) @@ -92,6 +117,9 @@ func (b *Broadcaster) Broadcast(data interface{}) { } b.mu.RUnlock() + // Track dead clients for removal + var deadClients []*Client + for _, client := range clients { select { case <-client.Done: @@ -102,12 +130,18 @@ func (b *Broadcaster) Broadcast(data interface{}) { log.Debug(). Str("clientId", client.ID). Err(err). - Msg("Failed to write to SSE client") + Msg("Failed to write to SSE client, marking for removal") + deadClients = append(deadClients, client) continue } client.Flusher.Flush() } } + + // Remove dead clients outside the iteration + for _, client := range deadClients { + b.removeClientByID(client.ID) + } } // ClientCount returns the number of connected clients.