Clean up the code and basic improvements.

This commit is contained in:
2025-12-18 00:17:02 +00:00
parent 0de9397a2d
commit fb6498c4be
15 changed files with 154 additions and 437 deletions
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/go-logr/logr"
@@ -13,8 +14,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -28,6 +30,7 @@ type ClusterImageReconciler struct {
Scheme *runtime.Scheme
MaxParallelJobs int
ActiveJobs int
activeJobsMu sync.Mutex // protects ActiveJobs counter
KubeClient *kubernetes.Clientset
}
@@ -66,7 +69,7 @@ func (r *ClusterImageReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err := r.Get(ctx, req.NamespacedName, latest); err != nil {
return ctrl.Result{}, err
}
latest.Status.Progress = shared.STATUS_PENDING
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
@@ -80,7 +83,10 @@ func (r *ClusterImageReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
// If we've reached the maximum number of parallel jobs, requeue
if r.ActiveJobs >= r.MaxParallelJobs && clusterImage.Status.Progress == shared.STATUS_PENDING {
r.activeJobsMu.Lock()
activeJobs := r.ActiveJobs
r.activeJobsMu.Unlock()
if activeJobs >= r.MaxParallelJobs && clusterImage.Status.Progress == shared.STATUS_PENDING {
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
}
@@ -117,7 +123,7 @@ func (r *ClusterImageReconciler) handlePendingClusterImage(ctx context.Context,
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
return ctrl.Result{}, err
}
latest.Status.Progress = shared.STATUS_PRESENT
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
@@ -151,7 +157,9 @@ func (r *ClusterImageReconciler) handlePendingClusterImage(ctx context.Context,
}
// Increment the active jobs count
r.activeJobsMu.Lock()
r.ActiveJobs++
r.activeJobsMu.Unlock()
return ctrl.Result{Requeue: true}, nil
}
@@ -217,7 +225,9 @@ func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context,
}
latest.Status.Progress = shared.STATUS_SUCCESS
r.activeJobsMu.Lock()
r.ActiveJobs--
r.activeJobsMu.Unlock()
// Update the status before cleaning up the job
if err := r.Status().Update(ctx, latest); err != nil {
if errors.IsConflict(err) {
@@ -232,7 +242,9 @@ func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context,
return ctrl.Result{}, err
}
} else if existingJob.Status.Failed > 0 {
r.activeJobsMu.Lock()
r.ActiveJobs--
r.activeJobsMu.Unlock()
if clusterImage.Status.RetryCount < 3 {
// Cleanup the failed job before retrying
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
@@ -300,8 +312,22 @@ func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context,
return r.updateClusterImageExportStatus(ctx, clusterImage)
}
func (r *ClusterImageReconciler) cleanupJobAndPods(ctx context.Context, job *v1batch.Job) error {
// Add a short delay to allow status updates to propagate
time.Sleep(2 * time.Second)
// Wait for job status to propagate before deletion
jobKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
currentJob := &v1batch.Job{}
if err := r.Get(ctx, jobKey, currentJob); err != nil {
if errors.IsNotFound(err) {
return true, nil // Job already deleted
}
return false, nil // Retry on transient errors
}
// Job status has been updated, proceed with deletion
return currentJob.Status.Active == 0, nil
})
if err != nil && !errors.IsNotFound(err) && err != context.DeadlineExceeded {
return fmt.Errorf("failed to wait for job status: %w", err)
}
// Delete the job
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
@@ -374,8 +400,8 @@ func (r *ClusterImageReconciler) createBackupJob(ctx context.Context, clusterIma
Kind: clusterImage.Kind,
Name: clusterImage.Name,
UID: clusterImage.UID,
BlockOwnerDeletion: pointer.Bool(true),
Controller: pointer.Bool(true),
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
},
},
}