mirror of
https://github.com/lukaszraczylo/kubernetes-images-sync-operator.git
synced 2026-06-13 00:30:14 +00:00
new year update (#4)
* Bring operator to the brand new world of build and deployments. * Clean up the code and basic improvements. * More fixes, moving from python to golang worker. * fixup! More fixes, moving from python to golang worker. * fixup! fixup! More fixes, moving from python to golang worker. * fixup! fixup! fixup! More fixes, moving from python to golang worker. * fixup! fixup! fixup! fixup! More fixes, moving from python to golang worker. * fixup! fixup! fixup! fixup! fixup! More fixes, moving from python to golang worker. * fixup! fixup! fixup! fixup! fixup! fixup! More fixes, moving from python to golang worker.
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
@@ -13,8 +14,9 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
@@ -28,6 +30,7 @@ type ClusterImageReconciler struct {
|
||||
Scheme *runtime.Scheme
|
||||
MaxParallelJobs int
|
||||
ActiveJobs int
|
||||
activeJobsMu sync.Mutex // protects ActiveJobs counter
|
||||
KubeClient *kubernetes.Clientset
|
||||
}
|
||||
|
||||
@@ -66,7 +69,7 @@ func (r *ClusterImageReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
if err := r.Get(ctx, req.NamespacedName, latest); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
|
||||
latest.Status.Progress = shared.STATUS_PENDING
|
||||
if err := r.Status().Update(ctx, latest); err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
@@ -80,7 +83,10 @@ func (r *ClusterImageReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
}
|
||||
|
||||
// If we've reached the maximum number of parallel jobs, requeue
|
||||
if r.ActiveJobs >= r.MaxParallelJobs && clusterImage.Status.Progress == shared.STATUS_PENDING {
|
||||
r.activeJobsMu.Lock()
|
||||
activeJobs := r.ActiveJobs
|
||||
r.activeJobsMu.Unlock()
|
||||
if activeJobs >= r.MaxParallelJobs && clusterImage.Status.Progress == shared.STATUS_PENDING {
|
||||
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
|
||||
}
|
||||
|
||||
@@ -117,7 +123,7 @@ func (r *ClusterImageReconciler) handlePendingClusterImage(ctx context.Context,
|
||||
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Name, Namespace: clusterImage.Namespace}, latest); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
|
||||
latest.Status.Progress = shared.STATUS_PRESENT
|
||||
if err := r.Status().Update(ctx, latest); err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
@@ -151,7 +157,9 @@ func (r *ClusterImageReconciler) handlePendingClusterImage(ctx context.Context,
|
||||
}
|
||||
|
||||
// Increment the active jobs count
|
||||
r.activeJobsMu.Lock()
|
||||
r.ActiveJobs++
|
||||
r.activeJobsMu.Unlock()
|
||||
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
@@ -217,7 +225,9 @@ func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context,
|
||||
}
|
||||
|
||||
latest.Status.Progress = shared.STATUS_SUCCESS
|
||||
r.activeJobsMu.Lock()
|
||||
r.ActiveJobs--
|
||||
r.activeJobsMu.Unlock()
|
||||
// Update the status before cleaning up the job
|
||||
if err := r.Status().Update(ctx, latest); err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
@@ -232,7 +242,9 @@ func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context,
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
} else if existingJob.Status.Failed > 0 {
|
||||
r.activeJobsMu.Lock()
|
||||
r.ActiveJobs--
|
||||
r.activeJobsMu.Unlock()
|
||||
if clusterImage.Status.RetryCount < 3 {
|
||||
// Cleanup the failed job before retrying
|
||||
if err := r.cleanupJobAndPods(ctx, existingJob); err != nil {
|
||||
@@ -300,8 +312,22 @@ func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context,
|
||||
return r.updateClusterImageExportStatus(ctx, clusterImage)
|
||||
}
|
||||
func (r *ClusterImageReconciler) cleanupJobAndPods(ctx context.Context, job *v1batch.Job) error {
|
||||
// Add a short delay to allow status updates to propagate
|
||||
time.Sleep(2 * time.Second)
|
||||
// Wait for job status to propagate before deletion
|
||||
jobKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
|
||||
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
currentJob := &v1batch.Job{}
|
||||
if err := r.Get(ctx, jobKey, currentJob); err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return true, nil // Job already deleted
|
||||
}
|
||||
return false, nil // Retry on transient errors
|
||||
}
|
||||
// Job status has been updated, proceed with deletion
|
||||
return currentJob.Status.Active == 0, nil
|
||||
})
|
||||
if err != nil && !errors.IsNotFound(err) && err != context.DeadlineExceeded {
|
||||
return fmt.Errorf("failed to wait for job status: %w", err)
|
||||
}
|
||||
|
||||
// Delete the job
|
||||
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
|
||||
@@ -334,12 +360,12 @@ func (r *ClusterImageReconciler) createBackupJob(ctx context.Context, clusterIma
|
||||
if clusterImage.Spec.Storage == shared.STORAGE_S3 {
|
||||
s3Params := shared.SetupS3Params(clusterImageExport.Spec.Storage.S3)
|
||||
additionalCommands := []string{
|
||||
"./export.py " + strings.Join(s3Params, " ") + " '/tmp/" + normalisedImageName + ".tar' " + "'s3://" + clusterImageExport.Spec.Storage.S3.Bucket + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar'",
|
||||
"./worker export " + strings.Join(s3Params, " ") + " '/tmp/" + normalisedImageName + ".tar' " + "'s3://" + clusterImageExport.Spec.Storage.S3.Bucket + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar'",
|
||||
}
|
||||
defaultCommands = append(defaultCommands, additionalCommands...)
|
||||
} else if clusterImage.Spec.Storage == shared.STORAGE_FILE {
|
||||
additionalCommands := []string{
|
||||
"./export.py /tmp/" + normalisedImageName + ".tar" + " " + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar",
|
||||
"./worker export '/tmp/" + normalisedImageName + ".tar' '" + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar'",
|
||||
}
|
||||
defaultCommands = append(defaultCommands, additionalCommands...)
|
||||
}
|
||||
@@ -374,8 +400,8 @@ func (r *ClusterImageReconciler) createBackupJob(ctx context.Context, clusterIma
|
||||
Kind: clusterImage.Kind,
|
||||
Name: clusterImage.Name,
|
||||
UID: clusterImage.UID,
|
||||
BlockOwnerDeletion: pointer.Bool(true),
|
||||
Controller: pointer.Bool(true),
|
||||
BlockOwnerDeletion: ptr.To(true),
|
||||
Controller: ptr.To(true),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -33,38 +33,73 @@ import (
|
||||
var _ = Describe("ClusterImage Controller", func() {
|
||||
Context("When reconciling a resource", func() {
|
||||
const resourceName = "test-resource"
|
||||
const exportName = "test-export"
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
typeNamespacedName := types.NamespacedName{
|
||||
Name: resourceName,
|
||||
Namespace: "default", // TODO(user):Modify as needed
|
||||
Namespace: "default",
|
||||
}
|
||||
exportNamespacedName := types.NamespacedName{
|
||||
Name: exportName,
|
||||
Namespace: "default",
|
||||
}
|
||||
clusterimage := &raczylocomv1.ClusterImage{}
|
||||
|
||||
BeforeEach(func() {
|
||||
By("creating the ClusterImageExport that the ClusterImage references")
|
||||
export := &raczylocomv1.ClusterImageExport{}
|
||||
err := k8sClient.Get(ctx, exportNamespacedName, export)
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
exportResource := &raczylocomv1.ClusterImageExport{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: exportName,
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: raczylocomv1.ClusterImageExportSpec{
|
||||
Name: exportName,
|
||||
BasePath: "/backups/test",
|
||||
MaxConcurrentJobs: 1,
|
||||
Storage: raczylocomv1.ClusterImageStorageSpec{
|
||||
StorageTarget: "FILE",
|
||||
},
|
||||
},
|
||||
}
|
||||
Expect(k8sClient.Create(ctx, exportResource)).To(Succeed())
|
||||
}
|
||||
|
||||
By("creating the custom resource for the Kind ClusterImage")
|
||||
err := k8sClient.Get(ctx, typeNamespacedName, clusterimage)
|
||||
err = k8sClient.Get(ctx, typeNamespacedName, clusterimage)
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
resource := &raczylocomv1.ClusterImage{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: resourceName,
|
||||
Namespace: "default",
|
||||
},
|
||||
// TODO(user): Specify other spec details if needed.
|
||||
Spec: raczylocomv1.ClusterImageSpec{
|
||||
ExportName: exportName,
|
||||
Image: "nginx:latest",
|
||||
},
|
||||
}
|
||||
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
|
||||
}
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
// TODO(user): Cleanup logic after each test, like removing the resource instance.
|
||||
By("Cleanup the specific resource instance ClusterImage")
|
||||
resource := &raczylocomv1.ClusterImage{}
|
||||
err := k8sClient.Get(ctx, typeNamespacedName, resource)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if err == nil {
|
||||
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
|
||||
}
|
||||
|
||||
By("Cleanup the specific resource instance ClusterImage")
|
||||
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
|
||||
By("Cleanup the ClusterImageExport")
|
||||
export := &raczylocomv1.ClusterImageExport{}
|
||||
err = k8sClient.Get(ctx, exportNamespacedName, export)
|
||||
if err == nil {
|
||||
Expect(k8sClient.Delete(ctx, export)).To(Succeed())
|
||||
}
|
||||
})
|
||||
It("should successfully reconcile the resource", func() {
|
||||
By("Reconciling the created resource")
|
||||
|
||||
@@ -2,9 +2,10 @@ package raczylocom
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/md5" // #nosec G501 - MD5 used for non-cryptographic unique identifiers only
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
@@ -14,7 +15,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
@@ -27,7 +28,7 @@ import (
|
||||
// ClusterImageExportReconciler reconciles a ClusterImageExport object
|
||||
type ClusterImageExportReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
Scheme *runtime.Scheme
|
||||
podAnnotations map[string]string
|
||||
}
|
||||
|
||||
@@ -61,6 +62,19 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
return r.handleDeletion(ctx, clusterImageExport)
|
||||
}
|
||||
|
||||
// Check if this export should be deleted by TTL
|
||||
if r.shouldDeleteByTTL(clusterImageExport) {
|
||||
l.Info("Deleting export due to TTL expiration",
|
||||
"export", clusterImageExport.Name,
|
||||
"ttlDays", *clusterImageExport.Spec.TTLDaysAfterFinished,
|
||||
"completedAt", clusterImageExport.Status.CompletedAt)
|
||||
if err := r.Delete(ctx, clusterImageExport); err != nil && !errors.IsNotFound(err) {
|
||||
l.Error(err, "Failed to delete export by TTL")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// Add finalizer and creation timestamp annotation if they don't exist
|
||||
needsUpdate := false
|
||||
if !controllerutil.ContainsFinalizer(clusterImageExport, clusterImageExportFinalizer) {
|
||||
@@ -121,6 +135,7 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
|
||||
for _, image := range fullImagesList.Containers {
|
||||
// Include creation timestamp in the hash to differentiate between exports with the same name
|
||||
// #nosec G401 - MD5 used for non-cryptographic unique identifier generation, not security
|
||||
nameHash := fmt.Sprintf("%x", md5.Sum([]byte(clusterImageExport.Name+image.Image+image.Tag+image.Sha+
|
||||
clusterImageExport.Annotations["export.raczylo.com/creation-timestamp"])))[:14]
|
||||
|
||||
@@ -156,7 +171,7 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
Kind: clusterImageExport.Kind,
|
||||
Name: clusterImageExport.Name,
|
||||
UID: clusterImageExport.UID,
|
||||
Controller: pointer.Bool(true),
|
||||
Controller: ptr.To(true),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -185,7 +200,7 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
failedCount := 0
|
||||
pendingCount := 0
|
||||
clusterImageList := &raczylocomv1.ClusterImageList{}
|
||||
if err := r.List(ctx, clusterImageList, client.InNamespace(clusterImageExport.Namespace),
|
||||
if err := r.List(ctx, clusterImageList, client.InNamespace(clusterImageExport.Namespace),
|
||||
client.MatchingFields{"spec.exportName": clusterImageExport.Name}); err != nil {
|
||||
l.Error(err, "unable to list ClusterImages")
|
||||
return ctrl.Result{}, err
|
||||
@@ -213,6 +228,11 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
} else {
|
||||
export.Status.Progress = shared.STATUS_SUCCESS
|
||||
}
|
||||
// Set CompletedAt timestamp when export completes
|
||||
if export.Status.CompletedAt == nil {
|
||||
now := metav1.Now()
|
||||
export.Status.CompletedAt = &now
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
@@ -220,6 +240,15 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// If export is complete, run retention cleanup
|
||||
if clusterImageExport.Status.Progress == shared.STATUS_SUCCESS ||
|
||||
clusterImageExport.Status.Progress == shared.STATUS_FAILED {
|
||||
if err := r.cleanupByRetention(ctx, clusterImageExport); err != nil {
|
||||
l.Error(err, "Failed to cleanup by retention policy")
|
||||
// Don't return error - this is non-critical
|
||||
}
|
||||
}
|
||||
|
||||
// If there are still pending images, requeue
|
||||
if pendingCount > 0 {
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
@@ -250,21 +279,6 @@ func (r *ClusterImageExportReconciler) updateStatusWithRetry(ctx context.Context
|
||||
})
|
||||
}
|
||||
|
||||
func (r *ClusterImageExportReconciler) checkAllClusterImagesCompleted(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) (bool, error) {
|
||||
clusterImageList := &raczylocomv1.ClusterImageList{}
|
||||
if err := r.List(ctx, clusterImageList, client.InNamespace(clusterImageExport.Namespace), client.MatchingFields{"spec.exportName": clusterImageExport.Name}); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, ci := range clusterImageList.Items {
|
||||
if ci.Status.Progress != shared.STATUS_SUCCESS && ci.Status.Progress != shared.STATUS_PRESENT {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
|
||||
func (r *ClusterImageExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
@@ -398,18 +412,18 @@ func (r *ClusterImageExportReconciler) runCleanupJob(ctx context.Context, cluste
|
||||
if clusterImageExport.Spec.Storage.StorageTarget == shared.STORAGE_S3 {
|
||||
s3Params := shared.SetupS3Params(clusterImageExport.Spec.Storage.S3)
|
||||
additionalCommands := []string{
|
||||
"./cleanup.py " + strings.Join(s3Params, " ") + " 's3://" + clusterImageExport.Spec.Storage.S3.Bucket + clusterImageExport.Spec.BasePath + "/" + clusterImageExport.ObjectMeta.Name + "/'",
|
||||
"./worker cleanup " + strings.Join(s3Params, " ") + " 's3://" + clusterImageExport.Spec.Storage.S3.Bucket + clusterImageExport.Spec.BasePath + "/" + clusterImageExport.ObjectMeta.Name + "/'",
|
||||
}
|
||||
defaultCommands = append(defaultCommands, additionalCommands...)
|
||||
} else if clusterImageExport.Spec.Storage.StorageTarget == shared.STORAGE_FILE {
|
||||
additionalCommands := []string{
|
||||
"./cleanup.py" + "'" + clusterImageExport.Spec.BasePath + "/" + clusterImageExport.ObjectMeta.Name + "/'",
|
||||
"./worker cleanup '" + clusterImageExport.Spec.BasePath + "/" + clusterImageExport.ObjectMeta.Name + "/'",
|
||||
}
|
||||
defaultCommands = append(defaultCommands, additionalCommands...)
|
||||
}
|
||||
|
||||
// Set up the cleanup job with retry limits and TTL
|
||||
backoffLimit := int32(2) // 3 total attempts (initial + 2 retries)
|
||||
backoffLimit := int32(2) // 3 total attempts (initial + 2 retries)
|
||||
ttlSecondsAfterFinished := int32(300) // Delete job 5 minutes after completion
|
||||
|
||||
// Merge annotations from different sources
|
||||
@@ -443,16 +457,16 @@ func (r *ClusterImageExportReconciler) runCleanupJob(ctx context.Context, cluste
|
||||
}
|
||||
|
||||
jobParams := shared.JobParams{
|
||||
Name: normalisedImageName,
|
||||
Namespace: clusterImageExport.Namespace,
|
||||
Image: shared.BACKUP_JOB_IMAGE,
|
||||
Commands: defaultCommands,
|
||||
Annotations: mergedAnnotations,
|
||||
ServiceAccount: "",
|
||||
ImagePullSecrets: clusterImageExport.Spec.ImagePullSecrets,
|
||||
BackoffLimit: &backoffLimit,
|
||||
Name: normalisedImageName,
|
||||
Namespace: clusterImageExport.Namespace,
|
||||
Image: shared.BACKUP_JOB_IMAGE,
|
||||
Commands: defaultCommands,
|
||||
Annotations: mergedAnnotations,
|
||||
ServiceAccount: "",
|
||||
ImagePullSecrets: clusterImageExport.Spec.ImagePullSecrets,
|
||||
BackoffLimit: &backoffLimit,
|
||||
TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
|
||||
EnvVars: envVars,
|
||||
EnvVars: envVars,
|
||||
}
|
||||
|
||||
cleanupJob := shared.CreateJob(jobParams, func(raczylocomv1.ClusterImageExport) []string { return nil })
|
||||
@@ -466,3 +480,110 @@ func (r *ClusterImageExportReconciler) runCleanupJob(ctx context.Context, cluste
|
||||
l.Info("Created cleanup job with retry limit and TTL")
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldDeleteByTTL checks if the export should be deleted based on TTL (in days)
|
||||
func (r *ClusterImageExportReconciler) shouldDeleteByTTL(clusterImageExport *raczylocomv1.ClusterImageExport) bool {
|
||||
// Only apply TTL to completed exports
|
||||
if clusterImageExport.Status.Progress != shared.STATUS_SUCCESS &&
|
||||
clusterImageExport.Status.Progress != shared.STATUS_FAILED {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if TTL is configured
|
||||
if clusterImageExport.Spec.TTLDaysAfterFinished == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if CompletedAt is set
|
||||
if clusterImageExport.Status.CompletedAt == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Convert days to duration (24 hours per day)
|
||||
ttlDuration := time.Duration(*clusterImageExport.Spec.TTLDaysAfterFinished) * 24 * time.Hour
|
||||
expirationTime := clusterImageExport.Status.CompletedAt.Add(ttlDuration)
|
||||
|
||||
return time.Now().After(expirationTime)
|
||||
}
|
||||
|
||||
// cleanupByRetention enforces the retention policy for completed exports
|
||||
func (r *ClusterImageExportReconciler) cleanupByRetention(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) error {
|
||||
l := log.FromContext(ctx)
|
||||
|
||||
// Check if retention policy is configured
|
||||
if clusterImageExport.Spec.Retention == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// List all ClusterImageExports in the same namespace
|
||||
exportList := &raczylocomv1.ClusterImageExportList{}
|
||||
if err := r.List(ctx, exportList, client.InNamespace(clusterImageExport.Namespace)); err != nil {
|
||||
return fmt.Errorf("failed to list ClusterImageExports: %w", err)
|
||||
}
|
||||
|
||||
// Separate successful and failed exports, sorted by completion time
|
||||
var successfulExports, failedExports []*raczylocomv1.ClusterImageExport
|
||||
for i := range exportList.Items {
|
||||
export := &exportList.Items[i]
|
||||
// Skip exports that don't have the same base path (different backup sets)
|
||||
if export.Spec.BasePath != clusterImageExport.Spec.BasePath {
|
||||
continue
|
||||
}
|
||||
// Skip exports that are still running
|
||||
if export.Status.Progress != shared.STATUS_SUCCESS &&
|
||||
export.Status.Progress != shared.STATUS_FAILED {
|
||||
continue
|
||||
}
|
||||
if export.Status.Progress == shared.STATUS_SUCCESS {
|
||||
successfulExports = append(successfulExports, export)
|
||||
} else if export.Status.Progress == shared.STATUS_FAILED {
|
||||
failedExports = append(failedExports, export)
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by CompletedAt (newest first)
|
||||
sortByCompletionTime := func(exports []*raczylocomv1.ClusterImageExport) {
|
||||
for i := 0; i < len(exports); i++ {
|
||||
for j := i + 1; j < len(exports); j++ {
|
||||
iTime := exports[i].Status.CompletedAt
|
||||
jTime := exports[j].Status.CompletedAt
|
||||
if iTime == nil || (jTime != nil && jTime.After(iTime.Time)) {
|
||||
exports[i], exports[j] = exports[j], exports[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sortByCompletionTime(successfulExports)
|
||||
sortByCompletionTime(failedExports)
|
||||
|
||||
// Delete excess successful exports
|
||||
if clusterImageExport.Spec.Retention.MaxSuccessful != nil {
|
||||
maxSuccessful := int(*clusterImageExport.Spec.Retention.MaxSuccessful)
|
||||
if len(successfulExports) > maxSuccessful {
|
||||
for _, export := range successfulExports[maxSuccessful:] {
|
||||
l.Info("Deleting export due to retention policy (maxSuccessful exceeded)",
|
||||
"export", export.Name, "maxSuccessful", maxSuccessful)
|
||||
if err := r.Delete(ctx, export); err != nil && !errors.IsNotFound(err) {
|
||||
l.Error(err, "Failed to delete export for retention", "export", export.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete excess failed exports
|
||||
if clusterImageExport.Spec.Retention.MaxFailed != nil {
|
||||
maxFailed := int(*clusterImageExport.Spec.Retention.MaxFailed)
|
||||
if len(failedExports) > maxFailed {
|
||||
for _, export := range failedExports[maxFailed:] {
|
||||
l.Info("Deleting export due to retention policy (maxFailed exceeded)",
|
||||
"export", export.Name, "maxFailed", maxFailed)
|
||||
if err := r.Delete(ctx, export); err != nil && !errors.IsNotFound(err) {
|
||||
l.Error(err, "Failed to delete export for retention", "export", export.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -51,20 +51,26 @@ var _ = Describe("ClusterImageExport Controller", func() {
|
||||
Name: resourceName,
|
||||
Namespace: "default",
|
||||
},
|
||||
// TODO(user): Specify other spec details if needed.
|
||||
Spec: raczylocomv1.ClusterImageExportSpec{
|
||||
Name: resourceName,
|
||||
BasePath: "/backups/test",
|
||||
MaxConcurrentJobs: 1,
|
||||
Storage: raczylocomv1.ClusterImageStorageSpec{
|
||||
StorageTarget: "FILE",
|
||||
},
|
||||
},
|
||||
}
|
||||
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
|
||||
}
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
// TODO(user): Cleanup logic after each test, like removing the resource instance.
|
||||
By("Cleanup the specific resource instance ClusterImageExport")
|
||||
resource := &raczylocomv1.ClusterImageExport{}
|
||||
err := k8sClient.Get(ctx, typeNamespacedName, resource)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By("Cleanup the specific resource instance ClusterImageExport")
|
||||
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
|
||||
if err == nil {
|
||||
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
|
||||
}
|
||||
})
|
||||
It("should successfully reconcile the resource", func() {
|
||||
By("Reconciling the created resource")
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -19,6 +19,7 @@ package raczylocom
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
@@ -28,6 +29,8 @@ import (
|
||||
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/envtest"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/log"
|
||||
@@ -58,17 +61,18 @@ var _ = BeforeSuite(func() {
|
||||
ctx, cancel = context.WithCancel(context.TODO())
|
||||
|
||||
By("bootstrapping test environment")
|
||||
|
||||
// Use KUBEBUILDER_ASSETS if set (CI), otherwise fall back to local path
|
||||
binaryAssetsDir := os.Getenv("KUBEBUILDER_ASSETS")
|
||||
if binaryAssetsDir == "" {
|
||||
binaryAssetsDir = filepath.Join("..", "..", "..", "bin", "k8s",
|
||||
fmt.Sprintf("1.31.0-%s-%s", runtime.GOOS, runtime.GOARCH))
|
||||
}
|
||||
|
||||
testEnv = &envtest.Environment{
|
||||
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")},
|
||||
ErrorIfCRDPathMissing: true,
|
||||
|
||||
// The BinaryAssetsDirectory is only required if you want to run the tests directly
|
||||
// without call the makefile target test. If not informed it will look for the
|
||||
// default path defined in controller-runtime which is /usr/local/kubebuilder/.
|
||||
// Note that you must have the required binaries setup under the bin directory to perform
|
||||
// the tests directly. When we run make test it will be setup and used automatically.
|
||||
BinaryAssetsDirectory: filepath.Join("..", "..", "..", "bin", "k8s",
|
||||
fmt.Sprintf("1.31.0-%s-%s", runtime.GOOS, runtime.GOARCH)),
|
||||
BinaryAssetsDirectory: binaryAssetsDir,
|
||||
}
|
||||
|
||||
var err error
|
||||
@@ -82,8 +86,38 @@ var _ = BeforeSuite(func() {
|
||||
|
||||
// +kubebuilder:scaffold:scheme
|
||||
|
||||
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
|
||||
// Create a manager to get a cache-backed client that supports field selectors
|
||||
// Explicitly configure cache to watch ClusterImage and ClusterImageExport resources
|
||||
// This is required for field selectors to work in tests (without registering controllers)
|
||||
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
|
||||
Scheme: scheme.Scheme,
|
||||
Cache: cache.Options{
|
||||
ByObject: map[client.Object]cache.ByObject{
|
||||
&raczylocomv1.ClusterImage{}: {},
|
||||
&raczylocomv1.ClusterImageExport{}: {},
|
||||
},
|
||||
},
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Add field index for spec.exportName on ClusterImage
|
||||
err = mgr.GetFieldIndexer().IndexField(ctx, &raczylocomv1.ClusterImage{}, "spec.exportName", func(obj client.Object) []string {
|
||||
clusterImage := obj.(*raczylocomv1.ClusterImage)
|
||||
return []string{clusterImage.Spec.ExportName}
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Start the manager cache in background
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
err := mgr.Start(ctx)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
// Wait for cache to sync
|
||||
Expect(mgr.GetCache().WaitForCacheSync(ctx)).To(BeTrue())
|
||||
|
||||
k8sClient = mgr.GetClient()
|
||||
Expect(k8sClient).NotTo(BeNil())
|
||||
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user