Compare commits

...

6 Commits

7 changed files with 334 additions and 20 deletions
+51 -4
View File
@@ -6,10 +6,10 @@ This project is in active use by [telegram-bot.app](https://telegram-bot.app), a
![Example of monitoring dashboard](static/monitoring-at-glance.png?raw=true)
You can find the example of the Kubernetes manifest in the [example deployment](static/kubernetes-deployment.yaml) file.
- [graphql monitoring proxy](#graphql-monitoring-proxy)
- [Why this project exists](#why-this-project-exists)
- [How to deploy](#how-to-deploy)
- [Note on websocket support](#note-on-websocket-support)
- [Endpoints](#endpoints)
- [Features](#features)
- [Configuration](#configuration)
@@ -26,11 +26,55 @@ You can find the example of the Kubernetes manifest in the [example deployment](
- [Healthcheck](#healthcheck)
- [Monitoring endpoint](#monitoring-endpoint)
### Why this project exists
I wanted to monitor the queries and responses of our graphql endpoint. Still, we didn't want to pay the price of the graphql server itself ( and I will not point fingers at a particular well-known project), as monitoring and basic security features should be a standard, free functionality.
### How to deploy
You can find the example of the Kubernetes manifest in the [example standalone deployment](static/kubernetes-deployment.yaml) or [example combined deployment](static/kubernetes-single-deployment.yaml) files. Observed advantage of multideployment is that it allows the network requests to travel via localhost, without leaving the deployment which brings quite significant network performance boost.
#### Note on websocket support
Proxy in its current version 0.5.30 does not support websockets. If you need to proxy the websocket requests - you can use following trick whilst setting up the proxy. As I'm a big fan of Traefik - there's an example which works with the mentioned above combined deployment.
<details>
<summary>Click to show working Traefik Ingress Route example.</summary>
```yaml
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
name: hasura-internal
spec:
entryPoints:
- websecure
routes:
# NON WEBSOCKET CONNECTION
- kind: Rule
match: Host(`example.com`) && PathPrefix(`/v1/graphql`) && !HeadersRegexp(`Upgrade`, `websocket`)
services:
- name: hasura-w-proxy-internal
port: proxy
middlewares:
- name: compression
namespace: default
# WEBSOCKET CONNECTION
- kind: Rule
match: Host(`example.com`) && PathPrefix(`/v1/graphql`) && HeadersRegexp(`Upgrade`, `websocket`)
services:
- name: hasura-w-proxy-internal
port: hasura
middlewares:
- name: compression
namespace: default
```
In this case, both proxy and websockets will be available under the `/v1/graphql` path, and the websocket connection will be proxied directly to the hasura service, bypassing the proxy.
</details>
### Endpoints
* `:8080/*` - the graphql passthrough endpoint
@@ -78,6 +122,7 @@ I wanted to monitor the queries and responses of our graphql endpoint. Still, we
| `ENABLE_API` | Enable the monitoring API | `false` |
| `API_PORT` | The port to expose the monitoring API | `9090` |
| `BANNED_USERS_FILE` | The path to the file with banned users | `/go/src/app/banned_users.json` |
| `PROXIED_CLIENT_TIMEOUT` | The timeout for the proxied client in seconds | `120` |
### Speed
@@ -89,6 +134,8 @@ You can then start using the cache by setting the `ENABLE_GLOBAL_CACHE` environm
In the case of the `@cached` you can add additional parameters to the directive which will set the cache for specific queries to the provided time.
For example, `query MyCachedQuery @cached(ttl: 90) ....` will set the cache for the query to 90 seconds.
Since version `0.5.30` the cache is gzipped in the memory, which should optimise the memory usage quite significantly.
### Security
#### Role-based rate limiting
@@ -180,7 +227,7 @@ Ban details will be stored in the `banned_users.json` file, which you can mount
#### Healthcheck
If you'd like the `/healthz` endpoint to perform actual check for the connectivity to the graphql endpoint - set the `HEALTHCHECK_GRAPHQL_URL` environment variable to the exact URL of the graphql endpoint. The query executed will be `query { __typename }` and if the response is not `200 OK` - the healthcheck will fail.
If you'd like the `/healthz` endpoint to perform actual check for the connectivity to the graphql endpoint - set the `HEALTHCHECK_GRAPHQL_URL` environment variable to the exact URL of the graphql endpoint. The query executed will be `query { __typename }` and if the response is not `200 OK` - the healthcheck will fail. Remember that the endpoint is a full URL which you'd like to check, so it should include the protocol, host and path - for example `http://localhost:8080/v1/graphql` and it's NOT the same as value of `HOST_GRAPHQL` environment variable which should provide only the host, without path, ending with slash.
#### Monitoring endpoint
+43 -10
View File
@@ -1,6 +1,9 @@
package libpack_cache
import (
"bytes"
"compress/gzip"
"io"
"sync"
"time"
)
@@ -46,15 +49,20 @@ func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
defer c.Unlock()
expiresAt := time.Now().Add(ttl)
// Get a byte slice from the pool and ensure it's properly sized.
b := c.bytePool.Get().([]byte)
if cap(b) < len(value) {
b = make([]byte, len(value))
} else {
b = b[:len(value)]
compressedValue, err := c.compress(value)
if err != nil {
return
}
copy(b, value)
// Get a byte slice from the pool and ensure it's properly sized.
b := c.bytePool.Get().([]byte)
if cap(b) < len(compressedValue) {
b = make([]byte, len(compressedValue))
} else {
b = b[:len(compressedValue)]
}
copy(b, compressedValue)
entry := CacheEntry{
Value: b,
@@ -71,10 +79,12 @@ func (c *Cache) Get(key string) ([]byte, bool) {
if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) {
return nil, false
}
compressedValue := entry.(CacheEntry).Value
value, err := c.decompress(compressedValue)
if err != nil {
return nil, false
}
// Copy the value from the byte slice.
value := make([]byte, len(entry.(CacheEntry).Value))
copy(value, entry.(CacheEntry).Value)
return value, true
}
@@ -110,3 +120,26 @@ func (c *Cache) CleanExpiredEntries() {
return true
})
}
func (c *Cache) compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write(data)
if err != nil {
return nil, err
}
err = w.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (c *Cache) decompress(data []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
+112
View File
@@ -0,0 +1,112 @@
package libpack_cache
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
)
type CacheTestSuite struct {
suite.Suite
}
func (suite *CacheTestSuite) SetupTest() {
}
func TestCachingTestSuite(t *testing.T) {
suite.Run(t, new(CacheTestSuite))
}
func (suite *CacheTestSuite) Test_New() {
suite.T().Run("should return a new cache", func(t *testing.T) {
cache := New(2 * time.Second)
suite.NotNil(cache)
})
}
func (suite *CacheTestSuite) Test_CacheUse() {
cache := New(30 * time.Second)
tests := []struct {
name string
cache_value string
}{
{
name: "test1",
cache_value: "test1-123",
},
{
name: "test2",
cache_value: "test2-123",
},
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
cache.Set(tt.name, []byte(tt.name), 5*time.Second)
c, ok := cache.Get(tt.name)
suite.Equal(true, ok)
suite.Equal(tt.name, string(c))
})
}
}
func (suite *CacheTestSuite) Test_CacheDelete() {
cache := New(30 * time.Second)
tests := []struct {
name string
cache_value string
}{
{
name: "test1",
cache_value: "test1-123",
},
{
name: "test2",
cache_value: "test2-123",
},
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
cache.Set(tt.name, []byte(tt.name), 5*time.Second)
c, ok := cache.Get(tt.name)
suite.Equal(true, ok)
suite.Equal(tt.name, string(c))
cache.Delete(tt.name)
c, ok = cache.Get(tt.name)
suite.Equal(false, ok)
suite.Equal("", string(c))
})
}
}
func (suite *CacheTestSuite) Test_CacheExpire() {
cache := New(30 * time.Second)
tests := []struct {
name string
cache_value string
ttl time.Duration
}{
{
name: "test1",
cache_value: "test1-123",
ttl: 2 * time.Second,
},
{
name: "test2",
cache_value: "test2-123",
ttl: 5 * time.Second,
},
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
cache.Set(tt.name, []byte(tt.name), tt.ttl)
c, ok := cache.Get(tt.name)
suite.Equal(true, ok)
suite.Equal(tt.name, string(c))
time.Sleep(tt.ttl)
c, ok = cache.Get(tt.name)
suite.Equal(false, ok)
suite.Equal("", string(c))
})
}
}
+2 -1
View File
@@ -50,7 +50,8 @@ func parseConfig() {
}
return strings.Split(urls, ",")
}()
c.Client.FastProxyClient = createFasthttpClient()
c.Client.ClientTimeout = envutil.GetInt("PROXIED_CLIENT_TIMEOUT", 120)
c.Client.FastProxyClient = createFasthttpClient(c.Client.ClientTimeout)
c.Server.EnableApi = envutil.GetBool("ENABLE_API", false)
c.Server.ApiPort = envutil.GetInt("API_PORT", 9090)
c.Api.BannedUsersFile = envutil.Getenv("BANNED_USERS_FILE", "/go/src/app/banned_users.json")
+4 -5
View File
@@ -11,17 +11,16 @@ import (
"github.com/valyala/fasthttp"
)
func createFasthttpClient() *fasthttp.Client {
func createFasthttpClient(timeout int) *fasthttp.Client {
return &fasthttp.Client{
Name: "graphql_proxy",
NoDefaultUserAgentHeader: true,
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxConnsPerHost: 100,
MaxIdleConnDuration: 2 * time.Minute,
ReadTimeout: time.Second * 10,
WriteTimeout: time.Second * 10,
MaxConnsPerHost: 200,
ReadTimeout: time.Second * time.Duration(timeout),
WriteTimeout: time.Second * time.Duration(timeout),
DisableHeaderNamesNormalizing: true,
}
}
+121
View File
@@ -0,0 +1,121 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: hasura-w-proxy-internal
labels:
app: hasura-w-proxy-internal
type: support
spec:
replicas: 2
selector:
matchLabels:
app: hasura-w-proxy-internal
type: support
template:
metadata:
labels:
app: hasura-w-proxy-internal
type: support
spec:
securityContext:
runAsUser: 65534 # nobody
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/worker
operator: Exists
containers:
- name: hasura
image: hasura/graphql-engine:v2.33.1-ce
ports:
- name: hasura-internal
containerPort: 8080
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 30
resources:
limits:
cpu: "1"
memory: "640Mi"
requests:
cpu: "0.75"
memory: "512Mi"
env:
- name: HASURA_GRAPHQL_DATABASE_URL
value: postgres://postgres:xxx@yyy:5432/postgres
- name: HASURA_GRAPHQL_ENABLE_CONSOLE
value: "true"
- name: HASURA_GRAPHQL_DEV_MODE
value: "true"
- name: HASURA_GRAPHQL_ENABLE_TELEMETRY
value: "false"
- name: HASURA_GRAPHQL_EXPERIMENTAL_FEATURES
value: "inherited_roles"
- name: HASURA_GRAPHQL_PG_CONNECTIONS
value: "20"
- name: HASURA_GRAPHQL_LOG_LEVEL
value: "error"
- name: graphql-proxy
image: ghcr.io/lukaszraczylo/graphql-monitoring-proxy:latest
imagePullPolicy: Always
resources:
limits:
cpu: "1"
memory: "640Mi"
requests:
cpu: "0.75"
memory: "128Mi"
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
timeoutSeconds: 5
ports:
- name: web
containerPort: 8181
- name: monitoring
containerPort: 9393
env:
- name: PORT_GRAPHQL
value: "8181"
- name: MONITORING_PORT
value: "9393"
- name: HOST_GRAPHQL
value: http://localhost:8080/
- name: ENABLE_GLOBAL_CACHE
value: "true"
- name: CACHE_TTL
value: "10"
---
apiVersion: v1
kind: Service
metadata:
name: hasura-w-proxy-internal
labels:
app: hasura-w-proxy-internal
type: support
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9393"
prometheus.io/path: "/metrics"
spec:
ports:
- name: hasura
port: 8080
targetPort: 8080
- name: proxy
port: 8181
targetPort: 8181
- name: monitoring
port: 9393
targetPort: 9393
selector:
app: hasura-w-proxy-internal
type: support
type: ClusterIP
+1
View File
@@ -34,6 +34,7 @@ type config struct {
GQLClient *graphql.BaseClient
FastProxyClient *fasthttp.Client
proxy string
ClientTimeout int
}
Cache struct {