mirror of
https://github.com/lukaszraczylo/gohoarder.git
synced 2026-06-05 22:53:53 +00:00
c0061b99e3
- [x] Implement GORM V2 metadata store with SQLite, PostgreSQL, and MySQL support - [x] Add database migration system using gormigrate for schema versioning - [x] Create migration CLI tool with support for migrate, rollback, and status commands - [x] Add Docker support for migration container (Dockerfile.migrate) - [x] Implement automatic partition management for PostgreSQL time-series tables - [x] Add background aggregation worker for download statistics - [x] Support connection pooling configuration (max_open_conns, max_idle_conns, conn_max_lifetime) - [x] Add blocking mechanism based on vulnerability thresholds in stats and handlers - [x] Update Helm charts with migration init containers and multi-database configuration - [x] Replace deprecated SQLite store with optimized GORM implementation - [x] Add comprehensive integration tests for MySQL and PostgreSQL - [x] Update frontend to display blocked packages and storage utilization - [x] Add goreleaser configuration for migrate binary and container image - [x] Update configuration examples with database backend options and recommendations
358 lines
11 KiB
Go
358 lines
11 KiB
Go
package gormstore
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// AggregationWorker handles background aggregation of download statistics
|
|
type AggregationWorker struct {
|
|
db *gorm.DB
|
|
stopChan chan struct{}
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
// NewAggregationWorker creates a new aggregation worker
|
|
func NewAggregationWorker(db *gorm.DB) *AggregationWorker {
|
|
return &AggregationWorker{
|
|
db: db,
|
|
stopChan: make(chan struct{}),
|
|
ticker: time.NewTicker(1 * time.Hour), // Run every hour
|
|
}
|
|
}
|
|
|
|
// Start begins the aggregation worker
|
|
func (w *AggregationWorker) Start() {
|
|
log.Info().Msg("Starting aggregation worker")
|
|
|
|
// Run immediately on start
|
|
if err := w.AggregateHourly(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to run initial hourly aggregation")
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-w.ticker.C:
|
|
if err := w.AggregateHourly(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to aggregate hourly stats")
|
|
}
|
|
|
|
// Check if it's time for daily aggregation (run at midnight)
|
|
now := time.Now()
|
|
if now.Hour() == 0 {
|
|
if err := w.AggregateDaily(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to aggregate daily stats")
|
|
}
|
|
}
|
|
|
|
case <-w.stopChan:
|
|
log.Info().Msg("Stopping aggregation worker")
|
|
w.ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops the aggregation worker
|
|
func (w *AggregationWorker) Stop() {
|
|
close(w.stopChan)
|
|
}
|
|
|
|
// AggregateHourly aggregates download events into hourly stats
|
|
func (w *AggregationWorker) AggregateHourly() error {
|
|
startTime := time.Now()
|
|
log.Debug().Msg("Starting hourly aggregation")
|
|
|
|
// Get dialect name
|
|
dialectName := w.db.Dialector.Name()
|
|
|
|
// Calculate cutoff time (aggregate events older than 5 minutes to avoid partial data)
|
|
cutoff := time.Now().Add(-5 * time.Minute).Truncate(time.Hour)
|
|
|
|
return w.db.Transaction(func(tx *gorm.DB) error {
|
|
var aggregateSQL string
|
|
|
|
switch dialectName {
|
|
case "postgres":
|
|
// PostgreSQL: Use date_trunc for time bucketing
|
|
aggregateSQL = `
|
|
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
|
|
SELECT
|
|
de.registry_id,
|
|
de.package_id,
|
|
date_trunc('hour', de.downloaded_at) AS time_bucket,
|
|
COUNT(*) AS download_count,
|
|
COUNT(DISTINCT de.ip_address) AS unique_ips,
|
|
COUNT(*) FILTER (WHERE de.authenticated = true) AS auth_downloads,
|
|
NOW() AS created_at,
|
|
NOW() AS updated_at
|
|
FROM download_events de
|
|
WHERE de.downloaded_at < ?
|
|
GROUP BY de.registry_id, de.package_id, time_bucket
|
|
ON CONFLICT (registry_id, COALESCE(package_id, 0), time_bucket)
|
|
DO UPDATE SET
|
|
download_count = download_stats_hourly.download_count + EXCLUDED.download_count,
|
|
unique_ips = GREATEST(download_stats_hourly.unique_ips, EXCLUDED.unique_ips),
|
|
auth_downloads = download_stats_hourly.auth_downloads + EXCLUDED.auth_downloads,
|
|
updated_at = NOW()
|
|
`
|
|
|
|
case "mysql":
|
|
// MySQL: Use DATE_FORMAT for time bucketing
|
|
aggregateSQL = `
|
|
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
|
|
SELECT
|
|
de.registry_id,
|
|
de.package_id,
|
|
DATE_FORMAT(de.downloaded_at, '%Y-%m-%d %H:00:00') AS time_bucket,
|
|
COUNT(*) AS download_count,
|
|
COUNT(DISTINCT de.ip_address) AS unique_ips,
|
|
SUM(CASE WHEN de.authenticated = true THEN 1 ELSE 0 END) AS auth_downloads,
|
|
NOW() AS created_at,
|
|
NOW() AS updated_at
|
|
FROM download_events de
|
|
WHERE de.downloaded_at < ?
|
|
GROUP BY de.registry_id, de.package_id, time_bucket
|
|
ON DUPLICATE KEY UPDATE
|
|
download_count = download_stats_hourly.download_count + VALUES(download_count),
|
|
unique_ips = GREATEST(download_stats_hourly.unique_ips, VALUES(unique_ips)),
|
|
auth_downloads = download_stats_hourly.auth_downloads + VALUES(auth_downloads),
|
|
updated_at = NOW()
|
|
`
|
|
|
|
default: // SQLite
|
|
// SQLite: Use strftime for time bucketing
|
|
// Note: SQLite doesn't support UPSERT as elegantly, need to handle separately
|
|
aggregateSQL = `
|
|
INSERT OR REPLACE INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
|
|
SELECT
|
|
de.registry_id,
|
|
de.package_id,
|
|
strftime('%Y-%m-%d %H:00:00', de.downloaded_at) AS time_bucket,
|
|
COUNT(*) AS download_count,
|
|
COUNT(DISTINCT de.ip_address) AS unique_ips,
|
|
SUM(CASE WHEN de.authenticated = 1 THEN 1 ELSE 0 END) AS auth_downloads,
|
|
datetime('now') AS created_at,
|
|
datetime('now') AS updated_at
|
|
FROM download_events de
|
|
WHERE de.downloaded_at < ?
|
|
GROUP BY de.registry_id, de.package_id, time_bucket
|
|
`
|
|
}
|
|
|
|
// Execute aggregation
|
|
if err := tx.Exec(aggregateSQL, cutoff).Error; err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete aggregated events (older than 24 hours to keep recent data for debugging)
|
|
deleteOlder := time.Now().Add(-24 * time.Hour)
|
|
deleteResult := tx.Exec("DELETE FROM download_events WHERE downloaded_at < ?", deleteOlder)
|
|
if deleteResult.Error != nil {
|
|
return deleteResult.Error
|
|
}
|
|
|
|
// Also update package-level stats (NULL package_id = registry totals)
|
|
var registryAggSQL string
|
|
if dialectName == "postgres" {
|
|
registryAggSQL = `
|
|
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
|
|
SELECT
|
|
registry_id,
|
|
NULL as package_id,
|
|
time_bucket,
|
|
SUM(download_count) as download_count,
|
|
SUM(unique_ips) as unique_ips,
|
|
SUM(auth_downloads) as auth_downloads,
|
|
NOW() as created_at,
|
|
NOW() as updated_at
|
|
FROM download_stats_hourly
|
|
WHERE package_id IS NOT NULL
|
|
GROUP BY registry_id, time_bucket
|
|
ON CONFLICT (registry_id, COALESCE(package_id, 0), time_bucket)
|
|
DO UPDATE SET
|
|
download_count = EXCLUDED.download_count,
|
|
unique_ips = EXCLUDED.unique_ips,
|
|
auth_downloads = EXCLUDED.auth_downloads,
|
|
updated_at = NOW()
|
|
`
|
|
} else if dialectName == "mysql" {
|
|
registryAggSQL = `
|
|
INSERT INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
|
|
SELECT
|
|
registry_id,
|
|
NULL as package_id,
|
|
time_bucket,
|
|
SUM(download_count) as download_count,
|
|
SUM(unique_ips) as unique_ips,
|
|
SUM(auth_downloads) as auth_downloads,
|
|
NOW() as created_at,
|
|
NOW() as updated_at
|
|
FROM download_stats_hourly
|
|
WHERE package_id IS NOT NULL
|
|
GROUP BY registry_id, time_bucket
|
|
ON DUPLICATE KEY UPDATE
|
|
download_count = VALUES(download_count),
|
|
unique_ips = VALUES(unique_ips),
|
|
auth_downloads = VALUES(auth_downloads),
|
|
updated_at = NOW()
|
|
`
|
|
} else {
|
|
// SQLite
|
|
registryAggSQL = `
|
|
INSERT OR REPLACE INTO download_stats_hourly (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, created_at, updated_at)
|
|
SELECT
|
|
registry_id,
|
|
NULL as package_id,
|
|
time_bucket,
|
|
SUM(download_count) as download_count,
|
|
SUM(unique_ips) as unique_ips,
|
|
SUM(auth_downloads) as auth_downloads,
|
|
datetime('now') as created_at,
|
|
datetime('now') as updated_at
|
|
FROM download_stats_hourly
|
|
WHERE package_id IS NOT NULL
|
|
GROUP BY registry_id, time_bucket
|
|
`
|
|
}
|
|
|
|
if err := tx.Exec(registryAggSQL).Error; err != nil {
|
|
log.Warn().Err(err).Msg("Failed to aggregate registry totals (continuing anyway)")
|
|
}
|
|
|
|
elapsed := time.Since(startTime)
|
|
log.Info().
|
|
Int64("deleted_events", deleteResult.RowsAffected).
|
|
Dur("duration", elapsed).
|
|
Msg("Completed hourly aggregation")
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// AggregateDaily aggregates hourly stats into daily stats
|
|
func (w *AggregationWorker) AggregateDaily() error {
|
|
startTime := time.Now()
|
|
log.Debug().Msg("Starting daily aggregation")
|
|
|
|
dialectName := w.db.Dialector.Name()
|
|
|
|
// Aggregate yesterday's data
|
|
yesterday := time.Now().AddDate(0, 0, -1).Truncate(24 * time.Hour)
|
|
dayEnd := yesterday.Add(24 * time.Hour)
|
|
|
|
return w.db.Transaction(func(tx *gorm.DB) error {
|
|
var aggregateSQL string
|
|
|
|
switch dialectName {
|
|
case "postgres":
|
|
aggregateSQL = `
|
|
INSERT INTO download_stats_daily (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, top_user_agents, created_at, updated_at)
|
|
SELECT
|
|
registry_id,
|
|
package_id,
|
|
date_trunc('day', time_bucket) AS time_bucket,
|
|
SUM(download_count) AS download_count,
|
|
MAX(unique_ips) AS unique_ips,
|
|
SUM(auth_downloads) AS auth_downloads,
|
|
'{}' AS top_user_agents,
|
|
NOW() AS created_at,
|
|
NOW() AS updated_at
|
|
FROM download_stats_hourly
|
|
WHERE time_bucket >= ? AND time_bucket < ?
|
|
GROUP BY registry_id, package_id, date_trunc('day', time_bucket)
|
|
ON CONFLICT (registry_id, COALESCE(package_id, 0), time_bucket)
|
|
DO UPDATE SET
|
|
download_count = EXCLUDED.download_count,
|
|
unique_ips = EXCLUDED.unique_ips,
|
|
auth_downloads = EXCLUDED.auth_downloads,
|
|
updated_at = NOW()
|
|
`
|
|
|
|
case "mysql":
|
|
aggregateSQL = `
|
|
INSERT INTO download_stats_daily (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, top_user_agents, created_at, updated_at)
|
|
SELECT
|
|
registry_id,
|
|
package_id,
|
|
DATE_FORMAT(time_bucket, '%Y-%m-%d 00:00:00') AS time_bucket,
|
|
SUM(download_count) AS download_count,
|
|
MAX(unique_ips) AS unique_ips,
|
|
SUM(auth_downloads) AS auth_downloads,
|
|
'{}' AS top_user_agents,
|
|
NOW() AS created_at,
|
|
NOW() AS updated_at
|
|
FROM download_stats_hourly
|
|
WHERE time_bucket >= ? AND time_bucket < ?
|
|
GROUP BY registry_id, package_id, DATE_FORMAT(time_bucket, '%Y-%m-%d 00:00:00')
|
|
ON DUPLICATE KEY UPDATE
|
|
download_count = VALUES(download_count),
|
|
unique_ips = VALUES(unique_ips),
|
|
auth_downloads = VALUES(auth_downloads),
|
|
updated_at = NOW()
|
|
`
|
|
|
|
default: // SQLite
|
|
aggregateSQL = `
|
|
INSERT OR REPLACE INTO download_stats_daily (registry_id, package_id, time_bucket, download_count, unique_ips, auth_downloads, top_user_agents, created_at, updated_at)
|
|
SELECT
|
|
registry_id,
|
|
package_id,
|
|
date(time_bucket) AS time_bucket,
|
|
SUM(download_count) AS download_count,
|
|
MAX(unique_ips) AS unique_ips,
|
|
SUM(auth_downloads) AS auth_downloads,
|
|
'{}' AS top_user_agents,
|
|
datetime('now') AS created_at,
|
|
datetime('now') AS updated_at
|
|
FROM download_stats_hourly
|
|
WHERE time_bucket >= ? AND time_bucket < ?
|
|
GROUP BY registry_id, package_id, date(time_bucket)
|
|
`
|
|
}
|
|
|
|
if err := tx.Exec(aggregateSQL, yesterday, dayEnd).Error; err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete old hourly stats (keep last 7 days)
|
|
deleteOlder := time.Now().AddDate(0, 0, -7)
|
|
deleteResult := tx.Exec("DELETE FROM download_stats_hourly WHERE time_bucket < ?", deleteOlder)
|
|
if deleteResult.Error != nil {
|
|
return deleteResult.Error
|
|
}
|
|
|
|
elapsed := time.Since(startTime)
|
|
log.Info().
|
|
Int64("deleted_hourly_stats", deleteResult.RowsAffected).
|
|
Dur("duration", elapsed).
|
|
Msg("Completed daily aggregation")
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// UpdatePackageAccessCounts synchronizes package access_count from download stats
|
|
func (w *AggregationWorker) UpdatePackageAccessCounts() error {
|
|
log.Debug().Msg("Updating package access counts")
|
|
|
|
// Update from download_stats_hourly (sum all-time downloads per package)
|
|
updateSQL := `
|
|
UPDATE packages p
|
|
SET access_count = COALESCE((
|
|
SELECT SUM(download_count)
|
|
FROM download_stats_hourly dsh
|
|
WHERE dsh.package_id = p.id
|
|
), 0)
|
|
`
|
|
|
|
if err := w.db.Exec(updateSQL).Error; err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info().Msg("Updated package access counts")
|
|
return nil
|
|
}
|