From 554019b1ccb7352102930187f2039ad0496ea970 Mon Sep 17 00:00:00 2001 From: Lukasz Raczylo Date: Tue, 21 Feb 2023 17:23:36 +0000 Subject: [PATCH] Remove duplicated attempts to re-run job and add minor improvements. --- api/v1beta1/managedjob_types.go | 3 - charts/jobs-manager-operator/Chart.yaml | 4 +- charts/jobs-manager-operator/values.yaml | 2 +- .../jobsmanager.raczylo.com_managedjobs.yaml | 3 - controllers/crd_calc.go | 71 ------------------- controllers/crd_scrapper.go | 48 +++++++------ controllers/helpers.go | 64 ----------------- controllers/managedjob_controller.go | 4 +- 8 files changed, 31 insertions(+), 168 deletions(-) delete mode 100644 controllers/crd_calc.go diff --git a/api/v1beta1/managedjob_types.go b/api/v1beta1/managedjob_types.go index ee0e532..e01fb68 100644 --- a/api/v1beta1/managedjob_types.go +++ b/api/v1beta1/managedjob_types.go @@ -107,9 +107,6 @@ type ManagedJobSpec struct { Groups []*ManagedJobGroup `json:"groups"` // +kubebuilder:validation:Optional Params ManagedJobParameters `json:"params"` - // +kubebuilder:validation:Optional - // +kubebuilder:default=pending - Status string `json:"status"` } // +kubebuilder:object:root=true diff --git a/charts/jobs-manager-operator/Chart.yaml b/charts/jobs-manager-operator/Chart.yaml index 3be23f2..0c078fb 100644 --- a/charts/jobs-manager-operator/Chart.yaml +++ b/charts/jobs-manager-operator/Chart.yaml @@ -13,12 +13,12 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.0.8 +version: 0.0.14 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.0.8" +appVersion: "0.0.14" keywords: - operator - jobs diff --git a/charts/jobs-manager-operator/values.yaml b/charts/jobs-manager-operator/values.yaml index 557a5c0..adfb05b 100644 --- a/charts/jobs-manager-operator/values.yaml +++ b/charts/jobs-manager-operator/values.yaml @@ -13,7 +13,7 @@ controllerManager: manager: image: repository: ghcr.io/lukaszraczylo/jobs-manager-operator - tag: 0.0.5 + tag: 0.0.14 resources: limits: cpu: 500m diff --git a/config/crd/bases/jobsmanager.raczylo.com_managedjobs.yaml b/config/crd/bases/jobsmanager.raczylo.com_managedjobs.yaml index b355a73..1f8b97f 100644 --- a/config/crd/bases/jobsmanager.raczylo.com_managedjobs.yaml +++ b/config/crd/bases/jobsmanager.raczylo.com_managedjobs.yaml @@ -8150,9 +8150,6 @@ spec: default: 1 minimum: 1 type: integer - status: - default: pending - type: string required: - groups - retries diff --git a/controllers/crd_calc.go b/controllers/crd_calc.go deleted file mode 100644 index 17f20e9..0000000 --- a/controllers/crd_calc.go +++ /dev/null @@ -1,71 +0,0 @@ -package controllers - -import ( - corev1 "k8s.io/api/core/v1" -) - -func (cp *connPackage) checkGroupsStatus() { - groupsTotal := len(cp.mj.Spec.Groups) - totalJobs := 0 - completedJobs := 0 - didAnyJobAbort := false - changePresent := false - groupsCompleted := 0 - - // Check if all groups have completed and set ManagedJob status to "succeeded" if so. - if cp.mj.Spec.Status != ExecutionStatusSucceeded && groupsTotal > 0 { - for _, group := range cp.mj.Spec.Groups { - groupJobsTotal := len(group.Jobs) - groupJobsCompleted := 0 - - for _, job := range group.Jobs { - if job.Status == ExecutionStatusSucceeded { - groupJobsCompleted++ - completedJobs++ - } - if job.Status == ExecutionStatusFailed || job.Status == ExecutionStatusAborted { - didAnyJobAbort = true - } - totalJobs++ - } - - if groupJobsTotal == groupJobsCompleted { - // All the jobs in the group are completed. - if group.Status != ExecutionStatusSucceeded { - group.Status = ExecutionStatusSucceeded - changePresent = true - } - groupsCompleted++ - } - } - } - - if groupsTotal == groupsCompleted && cp.mj.Spec.Status != ExecutionStatusSucceeded && cp.mj.Status != ExecutionStatusSucceeded { - cp.mj.Spec.Status = ExecutionStatusSucceeded - changePresent = true - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Completed", "All jobs completed") - } - - // Update status if any job aborted. - if didAnyJobAbort && cp.mj.Spec.Status != ExecutionStatusFailed { - cp.mj.Spec.Status = ExecutionStatusFailed - changePresent = true - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Aborted", "One of the jobs aborted") - } - - // Update status to "running" if not already set. - // if cp.mj.Spec.Status != ExecutionStatusRunning && cp.mj.Spec.Status != ExecutionStatusSucceeded { - // cp.mj.Spec.Status = ExecutionStatusRunning - // cp.mj.Status = ExecutionStatusRunning - // changePresent = true - // } - - // Check if the ManagedJob status has changed. - statusChanged := cp.mj.Spec.Status != cp.mj.Status - - // Update status and send event if it has changed. - if statusChanged || changePresent { - cp.updateCRDStatusDirectly() - // cp.r.Client.Status().Update(cp.ctx, cp.mj) - } -} diff --git a/controllers/crd_scrapper.go b/controllers/crd_scrapper.go index 04240d9..c4e7072 100644 --- a/controllers/crd_scrapper.go +++ b/controllers/crd_scrapper.go @@ -1,7 +1,6 @@ package controllers import ( - "fmt" "strings" "github.com/lukaszraczylo/pandati" @@ -102,18 +101,17 @@ func (cp *connPackage) checkRunningJobsStatus() { generatedJobName := jobNameGenerator(cp.mj.Name, group.Name, job.Name) if childJob.Name == generatedJobName { if childJob.Status.Succeeded > 0 && job.Status != ExecutionStatusSucceeded { + cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Completed", "Job %s completed [prev: %s]", childJob.Name, job.Status) job.Status = ExecutionStatusSucceeded - cp.updateDependentJobs(generatedJobName, ExecutionStatusSucceeded) - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Completed", "Job %s completed", childJob.Name) } else if childJob.Status.Failed > 0 && job.Status != ExecutionStatusFailed { + cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s failed [prev: %s]", childJob.Name, job.Status) job.Status = ExecutionStatusFailed - cp.updateDependentJobs(generatedJobName, ExecutionStatusFailed) - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s failed", childJob.Name) } else if childJob.Status.Active > 0 && job.Status != ExecutionStatusRunning { + cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s running [prev: %s]", childJob.Name, job.Status) job.Status = ExecutionStatusRunning - cp.updateDependentJobs(generatedJobName, ExecutionStatusRunning) - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s running", childJob.Name) } + cp.updateDependentJobs(generatedJobName, job.Status) + continue } } } @@ -137,11 +135,13 @@ func (cp *connPackage) runPendingJobs() { continue } - if group.Status == ExecutionStatusSucceeded || group.Status == ExecutionStatusFailed || group.Status == ExecutionStatusAborted { + approvedStatuses := []string{ExecutionStatusSucceeded, ExecutionStatusFailed, ExecutionStatusAborted} + if pandati.ExistsInSlice(approvedStatuses, group.Status) { cp.updateDependentGroups(group.Name, group.Status) } - if group.Status == ExecutionStatusPending || group.Status == ExecutionStatusRunning { + approvedStatuses = []string{ExecutionStatusPending, ExecutionStatusRunning} + if pandati.ExistsInSlice(approvedStatuses, group.Status) { if len(group.Dependencies) > 0 { groupsCompleted := 0 for _, group_dependency := range group.Dependencies { @@ -161,7 +161,7 @@ func (cp *connPackage) runPendingJobs() { } if !run_group { - fmt.Println("Group "+group.Name+" is not running as dependencies were not met", group.Dependencies) + // fmt.Println("Group "+group.Name+" is not running as dependencies were not met", group.Dependencies) continue // not running the group as dependencies were not met } else { group.Status = ExecutionStatusRunning @@ -192,27 +192,29 @@ func (cp *connPackage) runPendingJobs() { if !run_job { continue // job is not ready as dependencies were not met } else { - if job.Status != ExecutionStatusRunning && job.Status != ExecutionStatusFailed && job.Status != ExecutionStatusSucceeded { - job.Status = ExecutionStatusRunning - cp.updateDependentJobs(job.Name, ExecutionStatusRunning) - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s from group %s running", job.Name, group.Name) + approvedStatuses = []string{ExecutionStatusRunning, ExecutionStatusFailed, ExecutionStatusAborted} + if !pandati.ExistsInSlice(approvedStatuses, job.Status) { err := cp.executeJob(job, group) if err != nil { log.Log.Info("Unable to execute job", "error", err.Error()) - if !strings.Contains(err.Error(), "already exists") { + if !strings.Contains(err.Error(), "exists") { job.Status = ExecutionStatusFailed group.Status = ExecutionStatusFailed cp.updateDependentJobs(job.Name, ExecutionStatusFailed) cp.updateDependentGroups(group.Name, ExecutionStatusFailed) cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s from group %s failed", job.Name, group.Name) } + return } + job.Status = ExecutionStatusRunning + cp.updateDependentJobs(job.Name, ExecutionStatusRunning) + cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s from group %s running", job.Name, group.Name) } } } } - fmt.Println("Running group: ", group.Name, " with status: ", group.Status, " accepted: ", run_group) + // fmt.Println("Running group: ", group.Name, " with status: ", group.Status, " accepted: ", run_group) } } } @@ -283,7 +285,6 @@ func (cp *connPackage) executeJob(j *jobsmanagerv1beta1.ManagedJobDefinition, g err = cp.r.Client.Create(cp.ctx, &job_handler) if err != nil || pandati.IsZero(job_handler) { - log.Log.Info("Unable to create job", "job", job_handler.Name, "error", err.Error()) return err } @@ -293,22 +294,25 @@ func (cp *connPackage) executeJob(j *jobsmanagerv1beta1.ManagedJobDefinition, g func (cp *connPackage) checkOverallStatus() { groupsCompleted := 0 + negativeStatuses := []string{ExecutionStatusFailed, ExecutionStatusAborted} for _, group := range cp.mj.Spec.Groups { if group.Status == ExecutionStatusSucceeded { groupsCompleted++ - } else if group.Status == ExecutionStatusFailed || group.Status == ExecutionStatusAborted { + } else if pandati.ExistsInSlice(negativeStatuses, group.Status) { cp.mj.Status = ExecutionStatusFailed - cp.mj.Spec.Status = cp.mj.Status - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failure", "Run failed") + cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failure", "Run failed in group %s", group.Name) + } else { + continue } } if groupsCompleted == len(cp.mj.Spec.Groups) { + if cp.mj.Status != ExecutionStatusSucceeded { + cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Success", "Run completed successfuly") + } cp.mj.Status = ExecutionStatusSucceeded - cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Success", "Run completed successfuly") } else { cp.mj.Status = ExecutionStatusRunning } - cp.mj.Spec.Status = cp.mj.Status cp.r.Status().Update(cp.ctx, cp.mj) } diff --git a/controllers/helpers.go b/controllers/helpers.go index 0c0eb12..9fce4e5 100644 --- a/controllers/helpers.go +++ b/controllers/helpers.go @@ -2,19 +2,13 @@ package controllers import ( "context" - "encoding/json" - "fmt" "strings" "sync" - "github.com/kr/pretty" - "github.com/lukaszraczylo/pandati" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "raczylo.com/jobs-manager-operator/api/v1beta1" jobsmanagerv1beta1 "raczylo.com/jobs-manager-operator/api/v1beta1" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -54,65 +48,8 @@ func (cp *connPackage) getOwnerReference() (metav1.OwnerReference, error) { }, nil } -type patchStringValue struct { - Op string `json:"op"` - Path string `json:"path"` - Value string `json:"value"` -} - -func (cp *connPackage) getLatestMainJobAndPatch(proposedChanges *jobsmanagerv1beta1.ManagedJob) (bool, error) { - mj := jobsmanagerv1beta1.ManagedJob{} - err := cp.r.Client.Get(cp.ctx, cp.req.NamespacedName, &mj) - if err != nil { - return false, err - } - diff, diffIdentical, _ := pandati.CompareStructsReplaced(mj, proposedChanges) - if !diffIdentical { - if err != nil { - log.Log.Error(err, "Unable to marshal ManagedJob") - } - var payloadBytesArr []patchStringValue - for _, d := range diff { - if strings.HasPrefix(d.Key, "/spec") { - operation := "replace" - if d.OldValue == "" || d.OldValue == nil { - operation = "add" - } - - payload := patchStringValue{ - Op: operation, - Path: d.Key, - Value: d.Value.(string), - } - payloadBytesArr = append(payloadBytesArr, payload) - } - } - patchPayload, err := json.Marshal(payloadBytesArr) - if err != nil { - log.Log.Error(err, "Unable to marshal ManagedJob") - } - fmt.Printf("PatchPayload: %# v", pretty.Formatter(fmt.Sprintf("%s", patchPayload))) - kubepath := client.RawPatch(types.JSONPatchType, patchPayload) - err = cp.r.Client.Patch(cp.ctx, &mj, kubepath) - // if err != nil { - // log.Log.Error(err, "Unable to patch ManagedJob") - // } - return true, err - } - return false, nil -} - func (cp *connPackage) updateCRDStatusDirectly() error { cp.mtx.Lock() - // defer cp.mtx.Unlock() - - // val, err := cp.getLatestMainJobAndPatch(cp.mj) - // if err != nil { - // log.Log.Error(err, "Unable to get latest ManagedJob") - // return err - // } - - // if !val && err == nil { err := cp.r.Update(cp.ctx, cp.mj) if err != nil { // log.Log.Info("Error", err.Error(), "more", "Unable to update ManagedJob status directly") @@ -122,7 +59,6 @@ func (cp *connPackage) updateCRDStatusDirectly() error { if err != nil { log.Log.Error(err, "Unable to get updated ManagedJob") } - // } cp.mtx.Unlock() return err } diff --git a/controllers/managedjob_controller.go b/controllers/managedjob_controller.go index 5f359f4..621c948 100644 --- a/controllers/managedjob_controller.go +++ b/controllers/managedjob_controller.go @@ -64,14 +64,14 @@ func (r *ManagedJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) // TODO: Re-enable after testing cp.checkRunningJobsStatus() cp.runPendingJobs() - cp.checkOverallStatus() _, theSame, _ := pandati.CompareStructsReplaced(originalMainJobDefinition, cp.mj) if !theSame { cp.updateCRDStatusDirectly() } - // fmt.Printf("Reconcile: %# v", pretty.Formatter(r.Updater)) + cp.checkOverallStatus() + // fmt.Printf("Reconcile: %# v", pretty.Formatter(r.Updater)) return ctrl.Result{}, nil }