mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-15 02:47:52 +00:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 28223b40da | |||
| ee5618c699 | |||
| 94c097bc6c | |||
| 4e84cd7461 | |||
| e37a8beaa7 | |||
| 9dd8c11363 | |||
| 9fbee0d9a1 | |||
| 7df651c17a | |||
| 7ada94e4fa | |||
| c510c29a8f | |||
| 370602858a | |||
| 6261be6e53 | |||
| 5ae4ea1e25 |
@@ -425,6 +425,8 @@ You can now specify the read-only GraphQL endpoint by setting the `HOST_GRAPHQL_
|
||||
|
||||
You can check out the [example of combined deployment with RW and read-only hasura](static/kubernetes-single-deployment-with-ro.yaml).
|
||||
|
||||
**Important:** When using a read-only Hasura instance connected to a PostgreSQL read replica, you **must** disable event trigger processing on that instance by setting `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL=0` in the read-only Hasura container environment variables. This prevents the read-only instance from attempting to process event triggers (which require write access to event log tables), avoiding "cannot set transaction read-write mode during recovery" errors.
|
||||
|
||||
### Resilience
|
||||
|
||||
#### Circuit Breaker Pattern
|
||||
@@ -723,6 +725,8 @@ Following tables are being cleaned:
|
||||
- `hdb_catalog.hdb_cron_event_invocation_logs`
|
||||
- `hdb_catalog.hdb_scheduled_event_invocation_logs`
|
||||
|
||||
**Important for RO/RW setups:** The `HASURA_EVENT_METADATA_DB` connection string must point to the **read-write primary database** where the `hdb_catalog` schema resides. The cleaner executes DELETE operations which require write permissions. Do not point this to a read-only replica.
|
||||
|
||||
|
||||
### Security
|
||||
|
||||
|
||||
@@ -0,0 +1,143 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
fiber "github.com/gofiber/fiber/v2"
|
||||
"github.com/graphql-go/graphql/language/ast"
|
||||
"github.com/graphql-go/graphql/language/parser"
|
||||
"github.com/graphql-go/graphql/language/source"
|
||||
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
|
||||
)
|
||||
|
||||
// debugParseGraphQLQuery provides detailed logging for mutation routing analysis
|
||||
// This is automatically called when LOG_LEVEL=DEBUG to help identify routing issues
|
||||
//
|
||||
// It logs:
|
||||
// - GraphQL query structure (operations, selections, directives)
|
||||
// - Final routing decision (which endpoint was chosen)
|
||||
// - Automatic detection of mutations routed to wrong endpoints
|
||||
//
|
||||
// To enable: Set LOG_LEVEL=DEBUG and restart the proxy
|
||||
func debugParseGraphQLQuery(c *fiber.Ctx, query string) {
|
||||
if cfg == nil || cfg.Logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
cfg.Logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "=== DEBUG: Parsing GraphQL Query ===",
|
||||
Pairs: map[string]interface{}{
|
||||
"query_length": len(query),
|
||||
"query_preview": truncateString(query, 100),
|
||||
},
|
||||
})
|
||||
|
||||
// Parse the query
|
||||
src := source.NewSource(&source.Source{
|
||||
Body: []byte(query),
|
||||
Name: "Debug GraphQL request",
|
||||
})
|
||||
|
||||
p, err := parser.Parse(parser.ParseParams{Source: src})
|
||||
if err != nil {
|
||||
cfg.Logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "DEBUG: Failed to parse query",
|
||||
Pairs: map[string]interface{}{"error": err.Error()},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
cfg.Logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "DEBUG: Query parsed successfully",
|
||||
Pairs: map[string]interface{}{
|
||||
"definitions_count": len(p.Definitions),
|
||||
},
|
||||
})
|
||||
|
||||
// Analyze each definition
|
||||
for i, d := range p.Definitions {
|
||||
if oper, ok := d.(*ast.OperationDefinition); ok {
|
||||
operationType := strings.ToLower(oper.Operation)
|
||||
operationName := "unnamed"
|
||||
if oper.Name != nil {
|
||||
operationName = oper.Name.Value
|
||||
}
|
||||
|
||||
// Count selections
|
||||
selectionCount := 0
|
||||
if oper.SelectionSet != nil {
|
||||
selectionCount = len(oper.GetSelectionSet().Selections)
|
||||
}
|
||||
|
||||
cfg.Logger.Info(&libpack_logger.LogMessage{
|
||||
Message: fmt.Sprintf("DEBUG: Definition #%d (OperationDefinition)", i),
|
||||
Pairs: map[string]interface{}{
|
||||
"operation_type": operationType,
|
||||
"operation_name": operationName,
|
||||
"selection_count": selectionCount,
|
||||
"is_mutation": operationType == "mutation",
|
||||
"directive_count": len(oper.Directives),
|
||||
},
|
||||
})
|
||||
|
||||
// Log selections for mutations
|
||||
if operationType == "mutation" && oper.SelectionSet != nil {
|
||||
for j, sel := range oper.GetSelectionSet().Selections {
|
||||
if field, ok := sel.(*ast.Field); ok {
|
||||
cfg.Logger.Info(&libpack_logger.LogMessage{
|
||||
Message: fmt.Sprintf("DEBUG: Mutation field #%d", j),
|
||||
Pairs: map[string]interface{}{
|
||||
"field_name": field.Name.Value,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if frag, ok := d.(*ast.FragmentDefinition); ok {
|
||||
cfg.Logger.Info(&libpack_logger.LogMessage{
|
||||
Message: fmt.Sprintf("DEBUG: Definition #%d (FragmentDefinition)", i),
|
||||
Pairs: map[string]interface{}{
|
||||
"fragment_name": frag.Name.Value,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Now run the actual parsing to see the result
|
||||
result := parseGraphQLQuery(c)
|
||||
|
||||
cfg.Logger.Info(&libpack_logger.LogMessage{
|
||||
Message: "DEBUG: Final routing decision",
|
||||
Pairs: map[string]interface{}{
|
||||
"operation_type": result.operationType,
|
||||
"operation_name": result.operationName,
|
||||
"active_endpoint": result.activeEndpoint,
|
||||
"should_block": result.shouldBlock,
|
||||
"should_ignore": result.shouldIgnore,
|
||||
"write_endpoint": cfg.Server.HostGraphQL,
|
||||
"read_endpoint": cfg.Server.HostGraphQLReadOnly,
|
||||
"is_using_write": result.activeEndpoint == cfg.Server.HostGraphQL,
|
||||
},
|
||||
})
|
||||
|
||||
// Check for potential issues
|
||||
if result.operationType == "mutation" && result.activeEndpoint != cfg.Server.HostGraphQL {
|
||||
cfg.Logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "DEBUG: ⚠️ BUG DETECTED: Mutation routed to wrong endpoint!",
|
||||
Pairs: map[string]interface{}{
|
||||
"expected_endpoint": cfg.Server.HostGraphQL,
|
||||
"actual_endpoint": result.activeEndpoint,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if result.operationType == "mutation" && strings.Contains(strings.ToLower(result.activeEndpoint), "read") {
|
||||
cfg.Logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "DEBUG: ⚠️ CRITICAL: Mutation endpoint contains 'read' in URL!",
|
||||
Pairs: map[string]interface{}{
|
||||
"endpoint": result.activeEndpoint,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -15,12 +15,13 @@ const (
|
||||
)
|
||||
|
||||
// Use parameterized queries to prevent SQL injection
|
||||
// Cast $1 to interval type to allow proper parameterized interval values
|
||||
var delQueries = [...]string{
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
}
|
||||
|
||||
func enableHasuraEventCleaner(ctx context.Context) error {
|
||||
|
||||
@@ -340,8 +340,8 @@ func getDelQueries() []string {
|
||||
// This should return the actual delQueries from the main package
|
||||
// For testing purposes, we return expected parameterized queries
|
||||
return []string{
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - INTERVAL '$1 days'",
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - INTERVAL '$1 days'",
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,18 +9,18 @@ require (
|
||||
github.com/alicebob/miniredis/v2 v2.33.0
|
||||
github.com/avast/retry-go/v4 v4.7.0
|
||||
github.com/goccy/go-json v0.10.5
|
||||
github.com/gofiber/fiber/v2 v2.52.9
|
||||
github.com/gofiber/fiber/v2 v2.52.10
|
||||
github.com/gofiber/websocket/v2 v2.2.1
|
||||
github.com/gofrs/flock v0.13.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gookit/goutil v0.7.1
|
||||
github.com/gookit/goutil v0.7.2
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/graphql-go/graphql v0.8.1
|
||||
github.com/jackc/pgx/v5 v5.7.6
|
||||
github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.84
|
||||
github.com/redis/go-redis/v9 v9.16.0
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.89
|
||||
github.com/redis/go-redis/v9 v9.17.0
|
||||
github.com/sony/gobreaker v1.0.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/valyala/fasthttp v1.68.0
|
||||
@@ -28,7 +28,7 @@ require (
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0
|
||||
go.opentelemetry.io/otel/sdk v1.38.0
|
||||
go.opentelemetry.io/otel/trace v1.38.0
|
||||
google.golang.org/grpc v1.76.0
|
||||
google.golang.org/grpc v1.77.0
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -61,14 +61,14 @@ require (
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.38.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
|
||||
golang.org/x/crypto v0.43.0 // indirect
|
||||
golang.org/x/net v0.46.0 // indirect
|
||||
golang.org/x/sync v0.17.0 // indirect
|
||||
golang.org/x/sys v0.37.0 // indirect
|
||||
golang.org/x/term v0.36.0 // indirect
|
||||
golang.org/x/text v0.30.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101 // indirect
|
||||
golang.org/x/crypto v0.45.0 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/term v0.37.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect
|
||||
google.golang.org/protobuf v1.36.10 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -36,8 +36,8 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
|
||||
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
|
||||
github.com/goccy/go-reflect v1.2.0 h1:O0T8rZCuNmGXewnATuKYnkL0xm6o8UNOJZd/gOkb9ms=
|
||||
github.com/goccy/go-reflect v1.2.0/go.mod h1:n0oYZn8VcV2CkWTxi8B9QjkCoq6GTtCEdfmR66YhFtE=
|
||||
github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw=
|
||||
github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
|
||||
github.com/gofiber/fiber/v2 v2.52.10 h1:jRHROi2BuNti6NYXmZ6gbNSfT3zj/8c0xy94GOU5elY=
|
||||
github.com/gofiber/fiber/v2 v2.52.10/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
|
||||
github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w=
|
||||
github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU=
|
||||
github.com/gofrs/flock v0.13.0 h1:95JolYOvGMqeH31+FC7D2+uULf6mG61mEZ/A8dRYMzw=
|
||||
@@ -48,8 +48,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gookit/goutil v0.7.1 h1:AaFJPN9mrdeYBv8HOybri26EHGCC34WJVT7jUStGJsI=
|
||||
github.com/gookit/goutil v0.7.1/go.mod h1:vJS9HXctYTCLtCsZot5L5xF+O1oR17cDYO9R0HxBmnU=
|
||||
github.com/gookit/goutil v0.7.2 h1:NSiqWWY+BT0MwIlKDeSVPfQmr9xTkkAqwDjhplobdgo=
|
||||
github.com/gookit/goutil v0.7.2/go.mod h1:vJS9HXctYTCLtCsZot5L5xF+O1oR17cDYO9R0HxBmnU=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/graphql-go/graphql v0.8.1 h1:p7/Ou/WpmulocJeEx7wjQy611rtXGQaAcXGqanuMMgc=
|
||||
@@ -74,8 +74,8 @@ github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9 h1:pL8B9mjv6RPUf
|
||||
github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9/go.mod h1:M+UVdyqZs++xtEPrascaVmZdOMhCnxjZ2SgH+xHpR0c=
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12 h1:VO6hHYGw/Jy9JUizXf/bS0AI2QX1ueWWAWckMFVJ/w4=
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12/go.mod h1:TqXEOCtFJStk1i0tkipprv1kiDHGon1MVUisjSTBSKM=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.84 h1:yP00k8XSYKFYo6PmZFOsDblexLOG6WZzVWhzdstrxiw=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.84/go.mod h1:PxQYblQDZISmYYj8sNfazAWxAOh1rhAtU208y+uPV8s=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.89 h1:Xbu1Ny+a0lT2Sr2SaSC8mcHmGQDwGD4TJKk4DDd+PwA=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.89/go.mod h1:PxQYblQDZISmYYj8sNfazAWxAOh1rhAtU208y+uPV8s=
|
||||
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
|
||||
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
@@ -84,8 +84,8 @@ github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byF
|
||||
github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4=
|
||||
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/redis/go-redis/v9 v9.17.0 h1:K6E+ZlYN95KSMmZeEQPbU/c++wfmEvfFB17yEAq/VhM=
|
||||
github.com/redis/go-redis/v9 v9.17.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 h1:McifyVxygw1d67y6vxUqls2D46J8W9nrki9c8c0eVvE=
|
||||
@@ -129,27 +129,27 @@ go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjce
|
||||
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
|
||||
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
|
||||
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
|
||||
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
|
||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
|
||||
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
|
||||
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
|
||||
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
|
||||
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
|
||||
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
|
||||
golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss=
|
||||
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
|
||||
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
|
||||
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
|
||||
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
|
||||
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
|
||||
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
|
||||
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
|
||||
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101 h1:vk5TfqZHNn0obhPIYeS+cxIFKFQgser/M2jnI+9c6MM=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101/go.mod h1:E17fc4PDhkr22dE3RgnH2hEubUaky6ZwW4VhANxyspg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101 h1:tRPGkdGHuewF4UisLzzHHr1spKw92qLM98nIzxbC0wY=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
|
||||
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba h1:B14OtaXuMaCQsl2deSvNkyPKIzq3BjfxQp8d00QyWx4=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:G5IanEx8/PgI9w6CFcYQf7jMtHQhZruvfM1i3qOqk5U=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM=
|
||||
google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
+52
-13
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/goccy/go-json"
|
||||
fiber "github.com/gofiber/fiber/v2"
|
||||
@@ -37,6 +38,40 @@ var (
|
||||
currentCacheSize int64 // Use atomic operations for this
|
||||
)
|
||||
|
||||
// sanitizeOperationName removes null bytes and other invalid characters from operation names
|
||||
// This prevents panics when creating metrics with invalid label values
|
||||
func sanitizeOperationName(name string) string {
|
||||
if name == "" || name == "undefined" {
|
||||
return name
|
||||
}
|
||||
|
||||
var buf strings.Builder
|
||||
buf.Grow(len(name))
|
||||
|
||||
for _, r := range name {
|
||||
// Skip null bytes entirely
|
||||
if r == '\x00' {
|
||||
continue
|
||||
}
|
||||
// Replace control characters with underscores
|
||||
if r < 32 || r == 127 {
|
||||
buf.WriteByte('_')
|
||||
continue
|
||||
}
|
||||
// Only allow printable characters
|
||||
if unicode.IsPrint(r) {
|
||||
buf.WriteRune(r)
|
||||
}
|
||||
}
|
||||
|
||||
result := buf.String()
|
||||
// Return "undefined" if we ended up with an empty string after sanitization
|
||||
if result == "" {
|
||||
return "undefined"
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func prepareQueriesAndExemptions() {
|
||||
introspectionAllowedQueries = make(map[string]struct{})
|
||||
allowedUrls = make(map[string]struct{})
|
||||
@@ -298,8 +333,8 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
|
||||
res.operationType = "mutation"
|
||||
if oper.Name != nil {
|
||||
mutationName = oper.Name.Value
|
||||
// Use mutation name immediately
|
||||
res.operationName = mutationName
|
||||
// Use mutation name immediately, sanitized to prevent metric panics
|
||||
res.operationName = sanitizeOperationName(mutationName)
|
||||
}
|
||||
break // Found a mutation, no need to continue first pass
|
||||
}
|
||||
@@ -316,7 +351,7 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
|
||||
// We already set operation type to mutation in first pass
|
||||
// Only set name if we didn't find a mutation name earlier
|
||||
if res.operationName == "undefined" && oper.Name != nil {
|
||||
res.operationName = oper.Name.Value
|
||||
res.operationName = sanitizeOperationName(oper.Name.Value)
|
||||
}
|
||||
} else {
|
||||
// No mutation found, use the normal logic
|
||||
@@ -325,18 +360,10 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
|
||||
}
|
||||
|
||||
if res.operationName == "undefined" && oper.Name != nil {
|
||||
res.operationName = oper.Name.Value
|
||||
res.operationName = sanitizeOperationName(oper.Name.Value)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle endpoint routing - always use write endpoint for mutations
|
||||
if res.operationType == "mutation" {
|
||||
res.activeEndpoint = cfg.Server.HostGraphQL
|
||||
} else if cfg.Server.HostGraphQLReadOnly != "" {
|
||||
// Use read-only endpoint for non-mutation operations
|
||||
res.activeEndpoint = cfg.Server.HostGraphQLReadOnly
|
||||
}
|
||||
|
||||
// Block mutations in read-only mode
|
||||
if res.operationType == "mutation" && cfg.Server.ReadOnlyMode {
|
||||
if ifNotInTest() {
|
||||
@@ -359,13 +386,25 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
|
||||
}
|
||||
}
|
||||
|
||||
// Handle endpoint routing AFTER processing all definitions
|
||||
// This ensures mutations are always routed to the write endpoint
|
||||
if res.operationType == "mutation" {
|
||||
res.activeEndpoint = cfg.Server.HostGraphQL
|
||||
} else if cfg.Server.HostGraphQLReadOnly != "" {
|
||||
// Use read-only endpoint for non-mutation operations
|
||||
res.activeEndpoint = cfg.Server.HostGraphQLReadOnly
|
||||
}
|
||||
|
||||
// Track parsing time
|
||||
if ifNotInTest() && cfg.Monitoring != nil {
|
||||
parseTime := float64(time.Since(startTime).Milliseconds())
|
||||
cfg.Monitoring.IncrementFloat(libpack_monitoring.MetricsGraphQLParsingTime, nil, parseTime)
|
||||
}
|
||||
|
||||
return res
|
||||
// Create a copy to return, since the original will be returned to the pool
|
||||
// This prevents race conditions where concurrent requests could modify the same result
|
||||
result := *res
|
||||
return &result
|
||||
}
|
||||
|
||||
// processDirectives extracts caching directives from the operation
|
||||
|
||||
+107
-17
@@ -68,26 +68,74 @@ func ensureDefaultLabels(labels *map[string]string, podName string) {
|
||||
}
|
||||
}
|
||||
|
||||
// sanitizeLabelValue removes or replaces characters that are invalid in metric labels
|
||||
// This includes null bytes, newlines, carriage returns, quotes, and backslashes
|
||||
func sanitizeLabelValue(value string) string {
|
||||
if value == "" {
|
||||
return value
|
||||
}
|
||||
|
||||
var buf strings.Builder
|
||||
buf.Grow(len(value))
|
||||
|
||||
for _, r := range value {
|
||||
switch r {
|
||||
case '\x00': // null byte
|
||||
continue // Skip null bytes entirely
|
||||
case '\n', '\r', '\t': // newlines, carriage returns, tabs
|
||||
buf.WriteByte(' ') // Replace with space
|
||||
case '"', '\\': // quotes and backslashes need escaping
|
||||
buf.WriteByte('\\')
|
||||
buf.WriteRune(r)
|
||||
default:
|
||||
// Only allow printable ASCII and common unicode characters
|
||||
if unicode.IsPrint(r) {
|
||||
buf.WriteRune(r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func appendSortedLabels(buf *bytes.Buffer, labels map[string]string) {
|
||||
if len(labels) == 0 {
|
||||
// Add defer/recover to prevent panics from crashing the application
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Log the panic but don't crash
|
||||
fmt.Fprintf(os.Stderr, "Recovered from panic in appendSortedLabels: %v\n", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if len(labels) == 0 || buf == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Create a snapshot to avoid concurrent access issues
|
||||
labelsCopy := make(map[string]string, len(labels))
|
||||
for k, v := range labels {
|
||||
labelsCopy[k] = v
|
||||
if k == "" {
|
||||
continue // Skip empty keys
|
||||
}
|
||||
// Sanitize the label value to remove null bytes and other invalid characters
|
||||
labelsCopy[k] = sanitizeLabelValue(v)
|
||||
}
|
||||
|
||||
if len(labelsCopy) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
keys := getSortedKeys(labelsCopy)
|
||||
for i, k := range keys {
|
||||
if i > 0 {
|
||||
buf.WriteByte(',')
|
||||
if v, ok := labelsCopy[k]; ok {
|
||||
if i > 0 {
|
||||
buf.WriteByte(',')
|
||||
}
|
||||
buf.WriteString(k)
|
||||
buf.WriteString(`="`)
|
||||
buf.WriteString(v)
|
||||
buf.WriteByte('"')
|
||||
}
|
||||
buf.WriteString(k)
|
||||
buf.WriteString(`="`)
|
||||
buf.WriteString(labelsCopy[k])
|
||||
buf.WriteByte('"')
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,7 +165,15 @@ func getSortedKeys(labels map[string]string) []string {
|
||||
}
|
||||
|
||||
func labelsToString(labels map[string]string) string {
|
||||
if labels == nil {
|
||||
// Add defer/recover to prevent panics from crashing the application
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Log the panic but don't crash
|
||||
fmt.Fprintf(os.Stderr, "Recovered from panic in labelsToString: %v\n", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if len(labels) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -126,17 +182,34 @@ func labelsToString(labels map[string]string) string {
|
||||
values := make(map[string]string, len(labels))
|
||||
|
||||
for k, v := range labels {
|
||||
if k == "" {
|
||||
continue // Skip empty keys
|
||||
}
|
||||
keys = append(keys, k)
|
||||
values[k] = v
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
sort.Strings(keys)
|
||||
|
||||
// Pre-allocate the builder with estimated capacity to avoid reallocation
|
||||
var sb strings.Builder
|
||||
estimatedSize := 0
|
||||
for _, k := range keys {
|
||||
sb.WriteString(k)
|
||||
sb.WriteByte('=')
|
||||
sb.WriteString(values[k])
|
||||
sb.WriteByte(';')
|
||||
estimatedSize += len(k) + len(values[k]) + 2 // key + value + '=' + ';'
|
||||
}
|
||||
sb.Grow(estimatedSize)
|
||||
|
||||
for _, k := range keys {
|
||||
if v, ok := values[k]; ok {
|
||||
sb.WriteString(k)
|
||||
sb.WriteByte('=')
|
||||
sb.WriteString(v)
|
||||
sb.WriteByte(';')
|
||||
}
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
@@ -186,6 +259,14 @@ func is_special_rune(r rune) bool {
|
||||
}
|
||||
|
||||
func compile_metrics_with_labels(name string, labels map[string]string) string {
|
||||
// Add defer/recover to prevent panics from crashing the application
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Log the panic but don't crash
|
||||
fmt.Fprintf(os.Stderr, "Recovered from panic in compile_metrics_with_labels: %v\n", r)
|
||||
}
|
||||
}()
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
buf.WriteString(name)
|
||||
@@ -197,16 +278,25 @@ func compile_metrics_with_labels(name string, labels map[string]string) string {
|
||||
// Create a snapshot to avoid concurrent access issues
|
||||
labelsCopy := make(map[string]string, len(labels))
|
||||
for k, v := range labels {
|
||||
if k == "" {
|
||||
continue // Skip empty keys
|
||||
}
|
||||
labelsCopy[k] = v
|
||||
}
|
||||
|
||||
if len(labelsCopy) == 0 {
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
keys := getSortedKeys(labelsCopy)
|
||||
|
||||
for _, k := range keys {
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(k)
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(labelsCopy[k])
|
||||
if v, ok := labelsCopy[k]; ok {
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(k)
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(v)
|
||||
}
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
|
||||
@@ -82,6 +82,36 @@ func (suite *Tests) Test_proxyTheRequest() {
|
||||
wantErr: false,
|
||||
wantEndpoint: "https://telegram-bot.app/",
|
||||
},
|
||||
{
|
||||
name: "Test mutation with multiple operations (bug fix regression test)",
|
||||
body: `{"query":"mutation getOrCreateUser { insert_tg_users_one(object: {id: 123}) { id } } query otherQuery { users { id } }"}`,
|
||||
host: "https://telegram-bot.app/",
|
||||
hostRO: "https://google.com/",
|
||||
path: "/v1/graphql",
|
||||
headers: supplied_headers,
|
||||
wantErr: false,
|
||||
wantEndpoint: "https://telegram-bot.app/",
|
||||
},
|
||||
{
|
||||
name: "Test mutation followed by fragment (bug fix regression test)",
|
||||
body: `{"query":"mutation insertUser { insert_users_one(object: {name: \"test\"}) { ...userFields } } fragment userFields on users { id name }"}`,
|
||||
host: "https://telegram-bot.app/",
|
||||
hostRO: "https://google.com/",
|
||||
path: "/v1/graphql",
|
||||
headers: supplied_headers,
|
||||
wantErr: false,
|
||||
wantEndpoint: "https://telegram-bot.app/",
|
||||
},
|
||||
{
|
||||
name: "Test complex mutation document (main-bot style)",
|
||||
body: `{"query":"mutation getOrCreateUser($user_id: bigint!, $group_id: bigint!) { insert_tg_users_one(object: {id: $user_id}, on_conflict: {constraint: tg_users_pkey, update_columns: last_seen}) { id } insert_tg_groups_one(object: {id: $group_id}, on_conflict: {constraint: tg_groups_pkey, update_columns: last_seen}) { id } }"}`,
|
||||
host: "https://telegram-bot.app/",
|
||||
hostRO: "https://google.com/",
|
||||
path: "/v1/graphql",
|
||||
headers: supplied_headers,
|
||||
wantErr: false,
|
||||
wantEndpoint: "https://telegram-bot.app/",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
@@ -272,6 +272,17 @@ func processGraphQLRequest(c *fiber.Ctx) error {
|
||||
|
||||
// Parse the GraphQL query
|
||||
parsedResult := parseGraphQLQuery(c)
|
||||
|
||||
// Debug logging for mutation routing analysis (enabled when LOG_LEVEL=DEBUG)
|
||||
if cfg.LogLevel == "DEBUG" {
|
||||
var m map[string]interface{}
|
||||
if err := json.Unmarshal(c.Body(), &m); err == nil {
|
||||
if query, ok := m["query"].(string); ok {
|
||||
debugParseGraphQLQuery(c, query)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if parsedResult.shouldBlock {
|
||||
return c.Status(fiber.StatusForbidden).SendString("Request blocked")
|
||||
}
|
||||
|
||||
@@ -97,6 +97,9 @@ spec:
|
||||
value: "error"
|
||||
- name: HASURA_GRAPHQL_SERVER_PORT
|
||||
value: "8088"
|
||||
# Disable event trigger processing on read-only instance
|
||||
- name: HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL
|
||||
value: "0"
|
||||
|
||||
- name: graphql-proxy
|
||||
image: ghcr.io/lukaszraczylo/graphql-monitoring-proxy:latest
|
||||
|
||||
+91
-2
@@ -8,6 +8,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/goccy/go-json"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/websocket/v2"
|
||||
gorillaws "github.com/gorilla/websocket"
|
||||
@@ -141,8 +142,29 @@ func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *web
|
||||
// Set message size limit
|
||||
clientConn.SetReadLimit(wsp.maxMessageSize)
|
||||
|
||||
// Connect to backend WebSocket with forwarded headers
|
||||
backendConn, err := wsp.dialBackend(ctx, headers)
|
||||
// Read first message to extract authentication from connection_init payload
|
||||
// This bridges the gap between clients that send auth in payload vs Hasura expecting it in HTTP headers
|
||||
messageType, message, err := clientConn.ReadMessage()
|
||||
if err != nil {
|
||||
wsp.errors.Add(1)
|
||||
if wsp.logger != nil {
|
||||
wsp.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to read first message from client",
|
||||
Pairs: map[string]interface{}{
|
||||
"connection_id": connectionID,
|
||||
"error": err.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
clientConn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Try to extract headers from connection_init payload (for GraphQL WebSocket protocols)
|
||||
enrichedHeaders := wsp.extractAuthFromPayload(message, headers)
|
||||
|
||||
// Connect to backend WebSocket with enriched headers
|
||||
backendConn, err := wsp.dialBackend(ctx, enrichedHeaders)
|
||||
if err != nil {
|
||||
wsp.errors.Add(1)
|
||||
if wsp.logger != nil {
|
||||
@@ -159,6 +181,21 @@ func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *web
|
||||
}
|
||||
defer backendConn.Close()
|
||||
|
||||
// Forward the first message (connection_init) to backend
|
||||
if err := backendConn.WriteMessage(messageType, message); err != nil {
|
||||
wsp.errors.Add(1)
|
||||
if wsp.logger != nil {
|
||||
wsp.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to forward connection_init to backend",
|
||||
Pairs: map[string]interface{}{
|
||||
"connection_id": connectionID,
|
||||
"error": err.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if wsp.logger != nil {
|
||||
wsp.logger.Debug(&libpack_logger.LogMessage{
|
||||
Message: "Backend WebSocket connection established",
|
||||
@@ -336,6 +373,58 @@ func (wsp *WebSocketProxy) proxyBackendToClient(ctx context.Context, backend *go
|
||||
}
|
||||
}
|
||||
|
||||
// extractAuthFromPayload extracts authentication headers from GraphQL WebSocket connection_init payload
|
||||
// This bridges the gap between clients sending auth in payload and Hasura expecting it in HTTP headers
|
||||
func (wsp *WebSocketProxy) extractAuthFromPayload(message []byte, originalHeaders http.Header) http.Header {
|
||||
// Create a copy of original headers
|
||||
enrichedHeaders := make(http.Header)
|
||||
for k, v := range originalHeaders {
|
||||
enrichedHeaders[k] = v
|
||||
}
|
||||
|
||||
// Try to parse as JSON to extract headers from payload
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(message, &msg); err != nil {
|
||||
// Not JSON or parse error, return original headers
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// Check if this is a connection_init message
|
||||
msgType, ok := msg["type"].(string)
|
||||
if !ok || (msgType != "connection_init" && msgType != "start") {
|
||||
// Not a connection_init, return original headers
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// Extract payload
|
||||
payload, ok := msg["payload"].(map[string]interface{})
|
||||
if !ok {
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// Try to extract headers from payload.headers (graphql-ws format)
|
||||
if payloadHeaders, ok := payload["headers"].(map[string]interface{}); ok {
|
||||
for key, value := range payloadHeaders {
|
||||
if strValue, ok := value.(string); ok {
|
||||
enrichedHeaders.Set(key, strValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Also check top-level payload keys that look like headers (Apollo format)
|
||||
for key, value := range payload {
|
||||
if strValue, ok := value.(string); ok {
|
||||
// Common auth headers
|
||||
if key == "Authorization" || key == "authorization" ||
|
||||
key == "x-hasura-role" || key == "x-hasura-admin-secret" {
|
||||
enrichedHeaders.Set(key, strValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// dialBackend establishes a WebSocket connection to the backend
|
||||
func (wsp *WebSocketProxy) dialBackend(ctx context.Context, headers http.Header) (*gorillaws.Conn, error) {
|
||||
// Convert http:// to ws:// or https:// to wss://
|
||||
|
||||
Reference in New Issue
Block a user