Files
gohoarder/pkg/metadata/gormstore/aggregation_worker.go
T
lukaszraczylo c0061b99e3 chore(schema): migrate to GORM V2 with multi-database support
- [x] Implement GORM V2 metadata store with SQLite, PostgreSQL, and MySQL support
- [x] Add database migration system using gormigrate for schema versioning
- [x] Create migration CLI tool with support for migrate, rollback, and status commands
- [x] Add Docker support for migration container (Dockerfile.migrate)
- [x] Implement automatic partition management for PostgreSQL time-series tables
- [x] Add background aggregation worker for download statistics
- [x] Support connection pooling configuration (max_open_conns, max_idle_conns, conn_max_lifetime)
- [x] Add blocking mechanism based on vulnerability thresholds in stats and handlers
- [x] Update Helm charts with migration init containers and multi-database configuration
- [x] Replace deprecated SQLite store with optimized GORM implementation
- [x] Add comprehensive integration tests for MySQL and PostgreSQL
- [x] Update frontend to display blocked packages and storage utilization
- [x] Add goreleaser configuration for migrate binary and container image
- [x] Update configuration examples with database backend options and recommendations
2026-01-03 20:44:23 +00:00

358 lines
11 KiB
Go

package gormstore
import (
"time"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
)
// AggregationWorker handles background aggregation of download statistics
type AggregationWorker struct {
db *gorm.DB
stopChan chan struct{}
ticker *time.Ticker
}
// NewAggregationWorker creates a new aggregation worker
func NewAggregationWorker(db *gorm.DB) *AggregationWorker {
return &AggregationWorker{
db: db,
stopChan: make(chan struct{}),
ticker: time.NewTicker(1 * time.Hour), // Run every hour
}
}
// Start begins the aggregation worker
func (w *AggregationWorker) Start() {
log.Info().Msg("Starting aggregation worker")
// Run immediately on start
if err := w.AggregateHourly(); err != nil {
log.Error().Err(err).Msg("Failed to run initial hourly aggregation")
}
for {
select {
case <-w.ticker.C:
if err := w.AggregateHourly(); err != nil {
log.Error().Err(err).Msg("Failed to aggregate hourly stats")
}
// Check if it's time for daily aggregation (run at midnight)
now := time.Now()
if now.Hour() == 0 {
if err := w.AggregateDaily(); err != nil {
log.Error().Err(err).Msg("Failed to aggregate daily stats")
}
}
case <-w.stopChan:
log.Info().Msg("Stopping aggregation worker")
w.ticker.Stop()
return
}
}
}
// Stop stops the aggregation worker
func (w *AggregationWorker) Stop() {
close(w.stopChan)
}
// AggregateHourly aggregates download events into hourly stats
func (w *AggregationWorker) AggregateHourly() error {
startTime := time.Now()
log.Debug().Msg("Starting hourly aggregation")
// Get dialect name
dialectName := w.db.Dialector.Name()
// Calculate cutoff time (aggregate events older than 5 minutes to avoid partial data)
cutoff := time.Now().Add(-5 * time.Minute).Truncate(time.Hour)
return w.db.Transaction(func(tx *gorm.DB) error {
var aggregateSQL string
switch dialectName {
case "postgres":
// PostgreSQL: Use date_trunc for time bucketing
aggregateSQL = `
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
SELECT
de.registry_id,
de.package_id,
date_trunc('hour', de.downloaded_at) AS time_bucket,
COUNT(*) AS download_count,
COUNT(DISTINCT de.ip_address) AS unique_ips,
COUNT(*) FILTER (WHERE de.authenticated = true) AS auth_downloads,
NOW() AS created_at,
NOW() AS updated_at
FROM download_events de
WHERE de.downloaded_at < ?
GROUP BY de.registry_id, de.package_id, time_bucket
ON CONFLICT (registry_id, COALESCE(package_id, 0), time_bucket)
DO UPDATE SET
download_count = download_stats_hourly.download_count + EXCLUDED.download_count,
unique_ips = GREATEST(download_stats_hourly.unique_ips, EXCLUDED.unique_ips),
auth_downloads = download_stats_hourly.auth_downloads + EXCLUDED.auth_downloads,
updated_at = NOW()
`
case "mysql":
// MySQL: Use DATE_FORMAT for time bucketing
aggregateSQL = `
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
SELECT
de.registry_id,
de.package_id,
DATE_FORMAT(de.downloaded_at, '%Y-%m-%d %H:00:00') AS time_bucket,
COUNT(*) AS download_count,
COUNT(DISTINCT de.ip_address) AS unique_ips,
SUM(CASE WHEN de.authenticated = true THEN 1 ELSE 0 END) AS auth_downloads,
NOW() AS created_at,
NOW() AS updated_at
FROM download_events de
WHERE de.downloaded_at < ?
GROUP BY de.registry_id, de.package_id, time_bucket
ON DUPLICATE KEY UPDATE
download_count = download_stats_hourly.download_count + VALUES(download_count),
unique_ips = GREATEST(download_stats_hourly.unique_ips, VALUES(unique_ips)),
auth_downloads = download_stats_hourly.auth_downloads + VALUES(auth_downloads),
updated_at = NOW()
`
default: // SQLite
// SQLite: Use strftime for time bucketing
// Note: SQLite doesn't support UPSERT as elegantly, need to handle separately
aggregateSQL = `
INSERT OR REPLACE INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
SELECT
de.registry_id,
de.package_id,
strftime('%Y-%m-%d %H:00:00', de.downloaded_at) AS time_bucket,
COUNT(*) AS download_count,
COUNT(DISTINCT de.ip_address) AS unique_ips,
SUM(CASE WHEN de.authenticated = 1 THEN 1 ELSE 0 END) AS auth_downloads,
datetime('now') AS created_at,
datetime('now') AS updated_at
FROM download_events de
WHERE de.downloaded_at < ?
GROUP BY de.registry_id, de.package_id, time_bucket
`
}
// Execute aggregation
if err := tx.Exec(aggregateSQL, cutoff).Error; err != nil {
return err
}
// Delete aggregated events (older than 24 hours to keep recent data for debugging)
deleteOlder := time.Now().Add(-24 * time.Hour)
deleteResult := tx.Exec("DELETE FROM download_events WHERE downloaded_at < ?", deleteOlder)
if deleteResult.Error != nil {
return deleteResult.Error
}
// Also update package-level stats (NULL package_id = registry totals)
var registryAggSQL string
if dialectName == "postgres" {
registryAggSQL = `
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
SELECT
registry_id,
NULL as package_id,
time_bucket,
SUM(download_count) as download_count,
SUM(unique_ips) as unique_ips,
SUM(auth_downloads) as auth_downloads,
NOW() as created_at,
NOW() as updated_at
FROM download_stats_hourly
WHERE package_id IS NOT NULL
GROUP BY registry_id, time_bucket
ON CONFLICT (registry_id, COALESCE(package_id, 0), time_bucket)
DO UPDATE SET
download_count = EXCLUDED.download_count,
unique_ips = EXCLUDED.unique_ips,
auth_downloads = EXCLUDED.auth_downloads,
updated_at = NOW()
`
} else if dialectName == "mysql" {
registryAggSQL = `
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
SELECT
registry_id,
NULL as package_id,
time_bucket,
SUM(download_count) as download_count,
SUM(unique_ips) as unique_ips,
SUM(auth_downloads) as auth_downloads,
NOW() as created_at,
NOW() as updated_at
FROM download_stats_hourly
WHERE package_id IS NOT NULL
GROUP BY registry_id, time_bucket
ON DUPLICATE KEY UPDATE
download_count = VALUES(download_count),
unique_ips = VALUES(unique_ips),
auth_downloads = VALUES(auth_downloads),
updated_at = NOW()
`
} else {
// SQLite
registryAggSQL = `
INSERT OR REPLACE INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
SELECT
registry_id,
NULL as package_id,
time_bucket,
SUM(download_count) as download_count,
SUM(unique_ips) as unique_ips,
SUM(auth_downloads) as auth_downloads,
datetime('now') as created_at,
datetime('now') as updated_at
FROM download_stats_hourly
WHERE package_id IS NOT NULL
GROUP BY registry_id, time_bucket
`
}
if err := tx.Exec(registryAggSQL).Error; err != nil {
log.Warn().Err(err).Msg("Failed to aggregate registry totals (continuing anyway)")
}
elapsed := time.Since(startTime)
log.Info().
Int64("deleted_events", deleteResult.RowsAffected).
Dur("duration", elapsed).
Msg("Completed hourly aggregation")
return nil
})
}
// AggregateDaily aggregates hourly stats into daily stats
func (w *AggregationWorker) AggregateDaily() error {
startTime := time.Now()
log.Debug().Msg("Starting daily aggregation")
dialectName := w.db.Dialector.Name()
// Aggregate yesterday's data
yesterday := time.Now().AddDate(0, 0, -1).Truncate(24 * time.Hour)
dayEnd := yesterday.Add(24 * time.Hour)
return w.db.Transaction(func(tx *gorm.DB) error {
var aggregateSQL string
switch dialectName {
case "postgres":
aggregateSQL = `
INSERT INTO download_stats_daily (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, top_user_agents, created_at, updated_at)
SELECT
registry_id,
package_id,
date_trunc('day', time_bucket) AS time_bucket,
SUM(download_count) AS download_count,
MAX(unique_ips) AS unique_ips,
SUM(auth_downloads) AS auth_downloads,
'{}' AS top_user_agents,
NOW() AS created_at,
NOW() AS updated_at
FROM download_stats_hourly
WHERE time_bucket >= ? AND time_bucket < ?
GROUP BY registry_id, package_id, date_trunc('day', time_bucket)
ON CONFLICT (registry_id, COALESCE(package_id, 0), time_bucket)
DO UPDATE SET
download_count = EXCLUDED.download_count,
unique_ips = EXCLUDED.unique_ips,
auth_downloads = EXCLUDED.auth_downloads,
updated_at = NOW()
`
case "mysql":
aggregateSQL = `
INSERT INTO download_stats_daily (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, top_user_agents, created_at, updated_at)
SELECT
registry_id,
package_id,
DATE_FORMAT(time_bucket, '%Y-%m-%d 00:00:00') AS time_bucket,
SUM(download_count) AS download_count,
MAX(unique_ips) AS unique_ips,
SUM(auth_downloads) AS auth_downloads,
'{}' AS top_user_agents,
NOW() AS created_at,
NOW() AS updated_at
FROM download_stats_hourly
WHERE time_bucket >= ? AND time_bucket < ?
GROUP BY registry_id, package_id, DATE_FORMAT(time_bucket, '%Y-%m-%d 00:00:00')
ON DUPLICATE KEY UPDATE
download_count = VALUES(download_count),
unique_ips = VALUES(unique_ips),
auth_downloads = VALUES(auth_downloads),
updated_at = NOW()
`
default: // SQLite
aggregateSQL = `
INSERT OR REPLACE INTO download_stats_daily (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, top_user_agents, created_at, updated_at)
SELECT
registry_id,
package_id,
date(time_bucket) AS time_bucket,
SUM(download_count) AS download_count,
MAX(unique_ips) AS unique_ips,
SUM(auth_downloads) AS auth_downloads,
'{}' AS top_user_agents,
datetime('now') AS created_at,
datetime('now') AS updated_at
FROM download_stats_hourly
WHERE time_bucket >= ? AND time_bucket < ?
GROUP BY registry_id, package_id, date(time_bucket)
`
}
if err := tx.Exec(aggregateSQL, yesterday, dayEnd).Error; err != nil {
return err
}
// Delete old hourly stats (keep last 7 days)
deleteOlder := time.Now().AddDate(0, 0, -7)
deleteResult := tx.Exec("DELETE FROM download_stats_hourly WHERE time_bucket < ?", deleteOlder)
if deleteResult.Error != nil {
return deleteResult.Error
}
elapsed := time.Since(startTime)
log.Info().
Int64("deleted_hourly_stats", deleteResult.RowsAffected).
Dur("duration", elapsed).
Msg("Completed daily aggregation")
return nil
})
}
// UpdatePackageAccessCounts synchronizes package access_count from download stats
func (w *AggregationWorker) UpdatePackageAccessCounts() error {
log.Debug().Msg("Updating package access counts")
// Update from download_stats_hourly (sum all-time downloads per package)
updateSQL := `
UPDATE packages p
SET access_count = COALESCE((
SELECT SUM(download_count)
FROM download_stats_hourly dsh
WHERE dsh.package_id = p.id
), 0)
`
if err := w.db.Exec(updateSQL).Error; err != nil {
return err
}
log.Info().Msg("Updated package access counts")
return nil
}