Compare commits

...

3 Commits

Author SHA1 Message Date
lukaszraczylo e495cf23d9 Read only endpoint support (#10)
* This change introduces ability to set additional endpoint leading to the
instance of the graphql server connected to the read only database.
If regular query is detected and endpoint for `HOST_GRAPHQL_READONLY` value is set,
the query will be proxied to it. Mutations and non-graphql will be sent
to the `HOST_GRAPHQL` endpoint.
2024-03-12 11:16:35 +00:00
lukaszraczylo ba1fef9b57 Improve stats gathering (#9) 2024-03-05 22:42:30 +00:00
lukaszraczylo 3a18e0e935 Improve stats gathering and tests improvements. (#8) 2024-03-05 22:40:06 +00:00
19 changed files with 511 additions and 130 deletions
+81
View File
@@ -0,0 +1,81 @@
name: Run tests on PR
on:
pull_request:
branches:
- "main"
push:
paths-ignore:
- "**/**.md"
- "**/**.yaml"
- "static/**"
branches:
- "!main"
env:
GO_VERSION: ">=1.21"
jobs:
# This job is responsible for preparation of the build
# environment variables.
prepare:
name: Preparing build context
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
id: cache
with:
go-version: ${{env.GO_VERSION}}
cache-dependency-path: "**/*.sum"
- name: Go get dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: |
go get ./...
# This job is responsible for running tests and linting the codebase
test:
name: "Unit testing"
# needs: [prepare]
runs-on: ubuntu-latest
# container: github/super-linter:v4
needs: [prepare]
services:
# Label used to access the service container
redis:
# Docker Hub image
image: redis
# Set health checks to wait until redis has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{env.GO_VERSION}}
cache-dependency-path: "**/*.sum"
- name: Install dependencies
run: |
go mod tidy
- name: Run unit tests
env:
REDIS_HOST: redis
REDIS_PORT: 6379
run: |
export REDIS_SERVER="$REDIS_HOST:$REDIS_PORT"
CI_RUN=${CI} make test
+9
View File
@@ -15,6 +15,7 @@ This project is in active use by [telegram-bot.app](https://telegram-bot.app), a
- [Configuration](#configuration)
- [Speed](#speed)
- [Caching](#caching)
- [Read-only endpoint](#read-only-endpoint)
- [Security](#security)
- [Role-based rate limiting](#role-based-rate-limiting)
- [Read-only mode](#read-only-mode)
@@ -93,6 +94,7 @@ In this case, both proxy and websockets will be available under the `/v1/graphql
| monitor | Extracting the query name and type and adding it as a label to metrics|
| monitor | Calculating the query duration and adding it to the metrics |
| speed | Caching the queries, together with per-query cache and TTL |
| speed | Support for READ ONLY graphql endpoint |
| security | Blocking schema introspection |
| security | Rate limiting queries based on user role |
| security | Blocking mutations in read-only mode |
@@ -111,6 +113,7 @@ You can still use the non-prefixed environment variables in the spirit of the ba
| `MONITORING_PORT` | The port to expose the metrics endpoint | `9393` |
| `PORT_GRAPHQL` | The port to expose the graphql endpoint | `8080` |
| `HOST_GRAPHQL` | The host to proxy the graphql endpoint | `http://localhost/` |
| `HOST_GRAPHQL_READONLY` | The host to proxy the read-only graphql endpoint | `` |
| `HEALTHCHECK_GRAPHQL_URL` | The URL to check the health of the graphql endpoint | `` |
| `JWT_USER_CLAIM_PATH` | Path to the user claim in the JWT token | `` |
| `JWT_ROLE_CLAIM_PATH` | Path to the role claim in the JWT token | `` |
@@ -156,6 +159,12 @@ query MyProducts @cached(refresh: true) {
Since version `0.5.30` the cache is gzipped in the memory, which should optimise the memory usage quite significantly.
#### Read-only endpoint
You can now specify the read-only GraphQL endpoint by setting the `HOST_GRAPHQL_READONLY` environment variable. The default value is empty, preventing the proxy from using the read-only endpoint for the queries and directing all the requests to the main endpoint specified as `HOST_GRAPHQL`. If the `HOST_GRAPHQL_READONLY` is set, the proxy will use the read-only endpoint for the queries with the `query` type and the main endpoint for the `mutation` type queries. Format of the read-only endpoint is the same as `HOST_GRAPHQL` endpoint, for example `http://localhost:8080/`.
You can check out the [example of combined deployment with RW and read-only hasura](static/kubernetes-single-deployment-with-ro.yaml).
### Security
#### Role-based rate limiting
+1 -1
View File
@@ -13,7 +13,7 @@ func calculateHash(c *fiber.Ctx) string {
}
func enableCache() {
cfg.Cache.CacheClient = libpack_cache.New(time.Duration(cfg.Cache.CacheTTL) * time.Second * 100)
cfg.Cache.CacheClient = libpack_cache.New(time.Duration(cfg.Cache.CacheTTL) * time.Second)
}
func cacheLookup(hash string) []byte {
+1 -2
View File
@@ -1,7 +1,6 @@
package main
import (
"testing"
"time"
)
@@ -38,7 +37,7 @@ func (suite *Tests) Test_cacheLookup() {
},
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
suite.Run(tt.name, func() {
if tt.addCache.data != nil {
cfg.Cache.CacheClient.Set(tt.args.hash, tt.addCache.data, time.Duration(90*time.Second))
}
+1 -3
View File
@@ -1,7 +1,5 @@
package main
import "testing"
func (suite *Tests) Test_extractClaimsFromJWTHeader() {
jwt_token_for_tests := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwiSGFzdXJhIjp7IngtaGFzdXJhLWFsbG93ZWQtcm9sZXMiOlsiZ3Vlc3QiLCJ1c2VyIiwiZ3JvdXBhZG1pbiIsInBheWFkbWluIl0sIngtaGFzdXJhLWRlZmF1bHQtcm9sZSI6Imd1ZXN0IiwieC1oYXN1cmEtdXNlci1pZCI6IjE2NyIsIngtaGFzdXJhLXVzZXItdXVpZCI6ImRkM2U2ZTM1LTA0MDktNDNiMC1iZmYxLWNlZjNjNmVkNWYxMCJ9LCJpc3MiOiJBdXRoU2VydmljZSIsImV4cCI6MTY5NjgwMTcyNiwibmJmIjoxNjk2NTg1NzI2LCJpYXQiOjE2OTY1ODU3MjZ9.dsJ5JKzG5tXOlqeZ_Gfe2XC-vyrcwtYwOGfhvt8q9UY"
@@ -68,7 +66,7 @@ func (suite *Tests) Test_extractClaimsFromJWTHeader() {
},
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
suite.Run(tt.name, func() {
if len(tt.jwt_token_path) > 0 {
cfg.Client.JWTUserClaimPath = tt.jwt_token_path
}
+40 -43
View File
@@ -1,7 +1,6 @@
package main
import (
"flag"
"strconv"
"strings"
@@ -41,43 +40,30 @@ var introspectionQuerySet = map[string]struct{}{}
var introspectionAllowedQueries = map[string]struct{}{}
var allowedUrls = map[string]struct{}{}
// Utility function to convert a slice of strings to a map for O(1) lookups.
func sliceToMap(slice []string) map[string]struct{} {
resultMap := make(map[string]struct{}, len(slice))
for _, item := range slice {
resultMap[strings.ToLower(item)] = struct{}{}
}
return resultMap
}
func prepareQueriesAndExemptions() {
introspectionQuerySet = map[string]struct{}{}
introspectionQuerySet = func() map[string]struct{} {
rsqs := make(map[string]struct{}, len(introspection_queries))
for _, query := range introspection_queries {
rsqs[strings.ToLower(query)] = struct{}{}
}
return rsqs
}()
introspectionAllowedQueries = map[string]struct{}{}
introspectionAllowedQueries = func() map[string]struct{} {
rsqs := make(map[string]struct{}, len(cfg.Security.IntrospectionAllowed))
for _, query := range cfg.Security.IntrospectionAllowed {
rsqs[strings.ToLower(query)] = struct{}{}
}
return rsqs
}()
allowedUrls = map[string]struct{}{}
allowedUrls = func() map[string]struct{} {
rsqs := make(map[string]struct{}, len(cfg.Server.AllowURLs))
for _, query := range cfg.Server.AllowURLs {
rsqs[strings.ToLower(query)] = struct{}{}
}
return rsqs
}()
introspectionQuerySet = sliceToMap(introspection_queries)
introspectionAllowedQueries = sliceToMap(cfg.Security.IntrospectionAllowed)
allowedUrls = sliceToMap(cfg.Server.AllowURLs)
}
type parseGraphQLQueryResult struct {
operationType string
operationName string
cacheRequest bool
cacheTime int
cacheRefresh bool
shouldBlock bool
shouldIgnore bool
operationType string
operationName string
activeEndpoint string
cacheTime int
cacheRequest bool
cacheRefresh bool
shouldBlock bool
shouldIgnore bool
}
func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
@@ -85,8 +71,8 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
m := make(map[string]interface{})
err := json.Unmarshal(c.Body(), &m)
if err != nil {
cfg.Logger.Debug("Can't unmarshal the request", map[string]interface{}{"error": err.Error(), "body": string(c.Body())})
if flag.Lookup("test.v") == nil {
cfg.Logger.Error("Can't unmarshal the request", map[string]interface{}{"error": err.Error(), "body": string(c.Body())})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
return
@@ -95,7 +81,7 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
query, ok := m["query"].(string)
if !ok {
cfg.Logger.Error("Can't find the query", map[string]interface{}{"query": query, "m_val": m})
if flag.Lookup("test.v") == nil {
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
return
@@ -104,7 +90,7 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
p, err := parser.Parse(parser.ParseParams{Source: query})
if err != nil {
cfg.Logger.Error("Can't parse the query", map[string]interface{}{"query": query, "m_val": m})
if flag.Lookup("test.v") == nil {
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
}
return
@@ -112,17 +98,25 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
res.shouldIgnore = false
res.operationName = "undefined"
res.activeEndpoint = cfg.Server.HostGraphQL
for _, d := range p.Definitions {
if oper, ok := d.(*ast.OperationDefinition); ok {
res.operationType = oper.Operation
res.operationType = strings.ToLower(oper.Operation)
if oper.Name != nil {
res.operationName = oper.Name.Value
}
if strings.ToLower(res.operationType) == "mutation" && cfg.Server.ReadOnlyMode {
// If the query is a mutation then direct it to the RW endpoint,
// otherwise direct it to the RO endpoint if it's set.
if cfg.Server.HostGraphQLReadOnly != "" && res.operationType != "mutation" {
res.activeEndpoint = cfg.Server.HostGraphQLReadOnly
}
if res.operationType == "mutation" && cfg.Server.ReadOnlyMode {
cfg.Logger.Warning("Mutation blocked", m)
if flag.Lookup("test.v") == nil {
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
c.Status(403).SendString("The server is in read-only mode")
@@ -138,7 +132,7 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
res.cacheTime, err = strconv.Atoi(arg.Value.GetValue().(string))
if err != nil {
cfg.Logger.Error("Can't parse the ttl, using global", map[string]interface{}{"bad_ttl": arg.Value.GetValue().(string)})
if flag.Lookup("test.v") == nil {
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
}
return
@@ -184,8 +178,11 @@ func checkSelections(c *fiber.Ctx, selections []ast.Selection) bool {
func checkIfContainsIntrospection(c *fiber.Ctx, whatever string) (shouldBlock bool) {
whateverLower := strings.ToLower(whatever)
got_exemption := false
// If the query is an introspection query, we need to check if it's allowed.
if _, exists := introspectionQuerySet[whateverLower]; exists {
if len(cfg.Security.IntrospectionAllowed) > 0 {
if _, allowed_exists := introspectionAllowedQueries[whateverLower]; allowed_exists {
cfg.Logger.Debug("Introspection query allowed, passing through", map[string]interface{}{"query": whatever})
got_exemption = true
@@ -197,7 +194,7 @@ func checkIfContainsIntrospection(c *fiber.Ctx, whatever string) (shouldBlock bo
}
}
if shouldBlock {
if flag.Lookup("test.v") == nil {
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
c.Status(403).SendString("Introspection queries are not allowed")
+18 -27
View File
@@ -1,10 +1,6 @@
package main
import (
"testing"
fiber "github.com/gofiber/fiber/v2"
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
"github.com/valyala/fasthttp"
)
@@ -166,6 +162,7 @@ func (suite *Tests) Test_parseGraphQLQuery() {
{
name: "test mutation query with config: read only",
suppliedSettings: func() *config {
parseConfig()
cfg.Server.ReadOnlyMode = true
return cfg
}(),
@@ -199,6 +196,7 @@ func (suite *Tests) Test_parseGraphQLQuery() {
{
name: "test simple query with introspection __schema config: block introspection",
suppliedSettings: func() *config {
parseConfig()
cfg.Security.BlockIntrospection = true
return cfg
}(),
@@ -221,7 +219,6 @@ func (suite *Tests) Test_parseGraphQLQuery() {
parseConfig()
cfg.Security.BlockIntrospection = true
cfg.Security.IntrospectionAllowed = []string{}
prepareQueriesAndExemptions()
return cfg
}(),
suppliedQuery: queries{
@@ -243,7 +240,6 @@ func (suite *Tests) Test_parseGraphQLQuery() {
parseConfig()
cfg.Security.BlockIntrospection = true
cfg.Security.IntrospectionAllowed = []string{"__schema"}
prepareQueriesAndExemptions()
return cfg
}(),
suppliedQuery: queries{
@@ -275,15 +271,9 @@ func (suite *Tests) Test_parseGraphQLQuery() {
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
suite.Run(tt.name, func() {
cfg = &config{}
cfg.Logger = libpack_logging.NewLogger()
defer func() {
cfg = &config{}
}()
app := fiber.New()
parseConfig()
ctx_headers := func() *fasthttp.RequestHeader {
h := fasthttp.RequestHeader{}
for k, v := range tt.suppliedQuery.headers {
@@ -298,28 +288,29 @@ func (suite *Tests) Test_parseGraphQLQuery() {
ctx_request.AppendBody([]byte(tt.suppliedQuery.body))
ctx := app.AcquireCtx(&fasthttp.RequestCtx{
ctx := suite.app.AcquireCtx(&fasthttp.RequestCtx{
Request: ctx_request,
})
defer app.ReleaseCtx(ctx)
// defer func() {
// cfg = &config{}
// parseConfig()
// suite.app.ReleaseCtx(ctx)
// }()
assert.NotNil(ctx, "Fiber context is nil")
if tt.suppliedSettings != nil {
cfg = tt.suppliedSettings
}
defer func() {
cfg = &config{}
}()
prepareQueriesAndExemptions()
parseResult := parseGraphQLQuery(ctx)
assert.Equal(tt.wantResults.op_type, parseResult.operationType, "Unexpected operation type", tt.name)
assert.Equal(tt.wantResults.op_name, parseResult.operationName, "Unexpected operation name", tt.name)
assert.Equal(tt.wantResults.is_cached, parseResult.cacheRequest, "Unexpected cache value", tt.name)
assert.Equal(tt.wantResults.cached_ttl, parseResult.cacheTime, "Unexpected cache TTL value", tt.name)
assert.Equal(tt.wantResults.shouldBlock, parseResult.shouldBlock, "Unexpected block value", tt.name)
assert.Equal(tt.wantResults.shouldIgnore, parseResult.shouldIgnore, "Unexpected ignore value", tt.name)
assert.Equal(tt.wantResults.op_type, parseResult.operationType, "Unexpected operation type "+tt.name)
assert.Equal(tt.wantResults.op_name, parseResult.operationName, "Unexpected operation name "+tt.name)
assert.Equal(tt.wantResults.is_cached, parseResult.cacheRequest, "Unexpected cache value "+tt.name)
assert.Equal(tt.wantResults.cached_ttl, parseResult.cacheTime, "Unexpected cache TTL value "+tt.name)
assert.Equal(tt.wantResults.shouldBlock, parseResult.shouldBlock, "Unexpected block value "+tt.name)
assert.Equal(tt.wantResults.shouldIgnore, parseResult.shouldIgnore, "Unexpected ignore value "+tt.name)
if tt.wantResults.returnCode > 0 {
assert.Equal(tt.wantResults.returnCode, ctx.Response().StatusCode(), "Unexpected return code", tt.name)
+6
View File
@@ -59,10 +59,16 @@ func (lw *LogConfig) log(w io.Writer, level zerolog.Level, message string, v map
}
func (lw *LogConfig) Debug(message string, v ...map[string]interface{}) {
if !lw.logger.Debug().Enabled() {
return
}
lw.log(os.Stdout, zerolog.DebugLevel, message, mergeMaps(v))
}
func (lw *LogConfig) Info(message string, v ...map[string]interface{}) {
if !lw.logger.Info().Enabled() {
return
}
lw.log(os.Stdout, zerolog.InfoLevel, message, mergeMaps(v))
}
+2 -2
View File
@@ -183,7 +183,7 @@ func (suite *LoggingTestSuite) TestLogConfig_AllHandlers() {
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
suite.Run(tt.name, func() {
if tt.envMinLogLevel != "" {
os.Setenv("LOG_LEVEL", tt.envMinLogLevel)
defer os.Unsetenv("LOG_LEVEL")
@@ -274,7 +274,7 @@ func (suite *LoggingTestSuite) TestFullMessage() {
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
suite.Run(tt.name, func() {
if tt.envMinLogLevel != "" {
os.Setenv("LOG_LEVEL", tt.envMinLogLevel)
defer os.Unsetenv("LOG_LEVEL")
+6
View File
@@ -1,6 +1,7 @@
package main
import (
"flag"
"os"
"strings"
@@ -38,6 +39,7 @@ func parseConfig() {
c.Server.PortGraphQL = getDetailsFromEnv("PORT_GRAPHQL", 8080)
c.Server.PortMonitoring = getDetailsFromEnv("MONITORING_PORT", 9393)
c.Server.HostGraphQL = getDetailsFromEnv("HOST_GRAPHQL", "http://localhost/")
c.Server.HostGraphQLReadOnly = getDetailsFromEnv("HOST_GRAPHQL_READONLY", "")
c.Client.JWTUserClaimPath = getDetailsFromEnv("JWT_USER_CLAIM_PATH", "")
c.Client.JWTRoleClaimPath = getDetailsFromEnv("JWT_ROLE_CLAIM_PATH", "")
c.Client.RoleFromHeader = getDetailsFromEnv("ROLE_FROM_HEADER", "")
@@ -86,3 +88,7 @@ func main() {
StartMonitoringServer()
StartHTTPProxy()
}
func ifNotInTest() bool {
return flag.Lookup("test.v") == nil
}
+17 -3
View File
@@ -4,12 +4,16 @@ import (
"os"
"testing"
"github.com/goccy/go-json"
"github.com/gofiber/fiber/v2"
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
assertions "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type Tests struct {
suite.Suite
app *fiber.App
}
var (
@@ -21,6 +25,16 @@ func (suite *Tests) BeforeTest(suiteName, testName string) {
func (suite *Tests) SetupTest() {
assert = assertions.New(suite.T())
suite.app = fiber.New(
fiber.Config{
DisableStartupMessage: true,
JSONEncoder: json.Marshal,
JSONDecoder: json.Unmarshal,
},
)
parseConfig()
StartMonitoringServer()
cfg.Logger = libpack_logging.NewLogger()
// Setup environment variables here if needed
os.Setenv("GMP_TEST_STRING", "testValue")
os.Setenv("GMP_TEST_INT", "123")
@@ -48,10 +62,10 @@ func TestSuite(t *testing.T) {
func (suite *Tests) Test_envVariableSetting() {
tests := []struct {
name string
envKey string
defaultValue any
expected any
name string
envKey string
}{
{
name: "test_string",
@@ -86,7 +100,7 @@ func (suite *Tests) Test_envVariableSetting() {
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
suite.Run(tt.name, func() {
result := getDetailsFromEnv(tt.envKey, tt.defaultValue)
assert.Equal(tt.expected, result)
})
+1 -1
View File
@@ -5,7 +5,7 @@ import (
)
func StartMonitoringServer() {
cfg.Monitoring = libpack_monitoring.NewMonitoring(cfg.Server.PurgeOnCrawl, cfg.Server.PurgeEvery)
cfg.Monitoring = libpack_monitoring.NewMonitoring(&libpack_monitoring.InitConfig{PurgeOnCrawl: cfg.Server.PurgeOnCrawl, PurgeEvery: cfg.Server.PurgeEvery})
cfg.Monitoring.AddMetricsPrefix("graphql_proxy")
cfg.Monitoring.RegisterDefaultMetrics()
}
+23 -16
View File
@@ -4,6 +4,7 @@
package libpack_monitoring
import (
"flag"
"fmt"
"time"
@@ -17,31 +18,37 @@ import (
type MetricsSetup struct {
metrics_set *metrics.Set
metrics_set_custom *metrics.Set
ic *InitConfig
metrics_prefix string
}
var (
log *logging.LogConfig
purgeMetricsOnCrawl bool
purgeMetricsEvery int
log *logging.LogConfig
)
func NewMonitoring(purgeOnCrawl bool, purgeEvery int) *MetricsSetup {
purgeMetricsOnCrawl = purgeOnCrawl
purgeMetricsEvery = purgeEvery
type InitConfig struct {
PurgeOnCrawl bool
PurgeEvery int
}
func NewMonitoring(ic *InitConfig) *MetricsSetup {
log = logging.NewLogger()
ms := &MetricsSetup{}
ms := &MetricsSetup{ic: ic}
ms.metrics_set = metrics.NewSet()
ms.metrics_set_custom = metrics.NewSet()
go ms.startPrometheusEndpoint()
// if not testing, start the prometheus endpoint
if purgeEvery > 0 {
ticker := time.NewTicker(time.Duration(purgeEvery) * time.Second)
go func() {
for range ticker.C {
ms.PurgeMetrics()
}
}()
if flag.Lookup("test.v") == nil {
go ms.startPrometheusEndpoint()
if ic.PurgeEvery > 0 {
ticker := time.NewTicker(time.Duration(ic.PurgeEvery) * time.Second)
go func() {
for range ticker.C {
ms.PurgeMetrics()
}
}()
}
}
return ms
@@ -63,7 +70,7 @@ func (ms *MetricsSetup) metricsEndpoint(c *fiber.Ctx) error {
ms.metrics_set.WritePrometheus(c.Response().BodyWriter())
ms.metrics_set_custom.WritePrometheus(c.Response().BodyWriter())
if purgeMetricsOnCrawl && purgeMetricsEvery == 0 {
if ms.ic.PurgeOnCrawl && ms.ic.PurgeEvery == 0 {
ms.PurgeMetrics()
}
return nil
+6 -4
View File
@@ -1,8 +1,10 @@
package libpack_monitoring
const (
MetricsSucceeded = "requests_succesful"
MetricsFailed = "requests_failed"
MetricsDuration = "requests_duration"
MetricsSkipped = "requests_skipped"
MetricsSucceeded = "requests_succesful"
MetricsFailed = "requests_failed"
MetricsDuration = "requests_duration"
MetricsSkipped = "requests_skipped"
MetricsExecutedQuery = "executed_query"
MetricsTimedQuery = "timed_query"
)
+14 -8
View File
@@ -28,10 +28,12 @@ func createFasthttpClient(timeout int) *fasthttp.Client {
}
}
func proxyTheRequest(c *fiber.Ctx) error {
func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
if !checkAllowedURLs(c) {
cfg.Logger.Error("Request blocked", map[string]interface{}{"path": c.Path()})
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
c.Status(403).SendString("Request blocked - not allowed URL")
return nil
}
@@ -44,11 +46,13 @@ func proxyTheRequest(c *fiber.Ctx) error {
err := retry.Do(
func() error {
err := proxy.DoRedirects(c, cfg.Server.HostGraphQL+c.Path(), 3, cfg.Client.FastProxyClient)
if err != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": err.Error()})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
return err
errInt := proxy.DoRedirects(c, currentEndpoint+c.Path(), 3, cfg.Client.FastProxyClient)
if errInt != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": errInt.Error()})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
}
return errInt
}
return nil
},
@@ -69,7 +73,9 @@ func proxyTheRequest(c *fiber.Ctx) error {
cfg.Logger.Debug("Received proxied response", map[string]interface{}{"path": c.Path(), "response_body": string(c.Response().Body()), "response_code": c.Response().StatusCode(), "headers": c.GetRespHeaders(), "request_uuid": c.Locals("request_uuid")})
if c.Response().StatusCode() != 200 {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
}
return fmt.Errorf("Received non-200 response from the GraphQL server: %d", c.Response().StatusCode())
}
+97
View File
@@ -0,0 +1,97 @@
package main
import (
"github.com/valyala/fasthttp"
)
func (suite *Tests) Test_proxyTheRequest() {
supplied_headers := map[string]string{
"X-Forwarded-For": "127.0.0.1",
"Content-Type": "application/json",
}
tests := []struct {
headers map[string]string
name string
body string
host string
hostRO string
path string
wantErr bool
}{
{
name: "test_empty",
body: `{"query":"query {\n __type(name: \"Query\") {\n name\n }\n }"}`,
host: "https://telegram-bot.app/",
path: "/v1/graphql",
headers: supplied_headers,
wantErr: false,
},
{
name: "test_wrong_url",
body: `{"query":"query {\n __type(name: \"Query\") {\n name\n }\n }"}`,
host: "https://google.com/",
path: "/v1/wrongURL",
headers: supplied_headers,
wantErr: true,
},
{
name: "Test read only mode",
body: `{"query":"query {\n __type(name: \"Query\") {\n name\n }\n }"}`,
host: "https://google.com/",
hostRO: "https://telegram-bot.app/",
path: "/v1/graphql",
headers: supplied_headers,
wantErr: false,
},
{
name: "Test read only mode wrong host",
body: `{"query":"query {\n __type(name: \"Query\") {\n name\n }\n }"}`,
host: "https://telegram-bot.app/",
hostRO: "https://google.com/",
path: "/v1/graphql",
headers: supplied_headers,
wantErr: true,
},
}
for _, tt := range tests {
suite.Run(tt.name, func() {
cfg = &config{}
parseConfig()
cfg.Server.HostGraphQL = tt.host
if tt.hostRO != "" {
cfg.Server.HostGraphQLReadOnly = tt.hostRO
}
ctx_headers := func() *fasthttp.RequestHeader {
h := fasthttp.RequestHeader{}
for k, v := range tt.headers {
h.Add(k, v)
}
return &h
}()
ctx_request := fasthttp.Request{
Header: *ctx_headers,
}
ctx_request.SetBody([]byte(tt.body))
ctx_request.SetRequestURI(tt.path)
ctx_request.Header.SetMethod("POST")
ctx := suite.app.AcquireCtx(&fasthttp.RequestCtx{
Request: ctx_request,
})
res := parseGraphQLQuery(ctx)
assert.NotNil(ctx, "Fiber context is nil", tt.name)
err := proxyTheRequest(ctx, res.activeEndpoint)
if tt.wantErr {
assert.NotNil(err, "Error is nil", tt.name)
} else {
assert.Nil(err, "Error is not nil", tt.name)
}
})
}
}
+14 -9
View File
@@ -37,7 +37,7 @@ func StartHTTPProxy() {
server.Get("/livez", healthCheck)
server.Post("/*", processGraphQLRequest)
server.Get("/*", proxyTheRequest)
server.Get("/*", proxyTheRequestToDefault)
cfg.Logger.Info("GraphQL query proxy started", map[string]interface{}{"port": cfg.Server.PortGraphQL})
err := server.Listen(fmt.Sprintf(":%d", cfg.Server.PortGraphQL))
@@ -46,6 +46,10 @@ func StartHTTPProxy() {
}
}
func proxyTheRequestToDefault(c *fiber.Ctx) error {
return proxyTheRequest(c, cfg.Server.HostGraphQL)
}
func AddRequestUUID(c *fiber.Ctx) error {
c.Locals("request_uuid", uuid.NewString())
return c.Next()
@@ -118,7 +122,7 @@ func processGraphQLRequest(c *fiber.Ctx) error {
if parsedResult.shouldIgnore {
cfg.Logger.Debug("Request passed as-is - probably not a GraphQL")
return proxyTheRequest(c)
return proxyTheRequest(c, parsedResult.activeEndpoint)
}
if parsedResult.cacheTime > 0 {
@@ -148,14 +152,15 @@ func processGraphQLRequest(c *fiber.Ctx) error {
if cachedResponse := cacheLookup(queryCacheHash); cachedResponse != nil {
cfg.Logger.Debug("Cache hit", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")})
c.Request().Header.Add("X-Cache-Hit", "true")
c.Send(cachedResponse)
wasCached = true
} else {
cfg.Logger.Debug("Cache miss", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")})
proxyAndCacheTheRequest(c, queryCacheHash, parsedResult.cacheTime)
proxyAndCacheTheRequest(c, queryCacheHash, parsedResult.cacheTime, parsedResult.activeEndpoint)
}
} else {
proxyTheRequest(c)
proxyTheRequest(c, parsedResult.activeEndpoint)
}
timeTaken := time.Since(startTime)
@@ -167,8 +172,8 @@ func processGraphQLRequest(c *fiber.Ctx) error {
}
// Additional helper function to avoid code repetition
func proxyAndCacheTheRequest(c *fiber.Ctx, queryCacheHash string, cacheTime int) {
err := proxyTheRequest(c)
func proxyAndCacheTheRequest(c *fiber.Ctx, queryCacheHash string, cacheTime int, currentEndpoint string) {
err := proxyTheRequest(c, currentEndpoint)
if err != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": err.Error()})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
@@ -201,10 +206,10 @@ func logAndMonitorRequest(c *fiber.Ctx, userID, opType, opName string, wasCached
}
cfg.Monitoring.Increment(libpack_monitoring.MetricsSucceeded, nil)
cfg.Monitoring.Increment("executed_query", labels)
cfg.Monitoring.Increment(libpack_monitoring.MetricsExecutedQuery, labels)
if !wasCached {
cfg.Monitoring.UpdateDuration("timed_query", labels, startTime)
cfg.Monitoring.Update("timed_query", labels, float64(duration.Milliseconds()))
cfg.Monitoring.UpdateDuration(libpack_monitoring.MetricsTimedQuery, labels, startTime)
cfg.Monitoring.Update(libpack_monitoring.MetricsTimedQuery, labels, float64(duration.Milliseconds()))
}
}
@@ -0,0 +1,162 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: hasura-w-proxy-internal
labels:
app: hasura-w-proxy-internal
type: support
spec:
replicas: 2
selector:
matchLabels:
app: hasura-w-proxy-internal
type: support
template:
metadata:
labels:
app: hasura-w-proxy-internal
type: support
spec:
securityContext:
runAsUser: 65534 # nobody
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/worker
operator: Exists
containers:
- name: hasura
image: hasura/graphql-engine:v2.33.1-ce
ports:
- name: hasura-internal
containerPort: 8080
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 30
resources:
limits:
cpu: "1"
memory: "640Mi"
requests:
cpu: "0.75"
memory: "512Mi"
env:
- name: HASURA_GRAPHQL_DATABASE_URL
value: postgres://postgres:xxx@yyy:5432/postgres
- name: HASURA_GRAPHQL_ENABLE_CONSOLE
value: "true"
- name: HASURA_GRAPHQL_DEV_MODE
value: "true"
- name: HASURA_GRAPHQL_ENABLE_TELEMETRY
value: "false"
- name: HASURA_GRAPHQL_EXPERIMENTAL_FEATURES
value: "inherited_roles"
- name: HASURA_GRAPHQL_PG_CONNECTIONS
value: "20"
- name: HASURA_GRAPHQL_LOG_LEVEL
value: "error"
- name: hasura-ro
image: hasura/graphql-engine:v2.33.1-ce
ports:
- name: hasura-internal-ro
containerPort: 8088
livenessProbe:
httpGet:
path: /healthz
port: 8088
initialDelaySeconds: 30
resources:
limits:
cpu: "1"
memory: "640Mi"
requests:
cpu: "0.75"
memory: "512Mi"
env:
- name: HASURA_GRAPHQL_DATABASE_URL
value: postgres://postgres:xxx@yyy.read-only:5432/postgres
- name: HASURA_GRAPHQL_ENABLE_CONSOLE
value: "true"
- name: HASURA_GRAPHQL_DEV_MODE
value: "true"
- name: HASURA_GRAPHQL_ENABLE_TELEMETRY
value: "false"
- name: HASURA_GRAPHQL_EXPERIMENTAL_FEATURES
value: "inherited_roles"
- name: HASURA_GRAPHQL_PG_CONNECTIONS
value: "20"
- name: HASURA_GRAPHQL_LOG_LEVEL
value: "error"
- name: HASURA_PORT
value: "8088"
- name: graphql-proxy
image: ghcr.io/lukaszraczylo/graphql-monitoring-proxy:latest
imagePullPolicy: Always
resources:
limits:
cpu: "1"
memory: "640Mi"
requests:
cpu: "0.75"
memory: "128Mi"
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
timeoutSeconds: 5
ports:
- name: web
containerPort: 8181
- name: monitoring
containerPort: 9393
env:
- name: PORT_GRAPHQL
value: "8181"
- name: MONITORING_PORT
value: "9393"
- name: HOST_GRAPHQL
value: http://localhost:8080/
- name: HOST_GRAPHQL_READONLY
value: http://localhost:8088/
- name: ENABLE_GLOBAL_CACHE
value: "true"
- name: CACHE_TTL
value: "10"
---
apiVersion: v1
kind: Service
metadata:
name: hasura-w-proxy-internal
labels:
app: hasura-w-proxy-internal
type: support
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9393"
prometheus.io/path: "/metrics"
spec:
ports:
- name: hasura
port: 8080
targetPort: 8080
- name: hasura-ro
port: 8088
targetPort: 8088
- name: proxy
port: 8181
targetPort: 8181
- name: monitoring
port: 9393
targetPort: 9393
selector:
app: hasura-w-proxy-internal
type: support
type: ClusterIP
+12 -11
View File
@@ -33,16 +33,17 @@ type config struct {
BlockIntrospection bool
}
Server struct {
HostGraphQL string
HealthcheckGraphQL string
AllowURLs []string
PortGraphQL int
PortMonitoring int
ApiPort int
PurgeEvery int
AccessLog bool
ReadOnlyMode bool
EnableApi bool
PurgeOnCrawl bool
HostGraphQL string
HostGraphQLReadOnly string
HealthcheckGraphQL string
AllowURLs []string
PortGraphQL int
PortMonitoring int
ApiPort int
PurgeEvery int
AccessLog bool
ReadOnlyMode bool
EnableApi bool
PurgeOnCrawl bool
}
}