Update cache library, use miniredis for testing, add additional benchmarks. (#14)

Update cache library,
Update logging library,
use miniredis for testing, add additional benchmarks.
This commit is contained in:
2024-06-19 23:10:36 +01:00
committed by GitHub
parent 12e4237997
commit 61d7a45d00
34 changed files with 1477 additions and 546 deletions
+73
View File
@@ -0,0 +1,73 @@
name: Test and release
on:
workflow_dispatch:
schedule:
- cron: "0 3 * * *"
env:
GO_VERSION: ">=1.21"
jobs:
# This job is responsible for preparation of the build
# environment variables.
prepare:
name: Preparing build context
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
id: cache
with:
go-version: ${{env.GO_VERSION}}
cache-dependency-path: "**/*.sum"
- name: Go get dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: |
go get ./...
# This job is responsible for running tests and linting the codebase
test:
name: "Unit testing"
# needs: [prepare]
runs-on: ubuntu-latest
container: golang:1
# container: github/super-linter:v4
needs: [prepare]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{env.GO_VERSION}}
cache-dependency-path: "**/*.sum"
- name: Install dependencies
run: |
apt-get update
apt-get install ca-certificates make -y
update-ca-certificates
go mod tidy
get -u -v ./...
go mod tidy -v
- name: Run unit tests
run: |
CI_RUN=${CI} make test
# if go.mod or go.sum have changed then commit the changes to the repository
- name: Commit changes
run: |
git config --global user.email "github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
git add go.mod go.sum
git commit -m "Update go.mod and go.sum"
git push
+14 -19
View File
@@ -47,20 +47,20 @@ jobs:
# container: github/super-linter:v4 # container: github/super-linter:v4
needs: [prepare] needs: [prepare]
services: # services:
# Label used to access the service container # # Label used to access the service container
redis: # redis:
# Docker Hub image # # Docker Hub image
image: redis # image: redis
# Set health checks to wait until redis has started # # Set health checks to wait until redis has started
options: >- # options: >-
--health-cmd "redis-cli ping" # --health-cmd "redis-cli ping"
--health-interval 10s # --health-interval 10s
--health-timeout 5s # --health-timeout 5s
--health-retries 5 # --health-retries 5
ports: # ports:
# Maps the container port to the host machine # # Maps the container port to the host machine
- 6379:6379 # - 6379:6379
steps: steps:
- name: Checkout repository - name: Checkout repository
@@ -80,10 +80,5 @@ jobs:
go mod tidy go mod tidy
- name: Run unit tests - name: Run unit tests
env:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_SERVER: "redis:6379"
run: | run: |
export REDIS_SERVER="$REDIS_HOST:$REDIS_PORT"
CI_RUN=${CI} make test CI_RUN=${CI} make test
+1
View File
@@ -1,2 +1,3 @@
graphql-proxy graphql-proxy
test.sh test.sh
banned.json*
+5 -5
View File
@@ -1,9 +1,9 @@
CI_RUN?=false CI_RUN?=false
ADDITIONAL_BUILD_FLAGS="" # ADDITIONAL_BUILD_FLAGS=""
ifeq ($(CI_RUN), true) # ifeq ($(CI_RUN), true)
ADDITIONAL_BUILD_FLAGS="-test.short" # ADDITIONAL_BUILD_FLAGS="-test.short"
endif # endif
.PHONY: help .PHONY: help
help: ## display this help help: ## display this help
@@ -19,7 +19,7 @@ build: ## build the binary
.PHONY: test .PHONY: test
test: ## run tests on library test: ## run tests on library
@LOG_LEVEL=debug go test $(ADDITIONAL_BUILD_FLAGS) -v -cover ./... -race @LOG_LEVEL=info go test -v -cover -race ./...
.PHONY: test-packages .PHONY: test-packages
test-packages: ## run tests on packages test-packages: ## run tests on packages
+84 -23
View File
@@ -8,7 +8,9 @@ import (
"github.com/goccy/go-json" "github.com/goccy/go-json"
fiber "github.com/gofiber/fiber/v2" fiber "github.com/gofiber/fiber/v2"
"github.com/gofrs/flock" "github.com/gofrs/flock"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config" libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
) )
var bannedUsersIDs map[string]string = make(map[string]string) var bannedUsersIDs map[string]string = make(map[string]string)
@@ -29,7 +31,10 @@ func enableApi() {
go periodicallyReloadBannedUsers() go periodicallyReloadBannedUsers()
err := apiserver.Listen(fmt.Sprintf(":%d", cfg.Server.ApiPort)) err := apiserver.Listen(fmt.Sprintf(":%d", cfg.Server.ApiPort))
if err != nil { if err != nil {
cfg.Logger.Critical("Can't start the service", map[string]interface{}{"error": err.Error()}) cfg.Logger.Critical(&libpack_logger.LogMessage{
Message: "Can't start the service",
Pairs: map[string]interface{}{"port": cfg.Server.ApiPort},
})
} }
} }
} }
@@ -37,35 +42,52 @@ func enableApi() {
func periodicallyReloadBannedUsers() { func periodicallyReloadBannedUsers() {
for { for {
loadBannedUsers() loadBannedUsers()
cfg.Logger.Debug("Banned users reloaded", map[string]interface{}{"users": bannedUsersIDs}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Banned users reloaded",
Pairs: map[string]interface{}{"users": bannedUsersIDs},
})
<-time.After(10 * time.Second) <-time.After(10 * time.Second)
} }
} }
func checkIfUserIsBanned(c *fiber.Ctx, userID string) bool { func checkIfUserIsBanned(c *fiber.Ctx, userID string) bool {
_, found := bannedUsersIDs[userID] _, found := bannedUsersIDs[userID]
cfg.Logger.Debug("Checking if user is banned", map[string]interface{}{"user_id": userID, "found": found}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Checking if user is banned",
Pairs: map[string]interface{}{"user_id": userID, "found": found},
})
if found { if found {
cfg.Logger.Info("User is banned", map[string]interface{}{"user_id": userID}) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "User is banned",
Pairs: map[string]interface{}{"user_id": userID},
})
c.Status(403).SendString("User is banned") c.Status(403).SendString("User is banned")
} }
return found return found
} }
func apiClearCache(c *fiber.Ctx) error { func apiClearCache(c *fiber.Ctx) error {
cfg.Logger.Debug("Clearing cache via API", nil) cfg.Logger.Debug(&libpack_logger.LogMessage{
cacheClear() Message: "Clearing cache via API",
cfg.Logger.Info("Cache cleared via API", nil) Pairs: nil,
})
libpack_cache.CacheClear()
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Cache cleared via API",
Pairs: nil,
})
c.Status(200).SendString("OK: cache cleared") c.Status(200).SendString("OK: cache cleared")
return nil return nil
} }
func apiCacheStats(c *fiber.Ctx) error { func apiCacheStats(c *fiber.Ctx) error {
stats := getCacheStats() stats := libpack_cache.GetCacheStats()
cfg.Logger.Debug("Getting cache stats via API", map[string]interface{}{"stats": stats})
err := c.JSON(stats) err := c.JSON(stats)
if err != nil { if err != nil {
cfg.Logger.Error("Can't marshal cache stats", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't marshal cache stats",
Pairs: map[string]interface{}{"error": err.Error()},
})
return err return err
} }
return nil return nil
@@ -80,11 +102,17 @@ func apiBanUser(c *fiber.Ctx) error {
var req apiBanUserRequest var req apiBanUserRequest
err := c.BodyParser(&req) err := c.BodyParser(&req)
if err != nil { if err != nil {
cfg.Logger.Error("Can't parse the ban user request", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the ban user request",
Pairs: map[string]interface{}{"error": err.Error()},
})
return err return err
} }
bannedUsersIDs[req.UserID] = req.Reason bannedUsersIDs[req.UserID] = req.Reason
cfg.Logger.Info("Banned user", map[string]interface{}{"user_id": req.UserID, "reason": req.Reason}) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Banned user",
Pairs: map[string]interface{}{"user_id": req.UserID, "reason": req.Reason},
})
storeBannedUsers() storeBannedUsers()
c.Status(200).SendString("OK: user banned") c.Status(200).SendString("OK: user banned")
return nil return nil
@@ -94,11 +122,17 @@ func apiUnbanUser(c *fiber.Ctx) error {
var req apiBanUserRequest var req apiBanUserRequest
err := c.BodyParser(&req) err := c.BodyParser(&req)
if err != nil { if err != nil {
cfg.Logger.Error("Can't parse the unban user request", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the unban user request",
Pairs: map[string]interface{}{"error": err.Error()},
})
return err return err
} }
delete(bannedUsersIDs, req.UserID) delete(bannedUsersIDs, req.UserID)
cfg.Logger.Info("Unbanned user", map[string]interface{}{"user_id": req.UserID}) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Unbanned user",
Pairs: map[string]interface{}{"user_id": req.UserID},
})
storeBannedUsers() storeBannedUsers()
c.Status(200).SendString("OK: user unbanned") c.Status(200).SendString("OK: user unbanned")
return nil return nil
@@ -108,34 +142,52 @@ func storeBannedUsers() {
fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile)) fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
err := fileLock.Lock() err := fileLock.Lock()
if err != nil { if err != nil {
cfg.Logger.Error("Can't lock the file", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't lock the file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
defer fileLock.Unlock() defer fileLock.Unlock()
data, err := json.Marshal(bannedUsersIDs) data, err := json.Marshal(bannedUsersIDs)
if err != nil { if err != nil {
cfg.Logger.Error("Can't marshal banned users", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't marshal banned users",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
err = os.WriteFile(cfg.Api.BannedUsersFile, data, 0644) err = os.WriteFile(cfg.Api.BannedUsersFile, data, 0644)
if err != nil { if err != nil {
cfg.Logger.Error("Can't write banned users to file", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't write banned users to file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
} }
func loadBannedUsers() { func loadBannedUsers() {
if _, err := os.Stat(cfg.Api.BannedUsersFile); os.IsNotExist(err) { if _, err := os.Stat(cfg.Api.BannedUsersFile); os.IsNotExist(err) {
cfg.Logger.Info("Banned users file doesn't exist - creating it", map[string]interface{}{"file": cfg.Api.BannedUsersFile}) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Banned users file doesn't exist - creating it",
Pairs: map[string]interface{}{"file": cfg.Api.BannedUsersFile},
})
_, err := os.Create(cfg.Api.BannedUsersFile) _, err := os.Create(cfg.Api.BannedUsersFile)
if err != nil { if err != nil {
cfg.Logger.Error("Can't create the file", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't create the file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
// write empty json to the file // write empty json to the file
err = os.WriteFile(cfg.Api.BannedUsersFile, []byte("{}"), 0644) err = os.WriteFile(cfg.Api.BannedUsersFile, []byte("{}"), 0644)
if err != nil { if err != nil {
cfg.Logger.Error("Can't write to the file", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't write to the file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
} }
@@ -143,19 +195,28 @@ func loadBannedUsers() {
fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile)) fileLock := flock.New(fmt.Sprintf("%s.lock", cfg.Api.BannedUsersFile))
err := fileLock.RLock() // Use RLock for read lock err := fileLock.RLock() // Use RLock for read lock
if err != nil { if err != nil {
cfg.Logger.Error("Can't lock the file [load]", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't lock the file [load]",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
defer fileLock.Unlock() defer fileLock.Unlock()
data, err := os.ReadFile(cfg.Api.BannedUsersFile) data, err := os.ReadFile(cfg.Api.BannedUsersFile)
if err != nil { if err != nil {
cfg.Logger.Error("Can't read banned users from file", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't read banned users from file",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
err = json.Unmarshal(data, &bannedUsersIDs) err = json.Unmarshal(data, &bannedUsersIDs)
if err != nil { if err != nil {
cfg.Logger.Error("Can't unmarshal banned users", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't unmarshal banned users",
Pairs: map[string]interface{}{"error": err.Error()},
})
return return
} }
} }
-95
View File
@@ -1,95 +0,0 @@
package main
import (
"time"
fiber "github.com/gofiber/fiber/v2"
"github.com/gookit/goutil/strutil"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
)
type CacheStats struct {
CachedQueries int `json:"cached_queries"`
CacheHits int `json:"cache_hits"`
CacheMisses int `json:"cache_misses"`
}
type CacheClient interface {
Set(key string, value []byte, ttl time.Duration)
Get(key string) ([]byte, bool)
Delete(key string)
Clear()
CountQueries() int
}
var (
cacheStats *CacheStats
)
func calculateHash(c *fiber.Ctx) string {
return strutil.Md5(c.Body())
}
func enableCache() {
cacheStats = &CacheStats{}
if shouldUseRedisCache() {
cfg.Logger.Info("Using Redis cache", nil)
cfg.Cache.Client = libpack_redis.NewClient(&libpack_redis.RedisClientConfig{
RedisDB: cfg.Cache.CacheRedisDB,
RedisServer: cfg.Cache.CacheRedisURL,
RedisPassword: cfg.Cache.CacheRedisPassword,
})
} else {
cfg.Logger.Info("Using in-memory cache", nil)
cfg.Cache.Client = libpack_cache.New(time.Duration(cfg.Cache.CacheTTL) * time.Second)
}
}
func cacheLookup(hash string) []byte {
obj, found := cfg.Cache.Client.Get(hash)
if found {
cacheStats.CacheHits++
return obj
}
cacheStats.CacheMisses++
return nil
}
func cacheDelete(hash string) {
cfg.Logger.Debug("Deleting data from cache", map[string]interface{}{"hash": hash})
cacheStats.CachedQueries--
cfg.Cache.Client.Delete(hash)
}
func cacheStore(hash string, data []byte) {
cfg.Logger.Debug("Storing data in cache", map[string]interface{}{"hash": hash})
cacheStats.CachedQueries++
cfg.Cache.Client.Set(hash, data, time.Duration(cfg.Cache.CacheTTL)*time.Second)
}
func cacheStoreWithTTL(hash string, data []byte, ttl time.Duration) {
cfg.Logger.Debug("Storing data in cache with TTL", map[string]interface{}{"hash": hash, "ttl": ttl})
cacheStats.CachedQueries++
cfg.Cache.Client.Set(hash, data, ttl)
}
func cacheGetQueries() int {
cfg.Logger.Debug("Counting cache queries", nil)
return cfg.Cache.Client.CountQueries()
}
func cacheClear() {
cfg.Cache.Client.Clear()
cacheStats = &CacheStats{}
}
func getCacheStats() *CacheStats {
cfg.Logger.Debug("Getting cache stats", nil)
cacheStats.CachedQueries = cacheGetQueries()
return cacheStats
}
func shouldUseRedisCache() bool {
return cfg.Cache.CacheRedisEnable
}
+134
View File
@@ -0,0 +1,134 @@
package libpack_cache
import (
"sync/atomic"
"time"
fiber "github.com/gofiber/fiber/v2"
"github.com/gookit/goutil/strutil"
libpack_cache_memory "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_cache_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
type CacheConfig struct {
Logger *libpack_logger.Logger
Client CacheClient
Redis struct {
Enable bool `json:"enable"`
URL string `json:"url"`
Password string `json:"password"`
DB int `json:"db"`
}
TTL int `json:"ttl"`
}
type CacheStats struct {
CachedQueries int64 `json:"cached_queries"`
CacheHits int64 `json:"cache_hits"`
CacheMisses int64 `json:"cache_misses"`
}
type CacheClient interface {
Set(key string, value []byte, ttl time.Duration)
Get(key string) ([]byte, bool)
Delete(key string)
Clear()
CountQueries() int64
}
var (
cacheStats *CacheStats
config *CacheConfig
)
func CalculateHash(c *fiber.Ctx) string {
return strutil.Md5(c.Body())
}
func EnableCache(cfg *CacheConfig) {
if cfg.Logger == nil {
cfg.Logger = libpack_logger.New()
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Initializing in-module logger",
})
}
cacheStats = &CacheStats{}
if ShouldUseRedisCache(cfg) {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Using Redis cache",
})
cfg.Client = libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisDB: cfg.Redis.DB,
RedisServer: cfg.Redis.URL,
RedisPassword: cfg.Redis.Password,
})
} else {
cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Using in-memory cache",
})
cfg.Client = libpack_cache_memory.New(time.Duration(cfg.TTL) * time.Second)
}
config = cfg
}
func CacheLookup(hash string) []byte {
obj, found := config.Client.Get(hash)
if found {
atomic.AddInt64(&cacheStats.CacheHits, 1)
return obj
}
atomic.AddInt64(&cacheStats.CacheMisses, 1)
return nil
}
func CacheDelete(hash string) {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Deleting data from cache",
Pairs: map[string]interface{}{"hash": hash},
})
atomic.AddInt64(&cacheStats.CachedQueries, -1)
config.Client.Delete(hash)
}
func CacheStore(hash string, data []byte) {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Storing data in cache",
Pairs: map[string]interface{}{"hash": hash},
})
atomic.AddInt64(&cacheStats.CachedQueries, 1)
config.Client.Set(hash, data, time.Duration(config.TTL)*time.Second)
}
func CacheStoreWithTTL(hash string, data []byte, ttl time.Duration) {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Storing data in cache with TTL",
Pairs: map[string]interface{}{"hash": hash, "ttl": ttl},
})
atomic.AddInt64(&cacheStats.CachedQueries, 1)
config.Client.Set(hash, data, ttl)
}
func CacheGetQueries() int64 {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Counting cache queries",
})
return config.Client.CountQueries()
}
func CacheClear() {
config.Client.Clear()
cacheStats = &CacheStats{}
}
func GetCacheStats() *CacheStats {
config.Logger.Debug(&libpack_logger.LogMessage{
Message: "Getting cache stats",
})
cacheStats.CachedQueries = CacheGetQueries()
return cacheStats
}
func ShouldUseRedisCache(cfg *CacheConfig) bool {
return cfg.Redis.Enable
}
+107
View File
@@ -0,0 +1,107 @@
package libpack_cache
import (
"testing"
"time"
libpack_cache_memory "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_cache_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
const (
Parallelism = 4
RequestPerSec = 10000
)
func BenchmarkCacheLookupInMemory(b *testing.B) {
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: libpack_cache_memory.New(5 * time.Minute),
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
CacheStore(hash, data)
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheLookup(hash)
}
})
}
func BenchmarkCacheLookupRedis(b *testing.B) {
mockedCache := libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisServer: redisMockServer.Addr(),
RedisDB: 0,
})
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: mockedCache,
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
CacheStore(hash, data)
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheLookup(hash)
}
})
}
func BenchmarkCacheStoreInMemory(b *testing.B) {
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: libpack_cache_memory.New(5 * time.Minute),
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheStore(hash, data)
}
})
}
func BenchmarkCacheStoreRedis(b *testing.B) {
mockedCache := libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisServer: redisMockServer.Addr(),
RedisDB: 0,
})
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: mockedCache,
TTL: 5,
}
hash := "00000000000000000000000000000000001337"
data := []byte("it's fine.")
b.SetParallelism(Parallelism)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
CacheStore(hash, data)
}
})
}
+34
View File
@@ -0,0 +1,34 @@
package libpack_cache
import (
"testing"
"github.com/alicebob/miniredis/v2"
assertions "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type Tests struct {
suite.Suite
}
var (
assert *assertions.Assertions
redisMockServer, _ = miniredis.Run()
)
func (suite *Tests) BeforeTest(suiteName, testName string) {
}
func (suite *Tests) SetupTest() {
cacheStats = &CacheStats{}
assert = assertions.New(suite.T())
}
// TearDownTest is run after each test to clean up
func (suite *Tests) TearDownTest() {
}
func TestSuite(t *testing.T) {
suite.Run(t, new(Tests))
}
+32 -12
View File
@@ -1,11 +1,20 @@
package main package libpack_cache
import ( import (
"github.com/gookit/goutil/envutil" "time"
libpack_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
libpack_cache_memory "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/memory"
libpack_cache_redis "github.com/lukaszraczylo/graphql-monitoring-proxy/cache/redis"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
) )
func (suite *Tests) Test_cacheLookupInmemory() { func (suite *Tests) Test_cacheLookupInmemory() {
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: libpack_cache_memory.New(5 * time.Minute),
TTL: 5,
}
type args struct { type args struct {
hash string hash string
} }
@@ -40,22 +49,33 @@ func (suite *Tests) Test_cacheLookupInmemory() {
for _, tt := range tests { for _, tt := range tests {
suite.Run(tt.name, func() { suite.Run(tt.name, func() {
if tt.addCache.data != nil { if tt.addCache.data != nil {
cacheStore(tt.args.hash, tt.addCache.data) CacheStore(tt.args.hash, tt.addCache.data)
} }
got := cacheLookup(tt.args.hash) got := CacheLookup(tt.args.hash)
assert.Equal(tt.want, got, "Unexpected cache lookup result") assert.Equal(tt.want, got, "Unexpected cache lookup result")
}) })
} }
} }
func (suite *Tests) Test_cacheLookupRedis() { func (suite *Tests) Test_cacheLookupRedis() {
redis_server := envutil.Getenv("REDIS_SERVER", "localhost:6379") // redis_server := envutil.Getenv("REDIS_SERVER", "localhost:6379")
cfg.Cache.Client = libpack_redis.NewClient(&libpack_redis.RedisClientConfig{ // config.Client = libpack_cache_redis.NewClient(&libpack_cache_redis.RedisClientConfig{
RedisServer: redis_server, // RedisServer: redis_server,
RedisPassword: "", // RedisPassword: "",
RedisDB: 0, // RedisDB: 0,
// })
mockedCache := libpack_cache_redis.New(&libpack_cache_redis.RedisClientConfig{
RedisServer: redisMockServer.Addr(),
RedisDB: 0,
}) })
config = &CacheConfig{
Logger: libpack_logger.New(),
Client: mockedCache,
TTL: 5,
}
type args struct { type args struct {
hash string hash string
} }
@@ -90,9 +110,9 @@ func (suite *Tests) Test_cacheLookupRedis() {
for _, tt := range tests { for _, tt := range tests {
suite.Run(tt.name, func() { suite.Run(tt.name, func() {
if tt.addCache.data != nil { if tt.addCache.data != nil {
cacheStore(tt.args.hash, tt.addCache.data) CacheStore(tt.args.hash, tt.addCache.data)
} }
got := cacheLookup(tt.args.hash) got := CacheLookup(tt.args.hash)
assert.Equal(tt.want, got, "Unexpected cache lookup result") assert.Equal(tt.want, got, "Unexpected cache lookup result")
}) })
} }
+10 -45
View File
@@ -1,4 +1,4 @@
package libpack_cache package libpack_cache_memory
import ( import (
"bytes" "bytes"
@@ -19,7 +19,7 @@ type Cache struct {
decompressPool sync.Pool decompressPool sync.Pool
entries sync.Map entries sync.Map
globalTTL time.Duration globalTTL time.Duration
mu sync.RWMutex // Added sync.RWMutex field for locking sync.RWMutex
} }
func New(globalTTL time.Duration) *Cache { func New(globalTTL time.Duration) *Cache {
@@ -52,9 +52,6 @@ func (c *Cache) cleanupRoutine(globalTTL time.Duration) {
} }
func (c *Cache) Set(key string, value []byte, ttl time.Duration) { func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
c.lock()
defer c.unlock()
expiresAt := time.Now().Add(ttl) expiresAt := time.Now().Add(ttl)
compressedValue, err := c.compress(value) compressedValue, err := c.compress(value)
@@ -71,9 +68,6 @@ func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
} }
func (c *Cache) Get(key string) ([]byte, bool) { func (c *Cache) Get(key string) ([]byte, bool) {
c.rlock()
defer c.runlock()
entry, ok := c.entries.Load(key) entry, ok := c.entries.Load(key)
if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) { if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) {
return nil, false return nil, false
@@ -88,39 +82,32 @@ func (c *Cache) Get(key string) ([]byte, bool) {
} }
func (c *Cache) Delete(key string) { func (c *Cache) Delete(key string) {
c.lock()
defer c.unlock()
c.entries.Delete(key) c.entries.Delete(key)
} }
func (c *Cache) Clear() { func (c *Cache) Clear() {
c.lock()
defer c.unlock()
c.entries.Range(func(key, value interface{}) bool { c.entries.Range(func(key, value interface{}) bool {
c.entries.Delete(key) c.entries.Delete(key)
return true return true
}) })
} }
func (c *Cache) CountQueries() int { func (c *Cache) CountQueries() int64 {
c.rlock()
defer c.runlock()
var count int var count int
c.entries.Range(func(_, _ interface{}) bool { c.entries.Range(func(_, _ interface{}) bool {
count++ count++
return true return true
}) })
return count return int64(count)
} }
func (c *Cache) compress(data []byte) ([]byte, error) { func (c *Cache) compress(data []byte) ([]byte, error) {
w := c.compressPool.Get().(*gzip.Writer)
defer c.compressPool.Put(w)
var buf bytes.Buffer var buf bytes.Buffer
w := c.compressPool.Get().(*gzip.Writer)
defer func() {
w.Close()
c.compressPool.Put(w)
}()
w.Reset(&buf) w.Reset(&buf)
if _, err := w.Write(data); err != nil { if _, err := w.Write(data); err != nil {
return nil, err return nil, err
@@ -149,11 +136,7 @@ func (c *Cache) decompress(data []byte) ([]byte, error) {
c.decompressPool.Put(r) c.decompressPool.Put(r)
}() }()
decompressedData, err := io.ReadAll(r) return io.ReadAll(r)
if err != nil {
return nil, err
}
return decompressedData, nil
} }
func (c *Cache) CleanExpiredEntries() { func (c *Cache) CleanExpiredEntries() {
@@ -166,21 +149,3 @@ func (c *Cache) CleanExpiredEntries() {
return true return true
}) })
} }
// Private methods to handle locking
func (c *Cache) lock() {
c.mu.Lock()
}
func (c *Cache) unlock() {
c.mu.Unlock()
}
func (c *Cache) rlock() {
c.mu.RLock()
}
func (c *Cache) runlock() {
c.mu.RUnlock()
}
@@ -1,4 +1,4 @@
package libpack_cache package libpack_cache_memory
import ( import (
"testing" "testing"
@@ -7,7 +7,7 @@ import (
// Assume that New function initializes the cache and it is defined somewhere in the libpack_cache package. // Assume that New function initializes the cache and it is defined somewhere in the libpack_cache package.
func BenchmarkCacheSet(b *testing.B) { func BenchmarkMemCacheSet(b *testing.B) {
cache := New(30 * time.Second) // Initializing the cache with a TTL of 30 seconds cache := New(30 * time.Second) // Initializing the cache with a TTL of 30 seconds
key := "benchmark-key" key := "benchmark-key"
value := []byte("benchmark-value") value := []byte("benchmark-value")
@@ -19,7 +19,7 @@ func BenchmarkCacheSet(b *testing.B) {
} }
} }
func BenchmarkCacheGet(b *testing.B) { func BenchmarkMemCacheGet(b *testing.B) {
cache := New(30 * time.Second) // Initializing the cache cache := New(30 * time.Second) // Initializing the cache
key := "benchmark-key" key := "benchmark-key"
value := []byte("benchmark-value") value := []byte("benchmark-value")
@@ -32,7 +32,7 @@ func BenchmarkCacheGet(b *testing.B) {
} }
} }
func BenchmarkCacheExpire(b *testing.B) { func BenchmarkMemCacheExpire(b *testing.B) {
key := "benchmark-expire-key" key := "benchmark-expire-key"
value := []byte("benchmark-value") value := []byte("benchmark-value")
ttl := 5 * time.Millisecond // Setting a short TTL for quick expiration ttl := 5 * time.Millisecond // Setting a short TTL for quick expiration
@@ -45,7 +45,7 @@ func BenchmarkCacheExpire(b *testing.B) {
} }
} }
func BenchmarkCacheStats(b *testing.B) { func BenchmarkMemCacheStats(b *testing.B) {
cache := New(30 * time.Second) // Initializing the cache cache := New(30 * time.Second) // Initializing the cache
key := "benchmark-key" key := "benchmark-key"
value := []byte("benchmark-value") value := []byte("benchmark-value")
+8 -8
View File
@@ -1,4 +1,4 @@
package libpack_cache package libpack_cache_memory
import ( import (
"testing" "testing"
@@ -7,25 +7,25 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
type CacheTestSuite struct { type MemoryTestSuite struct {
suite.Suite suite.Suite
} }
func (suite *CacheTestSuite) SetupTest() { func (suite *MemoryTestSuite) SetupTest() {
} }
func TestCachingTestSuite(t *testing.T) { func TestCachingTestSuite(t *testing.T) {
suite.Run(t, new(CacheTestSuite)) suite.Run(t, new(MemoryTestSuite))
} }
func (suite *CacheTestSuite) Test_New() { func (suite *MemoryTestSuite) Test_New() {
suite.T().Run("should return a new cache", func(t *testing.T) { suite.T().Run("should return a new cache", func(t *testing.T) {
cache := New(2 * time.Second) cache := New(2 * time.Second)
suite.NotNil(cache) suite.NotNil(cache)
}) })
} }
func (suite *CacheTestSuite) Test_CacheUse() { func (suite *MemoryTestSuite) Test_CacheUse() {
cache := New(30 * time.Second) cache := New(30 * time.Second)
tests := []struct { tests := []struct {
name string name string
@@ -50,7 +50,7 @@ func (suite *CacheTestSuite) Test_CacheUse() {
} }
} }
func (suite *CacheTestSuite) Test_CacheDelete() { func (suite *MemoryTestSuite) Test_CacheDelete() {
cache := New(30 * time.Second) cache := New(30 * time.Second)
tests := []struct { tests := []struct {
name string name string
@@ -79,7 +79,7 @@ func (suite *CacheTestSuite) Test_CacheDelete() {
} }
} }
func (suite *CacheTestSuite) Test_CacheExpire() { func (suite *MemoryTestSuite) Test_CacheExpire() {
cache := New(30 * time.Second) cache := New(30 * time.Second)
tests := []struct { tests := []struct {
name string name string
-77
View File
@@ -1,77 +0,0 @@
package libpack_redis
import (
"context"
"time"
redis "github.com/redis/go-redis/v9"
)
var ()
type RedisConfig struct {
client *redis.Client
ctx context.Context
}
func prependKeyName(key string) string {
return "gmp_cache:" + key
}
type RedisClientConfig struct {
RedisServer string
RedisPassword string
RedisDB int
}
func NewClient(redisClientConfig *RedisClientConfig) *RedisConfig {
c := &RedisConfig{
client: redis.NewClient(&redis.Options{
Addr: redisClientConfig.RedisServer,
Password: redisClientConfig.RedisPassword,
DB: redisClientConfig.RedisDB,
}),
ctx: context.Background(),
}
_, err := c.client.Ping(c.ctx).Result()
if err != nil {
panic(err)
}
return c
}
func (c *RedisConfig) Set(key string, value []byte, ttl time.Duration) {
c.client.Set(c.ctx, prependKeyName(key), value, ttl)
}
func (c *RedisConfig) Get(key string) ([]byte, bool) {
val, err := c.client.Get(c.ctx, prependKeyName(key)).Result()
if err == redis.Nil || err != nil {
return nil, false
}
return []byte(val), true
}
func (c *RedisConfig) Delete(key string) {
c.client.Del(c.ctx, prependKeyName(key))
}
func (c *RedisConfig) Clear() {
c.client.FlushDB(c.ctx)
}
func (c *RedisConfig) CountQueries() int {
keys, err := c.client.Keys(c.ctx, prependKeyName("*")).Result()
if err != nil {
return 0
}
return len(keys)
}
func (c *RedisConfig) CountQueriesWithPattern(pattern string) int {
keys, err := c.client.Keys(c.ctx, prependKeyName(pattern)).Result()
if err != nil {
return 0
}
return len(keys)
}
+96
View File
@@ -0,0 +1,96 @@
package libpack_cache_redis
import (
"context"
"strings"
"time"
"sync"
redis "github.com/redis/go-redis/v9"
)
type RedisConfig struct {
client *redis.Client
ctx context.Context
prefix string
builderPool *sync.Pool
}
func (c *RedisConfig) prependKeyName(key string) string {
builder := c.builderPool.Get().(*strings.Builder)
defer c.builderPool.Put(builder)
builder.Reset()
builder.WriteString(c.prefix)
builder.WriteString(key)
return builder.String()
}
type RedisClientConfig struct {
RedisServer string
RedisPassword string
RedisDB int
Prefix string
}
func New(redisClientConfig *RedisClientConfig) *RedisConfig {
c := &RedisConfig{
client: redis.NewClient(&redis.Options{
Addr: redisClientConfig.RedisServer,
Password: redisClientConfig.RedisPassword,
DB: redisClientConfig.RedisDB,
}),
ctx: context.Background(),
prefix: redisClientConfig.Prefix,
builderPool: &sync.Pool{
New: func() interface{} {
return &strings.Builder{}
},
},
}
_, err := c.client.Ping(c.ctx).Result()
if err != nil {
panic(err)
}
return c
}
func (c *RedisConfig) Set(key string, value []byte, ttl time.Duration) {
c.client.Set(c.ctx, c.prependKeyName(key), value, ttl)
}
func (c *RedisConfig) Get(key string) ([]byte, bool) {
val, err := c.client.Get(c.ctx, c.prependKeyName(key)).Result()
if err == redis.Nil {
return nil, false
}
if err != nil {
return nil, false
}
return []byte(val), true
}
func (c *RedisConfig) Delete(key string) {
c.client.Del(c.ctx, c.prependKeyName(key))
}
func (c *RedisConfig) Clear() {
c.client.FlushDB(c.ctx)
}
func (c *RedisConfig) CountQueries() int64 {
keys, err := c.client.Keys(c.ctx, c.prependKeyName("*")).Result()
if err != nil {
return 0
}
return int64(len(keys))
}
func (c *RedisConfig) CountQueriesWithPattern(pattern string) int {
keys, err := c.client.Keys(c.ctx, c.prependKeyName(pattern)).Result()
if err != nil {
return 0
}
return len(keys)
}
+22 -17
View File
@@ -1,23 +1,24 @@
package libpack_redis package libpack_cache_redis
import ( import (
"testing" "testing"
"time" "time"
"github.com/gookit/goutil/envutil" "github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
type RedisConfigSuite struct { type RedisConfigSuite struct {
suite.Suite suite.Suite
redisConfig *RedisConfig redisConfig *RedisConfig
redis_server *miniredis.Miniredis
} }
func (suite *RedisConfigSuite) SetupTest() { func (suite *RedisConfigSuite) SetupTest() {
redis_server := envutil.Getenv("REDIS_SERVER", "localhost:6379") suite.redis_server, _ = miniredis.Run()
suite.redisConfig = NewClient(&RedisClientConfig{ suite.redisConfig = New(&RedisClientConfig{
RedisServer: redis_server, RedisServer: suite.redis_server.Addr(),
RedisPassword: "", RedisPassword: "",
RedisDB: 0, RedisDB: 0,
}) })
@@ -29,7 +30,7 @@ func TestRedisConfigSuite(t *testing.T) {
} }
func (suite *RedisConfigSuite) TestSet() { func (suite *RedisConfigSuite) TestSet() {
key := "testkey" key := "testkeyset"
value := []byte("testvalue") value := []byte("testvalue")
suite.redisConfig.Delete(key) // Ensure the key is deleted before the test suite.redisConfig.Delete(key) // Ensure the key is deleted before the test
@@ -50,9 +51,9 @@ func (suite *RedisConfigSuite) TestSet() {
} }
func (suite *RedisConfigSuite) TestSetWithExpiry() { func (suite *RedisConfigSuite) TestSetWithExpiry() {
key := "testkey" key := "testkey_with_expiry"
value := []byte("testvalue") value := []byte("testvaluewithexpiry")
expiry := 1 * time.Second expiry := 2 * time.Second
suite.redisConfig.Delete(key) // Ensure the key is deleted before the test suite.redisConfig.Delete(key) // Ensure the key is deleted before the test
// Test writing a new key-value pair // Test writing a new key-value pair
@@ -60,17 +61,19 @@ func (suite *RedisConfigSuite) TestSetWithExpiry() {
storedValue, found := suite.redisConfig.Get(key) storedValue, found := suite.redisConfig.Get(key)
assert.True(suite.T(), found) assert.True(suite.T(), found)
assert.Equal(suite.T(), value, storedValue) assert.Equal(suite.T(), value, storedValue)
_, found = suite.redisConfig.Get(key)
assert.True(suite.T(), found, "Key should exist")
// Test that key expires after the specified time // Test that key expires after the specified time
time.Sleep(2 * time.Second) suite.redis_server.FastForward(3 * time.Second)
_, found = suite.redisConfig.Get(key) _, found = suite.redisConfig.Get(key)
assert.False(suite.T(), found) assert.False(suite.T(), found, "Key should have expired after 2 seconds")
suite.redisConfig.Delete(key) // Clean up after the test suite.redisConfig.Delete(key) // Clean up after the test
} }
func (suite *RedisConfigSuite) TestGet() { func (suite *RedisConfigSuite) TestGet() {
key := "testkey" key := "testkeyget"
value := []byte("testvalue") value := []byte("testvalue")
suite.redisConfig.Set(key, value, 0) // Set the key-value pair suite.redisConfig.Set(key, value, 0) // Set the key-value pair
storedValue, found := suite.redisConfig.Get(key) storedValue, found := suite.redisConfig.Get(key)
@@ -79,7 +82,7 @@ func (suite *RedisConfigSuite) TestGet() {
} }
func (suite *RedisConfigSuite) TestDeleteKey() { func (suite *RedisConfigSuite) TestDeleteKey() {
key := "testkey" key := "testkeydelete"
value := []byte("testvalue") value := []byte("testvalue")
suite.redisConfig.Set(key, value, 0) // Set the key-value pair suite.redisConfig.Set(key, value, 0) // Set the key-value pair
suite.redisConfig.Delete(key) suite.redisConfig.Delete(key)
@@ -89,7 +92,7 @@ func (suite *RedisConfigSuite) TestDeleteKey() {
func (suite *RedisConfigSuite) TestCheckIfKeyExists() { func (suite *RedisConfigSuite) TestCheckIfKeyExists() {
ttl := time.Duration(10) * time.Second ttl := time.Duration(10) * time.Second
key := "testkey" key := "testkeyifexists"
value := []byte("testvalue") value := []byte("testvalue")
suite.redisConfig.Set(key, value, ttl) // Set the key-value pair suite.redisConfig.Set(key, value, ttl) // Set the key-value pair
_, found := suite.redisConfig.Get(key) _, found := suite.redisConfig.Get(key)
@@ -106,8 +109,8 @@ func (suite *RedisConfigSuite) TestGetKeys() {
suite.redisConfig.Set("testkey2", []byte("testvalue2"), ttl) suite.redisConfig.Set("testkey2", []byte("testvalue2"), ttl)
suite.redisConfig.Set("otherkey", []byte("othervalue"), ttl) suite.redisConfig.Set("otherkey", []byte("othervalue"), ttl)
keys, _ := suite.redisConfig.client.Keys(suite.redisConfig.ctx, prependKeyName("testkey*")).Result() keys, _ := suite.redisConfig.client.Keys(suite.redisConfig.ctx, "testkey*").Result()
expectedKeys := []string{prependKeyName("testkey1"), prependKeyName("testkey2")} expectedKeys := []string{"testkey1", "testkey2"}
assert.ElementsMatch(suite.T(), expectedKeys, keys) assert.ElementsMatch(suite.T(), expectedKeys, keys)
suite.redisConfig.client.Del(suite.redisConfig.ctx, "testkey1", "testkey2", "otherkey") suite.redisConfig.client.Del(suite.redisConfig.ctx, "testkey1", "testkey2", "otherkey")
@@ -120,6 +123,8 @@ func (suite *RedisConfigSuite) TestGetKeysCount() {
suite.redisConfig.Set("otherkey", []byte("othervalue"), ttl) suite.redisConfig.Set("otherkey", []byte("othervalue"), ttl)
assert.Equal(suite.T(), 2, suite.redisConfig.CountQueriesWithPattern("testkey*")) assert.Equal(suite.T(), 2, suite.redisConfig.CountQueriesWithPattern("testkey*"))
assert.Equal(suite.T(), 1, suite.redisConfig.CountQueriesWithPattern("otherkey*"))
assert.Equal(suite.T(), int64(3), suite.redisConfig.CountQueries())
suite.redisConfig.client.Del(suite.redisConfig.ctx, "testkey1", "testkey2", "otherkey") suite.redisConfig.client.Del(suite.redisConfig.ctx, "testkey1", "testkey2", "otherkey")
} }
+5 -1
View File
@@ -7,6 +7,7 @@ import (
"github.com/goccy/go-json" "github.com/goccy/go-json"
"github.com/lukaszraczylo/ask" "github.com/lukaszraczylo/ask"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring" libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
) )
@@ -15,7 +16,10 @@ func extractClaimsFromJWTHeader(authorization string) (usr string, role string)
handleError := func(msg string, details map[string]interface{}) { handleError := func(msg string, details map[string]interface{}) {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
cfg.Logger.Error(msg, details) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: msg,
Pairs: details,
})
} }
tokenParts := strings.Split(authorization, ".") tokenParts := strings.Split(authorization, ".")
+25 -6
View File
@@ -6,27 +6,40 @@ import (
"time" "time"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
) )
func enableHasuraEventCleaner() { func enableHasuraEventCleaner() {
if cfg.HasuraEventCleaner.Enable { if cfg.HasuraEventCleaner.Enable {
if cfg.HasuraEventCleaner.EventMetadataDb == "" { if cfg.HasuraEventCleaner.EventMetadataDb == "" {
cfg.Logger.Warning("Event metadata db URL not specified, event cleaner not active", nil) cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Event metadata db URL not specified, event cleaner not active",
Pairs: nil,
})
return return
} }
ticker := time.NewTicker(1 * time.Hour) ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop() defer ticker.Stop()
cfg.Logger.Info("Event cleaner enabled", map[string]interface{}{"interval_in_days": cfg.HasuraEventCleaner.ClearOlderThan}) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Event cleaner enabled",
Pairs: map[string]interface{}{"interval_in_days": cfg.HasuraEventCleaner.ClearOlderThan},
})
time.Sleep(60 * time.Second) // wait for everything to start and settle down time.Sleep(60 * time.Second) // wait for everything to start and settle down
cfg.Logger.Info("Initial cleanup of old events", nil) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Initial cleanup of old events",
Pairs: nil,
})
cleanEvents() cleanEvents()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
cfg.Logger.Info("Cleaning up old events", nil) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "Cleaning up old events",
Pairs: nil,
})
cleanEvents() cleanEvents()
} }
} }
@@ -36,7 +49,10 @@ func enableHasuraEventCleaner() {
func cleanEvents() { func cleanEvents() {
conn, err := pgx.Connect(context.Background(), cfg.HasuraEventCleaner.EventMetadataDb) conn, err := pgx.Connect(context.Background(), cfg.HasuraEventCleaner.EventMetadataDb)
if err != nil { if err != nil {
cfg.Logger.Error("Failed to connect to event metadata db", map[string]interface{}{"error": err}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Failed to connect to event metadata db",
Pairs: map[string]interface{}{"error": err},
})
return return
} }
defer conn.Close(context.Background()) defer conn.Close(context.Background())
@@ -52,7 +68,10 @@ func cleanEvents() {
for _, query := range delQueries { for _, query := range delQueries {
_, err := conn.Exec(context.Background(), query) _, err := conn.Exec(context.Background(), query)
if err != nil { if err != nil {
cfg.Logger.Debug("Failed to execute query", map[string]interface{}{"query": query, "error": err}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Failed to execute query",
Pairs: map[string]interface{}{"query": query, "error": err},
})
} }
} }
} }
+6 -3
View File
@@ -4,6 +4,7 @@ go 1.21
require ( require (
github.com/VictoriaMetrics/metrics v1.33.1 github.com/VictoriaMetrics/metrics v1.33.1
github.com/alicebob/miniredis/v2 v2.33.0
github.com/avast/retry-go/v4 v4.6.0 github.com/avast/retry-go/v4 v4.6.0
github.com/goccy/go-json v0.10.3 github.com/goccy/go-json v0.10.3
github.com/gofiber/fiber/v2 v2.52.4 github.com/gofiber/fiber/v2 v2.52.4
@@ -16,12 +17,12 @@ require (
github.com/lukaszraczylo/go-ratecounter v0.1.8 github.com/lukaszraczylo/go-ratecounter v0.1.8
github.com/lukaszraczylo/go-simple-graphql v1.2.14 github.com/lukaszraczylo/go-simple-graphql v1.2.14
github.com/redis/go-redis/v9 v9.5.3 github.com/redis/go-redis/v9 v9.5.3
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
github.com/valyala/fasthttp v1.54.0 github.com/valyala/fasthttp v1.55.0
) )
require ( require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.1.0 // indirect github.com/andybalholm/brotli v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
@@ -37,13 +38,15 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect github.com/valyala/histogram v1.2.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/crypto v0.24.0 // indirect golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.26.0 // indirect golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect golang.org/x/sys v0.21.0 // indirect
+10 -4
View File
@@ -1,5 +1,9 @@
github.com/VictoriaMetrics/metrics v1.33.1 h1:CNV3tfm2Kpv7Y9W3ohmvqgFWPR55tV2c7M2U6OIo+UM= github.com/VictoriaMetrics/metrics v1.33.1 h1:CNV3tfm2Kpv7Y9W3ohmvqgFWPR55tV2c7M2U6OIo+UM=
github.com/VictoriaMetrics/metrics v1.33.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/VictoriaMetrics/metrics v1.33.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
@@ -82,8 +86,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0= github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8=
github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
@@ -92,10 +96,12 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+25 -6
View File
@@ -8,6 +8,7 @@ import (
fiber "github.com/gofiber/fiber/v2" fiber "github.com/gofiber/fiber/v2"
"github.com/graphql-go/graphql/language/ast" "github.com/graphql-go/graphql/language/ast"
"github.com/graphql-go/graphql/language/parser" "github.com/graphql-go/graphql/language/parser"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring" libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
) )
@@ -71,7 +72,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
m := make(map[string]interface{}) m := make(map[string]interface{})
err := json.Unmarshal(c.Body(), &m) err := json.Unmarshal(c.Body(), &m)
if err != nil { if err != nil {
cfg.Logger.Error("Can't unmarshal the request", map[string]interface{}{"error": err.Error(), "body": string(c.Body())}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't unmarshal the request",
Pairs: map[string]interface{}{"error": err.Error(), "body": string(c.Body())},
})
if ifNotInTest() { if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
} }
@@ -80,7 +84,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
// get the query // get the query
query, ok := m["query"].(string) query, ok := m["query"].(string)
if !ok { if !ok {
cfg.Logger.Error("Can't find the query", map[string]interface{}{"query": query, "m_val": m}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't find the query",
Pairs: map[string]interface{}{"m_val": m},
})
if ifNotInTest() { if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
} }
@@ -89,7 +96,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
p, err := parser.Parse(parser.ParseParams{Source: query}) p, err := parser.Parse(parser.ParseParams{Source: query})
if err != nil { if err != nil {
cfg.Logger.Error("Can't parse the query", map[string]interface{}{"query": query, "m_val": m}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the query",
Pairs: map[string]interface{}{"query": query, "m_val": m},
})
if ifNotInTest() { if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
} }
@@ -115,7 +125,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
} }
if res.operationType == "mutation" && cfg.Server.ReadOnlyMode { if res.operationType == "mutation" && cfg.Server.ReadOnlyMode {
cfg.Logger.Warning("Mutation blocked", m) cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Mutation blocked",
Pairs: map[string]interface{}{"query": query},
})
if ifNotInTest() { if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
} }
@@ -131,7 +144,10 @@ func parseGraphQLQuery(c *fiber.Ctx) (res *parseGraphQLQueryResult) {
if arg.Name.Value == "ttl" { if arg.Name.Value == "ttl" {
res.cacheTime, err = strconv.Atoi(arg.Value.GetValue().(string)) res.cacheTime, err = strconv.Atoi(arg.Value.GetValue().(string))
if err != nil { if err != nil {
cfg.Logger.Error("Can't parse the ttl, using global", map[string]interface{}{"bad_ttl": arg.Value.GetValue().(string)}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't parse the ttl, using global",
Pairs: map[string]interface{}{"bad_ttl": arg.Value.GetValue().(string)},
})
if ifNotInTest() { if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
} }
@@ -184,7 +200,10 @@ func checkIfContainsIntrospection(c *fiber.Ctx, whatever string) (shouldBlock bo
if len(cfg.Security.IntrospectionAllowed) > 0 { if len(cfg.Security.IntrospectionAllowed) > 0 {
if _, allowed_exists := introspectionAllowedQueries[whateverLower]; allowed_exists { if _, allowed_exists := introspectionAllowedQueries[whateverLower]; allowed_exists {
cfg.Logger.Debug("Introspection query allowed, passing through", map[string]interface{}{"query": whatever}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Introspection query allowed, passing through",
Pairs: map[string]interface{}{"query": whatever},
})
got_exemption = true got_exemption = true
shouldBlock = false shouldBlock = false
} }
+204
View File
@@ -0,0 +1,204 @@
package libpack_logger
import (
"bytes"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/goccy/go-json"
)
const (
_ = iota
LEVEL_DEBUG
LEVEL_INFO
LEVEL_WARN
LEVEL_ERROR
LEVEL_FATAL
)
var LevelNames = [...]string{
"none",
"debug",
"info",
"warn",
"error",
"fatal",
}
const (
defaultFormat = time.RFC3339
defaultMinLevel = LEVEL_INFO
defaultShowCaller = false
)
var defaultOutput = os.Stdout
type Logger struct {
format string
minLogLevel int
showCaller bool
output io.Writer
}
type LogMessage struct {
Message string
Pairs map[string]any
output io.Writer
}
func (m *LogMessage) String() string {
return m.Message
}
var fieldNames = map[string]string{
"timestamp": "timestamp",
"level": "level",
"message": "message",
}
func New() *Logger {
return &Logger{
format: defaultFormat,
minLogLevel: defaultMinLevel,
output: defaultOutput,
showCaller: defaultShowCaller,
}
}
func (l *Logger) SetOutput(output io.Writer) *Logger {
l.output = output
return l
}
var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
var defaultPairs = make(map[string]any)
func GetLogLevel(level string) int {
for i, name := range LevelNames {
if name == strings.ToLower(level) {
return i
}
}
return defaultMinLevel
}
func (l *Logger) log(level int, m *LogMessage) {
if m.Pairs == nil {
m.Pairs = defaultPairs
}
m.Pairs[fieldNames["timestamp"]] = time.Now().Format(l.format)
m.Pairs[fieldNames["level"]] = LevelNames[level]
m.Pairs[fieldNames["message"]] = m.Message
if l.showCaller {
m.Pairs["caller"] = getCaller()
}
buffer := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buffer)
buffer.Reset()
var encoder = json.NewEncoder(buffer)
err := encoder.Encode(m.Pairs)
if err != nil {
fmt.Println("Error marshalling log message:", err)
return
}
// if not running in test - use stderr and stdout, otherwise - use logger's output setting
if flag.Lookup("test.v") != nil {
m.output = os.Stdout
if level >= LEVEL_ERROR {
m.output = os.Stderr
}
}
// Use logger's output setting instead of os.Stdout or os.Stderr
l.output.Write(buffer.Bytes())
}
func (l *Logger) Debug(m *LogMessage) {
if l.shouldLog(LEVEL_DEBUG) {
l.log(LEVEL_DEBUG, m)
}
}
func (l *Logger) Info(m *LogMessage) {
if l.shouldLog(LEVEL_INFO) {
l.log(LEVEL_INFO, m)
}
}
func (l *Logger) Warn(m *LogMessage) {
if l.shouldLog(LEVEL_WARN) {
l.log(LEVEL_WARN, m)
}
}
func (l *Logger) Warning(m *LogMessage) {
l.Warn(m)
}
func (l *Logger) Error(m *LogMessage) {
if l.shouldLog(LEVEL_ERROR) {
l.log(LEVEL_ERROR, m)
}
}
func (l *Logger) Fatal(m *LogMessage) {
if l.shouldLog(LEVEL_FATAL) {
l.log(LEVEL_FATAL, m)
}
}
func (l *Logger) Critical(m *LogMessage) {
l.Fatal(m)
os.Exit(1)
}
func (l *Logger) shouldLog(level int) bool {
return level >= l.minLogLevel
}
func (l *Logger) SetFormat(format string) *Logger {
l.format = format
return l
}
func (l *Logger) SetMinLogLevel(level int) *Logger {
l.minLogLevel = level
return l
}
func (l *Logger) SetFieldName(field, name string) *Logger {
fieldNames[field] = name
return l
}
func (l *Logger) SetShowCaller(show bool) *Logger {
l.showCaller = show
return l
}
func getCaller() string {
_, file, line, ok := runtime.Caller(3)
if !ok {
return "unknown:0"
}
file = filepath.Base(file)
return fmt.Sprintf("%s:%d", file, line)
}
+140
View File
@@ -0,0 +1,140 @@
package libpack_logger
import (
"bytes"
"testing"
"time"
)
func Benchmark_NewLogger(b *testing.B) {
type triggers struct {
ModFormat struct {
Format string
}
ModLevel struct {
Level int
}
}
tests := []struct {
name string
triggers triggers
}{
{
name: "BenchmarkNew",
},
{
name: "BenchmarkNewChangeTimeFormat",
triggers: triggers{
ModFormat: struct{ Format string }{
Format: time.RFC3339Nano,
},
},
},
{
name: "BenchmarkNewChangeLogLevel",
triggers: triggers{
ModLevel: struct{ Level int }{
Level: LEVEL_DEBUG,
},
},
},
{
name: "BenchmarkNewChangeTimeFormatAndLogLevel",
triggers: triggers{
ModFormat: struct{ Format string }{
Format: time.RFC3339Nano,
},
ModLevel: struct{ Level int }{
Level: LEVEL_DEBUG,
},
},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
got := New()
if tt.triggers.ModFormat.Format != "" {
got = got.SetFormat(tt.triggers.ModFormat.Format)
}
if tt.triggers.ModLevel.Level != 0 {
got = got.SetMinLogLevel(tt.triggers.ModLevel.Level)
}
}
})
}
}
func Benchmark_Log_Debug(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_DEBUG).SetOutput(output)
msg := &LogMessage{
Message: "debug message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Debug(msg)
}
}
func Benchmark_Log_Info(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_INFO).SetOutput(output)
msg := &LogMessage{
Message: "info message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Info(msg)
}
}
func Benchmark_Log_Warn(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_WARN).SetOutput(output)
msg := &LogMessage{
Message: "warn message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Warn(msg)
}
}
func Benchmark_Log_Error(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_ERROR).SetOutput(output)
msg := &LogMessage{
Message: "error message",
Pairs: map[string]any{"key": "value"},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Error(msg)
}
}
func Benchmark_Log_Fatal(b *testing.B) {
output := &bytes.Buffer{}
logger := New().SetMinLogLevel(LEVEL_FATAL).SetOutput(output)
msg := &LogMessage{
Message: "fatal message",
Pairs: make(map[string]any),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
logger.Fatal(msg)
}
}
+31
View File
@@ -0,0 +1,31 @@
package libpack_logger
import (
"testing"
assertions "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type LoggerTestSuite struct {
suite.Suite
}
var (
assert *assertions.Assertions
)
func (suite *LoggerTestSuite) BeforeTest(suiteName, testName string) {
}
func (suite *LoggerTestSuite) SetupTest() {
assert = assertions.New(suite.T())
}
// TearDownTest is run after each test to clean up
func (suite *LoggerTestSuite) TearDownTest() {
}
func TestSuite(t *testing.T) {
suite.Run(t, new(LoggerTestSuite))
}
+182
View File
@@ -0,0 +1,182 @@
package libpack_logger
import (
"bytes"
"fmt"
"os"
"reflect"
"testing"
"time"
"github.com/goccy/go-json"
)
func captureStderr(f func()) string {
originalStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
f()
w.Close()
var buf bytes.Buffer
buf.ReadFrom(r)
os.Stderr = originalStderr
return buf.String()
}
func captureStdOut(f func()) string {
originalStdout := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w
f()
w.Close()
var buf bytes.Buffer
buf.ReadFrom(r)
os.Stdout = originalStdout
return buf.String()
}
func (suite *LoggerTestSuite) Test_LogMessageString() {
msg := &LogMessage{
Message: "test message",
}
assert.Equal("test message", msg.String())
}
func callLoggerMethod(logger *Logger, methodName string, message *LogMessage) {
// Get the method by name using reflection
method := reflect.ValueOf(logger).MethodByName(methodName)
if method.IsValid() {
// Call the method with the message as an argument
method.Call([]reflect.Value{reflect.ValueOf(message)})
} else {
fmt.Printf("Method %s does not exist on Logger\n", methodName)
}
}
func (suite *LoggerTestSuite) Test_LogsLevelsPrint() {
output := &bytes.Buffer{}
logger := New().SetOutput(output)
tests := []struct {
name string
method string
loggerMinLevel int
messageLogLevel int
message string
pairs map[string]any
wantOutput bool // Whether we expect output to be written
}{
{
name: "Log: Debug, Level: Debug - no pairs",
method: "Debug",
loggerMinLevel: LEVEL_DEBUG,
messageLogLevel: LEVEL_DEBUG,
message: "debug message",
wantOutput: true,
},
{
name: "Log: Info, Level: Info - one pair",
method: "Info",
loggerMinLevel: LEVEL_INFO,
messageLogLevel: LEVEL_INFO,
message: "info message",
pairs: map[string]any{
"key": "value",
},
wantOutput: true,
},
{
name: "Log: Info, Level: Warn - with pairs",
method: "Info",
loggerMinLevel: LEVEL_WARN,
messageLogLevel: LEVEL_INFO,
message: "warn message",
pairs: map[string]any{
"key1": "value1",
"key2": "value2",
},
wantOutput: false,
},
{
name: "Log: Warn, Level: Info - with 500 pairs",
method: "Warn",
loggerMinLevel: LEVEL_INFO,
messageLogLevel: LEVEL_WARN,
message: "warn message with 500 pairs",
pairs: func() map[string]any {
pairs := make(map[string]any)
for i := 0; i < 500; i++ {
pairs[fmt.Sprintf("key%d", i)] = fmt.Sprintf("value%d", i)
}
return pairs
}(),
wantOutput: true,
},
}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
msg := &LogMessage{
Message: tt.message,
Pairs: tt.pairs,
}
output.Reset()
// Set logger's minimum log level
logger.SetMinLogLevel(tt.loggerMinLevel)
fmt.Println("Logger min log level:", LevelNames[logger.minLogLevel])
// Call the logging method
callLoggerMethod(logger, tt.method, msg)
logOutput := output.String()
fmt.Println("Output:", logOutput)
if tt.wantOutput {
var loggedMessage map[string]any
err := json.Unmarshal([]byte(logOutput), &loggedMessage)
if err != nil {
t.Fatalf("Error unmarshalling log message: %v\nLog output: %s", err, logOutput)
}
if !containsLogMessage(logOutput, tt.message) {
t.Errorf("Expected log message %q, but got %q", tt.message, logOutput)
}
assert.Equal(LevelNames[tt.messageLogLevel], loggedMessage["level"])
if tt.pairs != nil {
for k, v := range tt.pairs {
assert.Equal(v, loggedMessage[k])
}
}
} else {
assert.Equal("", logOutput)
}
})
}
}
func containsLogMessage(logOutput, expectedMessage string) bool {
return bytes.Contains([]byte(logOutput), []byte(expectedMessage))
}
func (suite *LoggerTestSuite) Test_SetFormat() {
logger := New().SetFormat(time.RFC3339Nano)
assert.Equal(time.RFC3339Nano, logger.format)
}
func (suite *LoggerTestSuite) Test_SetMinLogLevel() {
logger := New().SetMinLogLevel(LEVEL_DEBUG)
assert.Equal(LEVEL_DEBUG, logger.minLogLevel)
}
func (suite *LoggerTestSuite) Test_ShouldLog() {
logger := New().SetMinLogLevel(LEVEL_WARN)
assert.True(logger.shouldLog(LEVEL_WARN))
assert.True(logger.shouldLog(LEVEL_ERROR))
assert.False(logger.shouldLog(LEVEL_INFO))
assert.False(logger.shouldLog(LEVEL_DEBUG))
}
-123
View File
@@ -1,123 +0,0 @@
package libpack_logging
import (
"io"
"os"
"sync"
"time"
"github.com/gookit/goutil/envutil"
"github.com/rs/zerolog"
)
type LogConfig struct {
logger zerolog.Logger
}
var (
baseLogger zerolog.Logger
eventPool = sync.Pool{
New: func() interface{} {
return new(zerolog.Event)
},
}
fieldMapPool = sync.Pool{
New: func() interface{} {
return make(map[string]interface{})
},
}
)
func init() {
zerolog.TimeFieldFormat = time.RFC3339
zerolog.MessageFieldName = "short_message"
zerolog.TimestampFieldName = "timestamp"
zerolog.LevelFieldName = "level"
zerolog.LevelFatalValue = "critical"
baseLogger = zerolog.New(os.Stdout).With().Timestamp().Logger()
switch logLevel := envutil.Getenv("LOG_LEVEL", "info"); logLevel {
case "debug":
baseLogger = baseLogger.Level(zerolog.DebugLevel)
case "warn":
baseLogger = baseLogger.Level(zerolog.WarnLevel)
case "error":
baseLogger = baseLogger.Level(zerolog.ErrorLevel)
default:
baseLogger = baseLogger.Level(zerolog.InfoLevel)
}
}
func NewLogger() *LogConfig {
return &LogConfig{logger: baseLogger}
}
func (lw *LogConfig) log(w io.Writer, level zerolog.Level, message string, fields map[string]interface{}) {
logger := lw.logger.Output(w)
event := logger.WithLevel(level).CallerSkipFrame(3)
for k, val := range fields {
switch v := val.(type) {
case string:
event = event.Str(k, v)
case int:
event = event.Int(k, v)
case float64:
event = event.Float64(k, v)
default:
event = event.Interface(k, val)
}
}
event.Msg(message)
}
func (lw *LogConfig) logWithLevel(level zerolog.Level, message string, fields map[string]interface{}) {
if lw.logger.GetLevel() > level {
return
}
if lw.logger.GetLevel() <= level {
w := os.Stdout
if level >= zerolog.ErrorLevel {
w = os.Stderr
}
lw.log(w, level, message, fields)
}
}
func (lw *LogConfig) Debug(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.DebugLevel, message, fields)
}
func (lw *LogConfig) Info(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.InfoLevel, message, fields)
}
func (lw *LogConfig) Warning(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.WarnLevel, message, fields)
}
func (lw *LogConfig) Error(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.ErrorLevel, message, fields)
}
func (lw *LogConfig) Critical(message string, fields map[string]interface{}) {
lw.logWithLevel(zerolog.FatalLevel, message, fields)
os.Exit(1)
}
// Helper function to get a new fields map from the pool
func getFieldsMap() map[string]interface{} {
return fieldMapPool.Get().(map[string]interface{})
}
// Helper function to put a used fields map back into the pool
func putFieldsMap(fields map[string]interface{}) {
for k := range fields {
delete(fields, k)
}
fieldMapPool.Put(fields)
}
-32
View File
@@ -1,32 +0,0 @@
package libpack_logging
import (
"os"
"testing"
)
func BenchmarkNewLogger(b *testing.B) {
for i := 0; i < b.N; i++ {
NewLogger()
}
}
func BenchmarkInfoLog(b *testing.B) {
oldEnv := os.Getenv("LOG_LEVEL")
os.Setenv("LOG_LEVEL", "info")
oldStdout := os.Stdout
oldStderr := os.Stderr
os.Stdout, _ = os.Open(os.DevNull)
os.Stderr, _ = os.Open(os.DevNull)
defer func() {
os.Stdout = oldStdout
os.Stderr = oldStderr
os.Setenv("LOG_LEVEL", oldEnv)
}()
testsLogger := NewLogger()
b.ResetTimer()
for i := 0; i < b.N; i++ {
testsLogger.Info("test", map[string]interface{}{"test": "test"})
}
}
+16 -2
View File
@@ -9,6 +9,7 @@ import (
"github.com/gofiber/fiber/v2/middleware/proxy" "github.com/gofiber/fiber/v2/middleware/proxy"
"github.com/gookit/goutil/envutil" "github.com/gookit/goutil/envutil"
graphql "github.com/lukaszraczylo/go-simple-graphql" graphql "github.com/lukaszraczylo/go-simple-graphql"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config" libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging" libpack_logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
) )
@@ -62,7 +63,8 @@ func parseConfig() {
} }
return strings.Split(urls, ",") return strings.Split(urls, ",")
}() }()
c.Logger = libpack_logging.NewLogger() c.LogLevel = strings.ToUpper(getDetailsFromEnv("LOG_LEVEL", "info"))
c.Logger = libpack_logging.New().SetMinLogLevel(libpack_logging.GetLogLevel(c.LogLevel)).SetFieldName("timestamp", "ts").SetFieldName("message", "msg").SetShowCaller(true)
c.Server.HealthcheckGraphQL = getDetailsFromEnv("HEALTHCHECK_GRAPHQL_URL", "") c.Server.HealthcheckGraphQL = getDetailsFromEnv("HEALTHCHECK_GRAPHQL_URL", "")
c.Client.GQLClient = graphql.NewConnection() c.Client.GQLClient = graphql.NewConnection()
c.Client.GQLClient.SetEndpoint(c.Server.HealthcheckGraphQL) c.Client.GQLClient.SetEndpoint(c.Server.HealthcheckGraphQL)
@@ -88,7 +90,19 @@ func parseConfig() {
c.HasuraEventCleaner.EventMetadataDb = getDetailsFromEnv("HASURA_EVENT_METADATA_DB", "") c.HasuraEventCleaner.EventMetadataDb = getDetailsFromEnv("HASURA_EVENT_METADATA_DB", "")
cfg = &c cfg = &c
enableCache() // takes close to no resources, but can be used with dynamic query cache if cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable {
cacheConfig := &libpack_cache.CacheConfig{
Logger: cfg.Logger,
TTL: cfg.Cache.CacheTTL,
}
if cfg.Cache.CacheRedisEnable {
cacheConfig.Redis.Enable = true
cacheConfig.Redis.URL = cfg.Cache.CacheRedisURL
cacheConfig.Redis.Password = cfg.Cache.CacheRedisPassword
cacheConfig.Redis.DB = cfg.Cache.CacheRedisDB
}
libpack_cache.EnableCache(cacheConfig)
}
loadRatelimitConfig() loadRatelimitConfig()
once.Do(func() { once.Do(func() {
go enableApi() go enableApi()
+2 -3
View File
@@ -34,14 +34,13 @@ func (suite *Tests) SetupTest() {
JSONDecoder: json.Unmarshal, JSONDecoder: json.Unmarshal,
}, },
) )
cacheStats = &CacheStats{}
// Initialize a simple in-memory cache client for testing purposes // Initialize a simple in-memory cache client for testing purposes
cfg.Cache.Client = libpack_cache.New(5 * time.Minute) libpack_cache.New(5 * time.Minute)
parseConfig() parseConfig()
enableApi() enableApi()
StartMonitoringServer() StartMonitoringServer()
cfg.Logger = libpack_logging.NewLogger() cfg.Logger = libpack_logging.New().SetMinLogLevel(libpack_logging.GetLogLevel(getDetailsFromEnv("LOG_LEVEL", "info")))
// Setup environment variables here if needed // Setup environment variables here if needed
os.Setenv("GMP_TEST_STRING", "testValue") os.Setenv("GMP_TEST_STRING", "testValue")
os.Setenv("GMP_TEST_INT", "123") os.Setenv("GMP_TEST_INT", "123")
+23 -8
View File
@@ -12,7 +12,7 @@ import (
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gookit/goutil/envutil" "github.com/gookit/goutil/envutil"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config" libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
logging "github.com/lukaszraczylo/graphql-monitoring-proxy/logging" libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
) )
type MetricsSetup struct { type MetricsSetup struct {
@@ -23,7 +23,7 @@ type MetricsSetup struct {
} }
var ( var (
log *logging.LogConfig log *libpack_logger.Logger
) )
type InitConfig struct { type InitConfig struct {
@@ -32,7 +32,7 @@ type InitConfig struct {
} }
func NewMonitoring(ic *InitConfig) *MetricsSetup { func NewMonitoring(ic *InitConfig) *MetricsSetup {
log = logging.NewLogger() log = libpack_logger.New().SetMinLogLevel(libpack_logger.LEVEL_INFO)
ms := &MetricsSetup{ic: ic} ms := &MetricsSetup{ic: ic}
ms.metrics_set = metrics.NewSet() ms.metrics_set = metrics.NewSet()
ms.metrics_set_custom = metrics.NewSet() ms.metrics_set_custom = metrics.NewSet()
@@ -86,7 +86,10 @@ func (ms *MetricsSetup) ListActiveMetrics() []string {
func (ms *MetricsSetup) RegisterMetricsGauge(metric_name string, labels map[string]string, val float64) *metrics.Gauge { func (ms *MetricsSetup) RegisterMetricsGauge(metric_name string, labels map[string]string, val float64) *metrics.Gauge {
if validate_metrics_name(metric_name) != nil { if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsGauge() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name}) log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsGauge() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil return nil
} }
return ms.metrics_set_custom.GetOrCreateGauge(ms.get_metrics_name(metric_name, labels), func() float64 { return ms.metrics_set_custom.GetOrCreateGauge(ms.get_metrics_name(metric_name, labels), func() float64 {
@@ -97,7 +100,10 @@ func (ms *MetricsSetup) RegisterMetricsGauge(metric_name string, labels map[stri
func (ms *MetricsSetup) RegisterMetricsCounter(metric_name string, labels map[string]string) *metrics.Counter { func (ms *MetricsSetup) RegisterMetricsCounter(metric_name string, labels map[string]string) *metrics.Counter {
if validate_metrics_name(metric_name) != nil { if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsCounter() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name}) log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsCounter() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil return nil
} }
if metric_name == MetricsSucceeded || metric_name == MetricsFailed || metric_name == MetricsSkipped { if metric_name == MetricsSucceeded || metric_name == MetricsFailed || metric_name == MetricsSkipped {
@@ -108,7 +114,10 @@ func (ms *MetricsSetup) RegisterMetricsCounter(metric_name string, labels map[st
func (ms *MetricsSetup) RegisterFloatCounter(metric_name string, labels map[string]string) *metrics.FloatCounter { func (ms *MetricsSetup) RegisterFloatCounter(metric_name string, labels map[string]string) *metrics.FloatCounter {
if validate_metrics_name(metric_name) != nil { if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterFloatCounter() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name}) log.Critical(&libpack_logger.LogMessage{
Message: "RegisterFloatCounter() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil return nil
} }
return ms.metrics_set_custom.GetOrCreateFloatCounter(ms.get_metrics_name(metric_name, labels)) return ms.metrics_set_custom.GetOrCreateFloatCounter(ms.get_metrics_name(metric_name, labels))
@@ -116,7 +125,10 @@ func (ms *MetricsSetup) RegisterFloatCounter(metric_name string, labels map[stri
func (ms *MetricsSetup) RegisterMetricsSummary(metric_name string, labels map[string]string) *metrics.Summary { func (ms *MetricsSetup) RegisterMetricsSummary(metric_name string, labels map[string]string) *metrics.Summary {
if validate_metrics_name(metric_name) != nil { if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsSummary() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name}) log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsSummary() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil return nil
} }
return ms.metrics_set_custom.GetOrCreateSummary(ms.get_metrics_name(metric_name, labels)) return ms.metrics_set_custom.GetOrCreateSummary(ms.get_metrics_name(metric_name, labels))
@@ -124,7 +136,10 @@ func (ms *MetricsSetup) RegisterMetricsSummary(metric_name string, labels map[st
func (ms *MetricsSetup) RegisterMetricsHistogram(metric_name string, labels map[string]string) *metrics.Histogram { func (ms *MetricsSetup) RegisterMetricsHistogram(metric_name string, labels map[string]string) *metrics.Histogram {
if validate_metrics_name(metric_name) != nil { if validate_metrics_name(metric_name) != nil {
log.Critical("RegisterMetricsHistogram() error", map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name}) log.Critical(&libpack_logger.LogMessage{
Message: "RegisterMetricsHistogram() error",
Pairs: map[string]interface{}{"_error": "Invalid metric name", "_metric_name": metric_name},
})
return nil return nil
} }
return ms.metrics_set_custom.GetOrCreateHistogram(ms.get_metrics_name(metric_name, labels)) return ms.metrics_set_custom.GetOrCreateHistogram(ms.get_metrics_name(metric_name, labels))
+49 -6
View File
@@ -8,6 +8,7 @@ import (
"github.com/avast/retry-go/v4" "github.com/avast/retry-go/v4"
fiber "github.com/gofiber/fiber/v2" fiber "github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/proxy" "github.com/gofiber/fiber/v2/middleware/proxy"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring" libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
@@ -30,7 +31,10 @@ func createFasthttpClient(timeout int) *fasthttp.Client {
func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error { func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
if !checkAllowedURLs(c) { if !checkAllowedURLs(c) {
cfg.Logger.Error("Request blocked", map[string]interface{}{"path": c.Path()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Request blocked",
Pairs: map[string]interface{}{"path": c.Path()},
})
if ifNotInTest() { if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsSkipped, nil)
} }
@@ -42,13 +46,30 @@ func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
c.Request().Header.Add(fiber.HeaderXForwardedFor, string(c.Request().Header.Peek("X-Forwarded-For"))) c.Request().Header.Add(fiber.HeaderXForwardedFor, string(c.Request().Header.Peek("X-Forwarded-For")))
c.Request().Header.Del(fiber.HeaderAcceptEncoding) c.Request().Header.Del(fiber.HeaderAcceptEncoding)
cfg.Logger.Debug("Proxying the request", map[string]interface{}{"path": c.Path(), "body": string(c.Request().Body()), "headers": c.GetReqHeaders(), "request_uuid": c.Locals("request_uuid")}) // added dummy check for the log level because it executes additional functions which could
// potentially slow down the execution.
if cfg.LogLevel == "debug" {
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Proxying the request",
Pairs: map[string]interface{}{
"path": c.Path(),
"body": string(c.Request().Body()),
"headers": c.GetReqHeaders(),
"request_uuid": c.Locals("request_uuid"),
},
})
}
err := retry.Do( err := retry.Do(
func() error { func() error {
errInt := proxy.DoRedirects(c, currentEndpoint+c.Path(), 3, cfg.Client.FastProxyClient) errInt := proxy.DoRedirects(c, currentEndpoint+c.Path(), 3, cfg.Client.FastProxyClient)
if errInt != nil { if errInt != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": errInt.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{
"error": errInt.Error(),
},
})
if ifNotInTest() { if ifNotInTest() {
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
} }
@@ -57,7 +78,13 @@ func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
return nil return nil
}, },
retry.OnRetry(func(n uint, err error) { retry.OnRetry(func(n uint, err error) {
cfg.Logger.Warning("Retrying the request", map[string]interface{}{"path": c.Path(), "error": err.Error()}) cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Retrying the request",
Pairs: map[string]interface{}{
"path": c.Path(),
"error": err.Error(),
},
})
}), }),
retry.Attempts(uint(3)), retry.Attempts(uint(3)),
retry.DelayType(retry.BackOffDelay), retry.DelayType(retry.BackOffDelay),
@@ -66,11 +93,27 @@ func proxyTheRequest(c *fiber.Ctx, currentEndpoint string) error {
) )
if err != nil { if err != nil {
cfg.Logger.Warning("Can't proxy the request", map[string]interface{}{"error": err.Error()}) cfg.Logger.Warning(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{
"error": err.Error(),
},
})
return err return err
} }
cfg.Logger.Debug("Received proxied response", map[string]interface{}{"path": c.Path(), "response_body": string(c.Response().Body()), "response_code": c.Response().StatusCode(), "headers": c.GetRespHeaders(), "request_uuid": c.Locals("request_uuid")}) if cfg.LogLevel == "debug" {
cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Received proxied response",
Pairs: map[string]interface{}{
"path": c.Path(),
"response_body": string(c.Response().Body()),
"response_code": c.Response().StatusCode(),
"headers": c.GetRespHeaders(),
"request_uuid": c.Locals("request_uuid"),
},
})
}
if c.Response().StatusCode() != 200 { if c.Response().StatusCode() != 200 {
if ifNotInTest() { if ifNotInTest() {
+46 -14
View File
@@ -7,6 +7,7 @@ import (
"github.com/goccy/go-json" "github.com/goccy/go-json"
goratecounter "github.com/lukaszraczylo/go-ratecounter" goratecounter "github.com/lukaszraczylo/go-ratecounter"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
) )
type RateLimitConfig struct { type RateLimitConfig struct {
@@ -34,10 +35,17 @@ func loadRatelimitConfig() error {
if err == nil { if err == nil {
return nil return nil
} }
cfg.Logger.Debug("Failed to load config", map[string]interface{}{"path": path, "error": err}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Failed to load config",
Pairs: map[string]interface{}{"path": path, "error": err},
})
} }
cfg.Logger.Error("Rate limit config not found", map[string]interface{}{"paths": paths}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Rate limit config not found",
Pairs: map[string]interface{}{"paths": paths},
})
return os.ErrNotExist return os.ErrNotExist
} }
@@ -61,35 +69,53 @@ func loadConfigFromPath(path string) error {
value.RateCounterTicker = goratecounter.NewRateCounter().WithConfig(goratecounter.RateCounterConfig{ value.RateCounterTicker = goratecounter.NewRateCounter().WithConfig(goratecounter.RateCounterConfig{
Interval: time.Duration(value.Req) * ratelimit_intervals[value.Interval], Interval: time.Duration(value.Req) * ratelimit_intervals[value.Interval],
}) })
cfg.Logger.Debug("Setting ratelimit config for role", map[string]interface{}{
"role": key, if cfg.LogLevel == "debug" {
"interval_provided": value.Interval, cfg.Logger.Debug(&libpack_logger.LogMessage{
"interval_used": ratelimit_intervals[value.Interval], Message: "Setting ratelimit config for role",
"ratelimit": value.Req, Pairs: map[string]interface{}{
}) "role": key,
"interval_provided": value.Interval,
"interval_used": ratelimit_intervals[value.Interval],
"ratelimit": value.Req,
},
})
}
config.RateLimit[key] = value config.RateLimit[key] = value
} }
rateLimits = config.RateLimit rateLimits = config.RateLimit
cfg.Logger.Debug("Rate limit config loaded", map[string]interface{}{"ratelimit": rateLimits}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit config loaded",
Pairs: map[string]interface{}{"ratelimit": rateLimits},
})
return nil return nil
} }
func rateLimitedRequest(userID string, userRole string) (shouldAllow bool) { func rateLimitedRequest(userID string, userRole string) (shouldAllow bool) {
if rateLimits == nil { if rateLimits == nil {
cfg.Logger.Debug("Rate limit config not found", map[string]interface{}{"user_role": userRole}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit config not found",
Pairs: map[string]interface{}{"user_role": userRole},
})
return true return true
} }
// Fetch role config once to avoid multiple map lookups // Fetch role config once to avoid multiple map lookups
roleConfig, ok := rateLimits[userRole] roleConfig, ok := rateLimits[userRole]
if !ok { if !ok {
cfg.Logger.Warning("Rate limit role not found", map[string]interface{}{"user_role": userRole}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit role not found",
Pairs: map[string]interface{}{"user_role": userRole},
})
return true return true
} }
if roleConfig.RateCounterTicker == nil { if roleConfig.RateCounterTicker == nil {
cfg.Logger.Warning("Rate limit ticker not found", map[string]interface{}{"user_role": userRole}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit ticker not found",
Pairs: map[string]interface{}{"user_role": userRole},
})
return true return true
} }
@@ -104,10 +130,16 @@ func rateLimitedRequest(userID string, userRole string) (shouldAllow bool) {
"interval": roleConfig.Interval, "interval": roleConfig.Interval,
} }
cfg.Logger.Debug("Rate limit ticker", logDetails) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit ticker",
Pairs: map[string]interface{}{"log_details": logDetails},
})
if tickerRate > float64(roleConfig.Req) { if tickerRate > float64(roleConfig.Req) {
cfg.Logger.Debug("Rate limit exceeded", logDetails) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limit exceeded",
Pairs: map[string]interface{}{"log_details": logDetails},
})
return false return false
} }
+86 -30
View File
@@ -10,13 +10,18 @@ import (
"github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/cors"
"github.com/google/uuid" "github.com/google/uuid"
libpack_cache "github.com/lukaszraczylo/graphql-monitoring-proxy/cache"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config" libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring" libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
) )
// StartHTTPProxy starts the HTTP and points it to the GraphQL server. // StartHTTPProxy starts the HTTP and points it to the GraphQL server.
func StartHTTPProxy() { func StartHTTPProxy() {
cfg.Logger.Debug("Starting the HTTP proxy", nil) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Starting the HTTP proxy",
Pairs: nil,
})
server := fiber.New(fiber.Config{ server := fiber.New(fiber.Config{
DisableStartupMessage: true, DisableStartupMessage: true,
AppName: fmt.Sprintf("GraphQL Monitoring Proxy - %s v%s", libpack_config.PKG_NAME, libpack_config.PKG_VERSION), AppName: fmt.Sprintf("GraphQL Monitoring Proxy - %s v%s", libpack_config.PKG_NAME, libpack_config.PKG_VERSION),
@@ -40,10 +45,16 @@ func StartHTTPProxy() {
server.Post("/*", processGraphQLRequest) server.Post("/*", processGraphQLRequest)
server.Get("/*", proxyTheRequestToDefault) server.Get("/*", proxyTheRequestToDefault)
cfg.Logger.Info("GraphQL query proxy started", map[string]interface{}{"port": cfg.Server.PortGraphQL}) cfg.Logger.Info(&libpack_logger.LogMessage{
Message: "GraphQL proxy started",
Pairs: map[string]interface{}{"port": cfg.Server.PortGraphQL},
})
err := server.Listen(fmt.Sprintf(":%d", cfg.Server.PortGraphQL)) err := server.Listen(fmt.Sprintf(":%d", cfg.Server.PortGraphQL))
if err != nil { if err != nil {
cfg.Logger.Critical("Can't start the service", map[string]interface{}{"error": err.Error()}) cfg.Logger.Critical(&libpack_logger.LogMessage{
Message: "Can't start the service",
Pairs: map[string]interface{}{"port": cfg.Server.PortGraphQL},
})
} }
} }
@@ -66,17 +77,26 @@ func checkAllowedURLs(c *fiber.Ctx) bool {
func healthCheck(c *fiber.Ctx) error { func healthCheck(c *fiber.Ctx) error {
if len(cfg.Server.HealthcheckGraphQL) > 0 { if len(cfg.Server.HealthcheckGraphQL) > 0 {
cfg.Logger.Debug("Health check enabled", map[string]interface{}{"url": cfg.Server.HealthcheckGraphQL}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Health check enabled",
Pairs: map[string]interface{}{"url": cfg.Server.HealthcheckGraphQL},
})
query := `{ __typename }` query := `{ __typename }`
_, err := cfg.Client.GQLClient.Query(query, nil, nil) _, err := cfg.Client.GQLClient.Query(query, nil, nil)
if err != nil { if err != nil {
cfg.Logger.Error("Can't reach the GraphQL server", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't reach the GraphQL server",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't reach the GraphQL server with {__typename} query") c.Status(500).SendString("Can't reach the GraphQL server with {__typename} query")
return err return err
} }
} }
cfg.Logger.Debug("Health check returning OK", nil) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Health check returning OK",
Pairs: nil,
})
c.Status(200).SendString("Health check OK") c.Status(200).SendString("Health check OK")
return nil return nil
} }
@@ -108,7 +128,10 @@ func processGraphQLRequest(c *fiber.Ctx) error {
// Implementing rate limiting if enabled // Implementing rate limiting if enabled
if cfg.Client.RoleRateLimit { if cfg.Client.RoleRateLimit {
cfg.Logger.Debug("Rate limiting enabled", map[string]interface{}{"user_id": extractedUserID, "role_name": extractedRoleName}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Rate limiting enabled",
Pairs: map[string]interface{}{"user_id": extractedUserID, "role_name": extractedRoleName},
})
if !rateLimitedRequest(extractedUserID, extractedRoleName) { if !rateLimitedRequest(extractedUserID, extractedRoleName) {
c.Status(429).SendString("Rate limit exceeded, try again later") c.Status(429).SendString("Rate limit exceeded, try again later")
return nil return nil
@@ -122,20 +145,29 @@ func processGraphQLRequest(c *fiber.Ctx) error {
} }
if parsedResult.shouldIgnore { if parsedResult.shouldIgnore {
cfg.Logger.Debug("Request passed as-is - probably not a GraphQL", nil) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Request passed as-is - probably not a GraphQL",
Pairs: nil,
})
return proxyTheRequest(c, parsedResult.activeEndpoint) return proxyTheRequest(c, parsedResult.activeEndpoint)
} }
calculatedQueryHash := calculateHash(c) calculatedQueryHash := libpack_cache.CalculateHash(c)
if parsedResult.cacheTime > 0 { if parsedResult.cacheTime > 0 {
cfg.Logger.Debug("Cache time set via query", map[string]interface{}{"cacheTime": parsedResult.cacheTime}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache time set via query",
Pairs: map[string]interface{}{"cacheTime": parsedResult.cacheTime},
})
} else { } else {
// If not set via query, try setting via header // If not set via query, try setting via header
cacheQuery := c.Request().Header.Peek("X-Cache-Graphql-Query") cacheQuery := c.Request().Header.Peek("X-Cache-Graphql-Query")
if cacheQuery != nil { if cacheQuery != nil {
parsedResult.cacheTime, _ = strconv.Atoi(string(cacheQuery)) parsedResult.cacheTime, _ = strconv.Atoi(string(cacheQuery))
cfg.Logger.Debug("Cache time set via header", map[string]interface{}{"cacheTime": parsedResult.cacheTime}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache time set via header",
Pairs: map[string]interface{}{"cacheTime": parsedResult.cacheTime},
})
} else { } else {
parsedResult.cacheTime = cfg.Cache.CacheTTL parsedResult.cacheTime = cfg.Cache.CacheTTL
} }
@@ -144,35 +176,53 @@ func processGraphQLRequest(c *fiber.Ctx) error {
wasCached := false wasCached := false
if parsedResult.cacheRefresh { if parsedResult.cacheRefresh {
cfg.Logger.Debug("Cache refresh requested via query", map[string]interface{}{"user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")}) cfg.Logger.Debug(&libpack_logger.LogMessage{
cacheDelete(calculatedQueryHash) Message: "Cache refresh requested via query",
Pairs: map[string]interface{}{"user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")},
})
libpack_cache.CacheDelete(calculatedQueryHash)
} }
// Handling Cache Logic // Handling Cache Logic
if parsedResult.cacheRequest || cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable { if parsedResult.cacheRequest || cfg.Cache.CacheEnable || cfg.Cache.CacheRedisEnable {
cfg.Logger.Debug("Cache enabled", map[string]interface{}{"via_query": parsedResult.cacheRequest, "via_env": cfg.Cache.CacheEnable}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache enabled",
Pairs: map[string]interface{}{"via_query": parsedResult.cacheRequest, "via_env": cfg.Cache.CacheEnable},
})
queryCacheHash = calculatedQueryHash queryCacheHash = calculatedQueryHash
if cachedResponse := cacheLookup(queryCacheHash); cachedResponse != nil { if cachedResponse := libpack_cache.CacheLookup(queryCacheHash); cachedResponse != nil {
cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheHit, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheHit, nil)
cfg.Logger.Debug("Cache hit", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache hit",
Pairs: map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")},
})
c.Request().Header.Add("X-Cache-Hit", "true") c.Request().Header.Add("X-Cache-Hit", "true")
err := c.Send(cachedResponse) err := c.Send(cachedResponse)
if err != nil { if err != nil {
cfg.Logger.Error("Can't send the cached response", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't send the cached response",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't send the cached response - try again later") c.Status(500).SendString("Can't send the cached response - try again later")
} }
wasCached = true wasCached = true
} else { } else {
cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheMiss, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsCacheMiss, nil)
cfg.Logger.Debug("Cache miss", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")}) cfg.Logger.Debug(&libpack_logger.LogMessage{
Message: "Cache miss",
Pairs: map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID, "request_uuid": c.Locals("request_uuid")},
})
proxyAndCacheTheRequest(c, queryCacheHash, parsedResult.cacheTime, parsedResult.activeEndpoint) proxyAndCacheTheRequest(c, queryCacheHash, parsedResult.cacheTime, parsedResult.activeEndpoint)
} }
} else { } else {
err := proxyTheRequest(c, parsedResult.activeEndpoint) err := proxyTheRequest(c, parsedResult.activeEndpoint)
if err != nil { if err != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't proxy the request - try again later") c.Status(500).SendString("Can't proxy the request - try again later")
return nil return nil
@@ -191,12 +241,15 @@ func processGraphQLRequest(c *fiber.Ctx) error {
func proxyAndCacheTheRequest(c *fiber.Ctx, queryCacheHash string, cacheTime int, currentEndpoint string) { func proxyAndCacheTheRequest(c *fiber.Ctx, queryCacheHash string, cacheTime int, currentEndpoint string) {
err := proxyTheRequest(c, currentEndpoint) err := proxyTheRequest(c, currentEndpoint)
if err != nil { if err != nil {
cfg.Logger.Error("Can't proxy the request", map[string]interface{}{"error": err.Error()}) cfg.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't proxy the request",
Pairs: map[string]interface{}{"error": err.Error()},
})
cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsFailed, nil)
c.Status(500).SendString("Can't proxy the request - try again later") c.Status(500).SendString("Can't proxy the request - try again later")
return return
} }
cacheStoreWithTTL(queryCacheHash, c.Response().Body(), time.Duration(cacheTime)*time.Second) libpack_cache.CacheStoreWithTTL(queryCacheHash, c.Response().Body(), time.Duration(cacheTime)*time.Second)
cfg.Monitoring.Increment(libpack_monitoring.MetricsQueriesCached, nil) cfg.Monitoring.Increment(libpack_monitoring.MetricsQueriesCached, nil)
c.Send(c.Response().Body()) c.Send(c.Response().Body())
} }
@@ -210,15 +263,18 @@ func logAndMonitorRequest(c *fiber.Ctx, userID, opType, opName string, wasCached
} }
if cfg.Server.AccessLog { if cfg.Server.AccessLog {
cfg.Logger.Info("Request processed", map[string]interface{}{ cfg.Logger.Info(&libpack_logger.LogMessage{
"ip": c.IP(), Message: "Request processed",
"fwd-ip": string(c.Request().Header.Peek("X-Forwarded-For")), Pairs: map[string]interface{}{
"user_id": userID, "ip": c.IP(),
"op_type": opType, "fwd-ip": string(c.Request().Header.Peek("X-Forwarded-For")),
"op_name": opName, "user_id": userID,
"time": duration, "op_type": opType,
"cache": wasCached, "op_name": opName,
"request_uuid": c.Locals("request_uuid"), "time": duration,
"cache": wasCached,
"request_uuid": c.Locals("request_uuid"),
},
}) })
} }
+2 -2
View File
@@ -9,7 +9,8 @@ import (
// config is a struct that holds the configuration of the application. // config is a struct that holds the configuration of the application.
type config struct { type config struct {
Logger *libpack_logging.LogConfig Logger *libpack_logging.Logger
LogLevel string
Monitoring *libpack_monitoring.MetricsSetup Monitoring *libpack_monitoring.MetricsSetup
Api struct{ BannedUsersFile string } Api struct{ BannedUsersFile string }
Client struct { Client struct {
@@ -32,7 +33,6 @@ type config struct {
Enable bool Enable bool
} }
Cache struct { Cache struct {
Client CacheClient
CacheRedisURL string CacheRedisURL string
CacheRedisPassword string CacheRedisPassword string
CacheTTL int CacheTTL int