Files
kubernetes-images-sync-oper…/internal/controller/raczylo.com/clusterimage_controller.go
T

609 lines
21 KiB
Go

package raczylocom
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/go-logr/logr"
v1batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
raczylocomv1 "github.com/lukaszraczylo/kubernetes-images-sync-operator/api/raczylo.com/v1"
"github.com/lukaszraczylo/kubernetes-images-sync-operator/internal/shared"
)
type ClusterImageReconciler struct {
client.Client
Scheme *runtime.Scheme
MaxParallelJobs int
ActiveJobs int
activeJobsMu sync.Mutex // protects ActiveJobs counter
KubeClient *kubernetes.Clientset
}
// +kubebuilder:rbac:groups=raczylo.com,resources=*,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=raczylo.com,resources=*/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=raczylo.com,resources=*/finalizers,verbs=update
// # additional RBAC rules - create and manage jobs
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete;deletecollection
// add access to secrets
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
func (r *ClusterImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
clusterImage := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, req.NamespacedName, clusterImage); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
l.Error(err, "unable to fetch ClusterImage")
return ctrl.Result{}, err
}
clusterImageExport := &raczylocomv1.ClusterImageExport{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Spec.ExportName, Namespace: clusterImage.Namespace}, clusterImageExport); err != nil {
l.Error(err, "unable to fetch ClusterImageExport")
return ctrl.Result{}, err
}
r.MaxParallelJobs = clusterImageExport.Spec.MaxConcurrentJobs
// If the ClusterImage is new, set its status to PENDING
if clusterImage.Status.Progress == "" {
// Get the latest version before updating
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, req.NamespacedName, latest); err != nil {
return ctrl.Result{}, err
}
latest.Status.Progress = shared.STATUS_PENDING
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to update ClusterImage status to PENDING")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
// If we've reached the maximum number of parallel jobs, requeue
r.activeJobsMu.Lock()
activeJobs := r.ActiveJobs
r.activeJobsMu.Unlock()
if activeJobs >= r.MaxParallelJobs && clusterImage.Status.Progress == shared.STATUS_PENDING {
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
}
// Process the ClusterImage based on its current status
switch clusterImage.Status.Progress {
case shared.STATUS_PENDING:
return r.handlePendingClusterImage(ctx, clusterImage, l)
case shared.STATUS_RUNNING, shared.STATUS_RETRYING:
return r.handleRunningClusterImage(ctx, clusterImage, l)
case shared.STATUS_SUCCESS, shared.STATUS_FAILED, shared.STATUS_PRESENT:
return ctrl.Result{}, nil // No further action needed
default:
// l.Info("Unexpected ClusterImage status", "Status", clusterImage.Status.Progress)
return ctrl.Result{}, nil
}
}
func (r *ClusterImageReconciler) handlePendingClusterImage(ctx context.Context, clusterImage *raczylocomv1.ClusterImage, l logr.Logger) (ctrl.Result, error) {
// Get the latest version before proceeding
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
clusterImage = latest
// Check if the image is present
exists, err := r.checkImageExists(ctx, clusterImage)
if err != nil {
l.Error(err, "unable to check if image exists")
return ctrl.Result{}, err
}
if exists {
// Get latest version before updating status
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
latest.Status.Progress = shared.STATUS_PRESENT
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to update ClusterImage status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Fetch the associated ClusterImageExport
clusterImageExport := &raczylocomv1.ClusterImageExport{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Spec.ExportName, Namespace: clusterImage.Namespace}, clusterImageExport); err != nil {
l.Error(err, "unable to fetch ClusterImageExport")
return ctrl.Result{}, err
}
// Create the backup job
if err := r.createBackupJob(ctx, clusterImage, clusterImageExport, l); err != nil {
l.Error(err, "unable to create backup job")
return ctrl.Result{}, err
}
// Update ClusterImage status to RUNNING
clusterImage.Status.Progress = shared.STATUS_RUNNING
if err := r.Status().Update(ctx, clusterImage); err != nil {
l.Error(err, "unable to update ClusterImage status to RUNNING")
return ctrl.Result{}, err
}
// Increment the active jobs count
r.activeJobsMu.Lock()
r.ActiveJobs++
r.activeJobsMu.Unlock()
return ctrl.Result{Requeue: true}, nil
}
func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context, clusterImage *raczylocomv1.ClusterImage, l logr.Logger) (ctrl.Result, error) {
// Get the latest version before proceeding
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
clusterImage = latest
// Check for existing job for this ClusterImage
existingJob := &v1batch.Job{}
jobName := fmt.Sprintf("img-export-%s", clusterImage.Name)
err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: clusterImage.Namespace}, existingJob)
if err != nil {
if errors.IsNotFound(err) {
l.Info("Job not found; assuming it completed successfully", "job", jobName)
if clusterImage.Status.Progress == shared.STATUS_SUCCESS || clusterImage.Status.Progress == shared.STATUS_PRESENT {
// Job completed and status is already SUCCESS or PRESENT
return ctrl.Result{}, nil
}
// Get latest version before updating status
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
// If we have retries left, consider retrying
if latest.Status.RetryCount < 3 {
latest.Status.Progress = shared.STATUS_RETRYING
latest.Status.RetryCount++
} else {
// Exceeded retries; mark as FAILED
latest.Status.Progress = shared.STATUS_FAILED
}
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to update ClusterImage status after job not found")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to check for existing job")
return ctrl.Result{}, err
}
// Check job status and update ClusterImage accordingly
if existingJob.Status.Succeeded > 0 {
// Get latest version before updating status
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
latest.Status.Progress = shared.STATUS_SUCCESS
r.activeJobsMu.Lock()
r.ActiveJobs--
r.activeJobsMu.Unlock()
// Update the status before cleaning up the job
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to update ClusterImage status to SUCCESS")
return ctrl.Result{}, err
}
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
l.Error(err, "unable to cleanup job and pods")
return ctrl.Result{}, err
}
} else if existingJob.Status.Failed > 0 {
r.activeJobsMu.Lock()
r.ActiveJobs--
r.activeJobsMu.Unlock()
if clusterImage.Status.RetryCount < 3 {
// Cleanup the failed job before retrying
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
l.Error(err, "unable to cleanup failed job and pods for retry")
return ctrl.Result{}, err
}
// Get latest version before updating status
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
// Update status and retry count
latest.Status.Progress = shared.STATUS_RETRYING
latest.Status.RetryCount++
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to update ClusterImage status for retry")
return ctrl.Result{}, err
}
// Requeue immediately to create a new job
return ctrl.Result{Requeue: true}, nil
}
// Get latest version before updating status
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
// Max retries reached, mark as failed
latest.Status.Progress = shared.STATUS_FAILED
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to update ClusterImage status to FAILED")
return ctrl.Result{}, err
}
// Cleanup the failed job
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
l.Error(err, "unable to cleanup failed job and pods")
return ctrl.Result{}, err
}
}
if err := r.handleJobRestarts(ctx, existingJob, clusterImage); err != nil {
l.Error(err, "unable to handle job restarts")
return ctrl.Result{}, err
}
// Update ClusterImage status
if err := r.Status().Update(ctx, clusterImage); err != nil {
l.Error(err, "unable to update ClusterImage status")
return ctrl.Result{}, err
}
return r.updateClusterImageExportStatus(ctx, clusterImage)
}
func (r *ClusterImageReconciler) cleanupJobAndPods(ctx context.Context, job *v1batch.Job) error {
// Wait for job status to propagate before deletion
jobKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
currentJob := &v1batch.Job{}
if err := r.Get(ctx, jobKey, currentJob); err != nil {
if errors.IsNotFound(err) {
return true, nil // Job already deleted
}
return false, nil // Retry on transient errors
}
// Job status has been updated, proceed with deletion
return currentJob.Status.Active == 0, nil
})
if err != nil && !errors.IsNotFound(err) && err != context.DeadlineExceeded {
return fmt.Errorf("failed to wait for job status: %w", err)
}
// Delete the job
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete job: %w", err)
}
// Delete the associated pods
labelSelector := metav1.LabelSelector{
MatchLabels: job.Spec.Selector.MatchLabels,
}
listOptions := metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&labelSelector),
}
if err := r.KubeClient.CoreV1().Pods(job.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions); err != nil {
return fmt.Errorf("failed to delete pods: %w", err)
}
return nil
}
func (r *ClusterImageReconciler) createBackupJob(ctx context.Context, clusterImage *raczylocomv1.ClusterImage, clusterImageExport *raczylocomv1.ClusterImageExport, l logr.Logger) error {
normalisedImageName := shared.NormalizeImageName(clusterImage.Spec.FullName)
defaultCommands := []string{
"podman pull " + clusterImage.Spec.FullName,
"podman save --quiet -o /tmp/" + normalisedImageName + ".tar " + clusterImage.Spec.FullName,
}
if clusterImage.Spec.Storage == shared.STORAGE_S3 {
s3Params := shared.SetupS3Params(clusterImageExport.Spec.Storage.S3)
additionalCommands := []string{
"./worker export " + strings.Join(s3Params, " ") + " '/tmp/" + normalisedImageName + ".tar' " + "'s3://" + clusterImageExport.Spec.Storage.S3.Bucket + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar'",
}
defaultCommands = append(defaultCommands, additionalCommands...)
} else if clusterImage.Spec.Storage == shared.STORAGE_FILE {
additionalCommands := []string{
"./worker export '/tmp/" + normalisedImageName + ".tar' '" + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar'",
}
defaultCommands = append(defaultCommands, additionalCommands...)
}
defaultCommands = append(defaultCommands, "rm -f /tmp/"+normalisedImageName+".tar")
// Merge annotations from different sources
mergedAnnotations := make(map[string]string)
// 1. Add ClusterImageExport metadata annotations
for k, v := range clusterImageExport.Annotations {
mergedAnnotations[k] = v
}
// 2. Add ClusterImage metadata annotations
for k, v := range clusterImage.Annotations {
mergedAnnotations[k] = v
}
// 3. Add job-specific annotations from spec (these take precedence)
for k, v := range clusterImage.Spec.JobAnnotations {
mergedAnnotations[k] = v
}
jobParams := shared.JobParams{
Name: fmt.Sprintf("img-export-%s", clusterImage.Name),
Namespace: clusterImage.Namespace,
Image: shared.BACKUP_JOB_IMAGE,
Annotations: mergedAnnotations,
Commands: defaultCommands,
ServiceAccount: "",
ImagePullSecrets: clusterImage.Spec.ImagePullSecrets,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: clusterImage.APIVersion,
Kind: clusterImage.Kind,
Name: clusterImage.Name,
UID: clusterImage.UID,
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
},
},
}
backupJob := shared.CreateJob(jobParams, func(raczylocomv1.ClusterImageExport) []string { return nil })
if err := r.Create(ctx, backupJob); err != nil {
return err
}
clusterImage.Status.Progress = shared.STATUS_RUNNING
return r.Status().Update(ctx, clusterImage)
}
func (r *ClusterImageReconciler) updateClusterImageExportStatus(ctx context.Context, clusterImage *raczylocomv1.ClusterImage) (ctrl.Result, error) {
l := log.FromContext(ctx)
clusterImageExport := &raczylocomv1.ClusterImageExport{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Spec.ExportName, Namespace: clusterImage.Namespace}, clusterImageExport); err != nil {
l.Error(err, "unable to fetch ClusterImageExport")
return ctrl.Result{}, err
}
clusterImageList := &raczylocomv1.ClusterImageList{}
if err := r.List(ctx, clusterImageList, client.InNamespace(clusterImage.Namespace), client.MatchingFields{"spec.exportName": clusterImage.Spec.ExportName}); err != nil {
l.Error(err, "unable to list ClusterImages")
return ctrl.Result{}, err
}
allCompleted := true
anyFailed := false
anyRunning := false
anyMaxRetries := false
for _, ci := range clusterImageList.Items {
switch ci.Status.Progress {
case shared.STATUS_SUCCESS, shared.STATUS_PRESENT:
// These statuses are considered completed
case shared.STATUS_FAILED:
if ci.Status.RetryCount >= 3 {
anyMaxRetries = true
}
anyFailed = true
allCompleted = false
case shared.STATUS_RUNNING, shared.STATUS_RETRYING:
allCompleted = false
anyRunning = true
case shared.STATUS_PENDING:
allCompleted = false
}
}
var newStatus string
if allCompleted {
newStatus = shared.STATUS_SUCCESS
} else if anyMaxRetries {
// Only mark as failed if at least one job has reached max retries
newStatus = shared.STATUS_FAILED
} else if anyFailed {
// If there are failures but no max retries reached, keep running
newStatus = shared.STATUS_RUNNING
} else if anyRunning {
newStatus = shared.STATUS_RUNNING
} else {
newStatus = shared.STATUS_PENDING
}
if clusterImageExport.Status.Progress != newStatus {
clusterImageExport.Status.Progress = newStatus
if err := r.Status().Update(ctx, clusterImageExport); err != nil {
l.Error(err, "unable to update ClusterImageExport status")
return ctrl.Result{}, err
}
l.Info("Updated ClusterImageExport status", "ExportName", clusterImageExport.Name, "NewStatus", newStatus)
}
// If there are still pending or running images, requeue
if !allCompleted {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}
func (r *ClusterImageReconciler) handleJobRestarts(ctx context.Context, job *v1batch.Job, clusterImage *raczylocomv1.ClusterImage) error {
l := log.FromContext(ctx)
// Get the latest version before proceeding
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return err
}
clusterImage = latest
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Selector.MatchLabels)); err != nil {
return err
}
totalRestarts := 0
for _, pod := range podList.Items {
for _, containerStatus := range pod.Status.ContainerStatuses {
totalRestarts += int(containerStatus.RestartCount)
}
}
if totalRestarts > 0 {
// Only count new restarts
newRestarts := totalRestarts - clusterImage.Status.RetryCount
if newRestarts > 0 {
l.Info("Container restarts detected", "job", job.Name, "newRestarts", newRestarts, "totalRestarts", totalRestarts)
// Get latest version before updating
latest := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return err
}
// Update retry count with new restarts
latest.Status.RetryCount = clusterImage.Status.RetryCount + newRestarts
if latest.Status.RetryCount >= 3 {
// Max retries reached
latest.Status.Progress = shared.STATUS_FAILED
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return nil
}
return fmt.Errorf("failed to update status to FAILED: %w", err)
}
// Cleanup all related resources
if err := r.removeAllJobsAndContainers(ctx, clusterImage.Namespace); err != nil {
return fmt.Errorf("failed to cleanup resources: %w", err)
}
} else {
// Still have retries left
latest.Status.Progress = shared.STATUS_RETRYING
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
// Resource was modified, requeue and try again
return nil
}
return fmt.Errorf("failed to update status to RETRYING: %w", err)
}
}
}
}
return nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterImageReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Create a Kubernetes clientset
var err error
config := mgr.GetConfig()
r.KubeClient, err = kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("unable to create Kubernetes client: %w", err)
}
return ctrl.NewControllerManagedBy(mgr).
For(&raczylocomv1.ClusterImage{}).
Owns(&v1batch.Job{}).
Complete(r)
}
func (r *ClusterImageReconciler) removeAllJobsAndContainers(ctx context.Context, namespace string) error {
jobList := &v1batch.JobList{}
if err := r.List(ctx, jobList, client.InNamespace(namespace), client.MatchingLabels{"app": "image-export"}); err != nil {
return err
}
for _, job := range jobList.Items {
if err := r.Delete(ctx, &job, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func (r *ClusterImageReconciler) checkImageExists(ctx context.Context, clusterImage *raczylocomv1.ClusterImage) (bool, error) {
clusterImageList := &raczylocomv1.ClusterImageList{}
if err := r.List(ctx, clusterImageList); err != nil {
return false, err
}
for _, ci := range clusterImageList.Items {
if ci.Spec.FullName == clusterImage.Spec.FullName && ci.Name != clusterImage.Name {
if ci.Status.Progress == shared.STATUS_SUCCESS || ci.Status.Progress == shared.STATUS_PRESENT || ci.Status.Progress == shared.STATUS_RUNNING {
return true, nil
}
}
}
// Check if the image is already in the COMPLETED state
if clusterImage.Status.Progress == shared.STATUS_SUCCESS {
return true, nil
}
return false, nil
}