Files
claude-mnemonic/internal/worker/sse/broadcaster.go
T
lukaszraczylo 38b3786942 HTTP and SSE dead client fix
Fix 1: HTTP Server timeouts (service.go)
  - Added IdleTimeout: 120s - closes idle keep-alive connections
  - Added ReadTimeout: 30s - prevents hung connections waiting for request data

  Fix 2: SSE dead client cleanup (broadcaster.go) - This was the real leak
  - Bug: When Write() failed on a disconnected client, it just logged and continued - the dead client stayed in the clients map forever
  - Effect: Dead clients accumulated and every Broadcast() call would try (and fail) to write to them
  - Fix: Now tracks failed writes and removes dead clients from the map

  The SSE bug was likely the main cause. Every time a browser tab closed or connection dropped, the client stayed registered. On each broadcast (processing status updates happen frequently), it would try to write to dead connections, fail, but never clean up.
2025-12-19 02:17:07 +00:00

176 lines
3.8 KiB
Go

// Package sse provides Server-Sent Events broadcasting for claude-mnemonic.
package sse
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"github.com/rs/zerolog/log"
)
// Client represents a connected SSE client.
type Client struct {
ID string
Writer http.ResponseWriter
Flusher http.Flusher
Done chan struct{}
}
// Broadcaster manages SSE client connections and message broadcasting.
type Broadcaster struct {
clients map[string]*Client
mu sync.RWMutex
nextID int
}
// NewBroadcaster creates a new SSE broadcaster.
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
clients: make(map[string]*Client),
}
}
// AddClient adds a new SSE client connection.
func (b *Broadcaster) AddClient(w http.ResponseWriter) (*Client, error) {
flusher, ok := w.(http.Flusher)
if !ok {
return nil, fmt.Errorf("streaming not supported")
}
b.mu.Lock()
b.nextID++
id := fmt.Sprintf("client-%d", b.nextID)
client := &Client{
ID: id,
Writer: w,
Flusher: flusher,
Done: make(chan struct{}),
}
b.clients[id] = client
clientCount := len(b.clients)
b.mu.Unlock()
log.Debug().
Str("clientId", id).
Int("totalClients", clientCount).
Msg("SSE client connected")
return client, nil
}
// RemoveClient removes a client connection.
func (b *Broadcaster) RemoveClient(client *Client) {
b.mu.Lock()
delete(b.clients, client.ID)
clientCount := len(b.clients)
b.mu.Unlock()
close(client.Done)
log.Debug().
Str("clientId", client.ID).
Int("totalClients", clientCount).
Msg("SSE client disconnected")
}
// removeClientByID removes a client by ID (for dead client cleanup).
func (b *Broadcaster) removeClientByID(id string) {
b.mu.Lock()
client, exists := b.clients[id]
if exists {
delete(b.clients, id)
}
clientCount := len(b.clients)
b.mu.Unlock()
if exists && client.Done != nil {
select {
case <-client.Done:
// Already closed
default:
close(client.Done)
}
}
log.Debug().
Str("clientId", id).
Int("totalClients", clientCount).
Msg("Dead SSE client removed")
}
// Broadcast sends a message to all connected clients.
func (b *Broadcaster) Broadcast(data interface{}) {
jsonData, err := json.Marshal(data)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal SSE data")
return
}
message := fmt.Sprintf("data: %s\n\n", jsonData)
b.mu.RLock()
clients := make([]*Client, 0, len(b.clients))
for _, client := range b.clients {
clients = append(clients, client)
}
b.mu.RUnlock()
// Track dead clients for removal
var deadClients []*Client
for _, client := range clients {
select {
case <-client.Done:
continue
default:
_, err := client.Writer.Write([]byte(message))
if err != nil {
log.Debug().
Str("clientId", client.ID).
Err(err).
Msg("Failed to write to SSE client, marking for removal")
deadClients = append(deadClients, client)
continue
}
client.Flusher.Flush()
}
}
// Remove dead clients outside the iteration
for _, client := range deadClients {
b.removeClientByID(client.ID)
}
}
// ClientCount returns the number of connected clients.
func (b *Broadcaster) ClientCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.clients)
}
// HandleSSE handles an SSE connection request.
func (b *Broadcaster) HandleSSE(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
client, err := b.AddClient(w)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer b.RemoveClient(client)
// Send initial connection message
fmt.Fprintf(w, "data: {\"type\":\"connected\",\"clientId\":\"%s\"}\n\n", client.ID)
client.Flusher.Flush()
// Wait for client disconnect
<-r.Context().Done()
}