mirror of
https://github.com/lukaszraczylo/graphql-monitoring-proxy.git
synced 2026-06-05 23:03:48 +00:00
Add tests (#2)
* Initial tests draft. * Add tests for parsing jwt token * Further code optimisations
This commit is contained in:
@@ -42,70 +42,90 @@ func healthCheck(c *fiber.Ctx) error {
|
||||
}
|
||||
|
||||
func processGraphQLRequest(c *fiber.Ctx) error {
|
||||
t := time.Now()
|
||||
startTime := time.Now()
|
||||
|
||||
var extracted_user_id string = "-"
|
||||
var extracted_role_name string = "-"
|
||||
var query_cache_hash string
|
||||
// Initialize variables with default values
|
||||
extractedUserID := "-"
|
||||
extractedRoleName := "-"
|
||||
var queryCacheHash string
|
||||
|
||||
authorization := c.Request().Header.Peek("Authorization")
|
||||
if authorization != nil && (len(cfg.Client.JWTUserClaimPath) > 0 || len(cfg.Client.JWTRoleClaimPath) > 0) {
|
||||
extracted_user_id, extracted_role_name = extractClaimsFromJWTHeader(string(authorization))
|
||||
extractedUserID, extractedRoleName = extractClaimsFromJWTHeader(string(authorization))
|
||||
}
|
||||
|
||||
// Implementing rate limiting if enabled
|
||||
if cfg.Client.JWTRoleRateLimit {
|
||||
cfg.Logger.Debug("Rate limiting enabled", map[string]interface{}{"user_id": extracted_user_id, "role_name": extracted_role_name})
|
||||
if !rateLimitedRequest(extracted_user_id, extracted_role_name) {
|
||||
cfg.Logger.Debug("Rate limiting enabled", map[string]interface{}{"user_id": extractedUserID, "role_name": extractedRoleName})
|
||||
if !rateLimitedRequest(extractedUserID, extractedRoleName) {
|
||||
c.Status(429).SendString("Rate limit exceeded, try again later")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
opType, opName, cache_from_query, should_block := parseGraphQLQuery(c)
|
||||
|
||||
if should_block {
|
||||
opType, opName, cacheFromQuery, shouldBlock := parseGraphQLQuery(c)
|
||||
if shouldBlock {
|
||||
return nil
|
||||
}
|
||||
|
||||
was_cached := false
|
||||
wasCached := false
|
||||
|
||||
if cache_from_query || cfg.Cache.CacheEnable {
|
||||
cfg.Logger.Debug("Cache enabled", map[string]interface{}{"via_query": cache_from_query, "via_env": cfg.Cache.CacheEnable})
|
||||
query_cache_hash = calculateHash(c)
|
||||
cachedResponse := cacheLookup(query_cache_hash)
|
||||
if cachedResponse != nil {
|
||||
cfg.Logger.Debug("Cache hit", map[string]interface{}{"hash": query_cache_hash, "user_id": extracted_user_id})
|
||||
// Handling Cache Logic
|
||||
if cacheFromQuery || cfg.Cache.CacheEnable {
|
||||
cfg.Logger.Debug("Cache enabled", map[string]interface{}{"via_query": cacheFromQuery, "via_env": cfg.Cache.CacheEnable})
|
||||
queryCacheHash = calculateHash(c)
|
||||
|
||||
if cachedResponse := cacheLookup(queryCacheHash); cachedResponse != nil {
|
||||
cfg.Logger.Debug("Cache hit", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID})
|
||||
c.Send(cachedResponse)
|
||||
was_cached = true
|
||||
wasCached = true
|
||||
} else {
|
||||
cfg.Logger.Debug("Cache miss", map[string]interface{}{"hash": query_cache_hash, "user_id": extracted_user_id})
|
||||
proxyTheRequest(c)
|
||||
cfg.Cache.CacheClient.Set(query_cache_hash, c.Response().Body(), time.Duration(cfg.Cache.CacheTTL)*time.Second)
|
||||
c.Send(c.Response().Body())
|
||||
cfg.Logger.Debug("Cache miss", map[string]interface{}{"hash": queryCacheHash, "user_id": extractedUserID})
|
||||
proxyAndCacheTheRequest(c, queryCacheHash)
|
||||
}
|
||||
} else {
|
||||
proxyTheRequest(c)
|
||||
}
|
||||
time_taken := time.Since(t)
|
||||
|
||||
if cfg.Server.AccessLog {
|
||||
cfg.Logger.Info("Request processed", map[string]interface{}{"ip": c.IP(), "user_id": extracted_user_id, "op_type": opType, "op_name": opName, "time": time_taken, "cache": was_cached})
|
||||
}
|
||||
cfg.Monitoring.Increment(libpack_monitoring.MetricsSucceeded, nil)
|
||||
timeTaken := time.Since(startTime)
|
||||
|
||||
// Logging & Monitoring
|
||||
logAndMonitorRequest(c, extractedUserID, opType, opName, wasCached, timeTaken, startTime)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Additional helper function to avoid code repetition
|
||||
func proxyAndCacheTheRequest(c *fiber.Ctx, queryCacheHash string) {
|
||||
proxyTheRequest(c)
|
||||
cfg.Cache.CacheClient.Set(queryCacheHash, c.Response().Body(), time.Duration(cfg.Cache.CacheTTL)*time.Second)
|
||||
c.Send(c.Response().Body())
|
||||
}
|
||||
|
||||
func logAndMonitorRequest(c *fiber.Ctx, userID, opType, opName string, wasCached bool, duration time.Duration, startTime time.Time) {
|
||||
labels := map[string]string{
|
||||
"op_type": opType,
|
||||
"op_name": opName,
|
||||
"cached": fmt.Sprintf("%t", was_cached),
|
||||
"user_id": extracted_user_id,
|
||||
"cached": fmt.Sprintf("%t", wasCached),
|
||||
"user_id": userID,
|
||||
}
|
||||
|
||||
if cfg.Server.AccessLog {
|
||||
cfg.Logger.Info("Request processed", map[string]interface{}{
|
||||
"ip": c.IP(),
|
||||
"user_id": userID,
|
||||
"op_type": opType,
|
||||
"op_name": opName,
|
||||
"time": duration,
|
||||
"cache": wasCached,
|
||||
})
|
||||
}
|
||||
|
||||
cfg.Monitoring.Increment(libpack_monitoring.MetricsSucceeded, nil)
|
||||
cfg.Monitoring.Increment("executed_query", labels)
|
||||
|
||||
if !was_cached {
|
||||
cfg.Monitoring.UpdateDuration("timed_query", labels, t)
|
||||
cfg.Monitoring.Update("timed_query", labels, float64(time_taken.Milliseconds()))
|
||||
if !wasCached {
|
||||
cfg.Monitoring.UpdateDuration("timed_query", labels, startTime)
|
||||
cfg.Monitoring.Update("timed_query", labels, float64(duration.Milliseconds()))
|
||||
}
|
||||
// // cfg.Monitoring.Set("timed_query", time_taken.Milliseconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user