mirror of
https://github.com/lukaszraczylo/kubernetes-images-sync-operator.git
synced 2026-06-10 23:29:11 +00:00
General improvements
This commit is contained in:
@@ -179,32 +179,37 @@ func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context,
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
} else if existingJob.Status.Failed > 0 {
|
||||
r.ActiveJobs--
|
||||
if clusterImage.Status.RetryCount < 3 {
|
||||
clusterImage.Status.Progress = shared.STATUS_RETRYING
|
||||
clusterImage.Status.RetryCount++
|
||||
// Update the status before cleaning up the job
|
||||
if err := r.Status().Update(ctx, clusterImage); err != nil {
|
||||
l.Error(err, "unable to update ClusterImage status for retry")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
// Cleanup the failed job before retrying
|
||||
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
|
||||
l.Error(err, "unable to cleanup failed job and pods for retry")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
r.ActiveJobs--
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
} else {
|
||||
clusterImage.Status.Progress = shared.STATUS_FAILED
|
||||
r.ActiveJobs--
|
||||
// Update the status before cleaning up the job
|
||||
|
||||
// Update status and retry count
|
||||
clusterImage.Status.Progress = shared.STATUS_RETRYING
|
||||
clusterImage.Status.RetryCount++
|
||||
if err := r.Status().Update(ctx, clusterImage); err != nil {
|
||||
l.Error(err, "unable to update ClusterImage status to FAILED")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
|
||||
l.Error(err, "unable to cleanup failed job and pods")
|
||||
l.Error(err, "unable to update ClusterImage status for retry")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Requeue immediately to create a new job
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
|
||||
// Max retries reached, mark as failed
|
||||
clusterImage.Status.Progress = shared.STATUS_FAILED
|
||||
if err := r.Status().Update(ctx, clusterImage); err != nil {
|
||||
l.Error(err, "unable to update ClusterImage status to FAILED")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Cleanup the failed job
|
||||
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
|
||||
l.Error(err, "unable to cleanup failed job and pods")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,12 +320,16 @@ func (r *ClusterImageReconciler) updateClusterImageExportStatus(ctx context.Cont
|
||||
allCompleted := true
|
||||
anyFailed := false
|
||||
anyRunning := false
|
||||
anyMaxRetries := false
|
||||
|
||||
for _, ci := range clusterImageList.Items {
|
||||
switch ci.Status.Progress {
|
||||
case shared.STATUS_SUCCESS, shared.STATUS_PRESENT:
|
||||
// These statuses are considered completed
|
||||
case shared.STATUS_FAILED:
|
||||
if ci.Status.RetryCount >= 3 {
|
||||
anyMaxRetries = true
|
||||
}
|
||||
anyFailed = true
|
||||
allCompleted = false
|
||||
case shared.STATUS_RUNNING, shared.STATUS_RETRYING:
|
||||
@@ -334,8 +343,12 @@ func (r *ClusterImageReconciler) updateClusterImageExportStatus(ctx context.Cont
|
||||
var newStatus string
|
||||
if allCompleted {
|
||||
newStatus = shared.STATUS_SUCCESS
|
||||
} else if anyFailed {
|
||||
} else if anyMaxRetries {
|
||||
// Only mark as failed if at least one job has reached max retries
|
||||
newStatus = shared.STATUS_FAILED
|
||||
} else if anyFailed {
|
||||
// If there are failures but no max retries reached, keep running
|
||||
newStatus = shared.STATUS_RUNNING
|
||||
} else if anyRunning {
|
||||
newStatus = shared.STATUS_RUNNING
|
||||
} else {
|
||||
@@ -360,29 +373,45 @@ func (r *ClusterImageReconciler) updateClusterImageExportStatus(ctx context.Cont
|
||||
}
|
||||
|
||||
func (r *ClusterImageReconciler) handleJobRestarts(ctx context.Context, job *v1batch.Job, clusterImage *raczylocomv1.ClusterImage) error {
|
||||
l := log.FromContext(ctx)
|
||||
podList := &v1.PodList{}
|
||||
if err := r.List(ctx, podList, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Selector.MatchLabels)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalRestarts := 0
|
||||
for _, pod := range podList.Items {
|
||||
for _, containerStatus := range pod.Status.ContainerStatuses {
|
||||
if containerStatus.RestartCount > 0 {
|
||||
clusterImage.Status.RetryCount += int(containerStatus.RestartCount)
|
||||
if clusterImage.Status.RetryCount >= 3 {
|
||||
clusterImage.Status.Progress = shared.STATUS_FAILED
|
||||
if err := r.Status().Update(ctx, clusterImage); err != nil {
|
||||
return err
|
||||
}
|
||||
return r.removeAllJobsAndContainers(ctx, clusterImage.Namespace)
|
||||
} else {
|
||||
clusterImage.Status.Progress = shared.STATUS_RETRYING
|
||||
totalRestarts += int(containerStatus.RestartCount)
|
||||
}
|
||||
}
|
||||
|
||||
if totalRestarts > 0 {
|
||||
// Only count new restarts
|
||||
newRestarts := totalRestarts - clusterImage.Status.RetryCount
|
||||
if newRestarts > 0 {
|
||||
l.Info("Container restarts detected", "job", job.Name, "newRestarts", newRestarts, "totalRestarts", totalRestarts)
|
||||
|
||||
// Update retry count with new restarts
|
||||
clusterImage.Status.RetryCount += newRestarts
|
||||
|
||||
if clusterImage.Status.RetryCount >= 3 {
|
||||
// Max retries reached
|
||||
clusterImage.Status.Progress = shared.STATUS_FAILED
|
||||
if err := r.Status().Update(ctx, clusterImage); err != nil {
|
||||
return fmt.Errorf("failed to update status to FAILED: %w", err)
|
||||
}
|
||||
|
||||
if err := r.Status().Update(ctx, clusterImage); err != nil {
|
||||
return err
|
||||
// Cleanup all related resources
|
||||
if err := r.removeAllJobsAndContainers(ctx, clusterImage.Namespace); err != nil {
|
||||
return fmt.Errorf("failed to cleanup resources: %w", err)
|
||||
}
|
||||
} else {
|
||||
// Still have retries left
|
||||
clusterImage.Status.Progress = shared.STATUS_RETRYING
|
||||
if err := r.Status().Update(ctx, clusterImage); err != nil {
|
||||
return fmt.Errorf("failed to update status to RETRYING: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,12 +5,10 @@ import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
v1batch "k8s.io/api/batch/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -56,9 +54,23 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
return r.handleDeletion(ctx, clusterImageExport)
|
||||
}
|
||||
|
||||
// Add finalizer if it doesn't exist
|
||||
// Add finalizer and creation timestamp annotation if they don't exist
|
||||
needsUpdate := false
|
||||
if !controllerutil.ContainsFinalizer(clusterImageExport, clusterImageExportFinalizer) {
|
||||
controllerutil.AddFinalizer(clusterImageExport, clusterImageExportFinalizer)
|
||||
needsUpdate = true
|
||||
}
|
||||
|
||||
// Add creation timestamp annotation if it doesn't exist
|
||||
if clusterImageExport.Annotations == nil {
|
||||
clusterImageExport.Annotations = make(map[string]string)
|
||||
}
|
||||
if _, exists := clusterImageExport.Annotations["export.raczylo.com/creation-timestamp"]; !exists {
|
||||
clusterImageExport.Annotations["export.raczylo.com/creation-timestamp"] = clusterImageExport.CreationTimestamp.String()
|
||||
needsUpdate = true
|
||||
}
|
||||
|
||||
if needsUpdate {
|
||||
if err := r.Update(ctx, clusterImageExport); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
@@ -104,7 +116,9 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
}
|
||||
|
||||
for _, image := range fullImagesList.Containers {
|
||||
nameHash := fmt.Sprintf("%x", md5.Sum([]byte(clusterImageExport.Name+image.Image+image.Tag+image.Sha)))[:14]
|
||||
// Include creation timestamp in the hash to differentiate between exports with the same name
|
||||
nameHash := fmt.Sprintf("%x", md5.Sum([]byte(clusterImageExport.Name+image.Image+image.Tag+image.Sha+
|
||||
clusterImageExport.Annotations["export.raczylo.com/creation-timestamp"])))[:14]
|
||||
|
||||
// Check if the ClusterImage already exists
|
||||
clusterImage := &raczylocomv1.ClusterImage{}
|
||||
@@ -196,6 +210,14 @@ func (r *ClusterImageExportReconciler) checkAllClusterImagesCompleted(ctx contex
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
|
||||
func (r *ClusterImageExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
// Set up field indexing for ClusterImage exportName
|
||||
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &raczylocomv1.ClusterImage{}, "spec.exportName", func(obj client.Object) []string {
|
||||
clusterImage := obj.(*raczylocomv1.ClusterImage)
|
||||
return []string{clusterImage.Spec.ExportName}
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed setting up field indexer: %w", err)
|
||||
}
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&raczylocomv1.ClusterImageExport{}).
|
||||
Owns(&raczylocomv1.ClusterImage{}).
|
||||
@@ -243,13 +265,16 @@ func (r *ClusterImageExportReconciler) handleDeletion(ctx context.Context, clust
|
||||
l := log.FromContext(ctx)
|
||||
|
||||
if controllerutil.ContainsFinalizer(clusterImageExport, clusterImageExportFinalizer) {
|
||||
// Run the cleanup job
|
||||
if err := r.runCleanupJob(ctx, clusterImageExport); err != nil {
|
||||
l.Error(err, "Failed to run cleanup job")
|
||||
return ctrl.Result{}, err
|
||||
// Delete all associated ClusterImages first
|
||||
if err := r.deleteAssociatedClusterImages(ctx, clusterImageExport); err != nil {
|
||||
l.Error(err, "Failed to delete associated ClusterImages")
|
||||
// Continue with deletion even if cleanup fails
|
||||
}
|
||||
|
||||
// Remove the finalizer
|
||||
// Attempt to run cleanup job but don't block on errors
|
||||
r.runCleanupJob(ctx, clusterImageExport)
|
||||
|
||||
// Remove the finalizer regardless of cleanup job status
|
||||
controllerutil.RemoveFinalizer(clusterImageExport, clusterImageExportFinalizer)
|
||||
if err := r.Update(ctx, clusterImageExport); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
@@ -259,9 +284,29 @@ func (r *ClusterImageExportReconciler) handleDeletion(ctx context.Context, clust
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *ClusterImageExportReconciler) runCleanupJob(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) error {
|
||||
func (r *ClusterImageExportReconciler) deleteAssociatedClusterImages(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) error {
|
||||
l := log.FromContext(ctx)
|
||||
|
||||
// List all ClusterImages associated with this export
|
||||
clusterImageList := &raczylocomv1.ClusterImageList{}
|
||||
if err := r.List(ctx, clusterImageList, client.InNamespace(clusterImageExport.Namespace),
|
||||
client.MatchingFields{"spec.exportName": clusterImageExport.Name}); err != nil {
|
||||
return fmt.Errorf("failed to list ClusterImages: %w", err)
|
||||
}
|
||||
|
||||
// Delete each ClusterImage
|
||||
for _, ci := range clusterImageList.Items {
|
||||
if err := r.Delete(ctx, &ci); err != nil && !errors.IsNotFound(err) {
|
||||
l.Error(err, "Failed to delete ClusterImage", "name", ci.Name)
|
||||
// Continue deleting other ClusterImages even if one fails
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClusterImageExportReconciler) runCleanupJob(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) {
|
||||
l := log.FromContext(ctx)
|
||||
normalisedImageName := "cleanup-" + shared.NormalizeImageName(clusterImageExport.Name)
|
||||
|
||||
defaultCommands := []string{}
|
||||
@@ -291,55 +336,11 @@ func (r *ClusterImageExportReconciler) runCleanupJob(ctx context.Context, cluste
|
||||
|
||||
cleanupJob := shared.CreateJob(jobParams, func(raczylocomv1.ClusterImageExport) []string { return nil })
|
||||
|
||||
// Try to create the cleanup job but don't block on errors
|
||||
if err := r.Create(ctx, cleanupJob); err != nil {
|
||||
l.Error(err, "Failed to create cleanup job")
|
||||
return err
|
||||
l.Error(err, "Failed to create cleanup job, continuing with deletion anyway")
|
||||
return
|
||||
}
|
||||
|
||||
l.Info("Created cleanup job")
|
||||
|
||||
go func() {
|
||||
if err := r.waitForJobCompletionAndDelete(ctx, cleanupJob); err != nil {
|
||||
l.Error(err, "Failed to wait for job completion and delete")
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClusterImageExportReconciler) waitForJobCompletionAndDelete(ctx context.Context, job *v1batch.Job) error {
|
||||
l := log.FromContext(ctx)
|
||||
key := client.ObjectKeyFromObject(job)
|
||||
|
||||
// Wait for the job to complete
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
if err := r.Get(ctx, key, job); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if job.Status.Succeeded > 0 {
|
||||
// Job completed successfully, delete it
|
||||
l.Info("Cleanup job completed, deleting", "job", job.Name)
|
||||
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if job.Status.Failed > 0 {
|
||||
// Job failed, log the error but still delete the job
|
||||
l.Error(nil, "Cleanup job failed", "job", job.Name)
|
||||
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("cleanup job failed: %s", job.Name)
|
||||
}
|
||||
|
||||
// Job still running, wait and check again
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user