diff --git a/README.md b/README.md index c11c581..9b9e284 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,9 @@ spec: # excludedNamespaces: # - my-awesome-namespace + additionalImages: + - minio/minio:RELEASE.2024-09-09T16-59-28Z + basePath: /images # base path in the target directory storage: target: S3 # file backup is not ready yet diff --git a/api/raczylo.com/v1/clusterimageexport_types.go b/api/raczylo.com/v1/clusterimageexport_types.go index b6380b0..47e8a54 100644 --- a/api/raczylo.com/v1/clusterimageexport_types.go +++ b/api/raczylo.com/v1/clusterimageexport_types.go @@ -69,7 +69,8 @@ type ClusterImageExportSpec struct { ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` // +kubebuilder:validation.Minimum=1 // +kubebuilder:validation.Maximum=100 - MaxConcurrentJobs int `json:"maxConcurrentJobs"` + MaxConcurrentJobs int `json:"maxConcurrentJobs"` + AdditionalImages []string `json:"additionalImages,omitempty"` } // ClusterImageExportStatus defines the observed state of ClusterImageExport diff --git a/chart/Chart.yaml b/chart/Chart.yaml index e96fb15..cc29d4a 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -10,9 +10,9 @@ description: | type: application -version: 0.1.27 +version: 0.1.28 -appVersion: "0.1.27" +appVersion: "0.1.28" home: https://github.com/lukaszraczylo/kubernetes-images-sync-operator diff --git a/chart/values.yaml b/chart/values.yaml index 0b8d34e..84b06a0 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -12,7 +12,7 @@ sa: - ALL image: repository: ghcr.io/lukaszraczylo/kubernetes-images-sync-operator - tag: 0.1.27 + tag: 0.1.28 resources: limits: cpu: 500m diff --git a/config/crd/bases/raczylo.com_clusterimageexports.yaml b/config/crd/bases/raczylo.com_clusterimageexports.yaml index 443f135..f15faa2 100644 --- a/config/crd/bases/raczylo.com_clusterimageexports.yaml +++ b/config/crd/bases/raczylo.com_clusterimageexports.yaml @@ -53,6 +53,10 @@ spec: spec: description: ClusterImageExportSpec defines the desired state of ClusterImageExport properties: + additionalImages: + items: + type: string + type: array basePath: description: Base path for the export - both file and S3 maxLength: 255 diff --git a/internal/controller/raczylo.com/clusterimageexport_controller.go b/internal/controller/raczylo.com/clusterimageexport_controller.go index c84e3a8..9e3f5d0 100644 --- a/internal/controller/raczylo.com/clusterimageexport_controller.go +++ b/internal/controller/raczylo.com/clusterimageexport_controller.go @@ -87,6 +87,17 @@ func (r *ClusterImageExportReconciler) Reconcile(ctx context.Context, req ctrl.R return ctrl.Result{}, err } + if len(clusterImageExport.Spec.AdditionalImages) > 0 { + for _, image := range clusterImageExport.Spec.AdditionalImages { + img, err := shared.ProcessContainerName(image) + if err != nil { + l.Error(err, "unable to process additional image", "image", image) + continue + } + fullImagesList.Containers = append(fullImagesList.Containers, img) + } + } + clusterImageExport.Status.Progress = shared.STATUS_RUNNING if err := r.Status().Update(ctx, clusterImageExport); err != nil { l.Error(err, "unable to update ClusterImageExport status to RUNNING") diff --git a/internal/shared/k8s.go b/internal/shared/k8s.go index f2aa21e..126005d 100644 --- a/internal/shared/k8s.go +++ b/internal/shared/k8s.go @@ -4,11 +4,14 @@ import ( "context" "fmt" "strings" + "sync" + "time" raczylocomv1 "github.com/lukaszraczylo/kubernetes-images-sync-operator/api/raczylo.com/v1" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -18,10 +21,12 @@ type K8sResource interface { } // Wrapper types -type DeploymentWrapper appsv1.Deployment -type JobWrapper batchv1.Job -type DaemonSetWrapper appsv1.DaemonSet -type CronJobWrapper batchv1.CronJob +type ( + DeploymentWrapper appsv1.Deployment + JobWrapper batchv1.Job + DaemonSetWrapper appsv1.DaemonSet + CronJobWrapper batchv1.CronJob +) // Implement the K8sResource interface for wrapper types func (d *DeploymentWrapper) GetPodSpec() *corev1.PodSpec { return &d.Spec.Template.Spec } @@ -31,17 +36,40 @@ func (cj *CronJobWrapper) GetPodSpec() *corev1.PodSpec { return &cj.Spec.JobTemplate.Spec.Template.Spec } -func processContainerName(containerName string) (Container, error) { +type ContainerCache struct { + sync.RWMutex + cache map[string]Container +} + +var containerCache = &ContainerCache{ + cache: make(map[string]Container), +} + +func (cc *ContainerCache) Get(key string) (Container, bool) { + cc.RLock() + defer cc.RUnlock() + c, ok := cc.cache[key] + return c, ok +} + +func (cc *ContainerCache) Set(key string, value Container) { + cc.Lock() + defer cc.Unlock() + cc.cache[key] = value +} + +func ProcessContainerName(containerName string) (Container, error) { + if cnt, ok := containerCache.Get(containerName); ok { + return cnt, nil + } + cnt := Container{} parts := strings.Split(containerName, "@") if len(parts) > 2 { return cnt, fmt.Errorf("invalid container name format: %s", containerName) } - imageAndTag := strings.Split(parts[0], ":") + imageAndTag := strings.SplitN(parts[0], ":", 2) cnt.Image = imageAndTag[0] - if len(imageAndTag) > 2 { - return cnt, fmt.Errorf("invalid image:tag format: %s", parts[0]) - } if len(imageAndTag) == 2 { cnt.Tag = imageAndTag[1] } @@ -53,18 +81,18 @@ func processContainerName(containerName string) (Container, error) { cnt.Sha = parts[1] } cnt.FullName = containerName - // if tag is empty and sha is empty - use tag 'latest' if cnt.Sha == "" && cnt.Tag == "" { cnt.Tag = "latest" } - if cnt.Image == "" { return cnt, fmt.Errorf("image name is required") } + + containerCache.Set(containerName, cnt) return cnt, nil } -func processContainers[T K8sResource](resource T, namespace string, containersList *ContainersList) error { +func processContainers(ctx context.Context, resource K8sResource, namespace string, containersList *ContainersList) error { podSpec := resource.GetPodSpec() if podSpec == nil { return fmt.Errorf("nil PodSpec") @@ -72,13 +100,13 @@ func processContainers[T K8sResource](resource T, namespace string, containersLi allContainers := append(podSpec.Containers, podSpec.InitContainers...) for _, container := range allContainers { - if err := processContainer(container.Image, namespace, containersList); err != nil { + if err := processContainer(ctx, container.Image, namespace, containersList); err != nil { return err } } for _, container := range podSpec.EphemeralContainers { - if err := processContainer(container.EphemeralContainerCommon.Image, namespace, containersList); err != nil { + if err := processContainer(ctx, container.EphemeralContainerCommon.Image, namespace, containersList); err != nil { return err } } @@ -86,58 +114,98 @@ func processContainers[T K8sResource](resource T, namespace string, containersLi return nil } -// processContainer handles the processing of a single container image -func processContainer(image string, containerNamespace string, containersList *ContainersList) error { - cnt, err := processContainerName(image) - if err != nil { - return fmt.Errorf("failed to process container name: %s - %w", image, err) +func processContainer(ctx context.Context, image string, containerNamespace string, containersList *ContainersList) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + cnt, err := ProcessContainerName(image) + if err != nil { + return fmt.Errorf("failed to process container name: %s - %w", image, err) + } + cnt.ImageNamespace = containerNamespace + containersList.Containers = append(containersList.Containers, cnt) + return nil } - cnt.ImageNamespace = containerNamespace - containersList.Containers = append(containersList.Containers, cnt) - return nil } -// listAndProcessResources is a generic function to list and process K8s resources func ListAndProcessResources[T K8sResource, L client.ObjectList](ctx context.Context, r client.Client, list L, containersList *ContainersList) error { if err := r.List(ctx, list, &client.ListOptions{}); err != nil { return fmt.Errorf("failed to list resources: %w", err) } + var wg sync.WaitGroup + errChan := make(chan error, 1) + semaphore := make(chan struct{}, 10) // Limit concurrent goroutines + + processItem := func(item K8sResource, namespace string) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + if err := processContainers(ctx, item, namespace, containersList); err != nil { + select { + case errChan <- err: + default: + } + } + } + switch typedList := any(list).(type) { case *appsv1.DeploymentList: for i := range typedList.Items { - if err := processContainers((*DeploymentWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace, containersList); err != nil { - return err - } + wg.Add(1) + go processItem((*DeploymentWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace) } case *batchv1.JobList: for i := range typedList.Items { - if err := processContainers((*JobWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace, containersList); err != nil { - return err - } + wg.Add(1) + go processItem((*JobWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace) } case *appsv1.DaemonSetList: for i := range typedList.Items { - if err := processContainers((*DaemonSetWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace, containersList); err != nil { - return err - } + wg.Add(1) + go processItem((*DaemonSetWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace) } case *batchv1.CronJobList: for i := range typedList.Items { - if err := processContainers((*CronJobWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace, containersList); err != nil { - return err - } + wg.Add(1) + go processItem((*CronJobWrapper)(&typedList.Items[i]), typedList.Items[i].Namespace) } default: return fmt.Errorf("unsupported list type: %T", list) } + go func() { + wg.Wait() + close(errChan) + }() + + select { + case err := <-errChan: + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + return nil } func SetupIndexers(mgr manager.Manager) error { - return mgr.GetFieldIndexer().IndexField(context.Background(), &raczylocomv1.ClusterImage{}, "spec.exportName", func(rawObj client.Object) []string { - clusterImage := rawObj.(*raczylocomv1.ClusterImage) - return []string{clusterImage.Spec.ExportName} + return wait.ExponentialBackoff(wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2, + Jitter: 0.1, + Steps: 5, + }, func() (bool, error) { + err := mgr.GetFieldIndexer().IndexField(context.Background(), &raczylocomv1.ClusterImage{}, "spec.exportName", func(rawObj client.Object) []string { + clusterImage := rawObj.(*raczylocomv1.ClusterImage) + return []string{clusterImage.Spec.ExportName} + }) + if err != nil { + return false, nil + } + return true, nil }) }