Initial commit for the operator

This commit is contained in:
2024-09-04 20:46:36 +01:00
commit 180dfd1687
82 changed files with 5954 additions and 0 deletions
@@ -0,0 +1,413 @@
package raczylocom
import (
"context"
"fmt"
"strings"
"time"
"github.com/go-logr/logr"
v1batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
raczylocomv1 "raczylo.com/kubernetes-images-sync-operator/api/raczylo.com/v1"
"raczylo.com/kubernetes-images-sync-operator/shared"
)
type ClusterImageReconciler struct {
client.Client
Scheme *runtime.Scheme
MaxParallelJobs int
ActiveJobs int
}
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimages,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimages/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimages/finalizers,verbs=update
// # additional RBAC rules - create and manage jobs
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimageexports,verbs=get;list;watch;update;patch
func (r *ClusterImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
clusterImage := &raczylocomv1.ClusterImage{}
if err := r.Get(ctx, req.NamespacedName, clusterImage); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
l.Error(err, "unable to fetch ClusterImage")
return ctrl.Result{}, err
}
clusterImageExport := &raczylocomv1.ClusterImageExport{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Spec.ExportName, Namespace: clusterImage.Namespace}, clusterImageExport); err != nil {
l.Error(err, "unable to fetch ClusterImageExport")
return ctrl.Result{}, err
}
r.MaxParallelJobs = clusterImageExport.Spec.MaxConcurrentJobs
// If the ClusterImage is new, set its status to PENDING
if clusterImage.Status.Progress == "" {
clusterImage.Status.Progress = shared.STATUS_PENDING
if err := r.Status().Update(ctx, clusterImage); err != nil {
l.Error(err, "unable to update ClusterImage status to PENDING")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
// If we've reached the maximum number of parallel jobs, requeue
if r.ActiveJobs >= r.MaxParallelJobs && clusterImage.Status.Progress == shared.STATUS_PENDING {
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
}
// Process the ClusterImage based on its current status
switch clusterImage.Status.Progress {
case shared.STATUS_PENDING:
return r.handlePendingClusterImage(ctx, clusterImage, l)
case shared.STATUS_RUNNING, shared.STATUS_RETRYING:
return r.handleRunningClusterImage(ctx, clusterImage, l)
case shared.STATUS_SUCCESS, shared.STATUS_FAILED, shared.STATUS_PRESENT:
return ctrl.Result{}, nil // No further action needed
default:
l.Info("Unexpected ClusterImage status", "Status", clusterImage.Status.Progress)
return ctrl.Result{}, nil
}
}
func (r *ClusterImageReconciler) handlePendingClusterImage(ctx context.Context, clusterImage *raczylocomv1.ClusterImage, l logr.Logger) (ctrl.Result, error) {
// Check if the image is present
exists, err := r.checkImageExists(ctx, clusterImage)
if err != nil {
l.Error(err, "unable to check if image exists")
return ctrl.Result{}, err
}
if exists {
clusterImage.Status.Progress = shared.STATUS_PRESENT
if err := r.Status().Update(ctx, clusterImage); err != nil {
l.Error(err, "unable to update ClusterImage status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Fetch the associated ClusterImageExport
clusterImageExport := &raczylocomv1.ClusterImageExport{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Spec.ExportName, Namespace: clusterImage.Namespace}, clusterImageExport); err != nil {
l.Error(err, "unable to fetch ClusterImageExport")
return ctrl.Result{}, err
}
// Create the backup job
if err := r.createBackupJob(ctx, clusterImage, clusterImageExport, l); err != nil {
l.Error(err, "unable to create backup job")
return ctrl.Result{}, err
}
// Update ClusterImage status to RUNNING
clusterImage.Status.Progress = shared.STATUS_RUNNING
if err := r.Status().Update(ctx, clusterImage); err != nil {
l.Error(err, "unable to update ClusterImage status to RUNNING")
return ctrl.Result{}, err
}
// Increment the active jobs count
r.ActiveJobs++
return ctrl.Result{Requeue: true}, nil
}
func (r *ClusterImageReconciler) handleRunningClusterImage(ctx context.Context, clusterImage *raczylocomv1.ClusterImage, l logr.Logger) (ctrl.Result, error) {
// Check for existing job for this ClusterImage
existingJob := &v1batch.Job{}
jobName := fmt.Sprintf("img-export-%s", clusterImage.Name)
err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: clusterImage.Namespace}, existingJob)
if err != nil {
if errors.IsNotFound(err) {
// Job doesn't exist, set status back to PENDING
clusterImage.Status.Progress = shared.STATUS_PENDING
if err := r.Status().Update(ctx, clusterImage); err != nil {
l.Error(err, "unable to update ClusterImage status back to PENDING")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
l.Error(err, "unable to check for existing job")
return ctrl.Result{}, err
}
// Check job status and update ClusterImage accordingly
if existingJob.Status.Succeeded > 0 {
clusterImage.Status.Progress = shared.STATUS_SUCCESS
r.ActiveJobs--
} else if existingJob.Status.Failed > 0 {
if clusterImage.Status.RetryCount < 3 {
clusterImage.Status.Progress = shared.STATUS_RETRYING
clusterImage.Status.RetryCount++
if err := r.Delete(ctx, existingJob); err != nil {
l.Error(err, "unable to delete failed job for retry")
return ctrl.Result{}, err
}
r.ActiveJobs--
return ctrl.Result{Requeue: true}, nil
} else {
clusterImage.Status.Progress = shared.STATUS_FAILED
r.ActiveJobs--
}
}
if err := r.handleJobRestarts(ctx, existingJob, clusterImage); err != nil {
l.Error(err, "unable to handle job restarts")
return ctrl.Result{}, err
}
// Update ClusterImage status
if err := r.Status().Update(ctx, clusterImage); err != nil {
l.Error(err, "unable to update ClusterImage status")
return ctrl.Result{}, err
}
// Delete the completed job
if clusterImage.Status.Progress == shared.STATUS_SUCCESS || clusterImage.Status.Progress == shared.STATUS_FAILED {
if err := r.Delete(ctx, existingJob); err != nil && !errors.IsNotFound(err) {
l.Error(err, "unable to delete completed job")
return ctrl.Result{}, err
}
}
l.Info("Reconciling ClusterImage completed", "Name", clusterImage.Name, "Status", clusterImage.Status.Progress)
return r.updateClusterImageExportStatus(ctx, clusterImage)
}
func (r *ClusterImageReconciler) cleanupJobPods(ctx context.Context, job *v1batch.Job) error {
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Selector.MatchLabels)); err != nil {
return err
}
for _, pod := range podList.Items {
if err := r.Delete(ctx, &pod); err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func (r *ClusterImageReconciler) createBackupJob(ctx context.Context, clusterImage *raczylocomv1.ClusterImage, clusterImageExport *raczylocomv1.ClusterImageExport, l logr.Logger) error {
normalisedImageName := shared.NormalizeImageName(clusterImage.Spec.FullName)
defaultCommands := []string{
"podman pull " + clusterImage.Spec.FullName,
"podman save --quiet -o /tmp/" + normalisedImageName + ".tar " + clusterImage.Spec.FullName,
}
if clusterImage.Spec.Storage == shared.STORAGE_S3 {
s3Params := shared.SetupS3Params(clusterImageExport.Spec.Storage.S3)
additionalCommands := []string{
"./export.py " + strings.Join(s3Params, " ") + " '/tmp/" + normalisedImageName + ".tar' " + "'s3://" + clusterImageExport.Spec.Storage.S3.Bucket + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar'",
}
defaultCommands = append(defaultCommands, additionalCommands...)
} else if clusterImage.Spec.Storage == shared.STORAGE_FILE {
additionalCommands := []string{
"./export.py /tmp/" + normalisedImageName + ".tar" + " " + clusterImage.Spec.ExportPath + "/" + clusterImage.Spec.ExportName + "/" + normalisedImageName + ".tar",
}
defaultCommands = append(defaultCommands, additionalCommands...)
}
defaultCommands = append(defaultCommands, "rm -f /tmp/"+normalisedImageName+".tar")
jobParams := shared.JobParams{
Name: fmt.Sprintf("img-export-%s", clusterImage.Name),
Namespace: clusterImage.Namespace,
Image: shared.BACKUP_JOB_IMAGE,
Commands: defaultCommands,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: clusterImage.APIVersion,
Kind: clusterImage.Kind,
Name: clusterImage.Name,
UID: clusterImage.UID,
BlockOwnerDeletion: pointer.Bool(true),
Controller: pointer.Bool(true),
},
},
}
backupJob := shared.CreateJob(jobParams, func(raczylocomv1.ClusterImageExport) []string { return nil })
if err := r.Create(ctx, backupJob); err != nil {
return err
}
clusterImage.Status.Progress = shared.STATUS_RUNNING
return r.Status().Update(ctx, clusterImage)
}
func (r *ClusterImageReconciler) updateClusterImageExportStatus(ctx context.Context, clusterImage *raczylocomv1.ClusterImage) (ctrl.Result, error) {
l := log.FromContext(ctx)
clusterImageExport := &raczylocomv1.ClusterImageExport{}
if err := r.Get(ctx, types.NamespacedName{Name: clusterImage.Spec.ExportName, Namespace: clusterImage.Namespace}, clusterImageExport); err != nil {
l.Error(err, "unable to fetch ClusterImageExport")
return ctrl.Result{}, err
}
clusterImageList := &raczylocomv1.ClusterImageList{}
if err := r.List(ctx, clusterImageList, client.InNamespace(clusterImage.Namespace), client.MatchingFields{"spec.exportName": clusterImage.Spec.ExportName}); err != nil {
l.Error(err, "unable to list ClusterImages")
return ctrl.Result{}, err
}
allCompleted := true
anyFailed := false
anyRunning := false
for _, ci := range clusterImageList.Items {
switch ci.Status.Progress {
case shared.STATUS_SUCCESS, shared.STATUS_PRESENT:
// These statuses are considered completed
case shared.STATUS_FAILED:
anyFailed = true
allCompleted = false
case shared.STATUS_RUNNING, shared.STATUS_RETRYING:
allCompleted = false
anyRunning = true
case shared.STATUS_PENDING:
allCompleted = false
}
}
var newStatus string
if allCompleted {
newStatus = shared.STATUS_SUCCESS
} else if anyFailed {
newStatus = shared.STATUS_FAILED
} else if anyRunning {
newStatus = shared.STATUS_RUNNING
} else {
newStatus = shared.STATUS_PENDING
}
if clusterImageExport.Status.Progress != newStatus {
clusterImageExport.Status.Progress = newStatus
if err := r.Status().Update(ctx, clusterImageExport); err != nil {
l.Error(err, "unable to update ClusterImageExport status")
return ctrl.Result{}, err
}
l.Info("Updated ClusterImageExport status", "ExportName", clusterImageExport.Name, "NewStatus", newStatus)
}
// If there are still pending or running images, requeue
if !allCompleted {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}
func (r *ClusterImageReconciler) handleJobRestarts(ctx context.Context, job *v1batch.Job, clusterImage *raczylocomv1.ClusterImage) error {
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Selector.MatchLabels)); err != nil {
return err
}
for _, pod := range podList.Items {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.RestartCount > 0 {
clusterImage.Status.RetryCount += int(containerStatus.RestartCount)
if clusterImage.Status.RetryCount >= 3 {
clusterImage.Status.Progress = shared.STATUS_FAILED
if err := r.Status().Update(ctx, clusterImage); err != nil {
return err
}
return r.removeAllJobsAndContainers(ctx, clusterImage.Namespace)
} else {
clusterImage.Status.Progress = shared.STATUS_RETRYING
}
if err := r.Status().Update(ctx, clusterImage); err != nil {
return err
}
return nil
}
}
}
return nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterImageReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&raczylocomv1.ClusterImage{}).
Owns(&v1batch.Job{}).
Complete(r)
}
func (r *ClusterImageReconciler) removeAllJobsAndContainers(ctx context.Context, namespace string) error {
jobList := &v1batch.JobList{}
if err := r.List(ctx, jobList, client.InNamespace(namespace), client.MatchingLabels{"app": "image-export"}); err != nil {
return err
}
for _, job := range jobList.Items {
if err := r.Delete(ctx, &job, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func (r *ClusterImageReconciler) checkImageExists(ctx context.Context, clusterImage *raczylocomv1.ClusterImage) (bool, error) {
clusterImageList := &raczylocomv1.ClusterImageList{}
if err := r.List(ctx, clusterImageList); err != nil {
return false, err
}
for _, ci := range clusterImageList.Items {
if ci.Spec.FullName == clusterImage.Spec.FullName && ci.Name != clusterImage.Name {
if ci.Status.Progress == shared.STATUS_SUCCESS || ci.Status.Progress == shared.STATUS_PRESENT || ci.Status.Progress == shared.STATUS_RUNNING {
return true, nil
}
}
}
// Check if the image is already in the COMPLETED state
if clusterImage.Status.Progress == shared.STATUS_SUCCESS {
return true, nil
}
return false, nil
}
func (r *ClusterImageReconciler) isJobStarted(ctx context.Context, job *v1batch.Job) (bool, error) {
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Selector.MatchLabels)); err != nil {
return false, err
}
for _, pod := range podList.Items {
if pod.Status.Phase == v1.PodRunning {
return true, nil
}
}
return false, nil
}
func (r *ClusterImageReconciler) hasJobTimedOut(job *v1batch.Job) bool {
// Check if the job has been running for more than 5 minutes without starting
return time.Since(job.CreationTimestamp.Time) > 5*time.Minute
}
@@ -0,0 +1,84 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package raczylocom
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
raczylocomv1 "raczylo.com/kubernetes-images-sync-operator/api/raczylo.com/v1"
)
var _ = Describe("ClusterImage Controller", func() {
Context("When reconciling a resource", func() {
const resourceName = "test-resource"
ctx := context.Background()
typeNamespacedName := types.NamespacedName{
Name: resourceName,
Namespace: "default", // TODO(user):Modify as needed
}
clusterimage := &raczylocomv1.ClusterImage{}
BeforeEach(func() {
By("creating the custom resource for the Kind ClusterImage")
err := k8sClient.Get(ctx, typeNamespacedName, clusterimage)
if err != nil && errors.IsNotFound(err) {
resource := &raczylocomv1.ClusterImage{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default",
},
// TODO(user): Specify other spec details if needed.
}
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
}
})
AfterEach(func() {
// TODO(user): Cleanup logic after each test, like removing the resource instance.
resource := &raczylocomv1.ClusterImage{}
err := k8sClient.Get(ctx, typeNamespacedName, resource)
Expect(err).NotTo(HaveOccurred())
By("Cleanup the specific resource instance ClusterImage")
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
})
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &ClusterImageReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
// TODO(user): Add more specific assertions depending on your controller's reconciliation logic.
// Example: If you expect a certain status condition after reconciliation, verify it here.
})
})
})
@@ -0,0 +1,320 @@
package raczylocom
import (
"context"
"crypto/md5"
"fmt"
"strings"
"time"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1batch "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
raczylocomv1 "raczylo.com/kubernetes-images-sync-operator/api/raczylo.com/v1"
shared "raczylo.com/kubernetes-images-sync-operator/shared"
)
// ClusterImageExportReconciler reconciles a ClusterImageExport object
type ClusterImageExportReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimageexports,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimageexports/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimageexports/finalizers,verbs=update
// additional RBAC rules
// +kubebuilder:rbac:groups=raczylo.com,resources=clusterimages,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=daemonsets,verbs=get;list;watch
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch
const clusterImageExportFinalizer = "finalizer.clusterimageexport.raczylo.com"
func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
l.Info("Reconciling ClusterImageExport")
// Fetch the ClusterImageExport instance
clusterImageExport := &raczylocomv1.ClusterImageExport{}
if err := r.Get(ctx, req.NamespacedName, clusterImageExport); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if !clusterImageExport.ObjectMeta.DeletionTimestamp.IsZero() {
return r.handleDeletion(ctx, clusterImageExport)
}
// Add finalizer if it doesn't exist
if !controllerutil.ContainsFinalizer(clusterImageExport, clusterImageExportFinalizer) {
controllerutil.AddFinalizer(clusterImageExport, clusterImageExportFinalizer)
if err := r.Update(ctx, clusterImageExport); err != nil {
return ctrl.Result{}, err
}
}
// Early return if the ClusterImageExport is already in a completed state
if clusterImageExport.Status.Progress == shared.STATUS_SUCCESS || clusterImageExport.Status.Progress == shared.STATUS_FAILED {
l.Info("ClusterImageExport is already in a completed state", "Status", clusterImageExport.Status.Progress)
return ctrl.Result{}, nil
}
// If the status is empty, set it to PENDING
if clusterImageExport.Status.Progress == "" {
clusterImageExport.Status.Progress = shared.STATUS_PENDING
if err := r.Status().Update(ctx, clusterImageExport); err != nil {
l.Error(err, "unable to update ClusterImageExport status")
return ctrl.Result{}, err
}
}
// Proceed with the rest of the reconciliation logic
fullImagesList, err := r.listImagesInCluster(ctx, l, clusterImageExport)
if err != nil {
l.Error(err, "unable to list images in the cluster")
return ctrl.Result{}, err
}
clusterImageExport.Status.Progress = shared.STATUS_RUNNING
if err := r.Status().Update(ctx, clusterImageExport); err != nil {
l.Error(err, "unable to update ClusterImageExport status to RUNNING")
return ctrl.Result{}, err
}
for _, image := range fullImagesList.Containers {
nameHash := fmt.Sprintf("%x", md5.Sum([]byte(clusterImageExport.Name+image.Image+image.Tag+image.Sha)))[:14]
// Check if the ClusterImage already exists
clusterImage := &raczylocomv1.ClusterImage{}
err := r.Get(ctx, client.ObjectKey{Namespace: clusterImageExport.Namespace, Name: nameHash}, clusterImage)
if err == nil {
// ClusterImage exists, check its status
if clusterImage.Status.Progress == shared.STATUS_FAILED {
clusterImageExport.Status.Progress = shared.STATUS_FAILED
if err := r.Status().Update(ctx, clusterImageExport); err != nil {
l.Error(err, "unable to update ClusterImageExport status to FAILED")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
continue
} else if !errors.IsNotFound(err) {
l.Error(err, "unable to get ClusterImage")
return ctrl.Result{}, err
}
// Create a new ClusterImage
newClusterImage := &raczylocomv1.ClusterImage{
ObjectMeta: metav1.ObjectMeta{
Name: nameHash,
Namespace: clusterImageExport.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: clusterImageExport.APIVersion,
Kind: clusterImageExport.Kind,
Name: clusterImageExport.Name,
UID: clusterImageExport.UID,
Controller: pointer.Bool(true),
},
},
},
Spec: raczylocomv1.ClusterImageSpec{
Image: image.Image,
Tag: image.Tag,
Sha: image.Sha,
FullName: image.FullName,
Storage: clusterImageExport.Spec.Storage.StorageTarget,
ExportName: clusterImageExport.Name,
ExportPath: clusterImageExport.Spec.BasePath,
},
}
if err := r.Create(ctx, newClusterImage); err != nil {
l.Error(err, "unable to create ClusterImage", "image", image)
return ctrl.Result{}, err
}
}
// Check if all ClusterImages are completed
allCompleted, err := r.checkAllClusterImagesCompleted(ctx, clusterImageExport)
if err != nil {
l.Error(err, "unable to check ClusterImages status")
return ctrl.Result{}, err
}
if allCompleted {
clusterImageExport.Status.Progress = shared.STATUS_SUCCESS
if err := r.Status().Update(ctx, clusterImageExport); err != nil {
l.Error(err, "unable to update ClusterImageExport status to SUCCESS")
return ctrl.Result{}, err
}
}
return ctrl.Result{Requeue: !allCompleted}, nil
}
func (r *ClusterImageExportReconciler) checkAllClusterImagesCompleted(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) (bool, error) {
clusterImageList := &raczylocomv1.ClusterImageList{}
if err := r.List(ctx, clusterImageList, client.InNamespace(clusterImageExport.Namespace), client.MatchingFields{"spec.exportName": clusterImageExport.Name}); err != nil {
return false, err
}
for _, ci := range clusterImageList.Items {
if ci.Status.Progress != shared.STATUS_SUCCESS && ci.Status.Progress != shared.STATUS_PRESENT {
return false, nil
}
}
return true, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterImageExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&raczylocomv1.ClusterImageExport{}).
Owns(&raczylocomv1.ClusterImage{}).
Complete(r)
}
func (r *ClusterImageExportReconciler) listImagesInCluster(ctx context.Context, l logr.Logger, clusterImageExport *raczylocomv1.ClusterImageExport) (shared.ContainersList, error) {
containersList := shared.ContainersList{}
if err := shared.ListAndProcessResources[*shared.DeploymentWrapper](ctx, r.Client, &appsv1.DeploymentList{}, &containersList); err != nil {
return shared.ContainersList{}, err
}
if err := shared.ListAndProcessResources[*shared.JobWrapper](ctx, r.Client, &batchv1.JobList{}, &containersList); err != nil {
return shared.ContainersList{}, err
}
if err := shared.ListAndProcessResources[*shared.DaemonSetWrapper](ctx, r.Client, &appsv1.DaemonSetList{}, &containersList); err != nil {
return shared.ContainersList{}, err
}
if err := shared.ListAndProcessResources[*shared.CronJobWrapper](ctx, r.Client, &batchv1.CronJobList{}, &containersList); err != nil {
return shared.ContainersList{}, err
}
if len(clusterImageExport.Spec.Includes) > 0 {
containersList = shared.IncludeOnlyImages(containersList, clusterImageExport.Spec.Includes)
}
if len(clusterImageExport.Spec.Excludes) > 0 {
containersList = shared.RemoveExcludedImages(containersList, clusterImageExport.Spec.Excludes)
}
containersList = shared.RemoveDuplicates(containersList)
l.Info("List of containers in the cluster", "containers", containersList)
return containersList, nil
}
func (r *ClusterImageExportReconciler) handleDeletion(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) (ctrl.Result, error) {
l := log.FromContext(ctx)
if controllerutil.ContainsFinalizer(clusterImageExport, clusterImageExportFinalizer) {
// Run the cleanup job
if err := r.runCleanupJob(ctx, clusterImageExport); err != nil {
l.Error(err, "Failed to run cleanup job")
return ctrl.Result{}, err
}
// Remove the finalizer
controllerutil.RemoveFinalizer(clusterImageExport, clusterImageExportFinalizer)
if err := r.Update(ctx, clusterImageExport); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
func (r *ClusterImageExportReconciler) runCleanupJob(ctx context.Context, clusterImageExport *raczylocomv1.ClusterImageExport) error {
l := log.FromContext(ctx)
normalisedImageName := "cleanup-" + shared.NormalizeImageName(clusterImageExport.Name)
defaultCommands := []string{}
if clusterImageExport.Spec.Storage.StorageTarget == shared.STORAGE_S3 {
s3Params := shared.SetupS3Params(clusterImageExport.Spec.Storage.S3)
additionalCommands := []string{
"./cleanup.py " + strings.Join(s3Params, " ") + " 's3://" + clusterImageExport.Spec.Storage.S3.Bucket + clusterImageExport.Spec.BasePath + "/" + clusterImageExport.ObjectMeta.Name + "/'",
}
defaultCommands = append(defaultCommands, additionalCommands...)
} else if clusterImageExport.Spec.Storage.StorageTarget == shared.STORAGE_FILE {
additionalCommands := []string{
"./cleanup.py" + "'" + clusterImageExport.Spec.BasePath + "/" + clusterImageExport.ObjectMeta.Name + "/'",
}
defaultCommands = append(defaultCommands, additionalCommands...)
}
jobParams := shared.JobParams{
Name: normalisedImageName,
Namespace: clusterImageExport.Namespace,
Image: shared.BACKUP_JOB_IMAGE,
Commands: defaultCommands,
}
cleanupJob := shared.CreateJob(jobParams, func(raczylocomv1.ClusterImageExport) []string { return nil })
if err := r.Create(ctx, cleanupJob); err != nil {
l.Error(err, "Failed to create cleanup job")
return err
}
l.Info("Created cleanup job")
go func() {
if err := r.waitForJobCompletionAndDelete(ctx, cleanupJob); err != nil {
l.Error(err, "Failed to wait for job completion and delete")
}
}()
return nil
}
func (r *ClusterImageExportReconciler) waitForJobCompletionAndDelete(ctx context.Context, job *v1batch.Job) error {
l := log.FromContext(ctx)
key := client.ObjectKeyFromObject(job)
// Wait for the job to complete
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := r.Get(ctx, key, job); err != nil {
return err
}
if job.Status.Succeeded > 0 {
// Job completed successfully, delete it
l.Info("Cleanup job completed, deleting", "job", job.Name)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
return err
}
return nil
}
if job.Status.Failed > 0 {
// Job failed, log the error but still delete the job
l.Error(nil, "Cleanup job failed", "job", job.Name)
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
return err
}
return fmt.Errorf("cleanup job failed: %s", job.Name)
}
// Job still running, wait and check again
time.Sleep(5 * time.Second)
}
}
}
@@ -0,0 +1,84 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package raczylocom
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
raczylocomv1 "raczylo.com/kubernetes-images-sync-operator/api/raczylo.com/v1"
)
var _ = Describe("ClusterImageExport Controller", func() {
Context("When reconciling a resource", func() {
const resourceName = "test-resource"
ctx := context.Background()
typeNamespacedName := types.NamespacedName{
Name: resourceName,
Namespace: "default", // TODO(user):Modify as needed
}
clusterimageexport := &raczylocomv1.ClusterImageExport{}
BeforeEach(func() {
By("creating the custom resource for the Kind ClusterImageExport")
err := k8sClient.Get(ctx, typeNamespacedName, clusterimageexport)
if err != nil && errors.IsNotFound(err) {
resource := &raczylocomv1.ClusterImageExport{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default",
},
// TODO(user): Specify other spec details if needed.
}
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
}
})
AfterEach(func() {
// TODO(user): Cleanup logic after each test, like removing the resource instance.
resource := &raczylocomv1.ClusterImageExport{}
err := k8sClient.Get(ctx, typeNamespacedName, resource)
Expect(err).NotTo(HaveOccurred())
By("Cleanup the specific resource instance ClusterImageExport")
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
})
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &ClusterImageExportReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
// TODO(user): Add more specific assertions depending on your controller's reconciliation logic.
// Example: If you expect a certain status condition after reconciliation, verify it here.
})
})
})
@@ -0,0 +1,96 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package raczylocom
import (
"context"
"fmt"
"path/filepath"
"runtime"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
raczylocomv1 "raczylo.com/kubernetes-images-sync-operator/api/raczylo.com/v1"
// +kubebuilder:scaffold:imports
)
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var ctx context.Context
var cancel context.CancelFunc
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controller Suite")
}
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO())
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
// The BinaryAssetsDirectory is only required if you want to run the tests directly
// without call the makefile target test. If not informed it will look for the
// default path defined in controller-runtime which is /usr/local/kubebuilder/.
// Note that you must have the required binaries setup under the bin directory to perform
// the tests directly. When we run make test it will be setup and used automatically.
BinaryAssetsDirectory: filepath.Join("..", "..", "..", "bin", "k8s",
fmt.Sprintf("1.31.0-%s-%s", runtime.GOOS, runtime.GOARCH)),
}
var err error
// cfg is defined in this file globally.
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
err = raczylocomv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
// +kubebuilder:scaffold:scheme
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
})
var _ = AfterSuite(func() {
By("tearing down the test environment")
cancel()
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})