mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-14 02:32:10 +00:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7df651c17a | |||
| 7ada94e4fa | |||
| c510c29a8f | |||
| 370602858a | |||
| 6261be6e53 | |||
| 5ae4ea1e25 | |||
| fd30dc0890 | |||
| 2966661054 | |||
| 0f23f10e2f | |||
| ce39dc1bee | |||
| f864e8edcf | |||
| e36cdf099e | |||
| e2c3d03661 | |||
| 9de8b7bcaa | |||
| 163fc5ac42 | |||
| 758412e54e | |||
| 2f3909c5b0 |
@@ -425,6 +425,8 @@ You can now specify the read-only GraphQL endpoint by setting the `HOST_GRAPHQL_
|
||||
|
||||
You can check out the [example of combined deployment with RW and read-only hasura](static/kubernetes-single-deployment-with-ro.yaml).
|
||||
|
||||
**Important:** When using a read-only Hasura instance connected to a PostgreSQL read replica, you **must** disable event trigger processing on that instance by setting `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL=0` in the read-only Hasura container environment variables. This prevents the read-only instance from attempting to process event triggers (which require write access to event log tables), avoiding "cannot set transaction read-write mode during recovery" errors.
|
||||
|
||||
### Resilience
|
||||
|
||||
#### Circuit Breaker Pattern
|
||||
@@ -723,6 +725,8 @@ Following tables are being cleaned:
|
||||
- `hdb_catalog.hdb_cron_event_invocation_logs`
|
||||
- `hdb_catalog.hdb_scheduled_event_invocation_logs`
|
||||
|
||||
**Important for RO/RW setups:** The `HASURA_EVENT_METADATA_DB` connection string must point to the **read-write primary database** where the `hdb_catalog` schema resides. The cleaner executes DELETE operations which require write permissions. Do not point this to a read-only replica.
|
||||
|
||||
|
||||
### Security
|
||||
|
||||
|
||||
@@ -15,12 +15,13 @@ const (
|
||||
)
|
||||
|
||||
// Use parameterized queries to prevent SQL injection
|
||||
// Cast $1 to interval type to allow proper parameterized interval values
|
||||
var delQueries = [...]string{
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - INTERVAL $1",
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
}
|
||||
|
||||
func enableHasuraEventCleaner(ctx context.Context) error {
|
||||
|
||||
@@ -340,8 +340,8 @@ func getDelQueries() []string {
|
||||
// This should return the actual delQueries from the main package
|
||||
// For testing purposes, we return expected parameterized queries
|
||||
return []string{
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - INTERVAL '$1 days'",
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - INTERVAL '$1 days'",
|
||||
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - $1::INTERVAL",
|
||||
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,11 +19,11 @@ require (
|
||||
github.com/jackc/pgx/v5 v5.7.6
|
||||
github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.78
|
||||
github.com/redis/go-redis/v9 v9.14.1
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.84
|
||||
github.com/redis/go-redis/v9 v9.16.0
|
||||
github.com/sony/gobreaker v1.0.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/valyala/fasthttp v1.67.0
|
||||
github.com/valyala/fasthttp v1.68.0
|
||||
go.opentelemetry.io/otel v1.38.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0
|
||||
go.opentelemetry.io/otel/sdk v1.38.0
|
||||
@@ -36,7 +36,8 @@ require (
|
||||
github.com/andybalholm/brotli v1.2.0 // indirect
|
||||
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/clipperhouse/uax29/v2 v2.2.0 // indirect
|
||||
github.com/clipperhouse/stringish v0.1.1 // indirect
|
||||
github.com/clipperhouse/uax29/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/fasthttp/websocket v1.5.12 // indirect
|
||||
@@ -59,15 +60,15 @@ require (
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.38.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.8.0 // indirect
|
||||
golang.org/x/crypto v0.43.0 // indirect
|
||||
golang.org/x/net v0.46.0 // indirect
|
||||
golang.org/x/sync v0.17.0 // indirect
|
||||
golang.org/x/sys v0.37.0 // indirect
|
||||
golang.org/x/term v0.36.0 // indirect
|
||||
golang.org/x/text v0.30.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251020155222-88f65dc88635 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251020155222-88f65dc88635 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
|
||||
golang.org/x/crypto v0.44.0 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/term v0.37.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect
|
||||
google.golang.org/protobuf v1.36.10 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -16,8 +16,10 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x
|
||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/clipperhouse/uax29/v2 v2.2.0 h1:ChwIKnQN3kcZteTXMgb1wztSgaU+ZemkgWdohwgs8tY=
|
||||
github.com/clipperhouse/uax29/v2 v2.2.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM=
|
||||
github.com/clipperhouse/stringish v0.1.1 h1:+NSqMOr3GR6k1FdRhhnXrLfztGzuG+VuFDfatpWHKCs=
|
||||
github.com/clipperhouse/stringish v0.1.1/go.mod h1:v/WhFtE1q0ovMta2+m+UbpZ+2/HEXNWYXQgCt4hdOzA=
|
||||
github.com/clipperhouse/uax29/v2 v2.3.0 h1:SNdx9DVUqMoBuBoW3iLOj4FQv3dN5mDtuqwuhIGpJy4=
|
||||
github.com/clipperhouse/uax29/v2 v2.3.0/go.mod h1:Wn1g7MK6OoeDT0vL+Q0SQLDz/KpfsVRgg6W7ihQeh4g=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@@ -72,8 +74,8 @@ github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9 h1:pL8B9mjv6RPUf
|
||||
github.com/lukaszraczylo/ask v0.0.0-20240916204100-6e9ef53a62d9/go.mod h1:M+UVdyqZs++xtEPrascaVmZdOMhCnxjZ2SgH+xHpR0c=
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12 h1:VO6hHYGw/Jy9JUizXf/bS0AI2QX1ueWWAWckMFVJ/w4=
|
||||
github.com/lukaszraczylo/go-ratecounter v0.1.12/go.mod h1:TqXEOCtFJStk1i0tkipprv1kiDHGon1MVUisjSTBSKM=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.78 h1:Ze+vTC1v3QkVB8++EO1gxyA1f/1DbXRgMFrOQDMtSWk=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.78/go.mod h1:PxQYblQDZISmYYj8sNfazAWxAOh1rhAtU208y+uPV8s=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.84 h1:yP00k8XSYKFYo6PmZFOsDblexLOG6WZzVWhzdstrxiw=
|
||||
github.com/lukaszraczylo/go-simple-graphql v1.2.84/go.mod h1:PxQYblQDZISmYYj8sNfazAWxAOh1rhAtU208y+uPV8s=
|
||||
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
|
||||
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
@@ -82,8 +84,8 @@ github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byF
|
||||
github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
|
||||
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
|
||||
github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4=
|
||||
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 h1:McifyVxygw1d67y6vxUqls2D46J8W9nrki9c8c0eVvE=
|
||||
@@ -97,8 +99,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasthttp v1.67.0 h1:tqKlJMUP6iuNG8hGjK/s9J4kadH7HLV4ijEcPGsezac=
|
||||
github.com/valyala/fasthttp v1.67.0/go.mod h1:qYSIpqt/0XNmShgo/8Aq8E3UYWVVwNS2QYmzd8WIEPM=
|
||||
github.com/valyala/fasthttp v1.68.0 h1:v12Nx16iepr8r9ySOwqI+5RBJ/DqTxhOy1HrHoDFnok=
|
||||
github.com/valyala/fasthttp v1.68.0/go.mod h1:5EXiRfYQAoiO/khu4oU9VISC/eVY6JqmSpPJoHCKsz4=
|
||||
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
|
||||
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
|
||||
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
|
||||
@@ -123,29 +125,29 @@ go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA=
|
||||
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
|
||||
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
|
||||
go.opentelemetry.io/proto/otlp v1.8.0 h1:fRAZQDcAFHySxpJ1TwlA1cJ4tvcrw7nXl9xWWC8N5CE=
|
||||
go.opentelemetry.io/proto/otlp v1.8.0/go.mod h1:tIeYOeNBU4cvmPqpaji1P+KbB4Oloai8wN4rWzRrFF0=
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
|
||||
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
|
||||
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
|
||||
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
|
||||
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
|
||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU=
|
||||
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
|
||||
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
|
||||
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
|
||||
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
|
||||
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
|
||||
golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss=
|
||||
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
|
||||
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
|
||||
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
|
||||
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
|
||||
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
|
||||
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
|
||||
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
|
||||
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251020155222-88f65dc88635 h1:1wvBeYv+A2zfEbxROscJl69OP0m74S8wGEO+Syat26o=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251020155222-88f65dc88635/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251020155222-88f65dc88635 h1:3uycTxukehWrxH4HtPRtn1PDABTU331ViDjyqrUbaog=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251020155222-88f65dc88635/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba h1:B14OtaXuMaCQsl2deSvNkyPKIzq3BjfxQp8d00QyWx4=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:G5IanEx8/PgI9w6CFcYQf7jMtHQhZruvfM1i3qOqk5U=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
|
||||
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
|
||||
+39
-4
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/goccy/go-json"
|
||||
fiber "github.com/gofiber/fiber/v2"
|
||||
@@ -37,6 +38,40 @@ var (
|
||||
currentCacheSize int64 // Use atomic operations for this
|
||||
)
|
||||
|
||||
// sanitizeOperationName removes null bytes and other invalid characters from operation names
|
||||
// This prevents panics when creating metrics with invalid label values
|
||||
func sanitizeOperationName(name string) string {
|
||||
if name == "" || name == "undefined" {
|
||||
return name
|
||||
}
|
||||
|
||||
var buf strings.Builder
|
||||
buf.Grow(len(name))
|
||||
|
||||
for _, r := range name {
|
||||
// Skip null bytes entirely
|
||||
if r == '\x00' {
|
||||
continue
|
||||
}
|
||||
// Replace control characters with underscores
|
||||
if r < 32 || r == 127 {
|
||||
buf.WriteByte('_')
|
||||
continue
|
||||
}
|
||||
// Only allow printable characters
|
||||
if unicode.IsPrint(r) {
|
||||
buf.WriteRune(r)
|
||||
}
|
||||
}
|
||||
|
||||
result := buf.String()
|
||||
// Return "undefined" if we ended up with an empty string after sanitization
|
||||
if result == "" {
|
||||
return "undefined"
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func prepareQueriesAndExemptions() {
|
||||
introspectionAllowedQueries = make(map[string]struct{})
|
||||
allowedUrls = make(map[string]struct{})
|
||||
@@ -298,8 +333,8 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
|
||||
res.operationType = "mutation"
|
||||
if oper.Name != nil {
|
||||
mutationName = oper.Name.Value
|
||||
// Use mutation name immediately
|
||||
res.operationName = mutationName
|
||||
// Use mutation name immediately, sanitized to prevent metric panics
|
||||
res.operationName = sanitizeOperationName(mutationName)
|
||||
}
|
||||
break // Found a mutation, no need to continue first pass
|
||||
}
|
||||
@@ -316,7 +351,7 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
|
||||
// We already set operation type to mutation in first pass
|
||||
// Only set name if we didn't find a mutation name earlier
|
||||
if res.operationName == "undefined" && oper.Name != nil {
|
||||
res.operationName = oper.Name.Value
|
||||
res.operationName = sanitizeOperationName(oper.Name.Value)
|
||||
}
|
||||
} else {
|
||||
// No mutation found, use the normal logic
|
||||
@@ -325,7 +360,7 @@ func parseGraphQLQuery(c *fiber.Ctx) *parseGraphQLQueryResult {
|
||||
}
|
||||
|
||||
if res.operationName == "undefined" && oper.Name != nil {
|
||||
res.operationName = oper.Name.Value
|
||||
res.operationName = sanitizeOperationName(oper.Name.Value)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+144
-22
@@ -68,20 +68,82 @@ func ensureDefaultLabels(labels *map[string]string, podName string) {
|
||||
}
|
||||
}
|
||||
|
||||
func appendSortedLabels(buf *bytes.Buffer, labels map[string]string) {
|
||||
keys := getSortedKeys(labels)
|
||||
for i, k := range keys {
|
||||
if i > 0 {
|
||||
buf.WriteByte(',')
|
||||
// sanitizeLabelValue removes or replaces characters that are invalid in metric labels
|
||||
// This includes null bytes, newlines, carriage returns, quotes, and backslashes
|
||||
func sanitizeLabelValue(value string) string {
|
||||
if value == "" {
|
||||
return value
|
||||
}
|
||||
|
||||
var buf strings.Builder
|
||||
buf.Grow(len(value))
|
||||
|
||||
for _, r := range value {
|
||||
switch r {
|
||||
case '\x00': // null byte
|
||||
continue // Skip null bytes entirely
|
||||
case '\n', '\r', '\t': // newlines, carriage returns, tabs
|
||||
buf.WriteByte(' ') // Replace with space
|
||||
case '"', '\\': // quotes and backslashes need escaping
|
||||
buf.WriteByte('\\')
|
||||
buf.WriteRune(r)
|
||||
default:
|
||||
// Only allow printable ASCII and common unicode characters
|
||||
if unicode.IsPrint(r) {
|
||||
buf.WriteRune(r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func appendSortedLabels(buf *bytes.Buffer, labels map[string]string) {
|
||||
// Add defer/recover to prevent panics from crashing the application
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Log the panic but don't crash
|
||||
fmt.Fprintf(os.Stderr, "Recovered from panic in appendSortedLabels: %v\n", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if len(labels) == 0 || buf == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Create a snapshot to avoid concurrent access issues
|
||||
labelsCopy := make(map[string]string, len(labels))
|
||||
for k, v := range labels {
|
||||
if k == "" {
|
||||
continue // Skip empty keys
|
||||
}
|
||||
// Sanitize the label value to remove null bytes and other invalid characters
|
||||
labelsCopy[k] = sanitizeLabelValue(v)
|
||||
}
|
||||
|
||||
if len(labelsCopy) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
keys := getSortedKeys(labelsCopy)
|
||||
for i, k := range keys {
|
||||
if v, ok := labelsCopy[k]; ok {
|
||||
if i > 0 {
|
||||
buf.WriteByte(',')
|
||||
}
|
||||
buf.WriteString(k)
|
||||
buf.WriteString(`="`)
|
||||
buf.WriteString(v)
|
||||
buf.WriteByte('"')
|
||||
}
|
||||
buf.WriteString(k)
|
||||
buf.WriteString(`="`)
|
||||
buf.WriteString(labels[k])
|
||||
buf.WriteByte('"')
|
||||
}
|
||||
}
|
||||
|
||||
func getSortedKeys(labels map[string]string) []string {
|
||||
if labels == nil {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
labelsKey := labelsToString(labels)
|
||||
|
||||
// Check if the sorted keys are already cached
|
||||
@@ -89,7 +151,7 @@ func getSortedKeys(labels map[string]string) []string {
|
||||
return keys.([]string)
|
||||
}
|
||||
|
||||
// Compute the sorted keys
|
||||
// Compute the sorted keys - create a snapshot to avoid concurrent access issues
|
||||
keys := make([]string, 0, len(labels))
|
||||
for k := range labels {
|
||||
keys = append(keys, k)
|
||||
@@ -103,18 +165,51 @@ func getSortedKeys(labels map[string]string) []string {
|
||||
}
|
||||
|
||||
func labelsToString(labels map[string]string) string {
|
||||
keys := make([]string, 0, len(labels))
|
||||
for k := range labels {
|
||||
keys = append(keys, k)
|
||||
// Add defer/recover to prevent panics from crashing the application
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Log the panic but don't crash
|
||||
fmt.Fprintf(os.Stderr, "Recovered from panic in labelsToString: %v\n", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if len(labels) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Create a snapshot of the map to avoid concurrent access issues
|
||||
keys := make([]string, 0, len(labels))
|
||||
values := make(map[string]string, len(labels))
|
||||
|
||||
for k, v := range labels {
|
||||
if k == "" {
|
||||
continue // Skip empty keys
|
||||
}
|
||||
keys = append(keys, k)
|
||||
values[k] = v
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
sort.Strings(keys)
|
||||
|
||||
// Pre-allocate the builder with estimated capacity to avoid reallocation
|
||||
var sb strings.Builder
|
||||
estimatedSize := 0
|
||||
for _, k := range keys {
|
||||
sb.WriteString(k)
|
||||
sb.WriteByte('=')
|
||||
sb.WriteString(labels[k])
|
||||
sb.WriteByte(';')
|
||||
estimatedSize += len(k) + len(values[k]) + 2 // key + value + '=' + ';'
|
||||
}
|
||||
sb.Grow(estimatedSize)
|
||||
|
||||
for _, k := range keys {
|
||||
if v, ok := values[k]; ok {
|
||||
sb.WriteString(k)
|
||||
sb.WriteByte('=')
|
||||
sb.WriteString(v)
|
||||
sb.WriteByte(';')
|
||||
}
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
@@ -164,17 +259,44 @@ func is_special_rune(r rune) bool {
|
||||
}
|
||||
|
||||
func compile_metrics_with_labels(name string, labels map[string]string) string {
|
||||
// Add defer/recover to prevent panics from crashing the application
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Log the panic but don't crash
|
||||
fmt.Fprintf(os.Stderr, "Recovered from panic in compile_metrics_with_labels: %v\n", r)
|
||||
}
|
||||
}()
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
buf.WriteString(name)
|
||||
|
||||
keys := getSortedKeys(labels)
|
||||
if len(labels) == 0 {
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// Create a snapshot to avoid concurrent access issues
|
||||
labelsCopy := make(map[string]string, len(labels))
|
||||
for k, v := range labels {
|
||||
if k == "" {
|
||||
continue // Skip empty keys
|
||||
}
|
||||
labelsCopy[k] = v
|
||||
}
|
||||
|
||||
if len(labelsCopy) == 0 {
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
keys := getSortedKeys(labelsCopy)
|
||||
|
||||
for _, k := range keys {
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(k)
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(labels[k])
|
||||
if v, ok := labelsCopy[k]; ok {
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(k)
|
||||
buf.WriteByte('_')
|
||||
buf.WriteString(v)
|
||||
}
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
|
||||
@@ -97,6 +97,9 @@ spec:
|
||||
value: "error"
|
||||
- name: HASURA_GRAPHQL_SERVER_PORT
|
||||
value: "8088"
|
||||
# Disable event trigger processing on read-only instance
|
||||
- name: HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL
|
||||
value: "0"
|
||||
|
||||
- name: graphql-proxy
|
||||
image: ghcr.io/lukaszraczylo/graphql-monitoring-proxy:latest
|
||||
|
||||
+145
-8
@@ -8,6 +8,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/goccy/go-json"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/websocket/v2"
|
||||
gorillaws "github.com/gorilla/websocket"
|
||||
@@ -88,13 +89,39 @@ func (wsp *WebSocketProxy) HandleWebSocket(c *fiber.Ctx) error {
|
||||
return fiber.NewError(fiber.StatusUpgradeRequired, "WebSocket upgrade required")
|
||||
}
|
||||
|
||||
// Capture headers from the upgrade request to forward to backend
|
||||
headers := make(http.Header)
|
||||
var subprotocols []string
|
||||
|
||||
for key, value := range c.Request().Header.All() {
|
||||
keyStr := string(key)
|
||||
// Capture subprotocol separately
|
||||
if keyStr == "Sec-Websocket-Protocol" || keyStr == "Sec-WebSocket-Protocol" {
|
||||
subprotocols = append(subprotocols, string(value))
|
||||
}
|
||||
// Forward important headers including WebSocket subprotocol
|
||||
// Skip only connection-establishment headers that will be regenerated
|
||||
if keyStr != "Connection" && keyStr != "Upgrade" &&
|
||||
keyStr != "Sec-Websocket-Key" && keyStr != "Sec-Websocket-Version" &&
|
||||
keyStr != "Sec-Websocket-Extensions" {
|
||||
headers.Add(keyStr, string(value))
|
||||
}
|
||||
}
|
||||
|
||||
// Configure WebSocket with subprotocol support
|
||||
config := websocket.Config{
|
||||
Subprotocols: subprotocols,
|
||||
}
|
||||
|
||||
return websocket.New(func(clientConn *websocket.Conn) {
|
||||
wsp.handleConnection(c.Context(), clientConn)
|
||||
})(c)
|
||||
// Use background context for long-lived WebSocket connections
|
||||
// The original request context expires after the upgrade
|
||||
wsp.handleConnection(context.Background(), clientConn, headers)
|
||||
}, config)(c)
|
||||
}
|
||||
|
||||
// handleConnection manages a single WebSocket connection
|
||||
func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *websocket.Conn) {
|
||||
func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *websocket.Conn, headers http.Header) {
|
||||
connectionID := fmt.Sprintf("%p", clientConn)
|
||||
startTime := time.Now()
|
||||
|
||||
@@ -115,8 +142,29 @@ func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *web
|
||||
// Set message size limit
|
||||
clientConn.SetReadLimit(wsp.maxMessageSize)
|
||||
|
||||
// Connect to backend WebSocket
|
||||
backendConn, err := wsp.dialBackend(ctx)
|
||||
// Read first message to extract authentication from connection_init payload
|
||||
// This bridges the gap between clients that send auth in payload vs Hasura expecting it in HTTP headers
|
||||
messageType, message, err := clientConn.ReadMessage()
|
||||
if err != nil {
|
||||
wsp.errors.Add(1)
|
||||
if wsp.logger != nil {
|
||||
wsp.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to read first message from client",
|
||||
Pairs: map[string]interface{}{
|
||||
"connection_id": connectionID,
|
||||
"error": err.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
clientConn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Try to extract headers from connection_init payload (for GraphQL WebSocket protocols)
|
||||
enrichedHeaders := wsp.extractAuthFromPayload(message, headers)
|
||||
|
||||
// Connect to backend WebSocket with enriched headers
|
||||
backendConn, err := wsp.dialBackend(ctx, enrichedHeaders)
|
||||
if err != nil {
|
||||
wsp.errors.Add(1)
|
||||
if wsp.logger != nil {
|
||||
@@ -133,6 +181,32 @@ func (wsp *WebSocketProxy) handleConnection(ctx context.Context, clientConn *web
|
||||
}
|
||||
defer backendConn.Close()
|
||||
|
||||
// Forward the first message (connection_init) to backend
|
||||
if err := backendConn.WriteMessage(messageType, message); err != nil {
|
||||
wsp.errors.Add(1)
|
||||
if wsp.logger != nil {
|
||||
wsp.logger.Error(&libpack_logger.LogMessage{
|
||||
Message: "Failed to forward connection_init to backend",
|
||||
Pairs: map[string]interface{}{
|
||||
"connection_id": connectionID,
|
||||
"error": err.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if wsp.logger != nil {
|
||||
wsp.logger.Debug(&libpack_logger.LogMessage{
|
||||
Message: "Backend WebSocket connection established",
|
||||
Pairs: map[string]interface{}{
|
||||
"connection_id": connectionID,
|
||||
"subprotocol": backendConn.Subprotocol(),
|
||||
"has_authorization": headers.Get("Authorization") != "",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Set up bidirectional proxying
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
@@ -299,8 +373,60 @@ func (wsp *WebSocketProxy) proxyBackendToClient(ctx context.Context, backend *go
|
||||
}
|
||||
}
|
||||
|
||||
// extractAuthFromPayload extracts authentication headers from GraphQL WebSocket connection_init payload
|
||||
// This bridges the gap between clients sending auth in payload and Hasura expecting it in HTTP headers
|
||||
func (wsp *WebSocketProxy) extractAuthFromPayload(message []byte, originalHeaders http.Header) http.Header {
|
||||
// Create a copy of original headers
|
||||
enrichedHeaders := make(http.Header)
|
||||
for k, v := range originalHeaders {
|
||||
enrichedHeaders[k] = v
|
||||
}
|
||||
|
||||
// Try to parse as JSON to extract headers from payload
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(message, &msg); err != nil {
|
||||
// Not JSON or parse error, return original headers
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// Check if this is a connection_init message
|
||||
msgType, ok := msg["type"].(string)
|
||||
if !ok || (msgType != "connection_init" && msgType != "start") {
|
||||
// Not a connection_init, return original headers
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// Extract payload
|
||||
payload, ok := msg["payload"].(map[string]interface{})
|
||||
if !ok {
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// Try to extract headers from payload.headers (graphql-ws format)
|
||||
if payloadHeaders, ok := payload["headers"].(map[string]interface{}); ok {
|
||||
for key, value := range payloadHeaders {
|
||||
if strValue, ok := value.(string); ok {
|
||||
enrichedHeaders.Set(key, strValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Also check top-level payload keys that look like headers (Apollo format)
|
||||
for key, value := range payload {
|
||||
if strValue, ok := value.(string); ok {
|
||||
// Common auth headers
|
||||
if key == "Authorization" || key == "authorization" ||
|
||||
key == "x-hasura-role" || key == "x-hasura-admin-secret" {
|
||||
enrichedHeaders.Set(key, strValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return enrichedHeaders
|
||||
}
|
||||
|
||||
// dialBackend establishes a WebSocket connection to the backend
|
||||
func (wsp *WebSocketProxy) dialBackend(ctx context.Context) (*gorillaws.Conn, error) {
|
||||
func (wsp *WebSocketProxy) dialBackend(ctx context.Context, headers http.Header) (*gorillaws.Conn, error) {
|
||||
// Convert http:// to ws:// or https:// to wss://
|
||||
wsURL := wsp.backendURL
|
||||
if len(wsURL) > 7 && wsURL[:7] == "http://" {
|
||||
@@ -309,13 +435,24 @@ func (wsp *WebSocketProxy) dialBackend(ctx context.Context) (*gorillaws.Conn, er
|
||||
wsURL = "wss://" + wsURL[8:]
|
||||
}
|
||||
|
||||
// Append GraphQL WebSocket path
|
||||
wsURL = wsURL + "/v1/graphql"
|
||||
|
||||
// Extract subprotocols from headers (e.g., graphql-ws, graphql-transport-ws)
|
||||
var subprotocols []string
|
||||
if proto := headers.Get("Sec-WebSocket-Protocol"); proto != "" {
|
||||
subprotocols = []string{proto}
|
||||
// Remove from headers since it will be set via Subprotocols field
|
||||
headers.Del("Sec-WebSocket-Protocol")
|
||||
}
|
||||
|
||||
// Use gorilla websocket dialer
|
||||
dialer := gorillaws.Dialer{
|
||||
HandshakeTimeout: 10 * time.Second,
|
||||
Subprotocols: subprotocols,
|
||||
}
|
||||
|
||||
// Dial the backend with proper headers
|
||||
headers := http.Header{}
|
||||
// Dial the backend with forwarded headers
|
||||
conn, _, err := dialer.DialContext(ctx, wsURL, headers)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to dial backend WebSocket: %w", err)
|
||||
|
||||
+3
-1
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -167,7 +168,8 @@ func TestWebSocketProxy_DialBackend_URLConversion(t *testing.T) {
|
||||
// We can't fully test dialBackend without a real WebSocket server,
|
||||
// but we can verify the URL conversion logic
|
||||
ctx := context.Background()
|
||||
_, err := wsp.dialBackend(ctx)
|
||||
headers := http.Header{}
|
||||
_, err := wsp.dialBackend(ctx, headers)
|
||||
|
||||
// We expect an error since there's no server, but we verify the conversion happened
|
||||
assert.Error(t, err) // Should fail to connect to non-existent server
|
||||
|
||||
Reference in New Issue
Block a user