Compare commits

...

11 Commits

Author SHA1 Message Date
lukaszraczylo 94c097bc6c fixup! Race condition in parseGraphQLQuery result pooling 2025-11-18 17:27:55 +00:00
lukaszraczylo 4e84cd7461 Race condition in parseGraphQLQuery result pooling
Under high concurrency, the sync.Pool pattern was creating a race condition
where the same result pointer was being reused by multiple concurrent requests.

The bug:
- parseGraphQLQuery() returns a pointer to 'res' from the pool
- The defer statement returns 'res' back to the pool on function exit
- While the caller is still using the returned pointer, another concurrent
  request could get the SAME pointer from the pool and modify it

This caused mutations to randomly get the wrong activeEndpoint value:
- Request A: mutation parsed → activeEndpoint set to :8080 (write)
- Request A: returns pointer to result
- Request A: defer runs → result returned to pool
- Request B: gets SAME pointer from pool
- Request B: query parsed → activeEndpoint overwritten to :8088 (read-only)
- Request A: still holding pointer, now sees :8088 instead of :8080!
- Result: mutation routed to read-only endpoint → database write failure

The fix:
Create a copy of the result before returning, so the pooled object can be
safely reused without affecting the returned value.
2025-11-18 17:03:11 +00:00
lukaszraczylo e37a8beaa7 Fix: Move endpoint routing outside loop to prevent mutation misrouting
BUG FIX: The endpoint routing logic was inside the loop that
processes all GraphQL definitions. This caused mutations to be incorrectly
routed to read-only endpoints when followed by other definitions (queries,
fragments, etc).

The bug manifested as: mutations → read-only Hasura → read-only pooler →
PostgreSQL replica → "cannot set transaction read-write mode during recovery"

Changes:
- Move endpoint routing logic AFTER the definition processing loop
- Ensures mutations are ALWAYS routed to write endpoint regardless of
  subsequent definitions in the document
- Add 3 comprehensive regression tests covering:
  1. Mutation with multiple operations
  2. Mutation followed by fragment
  3. Complex main-bot style mutation document

Tests: All pass including new regression tests
Impact: Fixes database write failures in main-bot and other services
2025-11-18 17:03:06 +00:00
lukaszraczylo 9dd8c11363 CRITICAL: Routing fix for mutations in case of the R/W replicas 2025-11-18 16:28:58 +00:00
lukaszraczylo 9fbee0d9a1 Update go.mod and go.sum
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-11-18 03:21:43 +00:00
lukaszraczylo 7df651c17a Update go.mod and go.sum
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-11-12 03:21:41 +00:00
lukaszraczylo 7ada94e4fa Fix nil pointers + improve the cleanup. 2025-11-11 10:43:07 +00:00
lukaszraczylo c510c29a8f Update go.mod and go.sum
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-11-11 03:22:29 +00:00
lukaszraczylo 370602858a Update go.mod and go.sum
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-11-09 03:21:44 +00:00
lukaszraczylo 6261be6e53 fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! Update go.mod and go.sum 2025-11-06 16:55:12 +00:00
lukaszraczylo 5ae4ea1e25 fixup! fixup! fixup! fixup! fixup! fixup! fixup! Update go.mod and go.sum 2025-11-05 22:55:03 +00:00
12 changed files with 476 additions and 66 deletions
+4
View File
@@ -425,6 +425,8 @@ You can now specify the read-only GraphQL endpoint by setting the `HOST_GRAPHQL_
You can check out the [example of combined deployment with RW and read-only hasura](static/kubernetes-single-deployment-with-ro.yaml).
**Important:** When using a read-only Hasura instance connected to a PostgreSQL read replica, you **must** disable event trigger processing on that instance by setting `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL=0` in the read-only Hasura container environment variables. This prevents the read-only instance from attempting to process event triggers (which require write access to event log tables), avoiding "cannot set transaction read-write mode during recovery" errors.
### Resilience
#### Circuit Breaker Pattern
@@ -723,6 +725,8 @@ Following tables are being cleaned:
- `hdb_catalog.hdb_cron_event_invocation_logs`
- `hdb_catalog.hdb_scheduled_event_invocation_logs`
**Important for RO/RW setups:** The `HASURA_EVENT_METADATA_DB` connection string must point to the **read-write primary database** where the `hdb_catalog` schema resides. The cleaner executes DELETE operations which require write permissions. Do not point this to a read-only replica.
### Security
+143
View File
@@ -0,0 +1,143 @@
package main
import (
"fmt"
"strings"
fiber "github.com/gofiber/fiber/v2"
"github.com/graphql-go/graphql/language/ast"
"github.com/graphql-go/graphql/language/parser"
"github.com/graphql-go/graphql/language/source"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
// debugParseGraphQLQuery provides detailed logging for mutation routing analysis
// This is automatically called when LOG_LEVEL=DEBUG to help identify routing issues
//
// It logs:
// - GraphQL query structure (operations, selections, directives)
// - Final routing decision (which endpoint was chosen)
// - Automatic detection of mutations routed to wrong endpoints
//
// To enable: Set LOG_LEVEL=DEBUG and restart the proxy
func debugParseGraphQLQuery(c *fiber.Ctx, query string) {
if cfg == nil || cfg.Logger == nil {
return
}
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "=== DEBUG: Parsing GraphQL Query ===",
Pairs: map[string]interface{}{
"query_length": len(query),
"query_preview": truncateString(query, 100),
},
})
// Parse the query
src := source.NewSource(&source.Source{
Body: []byte(query),
Name: "Debug GraphQL request",
})
p, err := parser.Parse(parser.ParseParams{Source: src})
if err != nil {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "DEBUG: Failed to parse query",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "DEBUG: Query parsed successfully",
Pairs: map[string]interface{}{
"definitions_count": len(p.Definitions),
},
})
// Analyze each definition
for i, d := range p.Definitions {
if oper, ok := d.(*ast.OperationDefinition); ok {
operationType := strings.ToLower(oper.Operation)
operationName := "unnamed"
if oper.Name != nil {
operationName = oper.Name.Value
}
// Count selections
selectionCount := 0
if oper.SelectionSet != nil {
selectionCount = len(oper.GetSelectionSet().Selections)
}
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: fmt.Sprintf("DEBUG: Definition #%d (OperationDefinition)", i),
Pairs: map[string]interface{}{
"operation_type": operationType,
"operation_name": operationName,
"selection_count": selectionCount,
"is_mutation": operationType == "mutation",
"directive_count": len(oper.Directives),
},
})
// Log selections for mutations
if operationType == "mutation" && oper.SelectionSet != nil {
for j, sel := range oper.GetSelectionSet().Selections {
if field, ok := sel.(*ast.Field); ok {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: fmt.Sprintf("DEBUG: Mutation field #%d", j),
Pairs: map[string]interface{}{
"field_name": field.Name.Value,
},
})
}
}
}
} else if frag, ok := d.(*ast.FragmentDefinition); ok {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: fmt.Sprintf("DEBUG: Definition #%d (FragmentDefinition)", i),
Pairs: map[string]interface{}{
"fragment_name": frag.Name.Value,
},
})
}
}
// Now run the actual parsing to see the result
result := parseGraphQLQuery(c)
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "DEBUG: Final routing decision",
Pairs: map[string]interface{}{
"operation_type": result.operationType,
"operation_name": result.operationName,
"active_endpoint": result.activeEndpoint,
"should_block": result.shouldBlock,
"should_ignore": result.shouldIgnore,
"write_endpoint": cfg.Server.HostGraphQL,
"read_endpoint": cfg.Server.HostGraphQLReadOnly,
"is_using_write": result.activeEndpoint == cfg.Server.HostGraphQL,
},
})
// Check for potential issues
if result.operationType == "mutation" && result.activeEndpoint != cfg.Server.HostGraphQL {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "DEBUG: ⚠️ BUG DETECTED: Mutation routed to wrong endpoint!",
Pairs: map[string]interface{}{
"expected_endpoint": cfg.Server.HostGraphQL,
"actual_endpoint": result.activeEndpoint,
},
})
}
if result.operationType == "mutation" && strings.Contains(strings.ToLower(result.activeEndpoint), "read") {
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "DEBUG: ⚠️ CRITICAL: Mutation endpoint contains 'read' in URL!",
Pairs: map[string]interface{}{
"endpoint": result.activeEndpoint,
},
})
}
}
+6 -5
View File
@@ -15,12 +15,13 @@ const (
)
// Use parameterized queries to prevent SQL injection
// Cast $1 to interval type to allow proper parameterized interval values
var delQueries = [...]string{
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - INTERVAL $1",
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - INTERVAL $1",
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
}
func enableHasuraEventCleaner(ctx context.Context) error {
+2 -2
View File
@@ -340,8 +340,8 @@ func getDelQueries() []string {
// This should return the actual delQueries from the main package
// For testing purposes, we return expected parameterized queries
return []string{
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - INTERVAL '$1 days'",
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - INTERVAL '$1 days'",
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
}
}
+9 -9
View File
@@ -28,7 +28,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
google.golang.org/grpc v1.76.0
google.golang.org/grpc v1.77.0
)
require (
@@ -61,14 +61,14 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/term v0.36.0 // indirect
golang.org/x/text v0.30.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101 // indirect
golang.org/x/crypto v0.44.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+18 -18
View File
@@ -129,27 +129,27 @@ go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjce
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU=
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss=
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101 h1:vk5TfqZHNn0obhPIYeS+cxIFKFQgser/M2jnI+9c6MM=
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101/go.mod h1:E17fc4PDhkr22dE3RgnH2hEubUaky6ZwW4VhANxyspg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101 h1:tRPGkdGHuewF4UisLzzHHr1spKw92qLM98nIzxbC0wY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba h1:B14OtaXuMaCQsl2deSvNkyPKIzq3BjfxQp8d00QyWx4=
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:G5IanEx8/PgI9w6CFcYQf7jMtHQhZruvfM1i3qOqk5U=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM=
google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+52 -13
View File
@@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"
"time"
"unicode"
"github.com/goccy/go-json"
fiber "github.com/gofiber/fiber/v2"
@@ -37,6 +38,40 @@ var (
currentCacheSize int64 // Use atomic operations for this
)
// sanitizeOperationName removes null bytes and other invalid characters from operation names
// This prevents panics when creating metrics with invalid label values
func sanitizeOperationName(name string) string {
if name == "" || name == "undefined" {
return name
}
var buf strings.Builder
buf.Grow(len(name))
for _, r := range name {
// Skip null bytes entirely
if r == '\x00' {
continue
}
// Replace control characters with underscores
if r < 32 || r == 127 {
buf.WriteByte('_')
continue
}
// Only allow printable characters
if unicode.IsPrint(r) {
buf.WriteRune(r)
}
}
result := buf.String()
// Return "undefined" if we ended up with an empty string after sanitization
if result == "" {
return "undefined"
}
return result
}
func prepareQueriesAndExemptions() {
introspectionAllowedQueries = make(map[string]struct{})
allowedUrls = make(map[string]struct{})
@@ -298,8 +333,8 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
res.operationType = "mutation"
if oper.Name != nil {
mutationName = oper.Name.Value
// Use mutation name immediately
res.operationName = mutationName
// Use mutation name immediately, sanitized to prevent metric panics
res.operationName = sanitizeOperationName(mutationName)
}
break // Found a mutation, no need to continue first pass
}
@@ -316,7 +351,7 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
// We already set operation type to mutation in first pass
// Only set name if we didn't find a mutation name earlier
if res.operationName == "undefined" && oper.Name != nil {
res.operationName = oper.Name.Value
res.operationName = sanitizeOperationName(oper.Name.Value)
}
} else {
// No mutation found, use the normal logic
@@ -325,18 +360,10 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
}
if res.operationName == "undefined" && oper.Name != nil {
res.operationName = oper.Name.Value
res.operationName = sanitizeOperationName(oper.Name.Value)
}
}
// Handle endpoint routing - always use write endpoint for mutations
if res.operationType == "mutation" {
res.activeEndpoint = cfg.Server.HostGraphQL
} else if cfg.Server.HostGraphQLReadOnly != "" {
// Use read-only endpoint for non-mutation operations
res.activeEndpoint = cfg.Server.HostGraphQLReadOnly
}
// Block mutations in read-only mode
if res.operationType == "mutation" && cfg.Server.ReadOnlyMode {
if ifNotInTest() {
@@ -359,13 +386,25 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
}
}
// Handle endpoint routing AFTER processing all definitions
// This ensures mutations are always routed to the write endpoint
if res.operationType == "mutation" {
res.activeEndpoint = cfg.Server.HostGraphQL
} else if cfg.Server.HostGraphQLReadOnly != "" {
// Use read-only endpoint for non-mutation operations
res.activeEndpoint = cfg.Server.HostGraphQLReadOnly
}
// Track parsing time
if ifNotInTest() && cfg.Monitoring != nil {
parseTime := float64(time.Since(startTime).Milliseconds())
cfg.Monitoring.IncrementFloat(libpack_monitoring.MetricsGraphQLParsingTime, nil, parseTime)
}
return res
// Create a copy to return, since the original will be returned to the pool
// This prevents race conditions where concurrent requests could modify the same result
result := *res
return &result
}
// processDirectives extracts caching directives from the operation
+107 -17
View File
@@ -68,26 +68,74 @@ func ensureDefaultLabels(labels *map[string]string, podName string) {
}
}
// sanitizeLabelValue removes or replaces characters that are invalid in metric labels
// This includes null bytes, newlines, carriage returns, quotes, and backslashes
func sanitizeLabelValue(value string) string {
if value == "" {
return value
}
var buf strings.Builder
buf.Grow(len(value))
for _, r := range value {
switch r {
case '\x00': // null byte
continue // Skip null bytes entirely
case '\n', '\r', '\t': // newlines, carriage returns, tabs
buf.WriteByte(' ') // Replace with space
case '"', '\\': // quotes and backslashes need escaping
buf.WriteByte('\\')
buf.WriteRune(r)
default:
// Only allow printable ASCII and common unicode characters
if unicode.IsPrint(r) {
buf.WriteRune(r)
}
}
}
return buf.String()
}
func appendSortedLabels(buf *bytes.Buffer, labels map[string]string) {
if len(labels) == 0 {
// Add defer/recover to prevent panics from crashing the application
defer func() {
if r := recover(); r != nil {
// Log the panic but don't crash
fmt.Fprintf(os.Stderr, "Recovered from panic in appendSortedLabels: %v\n", r)
}
}()
if len(labels) == 0 || buf == nil {
return
}
// Create a snapshot to avoid concurrent access issues
labelsCopy := make(map[string]string, len(labels))
for k, v := range labels {
labelsCopy[k] = v
if k == "" {
continue // Skip empty keys
}
// Sanitize the label value to remove null bytes and other invalid characters
labelsCopy[k] = sanitizeLabelValue(v)
}
if len(labelsCopy) == 0 {
return
}
keys := getSortedKeys(labelsCopy)
for i, k := range keys {
if i > 0 {
buf.WriteByte(',')
if v, ok := labelsCopy[k]; ok {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(k)
buf.WriteString(`="`)
buf.WriteString(v)
buf.WriteByte('"')
}
buf.WriteString(k)
buf.WriteString(`="`)
buf.WriteString(labelsCopy[k])
buf.WriteByte('"')
}
}
@@ -117,7 +165,15 @@ func getSortedKeys(labels map[string]string) []string {
}
func labelsToString(labels map[string]string) string {
if labels == nil {
// Add defer/recover to prevent panics from crashing the application
defer func() {
if r := recover(); r != nil {
// Log the panic but don't crash
fmt.Fprintf(os.Stderr, "Recovered from panic in labelsToString: %v\n", r)
}
}()
if len(labels) == 0 {
return ""
}
@@ -126,17 +182,34 @@ func labelsToString(labels map[string]string) string {
values := make(map[string]string, len(labels))
for k, v := range labels {
if k == "" {
continue // Skip empty keys
}
keys = append(keys, k)
values[k] = v
}
if len(keys) == 0 {
return ""
}
sort.Strings(keys)
// Pre-allocate the builder with estimated capacity to avoid reallocation
var sb strings.Builder
estimatedSize := 0
for _, k := range keys {
sb.WriteString(k)
sb.WriteByte('=')
sb.WriteString(values[k])
sb.WriteByte(';')
estimatedSize += len(k) + len(values[k]) + 2 // key + value + '=' + ';'
}
sb.Grow(estimatedSize)
for _, k := range keys {
if v, ok := values[k]; ok {
sb.WriteString(k)
sb.WriteByte('=')
sb.WriteString(v)
sb.WriteByte(';')
}
}
return sb.String()
}
@@ -186,6 +259,14 @@ func is_special_rune(r rune) bool {
}
func compile_metrics_with_labels(name string, labels map[string]string) string {
// Add defer/recover to prevent panics from crashing the application
defer func() {
if r := recover(); r != nil {
// Log the panic but don't crash
fmt.Fprintf(os.Stderr, "Recovered from panic in compile_metrics_with_labels: %v\n", r)
}
}()
var buf bytes.Buffer
buf.WriteString(name)
@@ -197,16 +278,25 @@ func compile_metrics_with_labels(name string, labels map[string]string) string {
// Create a snapshot to avoid concurrent access issues
labelsCopy := make(map[string]string, len(labels))
for k, v := range labels {
if k == "" {
continue // Skip empty keys
}
labelsCopy[k] = v
}
if len(labelsCopy) == 0 {
return buf.String()
}
keys := getSortedKeys(labelsCopy)
for _, k := range keys {
buf.WriteByte('_')
buf.WriteString(k)
buf.WriteByte('_')
buf.WriteString(labelsCopy[k])
if v, ok := labelsCopy[k]; ok {
buf.WriteByte('_')
buf.WriteString(k)
buf.WriteByte('_')
buf.WriteString(v)
}
}
return buf.String()
+30
View File
@@ -82,6 +82,36 @@ func (suite *Tests) Test_proxyTheRequest() {
wantErr: false,
wantEndpoint: "https://telegram-bot.app/",
},
{
name: "Test mutation with multiple operations (bug fix regression test)",
body: `{"query":"mutation getOrCreateUser { insert_tg_users_one(object: {id: 123}) { id } } query otherQuery { users { id } }"}`,
host: "https://telegram-bot.app/",
hostRO: "https://google.com/",
path: "/v1/graphql",
headers: supplied_headers,
wantErr: false,
wantEndpoint: "https://telegram-bot.app/",
},
{
name: "Test mutation followed by fragment (bug fix regression test)",
body: `{"query":"mutation insertUser { insert_users_one(object: {name: \"test\"}) { ...userFields } } fragment userFields on users { id name }"}`,
host: "https://telegram-bot.app/",
hostRO: "https://google.com/",
path: "/v1/graphql",
headers: supplied_headers,
wantErr: false,
wantEndpoint: "https://telegram-bot.app/",
},
{
name: "Test complex mutation document (main-bot style)",
body: `{"query":"mutation getOrCreateUser($user_id: bigint!, $group_id: bigint!) { insert_tg_users_one(object: {id: $user_id}, on_conflict: {constraint: tg_users_pkey, update_columns: last_seen}) { id } insert_tg_groups_one(object: {id: $group_id}, on_conflict: {constraint: tg_groups_pkey, update_columns: last_seen}) { id } }"}`,
host: "https://telegram-bot.app/",
hostRO: "https://google.com/",
path: "/v1/graphql",
headers: supplied_headers,
wantErr: false,
wantEndpoint: "https://telegram-bot.app/",
},
}
for _, tt := range tests {
+11
View File
@@ -272,6 +272,17 @@ func processGraphQLRequest(c *fiber.Ctx) error {
// Parse the GraphQL query
parsedResult := parseGraphQLQuery(c)
// Debug logging for mutation routing analysis (enabled when LOG_LEVEL=DEBUG)
if cfg.LogLevel == "DEBUG" {
var m map[string]interface{}
if err := json.Unmarshal(c.Body(), &m); err == nil {
if query, ok := m["query"].(string); ok {
debugParseGraphQLQuery(c, query)
}
}
}
if parsedResult.shouldBlock {
return c.Status(fiber.StatusForbidden).SendString("Request blocked")
}
@@ -97,6 +97,9 @@ spec:
value: "error"
- name: HASURA_GRAPHQL_SERVER_PORT
value: "8088"
# Disable event trigger processing on read-only instance
- name: HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL
value: "0"
- name: graphql-proxy
image: ghcr.io/lukaszraczylo/graphql-monitoring-proxy:latest
+91 -2
View File
@@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"
"github.com/goccy/go-json"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
gorillaws "github.com/gorilla/websocket"
@@ -141,8 +142,29 @@ func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *web
// Set message size limit
clientConn.SetReadLimit(wsp.maxMessageSize)
// Connect to backend WebSocket with forwarded headers
backendConn, err := wsp.dialBackend(ctx, headers)
// Read first message to extract authentication from connection_init payload
// This bridges the gap between clients that send auth in payload vs Hasura expecting it in HTTP headers
messageType, message, err := clientConn.ReadMessage()
if err != nil {
wsp.errors.Add(1)
if wsp.logger != nil {
wsp.logger.Error(&libpack_logger.LogMessage{
Message: "Failed to read first message from client",
Pairs: map[string]interface{}{
"connection_id": connectionID,
"error": err.Error(),
},
})
}
clientConn.Close()
return
}
// Try to extract headers from connection_init payload (for GraphQL WebSocket protocols)
enrichedHeaders := wsp.extractAuthFromPayload(message, headers)
// Connect to backend WebSocket with enriched headers
backendConn, err := wsp.dialBackend(ctx, enrichedHeaders)
if err != nil {
wsp.errors.Add(1)
if wsp.logger != nil {
@@ -159,6 +181,21 @@ func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *web
}
defer backendConn.Close()
// Forward the first message (connection_init) to backend
if err := backendConn.WriteMessage(messageType, message); err != nil {
wsp.errors.Add(1)
if wsp.logger != nil {
wsp.logger.Error(&libpack_logger.LogMessage{
Message: "Failed to forward connection_init to backend",
Pairs: map[string]interface{}{
"connection_id": connectionID,
"error": err.Error(),
},
})
}
return
}
if wsp.logger != nil {
wsp.logger.Debug(&libpack_logger.LogMessage{
Message: "Backend WebSocket connection established",
@@ -336,6 +373,58 @@ func (wsp *WebSocketProxy) proxyBackendToClient(ctx context.Context, backend *go
}
}
// extractAuthFromPayload extracts authentication headers from GraphQL WebSocket connection_init payload
// This bridges the gap between clients sending auth in payload and Hasura expecting it in HTTP headers
func (wsp *WebSocketProxy) extractAuthFromPayload(message []byte, originalHeaders http.Header) http.Header {
// Create a copy of original headers
enrichedHeaders := make(http.Header)
for k, v := range originalHeaders {
enrichedHeaders[k] = v
}
// Try to parse as JSON to extract headers from payload
var msg map[string]interface{}
if err := json.Unmarshal(message, &msg); err != nil {
// Not JSON or parse error, return original headers
return enrichedHeaders
}
// Check if this is a connection_init message
msgType, ok := msg["type"].(string)
if !ok || (msgType != "connection_init" && msgType != "start") {
// Not a connection_init, return original headers
return enrichedHeaders
}
// Extract payload
payload, ok := msg["payload"].(map[string]interface{})
if !ok {
return enrichedHeaders
}
// Try to extract headers from payload.headers (graphql-ws format)
if payloadHeaders, ok := payload["headers"].(map[string]interface{}); ok {
for key, value := range payloadHeaders {
if strValue, ok := value.(string); ok {
enrichedHeaders.Set(key, strValue)
}
}
}
// Also check top-level payload keys that look like headers (Apollo format)
for key, value := range payload {
if strValue, ok := value.(string); ok {
// Common auth headers
if key == "Authorization" || key == "authorization" ||
key == "x-hasura-role" || key == "x-hasura-admin-secret" {
enrichedHeaders.Set(key, strValue)
}
}
}
return enrichedHeaders
}
// dialBackend establishes a WebSocket connection to the backend
func (wsp *WebSocketProxy) dialBackend(ctx context.Context, headers http.Header) (*gorillaws.Conn, error) {
// Convert http:// to ws:// or https:// to wss://