mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-13 02:17:35 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| de31912d2f | |||
| e0e9b4278f | |||
| 9a7635bd35 | |||
| e8b07d2e01 | |||
| efdd2de035 | |||
| 57d2fd8e80 |
@@ -16,6 +16,8 @@ This project is in active use by [telegram-bot.app](https://telegram-bot.app), a
|
||||
- [Speed](#speed)
|
||||
- [Caching](#caching)
|
||||
- [Read-only endpoint](#read-only-endpoint)
|
||||
- [Maintenance](#maintenance)
|
||||
- [Hasura event cleaner](#hasura-event-cleaner)
|
||||
- [Security](#security)
|
||||
- [Role-based rate limiting](#role-based-rate-limiting)
|
||||
- [Read-only mode](#read-only-mode)
|
||||
@@ -175,6 +177,20 @@ You can now specify the read-only GraphQL endpoint by setting the `HOST_GRAPHQL_
|
||||
|
||||
You can check out the [example of combined deployment with RW and read-only hasura](static/kubernetes-single-deployment-with-ro.yaml).
|
||||
|
||||
### Maintenance
|
||||
|
||||
#### Hasura event cleaner
|
||||
|
||||
When enabled via `HASURA_EVENT_CLEANER=true` - proxy needs to have a direct access to the database to execute simple delete queries on schedule. You can specify number of days the logs should be kept for using `HASURA_EVENT_CLEANER_OLDER_THAN`, for example `HASURA_EVENT_CLEANER_OLDER_THAN=14` will keep 14 days of event execution logs. Ticker managing the cleaner routine will be executed every hour.
|
||||
|
||||
Following tables are being cleaned:
|
||||
- `hdb_catalog.event_invocation_logs`
|
||||
- `hdb_catalog.event_log`
|
||||
- `hdb_catalog.hdb_action_log`
|
||||
- `hdb_catalog.hdb_cron_event_invocation_logs`
|
||||
- `hdb_catalog.hdb_scheduled_event_invocation_logs`
|
||||
|
||||
|
||||
### Security
|
||||
|
||||
#### Role-based rate limiting
|
||||
|
||||
Vendored
+46
-28
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -18,7 +19,7 @@ type Cache struct {
|
||||
decompressPool sync.Pool
|
||||
entries sync.Map
|
||||
globalTTL time.Duration
|
||||
sync.RWMutex
|
||||
mu sync.RWMutex // Added sync.RWMutex field for locking
|
||||
}
|
||||
|
||||
func New(globalTTL time.Duration) *Cache {
|
||||
@@ -26,13 +27,11 @@ func New(globalTTL time.Duration) *Cache {
|
||||
globalTTL: globalTTL,
|
||||
compressPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
w := gzip.NewWriter(nil)
|
||||
return w
|
||||
return gzip.NewWriter(nil)
|
||||
},
|
||||
},
|
||||
decompressPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
// Ensure that new is returning a new reader initialized with an empty byte buffer
|
||||
r, _ := gzip.NewReader(bytes.NewReader([]byte{}))
|
||||
return r
|
||||
},
|
||||
@@ -53,13 +52,14 @@ func (c *Cache) cleanupRoutine(globalTTL time.Duration) {
|
||||
}
|
||||
|
||||
func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
|
||||
c.Lock() // use the lock
|
||||
defer c.Unlock()
|
||||
c.lock()
|
||||
defer c.unlock()
|
||||
|
||||
expiresAt := time.Now().Add(ttl)
|
||||
|
||||
compressedValue, err := c.compress(value)
|
||||
if err != nil {
|
||||
log.Printf("Error compressing value for key %s: %v", key, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -71,8 +71,8 @@ func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
|
||||
}
|
||||
|
||||
func (c *Cache) Get(key string) ([]byte, bool) {
|
||||
c.RLock() // use the read lock
|
||||
defer c.RUnlock()
|
||||
c.rlock()
|
||||
defer c.runlock()
|
||||
|
||||
entry, ok := c.entries.Load(key)
|
||||
if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) {
|
||||
@@ -81,30 +81,33 @@ func (c *Cache) Get(key string) ([]byte, bool) {
|
||||
compressedValue := entry.(CacheEntry).Value
|
||||
value, err := c.decompress(compressedValue)
|
||||
if err != nil {
|
||||
log.Printf("Error decompressing value for key %s: %v", key, err)
|
||||
return nil, false
|
||||
}
|
||||
return value, true
|
||||
}
|
||||
|
||||
func (c *Cache) Delete(key string) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
_, ok := c.entries.Load(key)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.lock()
|
||||
defer c.unlock()
|
||||
|
||||
c.entries.Delete(key)
|
||||
}
|
||||
|
||||
func (c *Cache) Clear() {
|
||||
c.entries = sync.Map{}
|
||||
c.lock()
|
||||
defer c.unlock()
|
||||
|
||||
c.entries.Range(func(key, value interface{}) bool {
|
||||
c.entries.Delete(key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cache) CountQueries() int {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
c.rlock()
|
||||
defer c.runlock()
|
||||
|
||||
var count int
|
||||
c.entries.Range(func(_, _ interface{}) bool {
|
||||
count++
|
||||
@@ -131,27 +134,24 @@ func (c *Cache) compress(data []byte) ([]byte, error) {
|
||||
func (c *Cache) decompress(data []byte) ([]byte, error) {
|
||||
r, ok := c.decompressPool.Get().(*gzip.Reader)
|
||||
if !ok || r == nil {
|
||||
// If r is nil or type assertion fails, create a new gzip.Reader
|
||||
var err error
|
||||
r, err = gzip.NewReader(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return nil, err // Handle the error if gzip.NewReader fails
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// Reset the existing reader with new data
|
||||
if err := r.Reset(bytes.NewReader(data)); err != nil {
|
||||
return nil, err // Handle the error if Reset fails
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
defer r.Close()
|
||||
defer func() {
|
||||
r.Close()
|
||||
c.decompressPool.Put(r)
|
||||
}()
|
||||
|
||||
// Ensure the reader is returned to the pool
|
||||
defer c.decompressPool.Put(r)
|
||||
|
||||
// Read all the data from the reader
|
||||
decompressedData, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, err // Handle the error if reading fails
|
||||
return nil, err
|
||||
}
|
||||
return decompressedData, nil
|
||||
}
|
||||
@@ -166,3 +166,21 @@ func (c *Cache) CleanExpiredEntries() {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// Private methods to handle locking
|
||||
|
||||
func (c *Cache) lock() {
|
||||
c.mu.Lock()
|
||||
}
|
||||
|
||||
func (c *Cache) unlock() {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Cache) rlock() {
|
||||
c.mu.RLock()
|
||||
}
|
||||
|
||||
func (c *Cache) runlock() {
|
||||
c.mu.RUnlock()
|
||||
}
|
||||
|
||||
@@ -19,15 +19,17 @@ func enableHasuraEventCleaner() {
|
||||
defer ticker.Stop()
|
||||
cfg.Logger.Info("Event cleaner enabled", map[string]interface{}{"interval_in_days": cfg.HasuraEventCleaner.ClearOlderThan})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
cfg.Logger.Info("Cleaning up old events", nil)
|
||||
cleanEvents()
|
||||
}
|
||||
time.Sleep(60 * time.Second) // wait for everything to start and settle down
|
||||
cfg.Logger.Info("Initial cleanup of old events", nil)
|
||||
cleanEvents()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
cfg.Logger.Info("Cleaning up old events", nil)
|
||||
cleanEvents()
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,12 +44,15 @@ func cleanEvents() {
|
||||
delQueries := []string{
|
||||
fmt.Sprintf("DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < now() - interval '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
|
||||
fmt.Sprintf("DELETE FROM hdb_catalog.event_log WHERE created_at < now() - interval '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
|
||||
fmt.Sprintf("DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - INTERVAL '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
|
||||
fmt.Sprintf("DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - INTERVAL '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
|
||||
fmt.Sprintf("DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - INTERVAL '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
|
||||
}
|
||||
|
||||
for _, query := range delQueries {
|
||||
_, err := conn.Exec(context.Background(), query)
|
||||
if err != nil {
|
||||
cfg.Logger.Error("Failed to execute query", map[string]interface{}{"query": query, "error": err})
|
||||
cfg.Logger.Debug("Failed to execute query", map[string]interface{}{"query": query, "error": err})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user