Compare commits

...

7 Commits

Author SHA1 Message Date
Chris Clayton 12e4237997 divide long functions, replace strings.builder with bytes.buffer. (#13)
Co-authored-by: Chris Clayton <chris.clayton@contino.io>
2024-06-17 10:23:41 +01:00
lukaszraczylo de31912d2f increase error handling and mutex encapsulation (#12)
* increase error handling and mutex encapsulation

* undo method rename for now

* set cant return error

---------

Co-authored-by: Chris Clayton <chris.clayton@contino.io>
2024-06-15 10:21:49 +01:00
lukaszraczylo e0e9b4278f Release: Improve documentation and number of logs cleaned. 2024-06-12 12:59:54 +01:00
lukaszraczylo 9a7635bd35 fixup! fixup! Add cleaning up action logs as well. 2024-06-12 12:46:13 +01:00
lukaszraczylo e8b07d2e01 fixup! Add cleaning up action logs as well. 2024-06-12 12:27:13 +01:00
lukaszraczylo efdd2de035 Add cleaning up action logs as well. 2024-06-12 12:23:14 +01:00
lukaszraczylo 57d2fd8e80 Update documentation. 2024-06-12 12:12:25 +01:00
5 changed files with 303 additions and 115 deletions
+16
View File
@@ -16,6 +16,8 @@ This project is in active use by [telegram-bot.app](https://telegram-bot.app), a
- [Speed](#speed)
- [Caching](#caching)
- [Read-only endpoint](#read-only-endpoint)
- [Maintenance](#maintenance)
- [Hasura event cleaner](#hasura-event-cleaner)
- [Security](#security)
- [Role-based rate limiting](#role-based-rate-limiting)
- [Read-only mode](#read-only-mode)
@@ -175,6 +177,20 @@ You can now specify the read-only GraphQL endpoint by setting the `HOST_GRAPHQL_
You can check out the [example of combined deployment with RW and read-only hasura](static/kubernetes-single-deployment-with-ro.yaml).
### Maintenance
#### Hasura event cleaner
When enabled via `HASURA_EVENT_CLEANER=true` - proxy needs to have a direct access to the database to execute simple delete queries on schedule. You can specify number of days the logs should be kept for using `HASURA_EVENT_CLEANER_OLDER_THAN`, for example `HASURA_EVENT_CLEANER_OLDER_THAN=14` will keep 14 days of event execution logs. Ticker managing the cleaner routine will be executed every hour.
Following tables are being cleaned:
- `hdb_catalog.event_invocation_logs`
- `hdb_catalog.event_log`
- `hdb_catalog.hdb_action_log`
- `hdb_catalog.hdb_cron_event_invocation_logs`
- `hdb_catalog.hdb_scheduled_event_invocation_logs`
### Security
#### Role-based rate limiting
+46 -28
View File
@@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"io"
"log"
"sync"
"time"
)
@@ -18,7 +19,7 @@ type Cache struct {
decompressPool sync.Pool
entries sync.Map
globalTTL time.Duration
sync.RWMutex
mu sync.RWMutex // Added sync.RWMutex field for locking
}
func New(globalTTL time.Duration) *Cache {
@@ -26,13 +27,11 @@ func New(globalTTL time.Duration) *Cache {
globalTTL: globalTTL,
compressPool: sync.Pool{
New: func() interface{} {
w := gzip.NewWriter(nil)
return w
return gzip.NewWriter(nil)
},
},
decompressPool: sync.Pool{
New: func() interface{} {
// Ensure that new is returning a new reader initialized with an empty byte buffer
r, _ := gzip.NewReader(bytes.NewReader([]byte{}))
return r
},
@@ -53,13 +52,14 @@ func (c *Cache) cleanupRoutine(globalTTL time.Duration) {
}
func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
c.Lock() // use the lock
defer c.Unlock()
c.lock()
defer c.unlock()
expiresAt := time.Now().Add(ttl)
compressedValue, err := c.compress(value)
if err != nil {
log.Printf("Error compressing value for key %s: %v", key, err)
return
}
@@ -71,8 +71,8 @@ func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
}
func (c *Cache) Get(key string) ([]byte, bool) {
c.RLock() // use the read lock
defer c.RUnlock()
c.rlock()
defer c.runlock()
entry, ok := c.entries.Load(key)
if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) {
@@ -81,30 +81,33 @@ func (c *Cache) Get(key string) ([]byte, bool) {
compressedValue := entry.(CacheEntry).Value
value, err := c.decompress(compressedValue)
if err != nil {
log.Printf("Error decompressing value for key %s: %v", key, err)
return nil, false
}
return value, true
}
func (c *Cache) Delete(key string) {
c.Lock()
defer c.Unlock()
_, ok := c.entries.Load(key)
if !ok {
return
}
c.lock()
defer c.unlock()
c.entries.Delete(key)
}
func (c *Cache) Clear() {
c.entries = sync.Map{}
c.lock()
defer c.unlock()
c.entries.Range(func(key, value interface{}) bool {
c.entries.Delete(key)
return true
})
}
func (c *Cache) CountQueries() int {
c.RLock()
defer c.RUnlock()
c.rlock()
defer c.runlock()
var count int
c.entries.Range(func(_, _ interface{}) bool {
count++
@@ -131,27 +134,24 @@ func (c *Cache) compress(data []byte) ([]byte, error) {
func (c *Cache) decompress(data []byte) ([]byte, error) {
r, ok := c.decompressPool.Get().(*gzip.Reader)
if !ok || r == nil {
// If r is nil or type assertion fails, create a new gzip.Reader
var err error
r, err = gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err // Handle the error if gzip.NewReader fails
return nil, err
}
} else {
// Reset the existing reader with new data
if err := r.Reset(bytes.NewReader(data)); err != nil {
return nil, err // Handle the error if Reset fails
return nil, err
}
}
defer r.Close()
defer func() {
r.Close()
c.decompressPool.Put(r)
}()
// Ensure the reader is returned to the pool
defer c.decompressPool.Put(r)
// Read all the data from the reader
decompressedData, err := io.ReadAll(r)
if err != nil {
return nil, err // Handle the error if reading fails
return nil, err
}
return decompressedData, nil
}
@@ -166,3 +166,21 @@ func (c *Cache) CleanExpiredEntries() {
return true
})
}
// Private methods to handle locking
func (c *Cache) lock() {
c.mu.Lock()
}
func (c *Cache) unlock() {
c.mu.Unlock()
}
func (c *Cache) rlock() {
c.mu.RLock()
}
func (c *Cache) runlock() {
c.mu.RUnlock()
}
+14 -9
View File
@@ -19,15 +19,17 @@ func enableHasuraEventCleaner() {
defer ticker.Stop()
cfg.Logger.Info("Event cleaner enabled", map[string]interface{}{"interval_in_days": cfg.HasuraEventCleaner.ClearOlderThan})
go func() {
for {
select {
case <-ticker.C:
cfg.Logger.Info("Cleaning up old events", nil)
cleanEvents()
}
time.Sleep(60 * time.Second) // wait for everything to start and settle down
cfg.Logger.Info("Initial cleanup of old events", nil)
cleanEvents()
for {
select {
case <-ticker.C:
cfg.Logger.Info("Cleaning up old events", nil)
cleanEvents()
}
}()
}
}
}
@@ -42,12 +44,15 @@ func cleanEvents() {
delQueries := []string{
fmt.Sprintf("DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < now() - interval '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
fmt.Sprintf("DELETE FROM hdb_catalog.event_log WHERE created_at < now() - interval '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
fmt.Sprintf("DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - INTERVAL '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
fmt.Sprintf("DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - INTERVAL '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
fmt.Sprintf("DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - INTERVAL '%d days';", cfg.HasuraEventCleaner.ClearOlderThan),
}
for _, query := range delQueries {
_, err := conn.Exec(context.Background(), query)
if err != nil {
cfg.Logger.Error("Failed to execute query", map[string]interface{}{"query": query, "error": err})
cfg.Logger.Debug("Failed to execute query", map[string]interface{}{"query": query, "error": err})
}
}
}
+136 -72
View File
@@ -1,129 +1,193 @@
package libpack_monitoring
import (
"bytes"
"fmt"
"os"
"sort"
"strings"
"sync"
"unicode"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
)
func (ms *MetricsSetup) get_metrics_name(name string, labels map[string]string) (complete_name string) {
// Cache for sorted label keys to avoid repeated sorting
var sortedLabelKeysCache = struct {
sync.RWMutex
m map[string][]string
}{m: make(map[string][]string)}
func (ms *MetricsSetup) get_metrics_name(name string, labels map[string]string) string {
const unknownPodName = "unknown"
var sb strings.Builder
var buf bytes.Buffer
// Prepare default labels without initializing a new map
podName := unknownPodName
if hn, err := os.Hostname(); err == nil {
podName = hn
}
podName := getPodName()
if labels == nil {
labels = map[string]string{
"microservice": libpack_config.PKG_NAME,
"pod": podName,
}
labels = defaultLabels(podName)
} else {
if _, exists := labels["microservice"]; !exists {
labels["microservice"] = libpack_config.PKG_NAME
}
if _, exists := labels["pod"]; !exists {
labels["pod"] = podName
}
ensureDefaultLabels(&labels, podName)
}
// Prefix handling
if ms.metrics_prefix != "" {
sb.WriteString(ms.metrics_prefix)
sb.WriteString("_")
buf.WriteString(ms.metrics_prefix)
buf.WriteString("_")
}
sb.WriteString(name)
buf.WriteString(name)
// Append labels if any
if len(labels) > 0 {
sb.WriteString("{")
buf.WriteString("{")
appendSortedLabels(&buf, labels)
buf.WriteString("}")
}
keys := make([]string, 0, len(labels))
return buf.String()
}
func getPodName() string {
const unknownPodName = "unknown"
if hn, err := os.Hostname(); err == nil {
return hn
}
return unknownPodName
}
func defaultLabels(podName string) map[string]string {
return map[string]string{
"microservice": libpack_config.PKG_NAME,
"pod": podName,
}
}
func ensureDefaultLabels(labels *map[string]string, podName string) {
if *labels == nil {
*labels = make(map[string]string)
}
if _, exists := (*labels)["microservice"]; !exists {
(*labels)["microservice"] = libpack_config.PKG_NAME
}
if _, exists := (*labels)["pod"]; !exists {
(*labels)["pod"] = podName
}
}
func appendSortedLabels(buf *bytes.Buffer, labels map[string]string) {
keys := getSortedKeys(labels)
for i, k := range keys {
if i > 0 {
buf.WriteString(",")
}
buf.WriteString(k)
buf.WriteString("=\"")
buf.WriteString(labels[k])
buf.WriteString("\"")
}
}
func getSortedKeys(labels map[string]string) []string {
labelsKey := labelsToString(labels)
sortedLabelKeysCache.RLock()
keys, exists := sortedLabelKeysCache.m[labelsKey]
sortedLabelKeysCache.RUnlock()
if !exists {
keys = make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
for i, k := range keys {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(k)
sb.WriteString("=\"")
sb.WriteString(labels[k])
sb.WriteString("\"")
}
sb.WriteString("}")
sortedLabelKeysCache.Lock()
sortedLabelKeysCache.m[labelsKey] = keys
sortedLabelKeysCache.Unlock()
}
return keys
}
func labelsToString(labels map[string]string) string {
var sb strings.Builder
for k, v := range labels {
sb.WriteString(k)
sb.WriteString("=")
sb.WriteString(v)
sb.WriteString(";")
}
return sb.String()
}
// validate_metrics_name validates the name of the metric to adhere to the Prometheus naming conventions
// https://prometheus.io/docs/practices/naming/
func validate_metrics_name(name string) error {
var sb strings.Builder // Use strings.Builder for efficient string concatenation
cleanedName := clean_metric_name(name)
// Track if the last character was an underscore to avoid duplicate underscores
lastWasUnderscore := false
for _, r := range name {
// Convert spaces to underscores and skip non-alphanumeric characters except underscores
if r == ' ' || (unicode.IsLetter(r) || unicode.IsDigit(r) || r == '_') {
if r == ' ' || r == '_' {
if lastWasUnderscore {
continue // Skip if the previous character was also an underscore
}
r = '_' // Convert spaces to underscores
lastWasUnderscore = true
} else {
lastWasUnderscore = false
}
sb.WriteRune(r) // Add valid characters to the builder
}
}
// Trim leading and trailing underscores
name_new := strings.Trim(sb.String(), "_")
finalName := strings.Trim(cleanedName, "_")
// Check if the processed name matches the original input
if name_new != name {
return fmt.Errorf("Invalid metric name: %s, expected %s", name, name_new)
if finalName != name {
return fmt.Errorf("Invalid metric name: %s, expected %s", name, finalName)
}
return nil
}
func compile_metrics_with_labels(name string, labels map[string]string) string {
var totalLength int
totalLength += len(name)
for k, v := range labels {
totalLength += len(k) + len(v) + 2
// clean_metric_name processes the metric name according to Prometheus naming conventions
func clean_metric_name(name string) string {
var buf bytes.Buffer
lastWasUnderscore := false
for _, r := range name {
if is_allowed_rune(r) {
if is_special_rune(r) {
if lastWasUnderscore {
continue // Skip if the previous character was also an underscore
}
r = '_' // Convert spaces and special characters to underscores
lastWasUnderscore = true
} else {
lastWasUnderscore = false
}
buf.WriteRune(r)
} else if !lastWasUnderscore {
buf.WriteRune('_')
lastWasUnderscore = true
}
}
var sb strings.Builder
sb.Grow(totalLength + 1)
// Remove trailing underscore
result := buf.String()
return strings.Trim(result, "_")
}
sb.WriteString(name)
// is_allowed_rune checks if the rune is allowed in the metric name
func is_allowed_rune(r rune) bool {
return unicode.IsLetter(r) || unicode.IsDigit(r) || r == ' ' || r == '_'
}
// is_special_rune checks if the rune is a space or an underscore
func is_special_rune(r rune) bool {
return r == ' ' || r == '_'
}
func compile_metrics_with_labels(name string, labels map[string]string) string {
var buf bytes.Buffer
buf.WriteString(name)
// Collect keys and sort them
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
keys := getSortedKeys(labels)
// Append sorted key-value pairs to the builder
// Append sorted key-value pairs to the buffer
for _, k := range keys {
sb.WriteString("_")
sb.WriteString(k)
sb.WriteString("_")
sb.WriteString(labels[k])
buf.WriteString("_")
buf.WriteString(k)
buf.WriteString("_")
buf.WriteString(labels[k])
}
return sb.String()
return buf.String()
}
+91 -6
View File
@@ -1,7 +1,6 @@
package libpack_monitoring
import (
"os"
"testing"
libpack_config "github.com/lukaszraczylo/graphql-monitoring-proxy/config"
@@ -134,10 +133,96 @@ func TestValidateMetricsName(t *testing.T) {
}
}
func getPodName() string {
podName, err := os.Hostname()
if err != nil {
return "unknown"
func TestCleanMetricName(t *testing.T) {
tests := []struct {
input string
expected string
}{
{"valid metric name", "valid_metric_name"},
{"valid@metric#name!", "valid_metric_name"},
{"__valid__metric__name__", "valid_metric_name"},
{" valid metric name ", "valid_metric_name"},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
assert.Equal(t, tt.expected, clean_metric_name(tt.input))
})
}
}
func TestDefaultLabels(t *testing.T) {
podName := "test-pod"
libpack_config.PKG_NAME = "example_microservice"
expected := map[string]string{
"microservice": "example_microservice",
"pod": podName,
}
assert.Equal(t, expected, defaultLabels(podName))
}
func TestEnsureDefaultLabels(t *testing.T) {
podName := "test-pod"
libpack_config.PKG_NAME = "example_microservice"
tests := []struct {
name string
inputLabels map[string]string
expectedLabels map[string]string
}{
{
name: "Nil labels",
inputLabels: nil,
expectedLabels: map[string]string{"microservice": "example_microservice", "pod": podName},
},
{
name: "Empty labels",
inputLabels: map[string]string{},
expectedLabels: map[string]string{"microservice": "example_microservice", "pod": podName},
},
{
name: "Partial labels",
inputLabels: map[string]string{"microservice": "test_service"},
expectedLabels: map[string]string{"microservice": "test_service", "pod": podName},
},
{
name: "Complete labels",
inputLabels: map[string]string{"microservice": "test_service", "pod": "custom_pod"},
expectedLabels: map[string]string{"microservice": "test_service", "pod": "custom_pod"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ensureDefaultLabels(&tt.inputLabels, podName)
assert.Equal(t, tt.expectedLabels, tt.inputLabels)
})
}
}
func TestLabelsToString(t *testing.T) {
tests := []struct {
labels map[string]string
expected string
}{
{
labels: map[string]string{"key1": "value1", "key2": "value2"},
expected: "key1=value1;key2=value2;",
},
{
labels: map[string]string{"a": "1", "b": "2"},
expected: "a=1;b=2;",
},
{
labels: map[string]string{},
expected: "",
},
}
for _, tt := range tests {
t.Run(tt.expected, func(t *testing.T) {
assert.Equal(t, tt.expected, labelsToString(tt.labels))
})
}
return podName
}