Compare commits

...

2 Commits

Author SHA1 Message Date
lukaszraczylo 61d7a45d00 Update cache library, use miniredis for testing, add additional benchmarks. (#14)
Update cache library,
Update logging library,
use miniredis for testing, add additional benchmarks.
2024-06-19 23:10:36 +01:00
Chris Clayton 12e4237997 divide long functions, replace strings.builder with bytes.buffer. (#13)
Co-authored-by: Chris Clayton <chris.clayton@contino.io>
2024-06-17 10:23:41 +01:00
36 changed files with 1704 additions and 624 deletions
+73
View File
@@ -0,0 +1,73 @@
name: Test and release
on:
workflow_dispatch:
schedule:
- cron: "0 3 * * *"
env:
GO_VERSION: ">=1.21"
jobs:
# This job is responsible for preparation of the build
# environment variables.
prepare:
name: Preparing build context
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
id: cache
with:
go-version: ${{env.GO_VERSION}}
cache-dependency-path: "**/*.sum"
- name: Go get dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: |
go get ./...
# This job is responsible for running tests and linting the codebase
test:
name: "Unit testing"
# needs: [prepare]
runs-on: ubuntu-latest
container: golang:1
# container: github/super-linter:v4
needs: [prepare]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{env.GO_VERSION}}
cache-dependency-path: "**/*.sum"
- name: Install dependencies
run: |
apt-get update
apt-get install ca-certificates make -y
update-ca-certificates
go mod tidy
get -u -v ./...
go mod tidy -v
- name: Run unit tests
run: |
CI_RUN=${CI} make test
# if go.mod or go.sum have changed then commit the changes to the repository
- name: Commit changes
run: |
git config --global user.email "github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
git add go.mod go.sum
git commit -m "Update go.mod and go.sum"
git push
+14 -19
View File
@@ -47,20 +47,20 @@ jobs:
# container: github/super-linter:v4
needs: [prepare]
services:
# Label used to access the service container
redis:
# Docker Hub image
image: redis
# Set health checks to wait until redis has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
# Maps the container port to the host machine
- 6379:6379
# services:
# # Label used to access the service container
# redis:
# # Docker Hub image
# image: redis
# # Set health checks to wait until redis has started
# options: >-
# --health-cmd "redis-cli ping"
# --health-interval 10s
# --health-timeout 5s
# --health-retries 5
# ports:
# # Maps the container port to the host machine
# - 6379:6379
steps:
- name: Checkout repository
@@ -80,10 +80,5 @@ jobs:
go mod tidy
- name: Run unit tests
env:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_SERVER: "redis:6379"
run: |
export REDIS_SERVER="$REDIS_HOST:$REDIS_PORT"
CI_RUN=${CI} make test
+1
View File
@@ -1,2 +1,3 @@
graphql-proxy
test.sh
banned.json*
+5 -5
View File
@@ -1,9 +1,9 @@
CI_RUN?=false
ADDITIONAL_BUILD_FLAGS=""
# ADDITIONAL_BUILD_FLAGS=""
ifeq ($(CI_RUN), true)
ADDITIONAL_BUILD_FLAGS="-test.short"
endif
# ifeq ($(CI_RUN), true)
# ADDITIONAL_BUILD_FLAGS="-test.short"
# endif
.PHONY: help
help: ## display this help
@@ -19,7 +19,7 @@ build: ## build the binary
.PHONY: test
test: ## run tests on library
@LOG_LEVEL=debug go test $(ADDITIONAL_BUILD_FLAGS) -v -cover ./... -race
@LOG_LEVEL=info go test -v -cover -race ./...
.PHONY: test-packages
test-packages: ## run tests on packages
+84 -23
View File
@@ -8,7 +8,9 @@ import (
"github.com/goccy/go-json"
fiber "github.com/gofiber/fiber/v2"
"github.com/gofrs/flock"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
var bannedUsersIDs map[string]string = make(map[string]string)
@@ -29,7 +31,10 @@ func enableApi() {
go periodicallyReloadBannedUsers()
err := apiserver.Listen(fmt.Sprintf(":%d", cfg.Server.ApiPort))
if err != nil {
cfg.Logger.Critical("Can't start the service", map[string]interface{}{"error": err.Error()})
cfg.Logger.Critical(&libpack_logger.LogMessage{
Message: "Can't start the service",
Pairs: map[string]interface{}{"port": cfg.Server.ApiPort},
})
}
}
}
@@ -37,35 +42,52 @@ func enableApi() {
func periodicallyReloadBannedUsers() {
for {
loadBannedUsers()
cfg.Logger.Debug("Banned users reloaded", map[string]interface{}{"users": bannedUsersIDs})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Banned users reloaded",
Pairs: map[string]interface{}{"users": bannedUsersIDs},
})
<-time.After(10 * time.Second)
}
}
func checkIfUserIsBanned(c *fiber.Ctx, userID string) bool {
_, found := bannedUsersIDs[userID]
cfg.Logger.Debug("Checking if user is banned", map[string]interface{}{"user_id": userID, "found": found})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Checking if user is banned",
Pairs: map[string]interface{}{"user_id": userID, "found": found},
})
if found {
cfg.Logger.Info("User is banned", map[string]interface{}{"user_id": userID})
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "User is banned",
Pairs: map[string]interface{}{"user_id": userID},
})
c.Status(403).SendString("User is banned")
}
return found
}
func apiClearCache(c *fiber.Ctx) error {
cfg.Logger.Debug("Clearing cache via API", nil)
cacheClear()
cfg.Logger.Info("Cache cleared via API", nil)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Clearing cache via API",
Pairs: nil,
})
libpack_cache.CacheClear()
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Cache cleared via API",
Pairs: nil,
})
c.Status(200).SendString("OK: cache cleared")
return nil
}
func apiCacheStats(c *fiber.Ctx) error {
stats := getCacheStats()
cfg.Logger.Debug("Getting cache stats via API", map[string]interface{}{"stats": stats})
stats := libpack_cache.GetCacheStats()
err := c.JSON(stats)
if err != nil {
cfg.Logger.Error("Can't marshal cache stats", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't marshal cache stats",
Pairs: map[string]interface{}{"error": err.Error()},
})
return err
}
return nil
@@ -80,11 +102,17 @@ func apiBanUser(c *fiber.Ctx) error {
var req apiBanUserRequest
err := c.BodyParser(&req)
if err != nil {
cfg.Logger.Error("Can't parse the ban user request", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the ban user request",
Pairs: map[string]interface{}{"error": err.Error()},
})
return err
}
bannedUsersIDs[req.UserID] = req.Reason
cfg.Logger.Info("Banned user", map[string]interface{}{"user_id": req.UserID, "reason": req.Reason})
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Banned user",
Pairs: map[string]interface{}{"user_id": req.UserID, "reason": req.Reason},
})
storeBannedUsers()
c.Status(200).SendString("OK: user banned")
return nil
@@ -94,11 +122,17 @@ func apiUnbanUser(c *fiber.Ctx) error {
var req apiBanUserRequest
err := c.BodyParser(&req)
if err != nil {
cfg.Logger.Error("Can't parse the unban user request", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the unban user request",
Pairs: map[string]interface{}{"error": err.Error()},
})
return err
}
delete(bannedUsersIDs, req.UserID)
cfg.Logger.Info("Unbanned user", map[string]interface{}{"user_id": req.UserID})
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Unbanned user",
Pairs: map[string]interface{}{"user_id": req.UserID},
})
storeBannedUsers()
c.Status(200).SendString("OK: user unbanned")
return nil
@@ -108,34 +142,52 @@ func storeBannedUsers() {
fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
err := fileLock.Lock()
if err != nil {
cfg.Logger.Error("Can't lock the file", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't lock the file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
defer fileLock.Unlock()
data, err := json.Marshal(bannedUsersIDs)
if err != nil {
cfg.Logger.Error("Can't marshal banned users", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't marshal banned users",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
err = os.WriteFile(cfg.Api.BannedUsersFile, data, 0644)
if err != nil {
cfg.Logger.Error("Can't write banned users to file", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't write banned users to file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
}
func loadBannedUsers() {
if _, err := os.Stat(cfg.Api.BannedUsersFile); os.IsNotExist(err) {
cfg.Logger.Info("Banned users file doesn't exist - creating it", map[string]interface{}{"file": cfg.Api.BannedUsersFile})
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Banned users file doesn't exist - creating it",
Pairs: map[string]interface{}{"file": cfg.Api.BannedUsersFile},
})
_, err := os.Create(cfg.Api.BannedUsersFile)
if err != nil {
cfg.Logger.Error("Can't create the file", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't create the file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
// write empty json to the file
err = os.WriteFile(cfg.Api.BannedUsersFile, []byte("{}"), 0644)
if err != nil {
cfg.Logger.Error("Can't write to the file", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't write to the file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
}
@@ -143,19 +195,28 @@ func loadBannedUsers() {
fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
err := fileLock.RLock() // Use RLock for read lock
if err != nil {
cfg.Logger.Error("Can't lock the file [load]", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't lock the file [load]",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
defer fileLock.Unlock()
data, err := os.ReadFile(cfg.Api.BannedUsersFile)
if err != nil {
cfg.Logger.Error("Can't read banned users from file", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't read banned users from file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
err = json.Unmarshal(data, &bannedUsersIDs)
if err != nil {
cfg.Logger.Error("Can't unmarshal banned users", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't unmarshal banned users",
Pairs: map[string]interface{}{"error": err.Error()},
})
return
}
}
-95
View File
@@ -1,95 +0,0 @@
package main
import (
"time"
fiber "github.com/gofiber/fiber/v2"
"github.com/gookit/goutil/strutil"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
)
type CacheStats struct {
CachedQueries int `json:"cached_queries"`
CacheHits int `json:"cache_hits"`
CacheMisses int `json:"cache_misses"`
}
type CacheClient interface {
Set(key string, value []byte, ttl time.Duration)
Get(key string) ([]byte, bool)
Delete(key string)
Clear()
CountQueries() int
}
var (
cacheStats *CacheStats
)
func calculateHash(c *fiber.Ctx) string {
return strutil.Md5(c.Body())
}
func enableCache() {
cacheStats = &CacheStats{}
if shouldUseRedisCache() {
cfg.Logger.Info("Using Redis cache", nil)
cfg.Cache.Client = libpack_redis.NewClient(&libpack_redis.RedisClientConfig{
RedisDB: cfg.Cache.CacheRedisDB,
RedisServer: cfg.Cache.CacheRedisURL,
RedisPassword: cfg.Cache.CacheRedisPassword,
})
} else {
cfg.Logger.Info("Using in-memory cache", nil)
cfg.Cache.Client = libpack_cache.New(time.Duration(cfg.Cache.CacheTTL) * time.Second)
}
}
func cacheLookup(hash string) []byte {
obj, found := cfg.Cache.Client.Get(hash)
if found {
cacheStats.CacheHits++
return obj
}
cacheStats.CacheMisses++
return nil
}
func cacheDelete(hash string) {
cfg.Logger.Debug("Deleting data from cache", map[string]interface{}{"hash": hash})
cacheStats.CachedQueries--
cfg.Cache.Client.Delete(hash)
}
func cacheStore(hash string, data []byte) {
cfg.Logger.Debug("Storing data in cache", map[string]interface{}{"hash": hash})
cacheStats.CachedQueries++
cfg.Cache.Client.Set(hash, data, time.Duration(cfg.Cache.CacheTTL)*time.Second)
}
func cacheStoreWithTTL(hash string, data []byte, ttl time.Duration) {
cfg.Logger.Debug("Storing data in cache with TTL", map[string]interface{}{"hash": hash, "ttl": ttl})
cacheStats.CachedQueries++
cfg.Cache.Client.Set(hash, data, ttl)
}
func cacheGetQueries() int {
cfg.Logger.Debug("Counting cache queries", nil)
return cfg.Cache.Client.CountQueries()
}
func cacheClear() {
cfg.Cache.Client.Clear()
cacheStats = &CacheStats{}
}
func getCacheStats() *CacheStats {
cfg.Logger.Debug("Getting cache stats", nil)
cacheStats.CachedQueries = cacheGetQueries()
return cacheStats
}
func shouldUseRedisCache() bool {
return cfg.Cache.CacheRedisEnable
}
+134
View File
@@ -0,0 +1,134 @@
package libpack_cache
import (
"sync/atomic"
"time"
fiber "github.com/gofiber/fiber/v2"
"github.com/gookit/goutil/strutil"
libpack_cache_memory "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_cache_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
type CacheConfig struct {
Logger *libpack_logger.Logger
Client CacheClient
Redis struct {
Enable bool `json:"enable"`
URL string `json:"url"`
Password string `json:"password"`
DB int `json:"db"`
}
TTL int `json:"ttl"`
}
type CacheStats struct {
CachedQueries int64 `json:"cached_queries"`
CacheHits int64 `json:"cache_hits"`
CacheMisses int64 `json:"cache_misses"`
}
type CacheClient interface {
Set(key string, value []byte, ttl time.Duration)
Get(key string) ([]byte, bool)
Delete(key string)
Clear()
CountQueries() int64
}
var (
cacheStats *CacheStats
config *CacheConfig
)
func CalculateHash(c *fiber.Ctx) string {
return strutil.Md5(c.Body())
}
func EnableCache(cfg *CacheConfig) {
if cfg.Logger == nil {
cfg.Logger = libpack_logger.New()
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Initializing in-module logger",
})
}
cacheStats = &CacheStats{}
if ShouldUseRedisCache(cfg) {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Using Redis cache",
})
cfg.Client = libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisDB: cfg.Redis.DB,
RedisServer: cfg.Redis.URL,
RedisPassword: cfg.Redis.Password,
})
} else {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Using in-memory cache",
})
cfg.Client = libpack_cache_memory.New(time.Duration(cfg.TTL) * time.Second)
}
config = cfg
}
func CacheLookup(hash string) []byte {
obj, found := config.Client.Get(hash)
if found {
atomic.AddInt64(&cacheStats.CacheHits, 1)
return obj
}
atomic.AddInt64(&cacheStats.CacheMisses, 1)
return nil
}
func CacheDelete(hash string) {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Deleting data from cache",
Pairs: map[string]interface{}{"hash": hash},
})
atomic.AddInt64(&cacheStats.CachedQueries, -1)
config.Client.Delete(hash)
}
func CacheStore(hash string, data []byte) {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Storing data in cache",
Pairs: map[string]interface{}{"hash": hash},
})
atomic.AddInt64(&cacheStats.CachedQueries, 1)
config.Client.Set(hash, data, time.Duration(config.TTL)*time.Second)
}
func CacheStoreWithTTL(hash string, data []byte, ttl time.Duration) {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Storing data in cache with TTL",
Pairs: map[string]interface{}{"hash": hash, "ttl": ttl},
})
atomic.AddInt64(&cacheStats.CachedQueries, 1)
config.Client.Set(hash, data, ttl)
}
func CacheGetQueries() int64 {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Counting cache queries",
})
return config.Client.CountQueries()
}
func CacheClear() {
config.Client.Clear()
cacheStats = &CacheStats{}
}
func GetCacheStats() *CacheStats {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Getting cache stats",
})
cacheStats.CachedQueries = CacheGetQueries()
return cacheStats
}
func ShouldUseRedisCache(cfg *CacheConfig) bool {
return cfg.Redis.Enable
}
+107
View File
@@ -0,0 +1,107 @@
package libpack_cache
import (
"testing"
"time"
libpack_cache_memory "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_cache_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
const (
Parallelism = 4
RequestPerSec = 10000
)
func BenchmarkCacheLookupInMemory(b *testing.B) {
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: libpack_cache_memory.New(5 * time.Minute),
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
CacheStore(hash, data)
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheLookup(hash)
}
})
}
func BenchmarkCacheLookupRedis(b *testing.B) {
mockedCache := libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisServer: redisMockServer.Addr(),
RedisDB: 0,
})
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: mockedCache,
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
CacheStore(hash, data)
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheLookup(hash)
}
})
}
func BenchmarkCacheStoreInMemory(b *testing.B) {
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: libpack_cache_memory.New(5 * time.Minute),
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheStore(hash, data)
}
})
}
func BenchmarkCacheStoreRedis(b *testing.B) {
mockedCache := libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisServer: redisMockServer.Addr(),
RedisDB: 0,
})
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: mockedCache,
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheStore(hash, data)
}
})
}
+34
View File
@@ -0,0 +1,34 @@
package libpack_cache
import (
"testing"
"github.com/alicebob/miniredis/v2"
assertions "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type Tests struct {
suite.Suite
}
var (
assert *assertions.Assertions
redisMockServer, _ = miniredis.Run()
)
func (suite *Tests) BeforeTest(suiteName, testName string) {
}
func (suite *Tests) SetupTest() {
cacheStats = &CacheStats{}
assert = assertions.New(suite.T())
}
// TearDownTest is run after each test to clean up
func (suite *Tests) TearDownTest() {
}
func TestSuite(t *testing.T) {
suite.Run(t, new(Tests))
}
+32 -12
View File
@@ -1,11 +1,20 @@
package main
package libpack_cache
import (
"github.com/gookit/goutil/envutil"
libpack_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
"time"
libpack_cache_memory "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_cache_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
func (suite *Tests) Test_cacheLookupInmemory() {
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: libpack_cache_memory.New(5 * time.Minute),
TTL: 5,
}
type args struct {
hash string
}
@@ -40,22 +49,33 @@ func (suite *Tests) Test_cacheLookupInmemory() {
for _, tt := range tests {
suite.Run(tt.name, func() {
if tt.addCache.data != nil {
cacheStore(tt.args.hash, tt.addCache.data)
CacheStore(tt.args.hash, tt.addCache.data)
}
got := cacheLookup(tt.args.hash)
got := CacheLookup(tt.args.hash)
assert.Equal(tt.want, got, "Unexpected cache lookup result")
})
}
}
func (suite *Tests) Test_cacheLookupRedis() {
redis_server := envutil.Getenv("REDIS_SERVER", "localhost:6379")
cfg.Cache.Client = libpack_redis.NewClient(&libpack_redis.RedisClientConfig{
RedisServer: redis_server,
RedisPassword: "",
RedisDB: 0,
// redis_server := envutil.Getenv("REDIS_SERVER", "localhost:6379")
// config.Client = libpack_cache_redis.NewClient(&libpack_cache_redis.RedisClientConfig{
// RedisServer: redis_server,
// RedisPassword: "",
// RedisDB: 0,
// })
mockedCache := libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisServer: redisMockServer.Addr(),
RedisDB: 0,
})
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: mockedCache,
TTL: 5,
}
type args struct {
hash string
}
@@ -90,9 +110,9 @@ func (suite *Tests) Test_cacheLookupRedis() {
for _, tt := range tests {
suite.Run(tt.name, func() {
if tt.addCache.data != nil {
cacheStore(tt.args.hash, tt.addCache.data)
CacheStore(tt.args.hash, tt.addCache.data)
}
got := cacheLookup(tt.args.hash)
got := CacheLookup(tt.args.hash)
assert.Equal(tt.want, got, "Unexpected cache lookup result")
})
}
+10 -45
View File
@@ -1,4 +1,4 @@
package libpack_cache
package libpack_cache_memory
import (
"bytes"
@@ -19,7 +19,7 @@ type Cache struct {
decompressPool sync.Pool
entries sync.Map
globalTTL time.Duration
mu sync.RWMutex // Added sync.RWMutex field for locking
sync.RWMutex
}
func New(globalTTL time.Duration) *Cache {
@@ -52,9 +52,6 @@ func (c *Cache) cleanupRoutine(globalTTL time.Duration) {
}
func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
c.lock()
defer c.unlock()
expiresAt := time.Now().Add(ttl)
compressedValue, err := c.compress(value)
@@ -71,9 +68,6 @@ func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
}
func (c *Cache) Get(key string) ([]byte, bool) {
c.rlock()
defer c.runlock()
entry, ok := c.entries.Load(key)
if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) {
return nil, false
@@ -88,39 +82,32 @@ func (c *Cache) Get(key string) ([]byte, bool) {
}
func (c *Cache) Delete(key string) {
c.lock()
defer c.unlock()
c.entries.Delete(key)
}
func (c *Cache) Clear() {
c.lock()
defer c.unlock()
c.entries.Range(func(key, value interface{}) bool {
c.entries.Delete(key)
return true
})
}
func (c *Cache) CountQueries() int {
c.rlock()
defer c.runlock()
func (c *Cache) CountQueries() int64 {
var count int
c.entries.Range(func(_, _ interface{}) bool {
count++
return true
})
return count
return int64(count)
}
func (c *Cache) compress(data []byte) ([]byte, error) {
w := c.compressPool.Get().(*gzip.Writer)
defer c.compressPool.Put(w)
var buf bytes.Buffer
w := c.compressPool.Get().(*gzip.Writer)
defer func() {
w.Close()
c.compressPool.Put(w)
}()
w.Reset(&buf)
if _, err := w.Write(data); err != nil {
return nil, err
@@ -149,11 +136,7 @@ func (c *Cache) decompress(data []byte) ([]byte, error) {
c.decompressPool.Put(r)
}()
decompressedData, err := io.ReadAll(r)
if err != nil {
return nil, err
}
return decompressedData, nil
return io.ReadAll(r)
}
func (c *Cache) CleanExpiredEntries() {
@@ -166,21 +149,3 @@ func (c *Cache) CleanExpiredEntries() {
return true
})
}
// Private methods to handle locking
func (c *Cache) lock() {
c.mu.Lock()
}
func (c *Cache) unlock() {
c.mu.Unlock()
}
func (c *Cache) rlock() {
c.mu.RLock()
}
func (c *Cache) runlock() {
c.mu.RUnlock()
}
@@ -1,4 +1,4 @@
package libpack_cache
package libpack_cache_memory
import (
"testing"
@@ -7,7 +7,7 @@ import (
// Assume that New function initializes the cache and it is defined somewhere in the libpack_cache package.
func BenchmarkCacheSet(b *testing.B) {
func BenchmarkMemCacheSet(b *testing.B) {
cache := New(30 * time.Second) // Initializing the cache with a TTL of 30 seconds
key := "benchmark-key"
value := []byte("benchmark-value")
@@ -19,7 +19,7 @@ func BenchmarkCacheSet(b *testing.B) {
}
}
func BenchmarkCacheGet(b *testing.B) {
func BenchmarkMemCacheGet(b *testing.B) {
cache := New(30 * time.Second) // Initializing the cache
key := "benchmark-key"
value := []byte("benchmark-value")
@@ -32,7 +32,7 @@ func BenchmarkCacheGet(b *testing.B) {
}
}
func BenchmarkCacheExpire(b *testing.B) {
func BenchmarkMemCacheExpire(b *testing.B) {
key := "benchmark-expire-key"
value := []byte("benchmark-value")
ttl := 5 * time.Millisecond // Setting a short TTL for quick expiration
@@ -45,7 +45,7 @@ func BenchmarkCacheExpire(b *testing.B) {
}
}
func BenchmarkCacheStats(b *testing.B) {
func BenchmarkMemCacheStats(b *testing.B) {
cache := New(30 * time.Second) // Initializing the cache
key := "benchmark-key"
value := []byte("benchmark-value")
+8 -8
View File
@@ -1,4 +1,4 @@
package libpack_cache
package libpack_cache_memory
import (
"testing"
@@ -7,25 +7,25 @@ import (
"github.com/stretchr/testify/suite"
)
type CacheTestSuite struct {
type MemoryTestSuite struct {
suite.Suite
}
func (suite *CacheTestSuite) SetupTest() {
func (suite *MemoryTestSuite) SetupTest() {
}
func TestCachingTestSuite(t *testing.T) {
suite.Run(t, new(CacheTestSuite))
suite.Run(t, new(MemoryTestSuite))
}
func (suite *CacheTestSuite) Test_New() {
func (suite *MemoryTestSuite) Test_New() {
suite.T().Run("should return a new cache", func(t *testing.T) {
cache := New(2 * time.Second)
suite.NotNil(cache)
})
}
func (suite *CacheTestSuite) Test_CacheUse() {
func (suite *MemoryTestSuite) Test_CacheUse() {
cache := New(30 * time.Second)
tests := []struct {
name string
@@ -50,7 +50,7 @@ func (suite *CacheTestSuite) Test_CacheUse() {
}
}
func (suite *CacheTestSuite) Test_CacheDelete() {
func (suite *MemoryTestSuite) Test_CacheDelete() {
cache := New(30 * time.Second)
tests := []struct {
name string
@@ -79,7 +79,7 @@ func (suite *CacheTestSuite) Test_CacheDelete() {
}
}
func (suite *CacheTestSuite) Test_CacheExpire() {
func (suite *MemoryTestSuite) Test_CacheExpire() {
cache := New(30 * time.Second)
tests := []struct {
name string
-77
View File
@@ -1,77 +0,0 @@
package libpack_redis
import (
"context"
"time"
redis "github.com/redis/go-redis/v9"
)
var ()
type RedisConfig struct {
client *redis.Client
ctx context.Context
}
func prependKeyName(key string) string {
return "gmp_cache:" + key
}
type RedisClientConfig struct {
RedisServer string
RedisPassword string
RedisDB int
}
func NewClient(redisClientConfig *RedisClientConfig) *RedisConfig {
c := &RedisConfig{
client: redis.NewClient(&redis.Options{
Addr: redisClientConfig.RedisServer,
Password: redisClientConfig.RedisPassword,
DB: redisClientConfig.RedisDB,
}),
ctx: context.Background(),
}
_, err := c.client.Ping(c.ctx).Result()
if err != nil {
panic(err)
}
return c
}
func (c *RedisConfig) Set(key string, value []byte, ttl time.Duration) {
c.client.Set(c.ctx, prependKeyName(key), value, ttl)
}
func (c *RedisConfig) Get(key string) ([]byte, bool) {
val, err := c.client.Get(c.ctx, prependKeyName(key)).Result()
if err == redis.Nil || err != nil {
return nil, false
}
return []byte(val), true
}
func (c *RedisConfig) Delete(key string) {
c.client.Del(c.ctx, prependKeyName(key))
}
func (c *RedisConfig) Clear() {
c.client.FlushDB(c.ctx)
}
func (c *RedisConfig) CountQueries() int {
keys, err := c.client.Keys(c.ctx, prependKeyName("*")).Result()
if err != nil {
return 0
}
return len(keys)
}
func (c *RedisConfig) CountQueriesWithPattern(pattern string) int {
keys, err := c.client.Keys(c.ctx, prependKeyName(pattern)).Result()
if err != nil {
return 0
}
return len(keys)
}
+96
View File
@@ -0,0 +1,96 @@
package libpack_cache_redis
import (
"context"
"strings"
"time"
"sync"
redis "github.com/redis/go-redis/v9"
)
type RedisConfig struct {
client *redis.Client
ctx context.Context
prefix string
builderPool *sync.Pool
}
func (c *RedisConfig) prependKeyName(key string) string {
builder := c.builderPool.Get().(*strings.Builder)
defer c.builderPool.Put(builder)
builder.Reset()
builder.WriteString(c.prefix)
builder.WriteString(key)
return builder.String()
}
type RedisClientConfig struct {
RedisServer string
RedisPassword string
RedisDB int
Prefix string
}
func New(redisClientConfig *RedisClientConfig) *RedisConfig {
c := &RedisConfig{
client: redis.NewClient(&redis.Options{
Addr: redisClientConfig.RedisServer,
Password: redisClientConfig.RedisPassword,
DB: redisClientConfig.RedisDB,
}),
ctx: context.Background(),
prefix: redisClientConfig.Prefix,
builderPool: &sync.Pool{
New: func() interface{} {
return &strings.Builder{}
},
},
}
_, err := c.client.Ping(c.ctx).Result()
if err != nil {
panic(err)
}
return c
}
func (c *RedisConfig) Set(key string, value []byte, ttl time.Duration) {
c.client.Set(c.ctx, c.prependKeyName(key), value, ttl)
}
func (c *RedisConfig) Get(key string) ([]byte, bool) {
val, err := c.client.Get(c.ctx, c.prependKeyName(key)).Result()
if err == redis.Nil {
return nil, false
}
if err != nil {
return nil, false
}
return []byte(val), true
}
func (c *RedisConfig) Delete(key string) {
c.client.Del(c.ctx, c.prependKeyName(key))
}
func (c *RedisConfig) Clear() {
c.client.FlushDB(c.ctx)
}
func (c *RedisConfig) CountQueries() int64 {
keys, err := c.client.Keys(c.ctx, c.prependKeyName("*")).Result()
if err != nil {
return 0
}
return int64(len(keys))
}
func (c *RedisConfig) CountQueriesWithPattern(pattern string) int {
keys, err := c.client.Keys(c.ctx, c.prependKeyName(pattern)).Result()
if err != nil {
return 0
}
return len(keys)
}
+22 -17
View File
@@ -1,23 +1,24 @@
package libpack_redis
package libpack_cache_redis
import (
"testing"
"time"
"github.com/gookit/goutil/envutil"
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type RedisConfigSuite struct {
suite.Suite
redisConfig *RedisConfig
redisConfig *RedisConfig
redis_server *miniredis.Miniredis
}
func (suite *RedisConfigSuite) SetupTest() {
redis_server := envutil.Getenv("REDIS_SERVER", "localhost:6379")
suite.redisConfig = NewClient(&RedisClientConfig{
RedisServer: redis_server,
suite.redis_server, _ = miniredis.Run()
suite.redisConfig = New(&RedisClientConfig{
RedisServer: suite.redis_server.Addr(),
RedisPassword: "",
RedisDB: 0,
})
@@ -29,7 +30,7 @@ func TestRedisConfigSuite(t *testing.T) {
}
func (suite *RedisConfigSuite) TestSet() {
key := "testkey"
key := "testkeyset"
value := []byte("testvalue")
suite.redisConfig.Delete(key) // Ensure the key is deleted before the test
@@ -50,9 +51,9 @@ func (suite *RedisConfigSuite) TestSet() {
}
func (suite *RedisConfigSuite) TestSetWithExpiry() {
key := "testkey"
value := []byte("testvalue")
expiry := 1 * time.Second
key := "testkey_with_expiry"
value := []byte("testvaluewithexpiry")
expiry := 2 * time.Second
suite.redisConfig.Delete(key) // Ensure the key is deleted before the test
// Test writing a new key-value pair
@@ -60,17 +61,19 @@ func (suite *RedisConfigSuite) TestSetWithExpiry() {
storedValue, found := suite.redisConfig.Get(key)
assert.True(suite.T(), found)
assert.Equal(suite.T(), value, storedValue)
_, found = suite.redisConfig.Get(key)
assert.True(suite.T(), found, "Key should exist")
// Test that key expires after the specified time
time.Sleep(2 * time.Second)
suite.redis_server.FastForward(3 * time.Second)
_, found = suite.redisConfig.Get(key)
assert.False(suite.T(), found)
assert.False(suite.T(), found, "Key should have expired after 2 seconds")
suite.redisConfig.Delete(key) // Clean up after the test
}
func (suite *RedisConfigSuite) TestGet() {
key := "testkey"
key := "testkeyget"
value := []byte("testvalue")
suite.redisConfig.Set(key, value, 0) // Set the key-value pair
storedValue, found := suite.redisConfig.Get(key)
@@ -79,7 +82,7 @@ func (suite *RedisConfigSuite) TestGet() {
}
func (suite *RedisConfigSuite) TestDeleteKey() {
key := "testkey"
key := "testkeydelete"
value := []byte("testvalue")
suite.redisConfig.Set(key, value, 0) // Set the key-value pair
suite.redisConfig.Delete(key)
@@ -89,7 +92,7 @@ func (suite *RedisConfigSuite) TestDeleteKey() {
func (suite *RedisConfigSuite) TestCheckIfKeyExists() {
ttl := time.Duration(10) * time.Second
key := "testkey"
key := "testkeyifexists"
value := []byte("testvalue")
suite.redisConfig.Set(key, value, ttl) // Set the key-value pair
_, found := suite.redisConfig.Get(key)
@@ -106,8 +109,8 @@ func (suite *RedisConfigSuite) TestGetKeys() {
suite.redisConfig.Set("testkey2", []byte("testvalue2"), ttl)
suite.redisConfig.Set("otherkey", []byte("othervalue"), ttl)
keys, _ := suite.redisConfig.client.Keys(suite.redisConfig.ctx, prependKeyName("testkey*")).Result()
expectedKeys := []string{prependKeyName("testkey1"), prependKeyName("testkey2")}
keys, _ := suite.redisConfig.client.Keys(suite.redisConfig.ctx, "testkey*").Result()
expectedKeys := []string{"testkey1", "testkey2"}
assert.ElementsMatch(suite.T(), expectedKeys, keys)
suite.redisConfig.client.Del(suite.redisConfig.ctx, "testkey1", "testkey2", "otherkey")
@@ -120,6 +123,8 @@ func (suite *RedisConfigSuite) TestGetKeysCount() {
suite.redisConfig.Set("otherkey", []byte("othervalue"), ttl)
assert.Equal(suite.T(), 2, suite.redisConfig.CountQueriesWithPattern("testkey*"))
assert.Equal(suite.T(), 1, suite.redisConfig.CountQueriesWithPattern("otherkey*"))
assert.Equal(suite.T(), int64(3), suite.redisConfig.CountQueries())
suite.redisConfig.client.Del(suite.redisConfig.ctx, "testkey1", "testkey2", "otherkey")
}
+5 -1
View File
@@ -7,6 +7,7 @@ import (
"github.com/goccy/go-json"
"github.com/lukaszraczylo/ask"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
)
@@ -15,7 +16,10 @@ func extractClaimsFromJWTHeader(authorization string) (usr string, role string)
handleError := func(msg string, details map[string]interface{}) {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
cfg.Logger.Error(msg, details)
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: msg,
Pairs: details,
})
}
tokenParts := strings.Split(authorization, ".")
+25 -6
View File
@@ -6,27 +6,40 @@ import (
"time"
"github.com/jackc/pgx/v5"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
func enableHasuraEventCleaner() {
if cfg.HasuraEventCleaner.Enable {
if cfg.HasuraEventCleaner.EventMetadataDb == "" {
cfg.Logger.Warning("Event metadata db URL not specified, event cleaner not active", nil)
cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Event metadata db URL not specified, event cleaner not active",
Pairs: nil,
})
return
}
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
cfg.Logger.Info("Event cleaner enabled", map[string]interface{}{"interval_in_days": cfg.HasuraEventCleaner.ClearOlderThan})
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Event cleaner enabled",
Pairs: map[string]interface{}{"interval_in_days": cfg.HasuraEventCleaner.ClearOlderThan},
})
time.Sleep(60 * time.Second) // wait for everything to start and settle down
cfg.Logger.Info("Initial cleanup of old events", nil)
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Initial cleanup of old events",
Pairs: nil,
})
cleanEvents()
for {
select {
case <-ticker.C:
cfg.Logger.Info("Cleaning up old events", nil)
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Cleaning up old events",
Pairs: nil,
})
cleanEvents()
}
}
@@ -36,7 +49,10 @@ func enableHasuraEventCleaner() {
func cleanEvents() {
conn, err := pgx.Connect(context.Background(), cfg.HasuraEventCleaner.EventMetadataDb)
if err != nil {
cfg.Logger.Error("Failed to connect to event metadata db", map[string]interface{}{"error": err})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Failed to connect to event metadata db",
Pairs: map[string]interface{}{"error": err},
})
return
}
defer conn.Close(context.Background())
@@ -52,7 +68,10 @@ func cleanEvents() {
for _, query := range delQueries {
_, err := conn.Exec(context.Background(), query)
if err != nil {
cfg.Logger.Debug("Failed to execute query", map[string]interface{}{"query": query, "error": err})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Failed to execute query",
Pairs: map[string]interface{}{"query": query, "error": err},
})
}
}
}
+6 -3
View File
@@ -4,6 +4,7 @@ go 1.21
require (
github.com/VictoriaMetrics/metrics v1.33.1
github.com/alicebob/miniredis/v2 v2.33.0
github.com/avast/retry-go/v4 v4.6.0
github.com/goccy/go-json v0.10.3
github.com/gofiber/fiber/v2 v2.52.4
@@ -16,12 +17,12 @@ require (
github.com/lukaszraczylo/go-ratecounter v0.1.8
github.com/lukaszraczylo/go-simple-graphql v1.2.14
github.com/redis/go-redis/v9 v9.5.3
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.9.0
github.com/valyala/fasthttp v1.54.0
github.com/valyala/fasthttp v1.55.0
)
require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@@ -37,13 +38,15 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
+10 -4
View File
@@ -1,5 +1,9 @@
github.com/VictoriaMetrics/metrics v1.33.1 h1:CNV3tfm2Kpv7Y9W3ohmvqgFWPR55tV2c7M2U6OIo+UM=
github.com/VictoriaMetrics/metrics v1.33.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
@@ -82,8 +86,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0=
github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8=
github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
@@ -92,10 +96,12 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+25 -6
View File
@@ -8,6 +8,7 @@ import (
fiber "github.com/gofiber/fiber/v2"
"github.com/graphql-go/graphql/language/ast"
"github.com/graphql-go/graphql/language/parser"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
)
@@ -71,7 +72,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
m := make(map[string]interface{})
err := json.Unmarshal(c.Body(), &m)
if err != nil {
cfg.Logger.Error("Can't unmarshal the request", map[string]interface{}{"error": err.Error(), "body": string(c.Body())})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't unmarshal the request",
Pairs: map[string]interface{}{"error": err.Error(), "body": string(c.Body())},
})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
@@ -80,7 +84,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
// get the query
query, ok := m["query"].(string)
if !ok {
cfg.Logger.Error("Can't find the query", map[string]interface{}{"query": query, "m_val": m})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't find the query",
Pairs: map[string]interface{}{"m_val": m},
})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
@@ -89,7 +96,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
p, err := parser.Parse(parser.ParseParams{Source: query})
if err != nil {
cfg.Logger.Error("Can't parse the query", map[string]interface{}{"query": query, "m_val": m})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the query",
Pairs: map[string]interface{}{"query": query, "m_val": m},
})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
}
@@ -115,7 +125,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
}
if res.operationType == "mutation" && cfg.Server.ReadOnlyMode {
cfg.Logger.Warning("Mutation blocked", m)
cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Mutation blocked",
Pairs: map[string]interface{}{"query": query},
})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
@@ -131,7 +144,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
if arg.Name.Value == "ttl" {
res.cacheTime, err = strconv.Atoi(arg.Value.GetValue().(string))
if err != nil {
cfg.Logger.Error("Can't parse the ttl, using global", map[string]interface{}{"bad_ttl": arg.Value.GetValue().(string)})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the ttl, using global",
Pairs: map[string]interface{}{"bad_ttl": arg.Value.GetValue().(string)},
})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
}
@@ -184,7 +200,10 @@ func checkIfContainsIntrospection(c *fiber.Ctx, whatever string) (shouldBlock bo
if len(cfg.Security.IntrospectionAllowed) > 0 {
if _, allowed_exists := introspectionAllowedQueries[whateverLower]; allowed_exists {
cfg.Logger.Debug("Introspection query allowed, passing through", map[string]interface{}{"query": whatever})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Introspection query allowed, passing through",
Pairs: map[string]interface{}{"query": whatever},
})
got_exemption = true
shouldBlock = false
}
+204
View File
@@ -0,0 +1,204 @@
package libpack_logger
import (
"bytes"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/goccy/go-json"
)
const (
_ = iota
LEVEL_DEBUG
LEVEL_INFO
LEVEL_WARN
LEVEL_ERROR
LEVEL_FATAL
)
var LevelNames = [...]string{
"none",
"debug",
"info",
"warn",
"error",
"fatal",
}
const (
defaultFormat = time.RFC3339
defaultMinLevel = LEVEL_INFO
defaultShowCaller = false
)
var defaultOutput = os.Stdout
type Logger struct {
format string
minLogLevel int
showCaller bool
output io.Writer
}
type LogMessage struct {
Message string
Pairs map[string]any
output io.Writer
}
func (m *LogMessage) String() string {
return m.Message
}
var fieldNames = map[string]string{
"timestamp": "timestamp",
"level": "level",
"message": "message",
}
func New() *Logger {
return &Logger{
format: defaultFormat,
minLogLevel: defaultMinLevel,
output: defaultOutput,
showCaller: defaultShowCaller,
}
}
func (l *Logger) SetOutput(output io.Writer) *Logger {
l.output = output
return l
}
var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
var defaultPairs = make(map[string]any)
func GetLogLevel(level string) int {
for i, name := range LevelNames {
if name == strings.ToLower(level) {
return i
}
}
return defaultMinLevel
}
func (l *Logger) log(level int, m *LogMessage) {
if m.Pairs == nil {
m.Pairs = defaultPairs
}
m.Pairs[fieldNames["timestamp"]] = time.Now().Format(l.format)
m.Pairs[fieldNames["level"]] = LevelNames[level]
m.Pairs[fieldNames["message"]] = m.Message
if l.showCaller {
m.Pairs["caller"] = getCaller()
}
buffer := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buffer)
buffer.Reset()
var encoder = json.NewEncoder(buffer)
err := encoder.Encode(m.Pairs)
if err != nil {
fmt.Println("Error marshalling log message:", err)
return
}
// if not running in test - use stderr and stdout, otherwise - use logger's output setting
if flag.Lookup("test.v") != nil {
m.output = os.Stdout
if level >= LEVEL_ERROR {
m.output = os.Stderr
}
}
// Use logger's output setting instead of os.Stdout or os.Stderr
l.output.Write(buffer.Bytes())
}
func (l *Logger) Debug(m *LogMessage) {
if l.shouldLog(LEVEL_DEBUG) {
l.log(LEVEL_DEBUG, m)
}
}
func (l *Logger) Info(m *LogMessage) {
if l.shouldLog(LEVEL_INFO) {
l.log(LEVEL_INFO, m)
}
}
func (l *Logger) Warn(m *LogMessage) {
if l.shouldLog(LEVEL_WARN) {
l.log(LEVEL_WARN, m)
}
}
func (l *Logger) Warning(m *LogMessage) {
l.Warn(m)
}
func (l *Logger) Error(m *LogMessage) {
if l.shouldLog(LEVEL_ERROR) {
l.log(LEVEL_ERROR, m)
}
}
func (l *Logger) Fatal(m *LogMessage) {
if l.shouldLog(LEVEL_FATAL) {
l.log(LEVEL_FATAL, m)
}
}
func (l *Logger) Critical(m *LogMessage) {
l.Fatal(m)
os.Exit(1)
}
func (l *Logger) shouldLog(level int) bool {
return level >= l.minLogLevel
}
func (l *Logger) SetFormat(format string) *Logger {
l.format = format
return l
}
func (l *Logger) SetMinLogLevel(level int) *Logger {
l.minLogLevel = level
return l
}
func (l *Logger) SetFieldName(field, name string) *Logger {
fieldNames[field] = name
return l
}
func (l *Logger) SetShowCaller(show bool) *Logger {
l.showCaller = show
return l
}
func getCaller() string {
_, file, line, ok := runtime.Caller(3)
if !ok {
return "unknown:0"
}
file = filepath.Base(file)
return fmt.Sprintf("%s:%d", file, line)
}
+140
View File
@@ -0,0 +1,140 @@
package libpack_logger
import (
"bytes"
"testing"
"time"
)
func Benchmark_NewLogger(b *testing.B) {
type triggers struct {
ModFormat struct {
Format string
}
ModLevel struct {
Level int
}
}
tests := []struct {
name string
triggers triggers
}{
{
name: "BenchmarkNew",
},
{
name: "BenchmarkNewChangeTimeFormat",
triggers: triggers{
ModFormat: struct{ Format string }{
Format: time.RFC3339Nano,
},
},
},
{
name: "BenchmarkNewChangeLogLevel",
triggers: triggers{
ModLevel: struct{ Level int }{
Level: LEVEL_DEBUG,
},
},
},
{
name: "BenchmarkNewChangeTimeFormatAndLogLevel",
triggers: triggers{
ModFormat: struct{ Format string }{
Format: time.RFC3339Nano,
},
ModLevel: struct{ Level int }{
Level: LEVEL_DEBUG,
},
},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
got := New()
if tt.triggers.ModFormat.Format != "" {
got = got.SetFormat(tt.triggers.ModFormat.Format)
}
if tt.triggers.ModLevel.Level != 0 {
got = got.SetMinLogLevel(tt.triggers.ModLevel.Level)
}
}
})
}
}
func Benchmark_Log_Debug(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_DEBUG).SetOutput(output)
msg := &LogMessage{
Message: "debug message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Debug(msg)
}
}
func Benchmark_Log_Info(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_INFO).SetOutput(output)
msg := &LogMessage{
Message: "info message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Info(msg)
}
}
func Benchmark_Log_Warn(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_WARN).SetOutput(output)
msg := &LogMessage{
Message: "warn message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Warn(msg)
}
}
func Benchmark_Log_Error(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_ERROR).SetOutput(output)
msg := &LogMessage{
Message: "error message",
Pairs: map[string]any{"key": "value"},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Error(msg)
}
}
func Benchmark_Log_Fatal(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_FATAL).SetOutput(output)
msg := &LogMessage{
Message: "fatal message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Fatal(msg)
}
}
+31
View File
@@ -0,0 +1,31 @@
package libpack_logger
import (
"testing"
assertions "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type LoggerTestSuite struct {
suite.Suite
}
var (
assert *assertions.Assertions
)
func (suite *LoggerTestSuite) BeforeTest(suiteName, testName string) {
}
func (suite *LoggerTestSuite) SetupTest() {
assert = assertions.New(suite.T())
}
// TearDownTest is run after each test to clean up
func (suite *LoggerTestSuite) TearDownTest() {
}
func TestSuite(t *testing.T) {
suite.Run(t, new(LoggerTestSuite))
}
+182
View File
@@ -0,0 +1,182 @@
package libpack_logger
import (
"bytes"
"fmt"
"os"
"reflect"
"testing"
"time"
"github.com/goccy/go-json"
)
func captureStderr(f func()) string {
originalStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
f()
w.Close()
var buf bytes.Buffer
buf.ReadFrom(r)
os.Stderr = originalStderr
return buf.String()
}
func captureStdOut(f func()) string {
originalStdout := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w
f()
w.Close()
var buf bytes.Buffer
buf.ReadFrom(r)
os.Stdout = originalStdout
return buf.String()
}
func (suite *LoggerTestSuite) Test_LogMessageString() {
msg := &LogMessage{
Message: "test message",
}
assert.Equal("test message", msg.String())
}
func callLoggerMethod(logger *Logger, methodName string, message *LogMessage) {
// Get the method by name using reflection
method := reflect.ValueOf(logger).MethodByName(methodName)
if method.IsValid() {
// Call the method with the message as an argument
method.Call([]reflect.Value{reflect.ValueOf(message)})
} else {
fmt.Printf("Method %s does not exist on Logger\n", methodName)
}
}
func (suite *LoggerTestSuite) Test_LogsLevelsPrint() {
output := &bytes.Buffer{}
logger := New().SetOutput(output)
tests := []struct {
name string
method string
loggerMinLevel int
messageLogLevel int
message string
pairs map[string]any
wantOutput bool // Whether we expect output to be written
}{
{
name: "Log: Debug, Level: Debug - no pairs",
method: "Debug",
loggerMinLevel: LEVEL_DEBUG,
messageLogLevel: LEVEL_DEBUG,
message: "debug message",
wantOutput: true,
},
{
name: "Log: Info, Level: Info - one pair",
method: "Info",
loggerMinLevel: LEVEL_INFO,
messageLogLevel: LEVEL_INFO,
message: "info message",
pairs: map[string]any{
"key": "value",
},
wantOutput: true,
},
{
name: "Log: Info, Level: Warn - with pairs",
method: "Info",
loggerMinLevel: LEVEL_WARN,
messageLogLevel: LEVEL_INFO,
message: "warn message",
pairs: map[string]any{
"key1": "value1",
"key2": "value2",
},
wantOutput: false,
},
{
name: "Log: Warn, Level: Info - with 500 pairs",
method: "Warn",
loggerMinLevel: LEVEL_INFO,
messageLogLevel: LEVEL_WARN,
message: "warn message with 500 pairs",
pairs: func() map[string]any {
pairs := make(map[string]any)
for i := 0; i < 500; i++ {
pairs[fmt.Sprintf("key%d", i)] = fmt.Sprintf("value%d", i)
}
return pairs
}(),
wantOutput: true,
},
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
msg := &LogMessage{
Message: tt.message,
Pairs: tt.pairs,
}
output.Reset()
// Set logger's minimum log level
logger.SetMinLogLevel(tt.loggerMinLevel)
fmt.Println("Logger min log level:", LevelNames[logger.minLogLevel])
// Call the logging method
callLoggerMethod(logger, tt.method, msg)
logOutput := output.String()
fmt.Println("Output:", logOutput)
if tt.wantOutput {
var loggedMessage map[string]any
err := json.Unmarshal([]byte(logOutput), &loggedMessage)
if err != nil {
t.Fatalf("Error unmarshalling log message: %v\nLog output: %s", err, logOutput)
}
if !containsLogMessage(logOutput, tt.message) {
t.Errorf("Expected log message %q, but got %q", tt.message, logOutput)
}
assert.Equal(LevelNames[tt.messageLogLevel], loggedMessage["level"])
if tt.pairs != nil {
for k, v := range tt.pairs {
assert.Equal(v, loggedMessage[k])
}
}
} else {
assert.Equal("", logOutput)
}
})
}
}
func containsLogMessage(logOutput, expectedMessage string) bool {
return bytes.Contains([]byte(logOutput), []byte(expectedMessage))
}
func (suite *LoggerTestSuite) Test_SetFormat() {
logger := New().SetFormat(time.RFC3339Nano)
assert.Equal(time.RFC3339Nano, logger.format)
}
func (suite *LoggerTestSuite) Test_SetMinLogLevel() {
logger := New().SetMinLogLevel(LEVEL_DEBUG)
assert.Equal(LEVEL_DEBUG, logger.minLogLevel)
}
func (suite *LoggerTestSuite) Test_ShouldLog() {
logger := New().SetMinLogLevel(LEVEL_WARN)
assert.True(logger.shouldLog(LEVEL_WARN))
assert.True(logger.shouldLog(LEVEL_ERROR))
assert.False(logger.shouldLog(LEVEL_INFO))
assert.False(logger.shouldLog(LEVEL_DEBUG))
}
-123
View File
@@ -1,123 +0,0 @@
package libpack_logging
import (
"io"
"os"
"sync"
"time"
"github.com/gookit/goutil/envutil"
"github.com/rs/zerolog"
)
type LogConfig struct {
logger zerolog.Logger
}
var (
baseLogger zerolog.Logger
eventPool = sync.Pool{
New: func() interface{} {
return new(zerolog.Event)
},
}
fieldMapPool = sync.Pool{
New: func() interface{} {
return make(map[string]interface{})
},
}
)
func init() {
zerolog.TimeFieldFormat = time.RFC3339
zerolog.MessageFieldName = "short_message"
zerolog.TimestampFieldName = "timestamp"
zerolog.LevelFieldName = "level"
zerolog.LevelFatalValue = "critical"
baseLogger = zerolog.New(os.Stdout).With().Timestamp().Logger()
switch logLevel := envutil.Getenv("LOG_LEVEL", "info"); logLevel {
case "debug":
baseLogger = baseLogger.Level(zerolog.DebugLevel)
case "warn":
baseLogger = baseLogger.Level(zerolog.WarnLevel)
case "error":
baseLogger = baseLogger.Level(zerolog.ErrorLevel)
default:
baseLogger = baseLogger.Level(zerolog.InfoLevel)
}
}
func NewLogger() *LogConfig {
return &LogConfig{logger: baseLogger}
}
func (lw *LogConfig) log(w io.Writer, level zerolog.Level, message string, fields map[string]interface{}) {
logger := lw.logger.Output(w)
event := logger.WithLevel(level).CallerSkipFrame(3)
for k, val := range fields {
switch v := val.(type) {
case string:
event = event.Str(k, v)
case int:
event = event.Int(k, v)
case float64:
event = event.Float64(k, v)
default:
event = event.Interface(k, val)
}
}
event.Msg(message)
}
func (lw *LogConfig) logWithLevel(level zerolog.Level, message string, fields map[string]interface{}) {
if lw.logger.GetLevel() > level {
return
}
if lw.logger.GetLevel() <= level {
w := os.Stdout
if level >= zerolog.ErrorLevel {
w = os.Stderr
}
lw.log(w, level, message, fields)
}
}
func (lw *LogConfig) Debug(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.DebugLevel, message, fields)
}
func (lw *LogConfig) Info(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.InfoLevel, message, fields)
}
func (lw *LogConfig) Warning(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.WarnLevel, message, fields)
}
func (lw *LogConfig) Error(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.ErrorLevel, message, fields)
}
func (lw *LogConfig) Critical(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.FatalLevel, message, fields)
os.Exit(1)
}
// Helper function to get a new fields map from the pool
func getFieldsMap() map[string]interface{} {
return fieldMapPool.Get().(map[string]interface{})
}
// Helper function to put a used fields map back into the pool
func putFieldsMap(fields map[string]interface{}) {
for k := range fields {
delete(fields, k)
}
fieldMapPool.Put(fields)
}
-32
View File
@@ -1,32 +0,0 @@
package libpack_logging
import (
"os"
"testing"
)
func BenchmarkNewLogger(b *testing.B) {
for i := 0; i < b.N; i++ {
NewLogger()
}
}
func BenchmarkInfoLog(b *testing.B) {
oldEnv := os.Getenv("LOG_LEVEL")
os.Setenv("LOG_LEVEL", "info")
oldStdout := os.Stdout
oldStderr := os.Stderr
os.Stdout, _ = os.Open(os.DevNull)
os.Stderr, _ = os.Open(os.DevNull)
defer func() {
os.Stdout = oldStdout
os.Stderr = oldStderr
os.Setenv("LOG_LEVEL", oldEnv)
}()
testsLogger := NewLogger()
b.ResetTimer()
for i := 0; i < b.N; i++ {
testsLogger.Info("test", map[string]interface{}{"test": "test"})
}
}
+16 -2
View File
@@ -9,6 +9,7 @@ import (
"github.com/gofiber/fiber/v2/middleware/proxy"
"github.com/gookit/goutil/envutil"
graphql "github.com/lukaszraczylo/go-simple-graphql"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
@@ -62,7 +63,8 @@ func parseConfig() {
}
return strings.Split(urls, ",")
}()
c.Logger = libpack_logging.NewLogger()
c.LogLevel = strings.ToUpper(getDetailsFromEnv("LOG_LEVEL", "info"))
c.Logger = libpack_logging.New().SetMinLogLevel(libpack_logging.GetLogLevel(c.LogLevel)).SetFieldName("timestamp", "ts").SetFieldName("message", "msg").SetShowCaller(true)
c.Server.HealthcheckGraphQL = getDetailsFromEnv("HEALTHCHECK_GRAPHQL_URL", "")
c.Client.GQLClient = graphql.NewConnection()
c.Client.GQLClient.SetEndpoint(c.Server.HealthcheckGraphQL)
@@ -88,7 +90,19 @@ func parseConfig() {
c.HasuraEventCleaner.EventMetadataDb = getDetailsFromEnv("HASURA_EVENT_METADATA_DB", "")
cfg = &c
enableCache() // takes close to no resources, but can be used with dynamic query cache
if cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable {
cacheConfig := &libpack_cache.CacheConfig{
Logger: cfg.Logger,
TTL: cfg.Cache.CacheTTL,
}
if cfg.Cache.CacheRedisEnable {
cacheConfig.Redis.Enable = true
cacheConfig.Redis.URL = cfg.Cache.CacheRedisURL
cacheConfig.Redis.Password = cfg.Cache.CacheRedisPassword
cacheConfig.Redis.DB = cfg.Cache.CacheRedisDB
}
libpack_cache.EnableCache(cacheConfig)
}
loadRatelimitConfig()
once.Do(func() {
go enableApi()
+2 -3
View File
@@ -34,14 +34,13 @@ func (suite *Tests) SetupTest() {
JSONDecoder: json.Unmarshal,
},
)
cacheStats = &CacheStats{}
// Initialize a simple in-memory cache client for testing purposes
cfg.Cache.Client = libpack_cache.New(5 * time.Minute)
libpack_cache.New(5 * time.Minute)
parseConfig()
enableApi()
StartMonitoringServer()
cfg.Logger = libpack_logging.NewLogger()
cfg.Logger = libpack_logging.New().SetMinLogLevel(libpack_logging.GetLogLevel(getDetailsFromEnv("LOG_LEVEL", "info")))
// Setup environment variables here if needed
os.Setenv("GMP_TEST_STRING", "testValue")
os.Setenv("GMP_TEST_INT", "123")
+136 -72
View File
@@ -1,129 +1,193 @@
package libpack_monitoring
import (
"bytes"
"fmt"
"os"
"sort"
"strings"
"sync"
"unicode"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
)
func (ms *MetricsSetup) get_metrics_name(name string, labels map[string]string) (complete_name string) {
// Cache for sorted label keys to avoid repeated sorting
var sortedLabelKeysCache = struct {
sync.RWMutex
m map[string][]string
}{m: make(map[string][]string)}
func (ms *MetricsSetup) get_metrics_name(name string, labels map[string]string) string {
const unknownPodName = "unknown"
var sb strings.Builder
var buf bytes.Buffer
// Prepare default labels without initializing a new map
podName := unknownPodName
if hn, err := os.Hostname(); err == nil {
podName = hn
}
podName := getPodName()
if labels == nil {
labels = map[string]string{
"microservice": libpack_config.PKG_NAME,
"pod": podName,
}
labels = defaultLabels(podName)
} else {
if _, exists := labels["microservice"]; !exists {
labels["microservice"] = libpack_config.PKG_NAME
}
if _, exists := labels["pod"]; !exists {
labels["pod"] = podName
}
ensureDefaultLabels(&labels, podName)
}
// Prefix handling
if ms.metrics_prefix != "" {
sb.WriteString(ms.metrics_prefix)
sb.WriteString("_")
buf.WriteString(ms.metrics_prefix)
buf.WriteString("_")
}
sb.WriteString(name)
buf.WriteString(name)
// Append labels if any
if len(labels) > 0 {
sb.WriteString("{")
buf.WriteString("{")
appendSortedLabels(&buf, labels)
buf.WriteString("}")
}
keys := make([]string, 0, len(labels))
return buf.String()
}
func getPodName() string {
const unknownPodName = "unknown"
if hn, err := os.Hostname(); err == nil {
return hn
}
return unknownPodName
}
func defaultLabels(podName string) map[string]string {
return map[string]string{
"microservice": libpack_config.PKG_NAME,
"pod": podName,
}
}
func ensureDefaultLabels(labels *map[string]string, podName string) {
if *labels == nil {
*labels = make(map[string]string)
}
if _, exists := (*labels)["microservice"]; !exists {
(*labels)["microservice"] = libpack_config.PKG_NAME
}
if _, exists := (*labels)["pod"]; !exists {
(*labels)["pod"] = podName
}
}
func appendSortedLabels(buf *bytes.Buffer, labels map[string]string) {
keys := getSortedKeys(labels)
for i, k := range keys {
if i > 0 {
buf.WriteString(",")
}
buf.WriteString(k)
buf.WriteString("=\"")
buf.WriteString(labels[k])
buf.WriteString("\"")
}
}
func getSortedKeys(labels map[string]string) []string {
labelsKey := labelsToString(labels)
sortedLabelKeysCache.RLock()
keys, exists := sortedLabelKeysCache.m[labelsKey]
sortedLabelKeysCache.RUnlock()
if !exists {
keys = make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
for i, k := range keys {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(k)
sb.WriteString("=\"")
sb.WriteString(labels[k])
sb.WriteString("\"")
}
sb.WriteString("}")
sortedLabelKeysCache.Lock()
sortedLabelKeysCache.m[labelsKey] = keys
sortedLabelKeysCache.Unlock()
}
return keys
}
func labelsToString(labels map[string]string) string {
var sb strings.Builder
for k, v := range labels {
sb.WriteString(k)
sb.WriteString("=")
sb.WriteString(v)
sb.WriteString(";")
}
return sb.String()
}
// validate_metrics_name validates the name of the metric to adhere to the Prometheus naming conventions
// https://prometheus.io/docs/practices/naming/
func validate_metrics_name(name string) error {
var sb strings.Builder // Use strings.Builder for efficient string concatenation
cleanedName := clean_metric_name(name)
// Track if the last character was an underscore to avoid duplicate underscores
lastWasUnderscore := false
for _, r := range name {
// Convert spaces to underscores and skip non-alphanumeric characters except underscores
if r == ' ' || (unicode.IsLetter(r) || unicode.IsDigit(r) || r == '_') {
if r == ' ' || r == '_' {
if lastWasUnderscore {
continue // Skip if the previous character was also an underscore
}
r = '_' // Convert spaces to underscores
lastWasUnderscore = true
} else {
lastWasUnderscore = false
}
sb.WriteRune(r) // Add valid characters to the builder
}
}
// Trim leading and trailing underscores
name_new := strings.Trim(sb.String(), "_")
finalName := strings.Trim(cleanedName, "_")
// Check if the processed name matches the original input
if name_new != name {
return fmt.Errorf("Invalid metric name: %s, expected %s", name, name_new)
if finalName != name {
return fmt.Errorf("Invalid metric name: %s, expected %s", name, finalName)
}
return nil
}
func compile_metrics_with_labels(name string, labels map[string]string) string {
var totalLength int
totalLength += len(name)
for k, v := range labels {
totalLength += len(k) + len(v) + 2
// clean_metric_name processes the metric name according to Prometheus naming conventions
func clean_metric_name(name string) string {
var buf bytes.Buffer
lastWasUnderscore := false
for _, r := range name {
if is_allowed_rune(r) {
if is_special_rune(r) {
if lastWasUnderscore {
continue // Skip if the previous character was also an underscore
}
r = '_' // Convert spaces and special characters to underscores
lastWasUnderscore = true
} else {
lastWasUnderscore = false
}
buf.WriteRune(r)
} else if !lastWasUnderscore {
buf.WriteRune('_')
lastWasUnderscore = true
}
}
var sb strings.Builder
sb.Grow(totalLength + 1)
// Remove trailing underscore
result := buf.String()
return strings.Trim(result, "_")
}
sb.WriteString(name)
// is_allowed_rune checks if the rune is allowed in the metric name
func is_allowed_rune(r rune) bool {
return unicode.IsLetter(r) || unicode.IsDigit(r) || r == ' ' || r == '_'
}
// is_special_rune checks if the rune is a space or an underscore
func is_special_rune(r rune) bool {
return r == ' ' || r == '_'
}
func compile_metrics_with_labels(name string, labels map[string]string) string {
var buf bytes.Buffer
buf.WriteString(name)
// Collect keys and sort them
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
keys := getSortedKeys(labels)
// Append sorted key-value pairs to the builder
// Append sorted key-value pairs to the buffer
for _, k := range keys {
sb.WriteString("_")
sb.WriteString(k)
sb.WriteString("_")
sb.WriteString(labels[k])
buf.WriteString("_")
buf.WriteString(k)
buf.WriteString("_")
buf.WriteString(labels[k])
}
return sb.String()
return buf.String()
}
+91 -6
View File
@@ -1,7 +1,6 @@
package libpack_monitoring
import (
"os"
"testing"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
@@ -134,10 +133,96 @@ func TestValidateMetricsName(t *testing.T) {
}
}
func getPodName() string {
podName, err := os.Hostname()
if err != nil {
return "unknown"
func TestCleanMetricName(t *testing.T) {
tests := []struct {
input string
expected string
}{
{"valid metric name", "valid_metric_name"},
{"valid@metric#name!", "valid_metric_name"},
{"__valid__metric__name__", "valid_metric_name"},
{" valid metric name ", "valid_metric_name"},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
assert.Equal(t, tt.expected, clean_metric_name(tt.input))
})
}
}
func TestDefaultLabels(t *testing.T) {
podName := "test-pod"
libpack_config.PKG_NAME = "example_microservice"
expected := map[string]string{
"microservice": "example_microservice",
"pod": podName,
}
assert.Equal(t, expected, defaultLabels(podName))
}
func TestEnsureDefaultLabels(t *testing.T) {
podName := "test-pod"
libpack_config.PKG_NAME = "example_microservice"
tests := []struct {
name string
inputLabels map[string]string
expectedLabels map[string]string
}{
{
name: "Nil labels",
inputLabels: nil,
expectedLabels: map[string]string{"microservice": "example_microservice", "pod": podName},
},
{
name: "Empty labels",
inputLabels: map[string]string{},
expectedLabels: map[string]string{"microservice": "example_microservice", "pod": podName},
},
{
name: "Partial labels",
inputLabels: map[string]string{"microservice": "test_service"},
expectedLabels: map[string]string{"microservice": "test_service", "pod": podName},
},
{
name: "Complete labels",
inputLabels: map[string]string{"microservice": "test_service", "pod": "custom_pod"},
expectedLabels: map[string]string{"microservice": "test_service", "pod": "custom_pod"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ensureDefaultLabels(&tt.inputLabels, podName)
assert.Equal(t, tt.expectedLabels, tt.inputLabels)
})
}
}
func TestLabelsToString(t *testing.T) {
tests := []struct {
labels map[string]string
expected string
}{
{
labels: map[string]string{"key1": "value1", "key2": "value2"},
expected: "key1=value1;key2=value2;",
},
{
labels: map[string]string{"a": "1", "b": "2"},
expected: "a=1;b=2;",
},
{
labels: map[string]string{},
expected: "",
},
}
for _, tt := range tests {
t.Run(tt.expected, func(t *testing.T) {
assert.Equal(t, tt.expected, labelsToString(tt.labels))
})
}
return podName
}
+23 -8
View File
@@ -12,7 +12,7 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gookit/goutil/envutil"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
type MetricsSetup struct {
@@ -23,7 +23,7 @@ type MetricsSetup struct {
}
var (
log *logging.LogConfig
log *libpack_logger.Logger
)
type InitConfig struct {
@@ -32,7 +32,7 @@ type InitConfig struct {
}
func NewMonitoring(ic *InitConfig) *MetricsSetup {
log = logging.NewLogger()
log = libpack_logger.New().SetMinLogLevel(libpack_logger.LEVEL_INFO)
ms := &MetricsSetup{ic: ic}
ms.metrics_set = metrics.NewSet()
ms.metrics_set_custom = metrics.NewSet()
@@ -86,7 +86,10 @@ func (ms *MetricsSetup) ListActiveMetrics() []string {
func (ms *MetricsSetup) RegisterMetricsGauge(metric_name string, labels map[string]string, val float64) *metrics.Gauge {
if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsGauge() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name})
log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsGauge() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil
}
return ms.metrics_set_custom.GetOrCreateGauge(ms.get_metrics_name(metric_name, labels), func() float64 {
@@ -97,7 +100,10 @@ func (ms *MetricsSetup) RegisterMetricsGauge(metric_name string, labels map[stri
func (ms *MetricsSetup) RegisterMetricsCounter(metric_name string, labels map[string]string) *metrics.Counter {
if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsCounter() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name})
log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsCounter() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil
}
if metric_name == MetricsSucceeded || metric_name == MetricsFailed || metric_name == MetricsSkipped {
@@ -108,7 +114,10 @@ func (ms *MetricsSetup) RegisterMetricsCounter(metric_name string, labels map[st
func (ms *MetricsSetup) RegisterFloatCounter(metric_name string, labels map[string]string) *metrics.FloatCounter {
if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterFloatCounter() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name})
log.Critical(&libpack_logger.LogMessage{
Message: "RegisterFloatCounter() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil
}
return ms.metrics_set_custom.GetOrCreateFloatCounter(ms.get_metrics_name(metric_name, labels))
@@ -116,7 +125,10 @@ func (ms *MetricsSetup) RegisterFloatCounter(metric_name string, labels map[stri
func (ms *MetricsSetup) RegisterMetricsSummary(metric_name string, labels map[string]string) *metrics.Summary {
if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsSummary() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name})
log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsSummary() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil
}
return ms.metrics_set_custom.GetOrCreateSummary(ms.get_metrics_name(metric_name, labels))
@@ -124,7 +136,10 @@ func (ms *MetricsSetup) RegisterMetricsSummary(metric_name string, labels map[st
func (ms *MetricsSetup) RegisterMetricsHistogram(metric_name string, labels map[string]string) *metrics.Histogram {
if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsHistogram() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name})
log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsHistogram() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil
}
return ms.metrics_set_custom.GetOrCreateHistogram(ms.get_metrics_name(metric_name, labels))
+49 -6
View File
@@ -8,6 +8,7 @@ import (
"github.com/avast/retry-go/v4"
fiber "github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/proxy"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
"github.com/valyala/fasthttp"
)
@@ -30,7 +31,10 @@ func createFasthttpClient(timeout int) *fasthttp.Client {
func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
if !checkAllowedURLs(c) {
cfg.Logger.Error("Request blocked", map[string]interface{}{"path": c.Path()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Request blocked",
Pairs: map[string]interface{}{"path": c.Path()},
})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
}
@@ -42,13 +46,30 @@ func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
c.Request().Header.Add(fiber.HeaderXForwardedFor, string(c.Request().Header.Peek("X-Forwarded-For")))
c.Request().Header.Del(fiber.HeaderAcceptEncoding)
cfg.Logger.Debug("Proxying the request", map[string]interface{}{"path": c.Path(), "body": string(c.Request().Body()), "headers": c.GetReqHeaders(), "request_uuid": c.Locals("request_uuid")})
// added dummy check for the log level because it executes additional functions which could
// potentially slow down the execution.
if cfg.LogLevel == "debug" {
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Proxying the request",
Pairs: map[string]interface{}{
"path": c.Path(),
"body": string(c.Request().Body()),
"headers": c.GetReqHeaders(),
"request_uuid": c.Locals("request_uuid"),
},
})
}
err := retry.Do(
func() error {
errInt := proxy.DoRedirects(c, currentEndpoint+c.Path(), 3, cfg.Client.FastProxyClient)
if errInt != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": errInt.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{
"error": errInt.Error(),
},
})
if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
}
@@ -57,7 +78,13 @@ func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
return nil
},
retry.OnRetry(func(n uint, err error) {
cfg.Logger.Warning("Retrying the request", map[string]interface{}{"path": c.Path(), "error": err.Error()})
cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Retrying the request",
Pairs: map[string]interface{}{
"path": c.Path(),
"error": err.Error(),
},
})
}),
retry.Attempts(uint(3)),
retry.DelayType(retry.BackOffDelay),
@@ -66,11 +93,27 @@ func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
)
if err != nil {
cfg.Logger.Warning("Can't proxy the request", map[string]interface{}{"error": err.Error()})
cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{
"error": err.Error(),
},
})
return err
}
cfg.Logger.Debug("Received proxied response", map[string]interface{}{"path": c.Path(), "response_body": string(c.Response().Body()), "response_code": c.Response().StatusCode(), "headers": c.GetRespHeaders(), "request_uuid": c.Locals("request_uuid")})
if cfg.LogLevel == "debug" {
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Received proxied response",
Pairs: map[string]interface{}{
"path": c.Path(),
"response_body": string(c.Response().Body()),
"response_code": c.Response().StatusCode(),
"headers": c.GetRespHeaders(),
"request_uuid": c.Locals("request_uuid"),
},
})
}
if c.Response().StatusCode() != 200 {
if ifNotInTest() {
+46 -14
View File
@@ -7,6 +7,7 @@ import (
"github.com/goccy/go-json"
goratecounter "github.com/lukaszraczylo/go-ratecounter"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
type RateLimitConfig struct {
@@ -34,10 +35,17 @@ func loadRatelimitConfig() error {
if err == nil {
return nil
}
cfg.Logger.Debug("Failed to load config", map[string]interface{}{"path": path, "error": err})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Failed to load config",
Pairs: map[string]interface{}{"path": path, "error": err},
})
}
cfg.Logger.Error("Rate limit config not found", map[string]interface{}{"paths": paths})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Rate limit config not found",
Pairs: map[string]interface{}{"paths": paths},
})
return os.ErrNotExist
}
@@ -61,35 +69,53 @@ func loadConfigFromPath(path string) error {
value.RateCounterTicker = goratecounter.NewRateCounter().WithConfig(goratecounter.RateCounterConfig{
Interval: time.Duration(value.Req) * ratelimit_intervals[value.Interval],
})
cfg.Logger.Debug("Setting ratelimit config for role", map[string]interface{}{
"role": key,
"interval_provided": value.Interval,
"interval_used": ratelimit_intervals[value.Interval],
"ratelimit": value.Req,
})
if cfg.LogLevel == "debug" {
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Setting ratelimit config for role",
Pairs: map[string]interface{}{
"role": key,
"interval_provided": value.Interval,
"interval_used": ratelimit_intervals[value.Interval],
"ratelimit": value.Req,
},
})
}
config.RateLimit[key] = value
}
rateLimits = config.RateLimit
cfg.Logger.Debug("Rate limit config loaded", map[string]interface{}{"ratelimit": rateLimits})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit config loaded",
Pairs: map[string]interface{}{"ratelimit": rateLimits},
})
return nil
}
func rateLimitedRequest(userID string, userRole string) (shouldAllow bool) {
if rateLimits == nil {
cfg.Logger.Debug("Rate limit config not found", map[string]interface{}{"user_role": userRole})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit config not found",
Pairs: map[string]interface{}{"user_role": userRole},
})
return true
}
// Fetch role config once to avoid multiple map lookups
roleConfig, ok := rateLimits[userRole]
if !ok {
cfg.Logger.Warning("Rate limit role not found", map[string]interface{}{"user_role": userRole})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit role not found",
Pairs: map[string]interface{}{"user_role": userRole},
})
return true
}
if roleConfig.RateCounterTicker == nil {
cfg.Logger.Warning("Rate limit ticker not found", map[string]interface{}{"user_role": userRole})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit ticker not found",
Pairs: map[string]interface{}{"user_role": userRole},
})
return true
}
@@ -104,10 +130,16 @@ func rateLimitedRequest(userID string, userRole string) (shouldAllow bool) {
"interval": roleConfig.Interval,
}
cfg.Logger.Debug("Rate limit ticker", logDetails)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit ticker",
Pairs: map[string]interface{}{"log_details": logDetails},
})
if tickerRate > float64(roleConfig.Req) {
cfg.Logger.Debug("Rate limit exceeded", logDetails)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit exceeded",
Pairs: map[string]interface{}{"log_details": logDetails},
})
return false
}
+86 -30
View File
@@ -10,13 +10,18 @@ import (
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/google/uuid"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
)
// StartHTTPProxy starts the HTTP and points it to the GraphQL server.
func StartHTTPProxy() {
cfg.Logger.Debug("Starting the HTTP proxy", nil)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Starting the HTTP proxy",
Pairs: nil,
})
server := fiber.New(fiber.Config{
DisableStartupMessage: true,
AppName: fmt.Sprintf("GraphQL Monitoring Proxy - %s v%s", libpack_config.PKG_NAME, libpack_config.PKG_VERSION),
@@ -40,10 +45,16 @@ func StartHTTPProxy() {
server.Post("/*", processGraphQLRequest)
server.Get("/*", proxyTheRequestToDefault)
cfg.Logger.Info("GraphQL query proxy started", map[string]interface{}{"port": cfg.Server.PortGraphQL})
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "GraphQL proxy started",
Pairs: map[string]interface{}{"port": cfg.Server.PortGraphQL},
})
err := server.Listen(fmt.Sprintf(":%d", cfg.Server.PortGraphQL))
if err != nil {
cfg.Logger.Critical("Can't start the service", map[string]interface{}{"error": err.Error()})
cfg.Logger.Critical(&libpack_logger.LogMessage{
Message: "Can't start the service",
Pairs: map[string]interface{}{"port": cfg.Server.PortGraphQL},
})
}
}
@@ -66,17 +77,26 @@ func checkAllowedURLs(c *fiber.Ctx) bool {
func healthCheck(c *fiber.Ctx) error {
if len(cfg.Server.HealthcheckGraphQL) > 0 {
cfg.Logger.Debug("Health check enabled", map[string]interface{}{"url": cfg.Server.HealthcheckGraphQL})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Health check enabled",
Pairs: map[string]interface{}{"url": cfg.Server.HealthcheckGraphQL},
})
query := `{ __typename }`
_, err := cfg.Client.GQLClient.Query(query, nil, nil)
if err != nil {
cfg.Logger.Error("Can't reach the GraphQL server", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't reach the GraphQL server",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't reach the GraphQL server with {__typename} query")
return err
}
}
cfg.Logger.Debug("Health check returning OK", nil)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Health check returning OK",
Pairs: nil,
})
c.Status(200).SendString("Health check OK")
return nil
}
@@ -108,7 +128,10 @@ func processGraphQLRequest(c *fiber.Ctx) error {
// Implementing rate limiting if enabled
if cfg.Client.RoleRateLimit {
cfg.Logger.Debug("Rate limiting enabled", map[string]interface{}{"user_id": extractedUserID, "role_name": extractedRoleName})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limiting enabled",
Pairs: map[string]interface{}{"user_id": extractedUserID, "role_name": extractedRoleName},
})
if !rateLimitedRequest(extractedUserID, extractedRoleName) {
c.Status(429).SendString("Rate limit exceeded, try again later")
return nil
@@ -122,20 +145,29 @@ func processGraphQLRequest(c *fiber.Ctx) error {
}
if parsedResult.shouldIgnore {
cfg.Logger.Debug("Request passed as-is - probably not a GraphQL", nil)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Request passed as-is - probably not a GraphQL",
Pairs: nil,
})
return proxyTheRequest(c, parsedResult.activeEndpoint)
}
calculatedQueryHash := calculateHash(c)
calculatedQueryHash := libpack_cache.CalculateHash(c)
if parsedResult.cacheTime > 0 {
cfg.Logger.Debug("Cache time set via query", map[string]interface{}{"cacheTime": parsedResult.cacheTime})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache time set via query",
Pairs: map[string]interface{}{"cacheTime": parsedResult.cacheTime},
})
} else {
// If not set via query, try setting via header
cacheQuery := c.Request().Header.Peek("X-Cache-Graphql-Query")
if cacheQuery != nil {
parsedResult.cacheTime, _ = strconv.Atoi(string(cacheQuery))
cfg.Logger.Debug("Cache time set via header", map[string]interface{}{"cacheTime": parsedResult.cacheTime})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache time set via header",
Pairs: map[string]interface{}{"cacheTime": parsedResult.cacheTime},
})
} else {
parsedResult.cacheTime = cfg.Cache.CacheTTL
}
@@ -144,35 +176,53 @@ func processGraphQLRequest(c *fiber.Ctx) error {
wasCached := false
if parsedResult.cacheRefresh {
cfg.Logger.Debug("Cache refresh requested via query", map[string]interface{}{"user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")})
cacheDelete(calculatedQueryHash)
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache refresh requested via query",
Pairs: map[string]interface{}{"user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")},
})
libpack_cache.CacheDelete(calculatedQueryHash)
}
// Handling Cache Logic
if parsedResult.cacheRequest || cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable {
cfg.Logger.Debug("Cache enabled", map[string]interface{}{"via_query": parsedResult.cacheRequest, "via_env": cfg.Cache.CacheEnable})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache enabled",
Pairs: map[string]interface{}{"via_query": parsedResult.cacheRequest, "via_env": cfg.Cache.CacheEnable},
})
queryCacheHash = calculatedQueryHash
if cachedResponse := cacheLookup(queryCacheHash); cachedResponse != nil {
if cachedResponse := libpack_cache.CacheLookup(queryCacheHash); cachedResponse != nil {
cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheHit, nil)
cfg.Logger.Debug("Cache hit", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache hit",
Pairs: map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")},
})
c.Request().Header.Add("X-Cache-Hit", "true")
err := c.Send(cachedResponse)
if err != nil {
cfg.Logger.Error("Can't send the cached response", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't send the cached response",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't send the cached response - try again later")
}
wasCached = true
} else {
cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheMiss, nil)
cfg.Logger.Debug("Cache miss", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")})
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache miss",
Pairs: map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")},
})
proxyAndCacheTheRequest(c, queryCacheHash, parsedResult.cacheTime, parsedResult.activeEndpoint)
}
} else {
err := proxyTheRequest(c, parsedResult.activeEndpoint)
if err != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't proxy the request - try again later")
return nil
@@ -191,12 +241,15 @@ func processGraphQLRequest(c *fiber.Ctx) error {
func proxyAndCacheTheRequest(c *fiber.Ctx, queryCacheHash string, cacheTime int, currentEndpoint string) {
err := proxyTheRequest(c, currentEndpoint)
if err != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": err.Error()})
cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't proxy the request - try again later")
return
}
cacheStoreWithTTL(queryCacheHash, c.Response().Body(), time.Duration(cacheTime)*time.Second)
libpack_cache.CacheStoreWithTTL(queryCacheHash, c.Response().Body(), time.Duration(cacheTime)*time.Second)
cfg.Monitoring.Increment(libpack_monitoring.MetricsQueriesCached, nil)
c.Send(c.Response().Body())
}
@@ -210,15 +263,18 @@ func logAndMonitorRequest(c *fiber.Ctx, userID, opType, opName string, wasCached
}
if cfg.Server.AccessLog {
cfg.Logger.Info("Request processed", map[string]interface{}{
"ip": c.IP(),
"fwd-ip": string(c.Request().Header.Peek("X-Forwarded-For")),
"user_id": userID,
"op_type": opType,
"op_name": opName,
"time": duration,
"cache": wasCached,
"request_uuid": c.Locals("request_uuid"),
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Request processed",
Pairs: map[string]interface{}{
"ip": c.IP(),
"fwd-ip": string(c.Request().Header.Peek("X-Forwarded-For")),
"user_id": userID,
"op_type": opType,
"op_name": opName,
"time": duration,
"cache": wasCached,
"request_uuid": c.Locals("request_uuid"),
},
})
}
+2 -2
View File
@@ -9,7 +9,8 @@ import (
// config is a struct that holds the configuration of the application.
type config struct {
Logger *libpack_logging.LogConfig
Logger *libpack_logging.Logger
LogLevel string
Monitoring *libpack_monitoring.MetricsSetup
Api struct{ BannedUsersFile string }
Client struct {
@@ -32,7 +33,6 @@ type config struct {
Enable bool
}
Cache struct {
Client CacheClient
CacheRedisURL string
CacheRedisPassword string
CacheTTL int