Remove duplicated attempts to re-run job and add minor improvements.

This commit is contained in:
2023-02-21 17:23:36 +00:00
parent 41421ea889
commit 554019b1cc
8 changed files with 31 additions and 168 deletions
-3
View File
@@ -107,9 +107,6 @@ type ManagedJobSpec struct {
Groups []*ManagedJobGroup `json:"groups"`
// +kubebuilder:validation:Optional
Params ManagedJobParameters `json:"params"`
// +kubebuilder:validation:Optional
// +kubebuilder:default=pending
Status string `json:"status"`
}
// +kubebuilder:object:root=true
+2 -2
View File
@@ -13,12 +13,12 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.0.8
version: 0.0.14
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.0.8"
appVersion: "0.0.14"
keywords:
- operator
- jobs
+1 -1
View File
@@ -13,7 +13,7 @@ controllerManager:
manager:
image:
repository: ghcr.io/lukaszraczylo/jobs-manager-operator
tag: 0.0.5
tag: 0.0.14
resources:
limits:
cpu: 500m
@@ -8150,9 +8150,6 @@ spec:
default: 1
minimum: 1
type: integer
status:
default: pending
type: string
required:
- groups
- retries
-71
View File
@@ -1,71 +0,0 @@
package controllers
import (
corev1 "k8s.io/api/core/v1"
)
func (cp *connPackage) checkGroupsStatus() {
groupsTotal := len(cp.mj.Spec.Groups)
totalJobs := 0
completedJobs := 0
didAnyJobAbort := false
changePresent := false
groupsCompleted := 0
// Check if all groups have completed and set ManagedJob status to "succeeded" if so.
if cp.mj.Spec.Status != ExecutionStatusSucceeded && groupsTotal > 0 {
for _, group := range cp.mj.Spec.Groups {
groupJobsTotal := len(group.Jobs)
groupJobsCompleted := 0
for _, job := range group.Jobs {
if job.Status == ExecutionStatusSucceeded {
groupJobsCompleted++
completedJobs++
}
if job.Status == ExecutionStatusFailed || job.Status == ExecutionStatusAborted {
didAnyJobAbort = true
}
totalJobs++
}
if groupJobsTotal == groupJobsCompleted {
// All the jobs in the group are completed.
if group.Status != ExecutionStatusSucceeded {
group.Status = ExecutionStatusSucceeded
changePresent = true
}
groupsCompleted++
}
}
}
if groupsTotal == groupsCompleted && cp.mj.Spec.Status != ExecutionStatusSucceeded && cp.mj.Status != ExecutionStatusSucceeded {
cp.mj.Spec.Status = ExecutionStatusSucceeded
changePresent = true
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Completed", "All jobs completed")
}
// Update status if any job aborted.
if didAnyJobAbort && cp.mj.Spec.Status != ExecutionStatusFailed {
cp.mj.Spec.Status = ExecutionStatusFailed
changePresent = true
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Aborted", "One of the jobs aborted")
}
// Update status to "running" if not already set.
// if cp.mj.Spec.Status != ExecutionStatusRunning && cp.mj.Spec.Status != ExecutionStatusSucceeded {
// cp.mj.Spec.Status = ExecutionStatusRunning
// cp.mj.Status = ExecutionStatusRunning
// changePresent = true
// }
// Check if the ManagedJob status has changed.
statusChanged := cp.mj.Spec.Status != cp.mj.Status
// Update status and send event if it has changed.
if statusChanged || changePresent {
cp.updateCRDStatusDirectly()
// cp.r.Client.Status().Update(cp.ctx, cp.mj)
}
}
+26 -22
View File
@@ -1,7 +1,6 @@
package controllers
import (
"fmt"
"strings"
"github.com/lukaszraczylo/pandati"
@@ -102,18 +101,17 @@ func (cp *connPackage) checkRunningJobsStatus() {
generatedJobName := jobNameGenerator(cp.mj.Name, group.Name, job.Name)
if childJob.Name == generatedJobName {
if childJob.Status.Succeeded > 0 && job.Status != ExecutionStatusSucceeded {
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Completed", "Job %s completed [prev: %s]", childJob.Name, job.Status)
job.Status = ExecutionStatusSucceeded
cp.updateDependentJobs(generatedJobName, ExecutionStatusSucceeded)
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Completed", "Job %s completed", childJob.Name)
} else if childJob.Status.Failed > 0 && job.Status != ExecutionStatusFailed {
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s failed [prev: %s]", childJob.Name, job.Status)
job.Status = ExecutionStatusFailed
cp.updateDependentJobs(generatedJobName, ExecutionStatusFailed)
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s failed", childJob.Name)
} else if childJob.Status.Active > 0 && job.Status != ExecutionStatusRunning {
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s running [prev: %s]", childJob.Name, job.Status)
job.Status = ExecutionStatusRunning
cp.updateDependentJobs(generatedJobName, ExecutionStatusRunning)
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s running", childJob.Name)
}
cp.updateDependentJobs(generatedJobName, job.Status)
continue
}
}
}
@@ -137,11 +135,13 @@ func (cp *connPackage) runPendingJobs() {
continue
}
if group.Status == ExecutionStatusSucceeded || group.Status == ExecutionStatusFailed || group.Status == ExecutionStatusAborted {
approvedStatuses := []string{ExecutionStatusSucceeded, ExecutionStatusFailed, ExecutionStatusAborted}
if pandati.ExistsInSlice(approvedStatuses, group.Status) {
cp.updateDependentGroups(group.Name, group.Status)
}
if group.Status == ExecutionStatusPending || group.Status == ExecutionStatusRunning {
approvedStatuses = []string{ExecutionStatusPending, ExecutionStatusRunning}
if pandati.ExistsInSlice(approvedStatuses, group.Status) {
if len(group.Dependencies) > 0 {
groupsCompleted := 0
for _, group_dependency := range group.Dependencies {
@@ -161,7 +161,7 @@ func (cp *connPackage) runPendingJobs() {
}
if !run_group {
fmt.Println("Group "+group.Name+" is not running as dependencies were not met", group.Dependencies)
// fmt.Println("Group "+group.Name+" is not running as dependencies were not met", group.Dependencies)
continue // not running the group as dependencies were not met
} else {
group.Status = ExecutionStatusRunning
@@ -192,27 +192,29 @@ func (cp *connPackage) runPendingJobs() {
if !run_job {
continue // job is not ready as dependencies were not met
} else {
if job.Status != ExecutionStatusRunning && job.Status != ExecutionStatusFailed && job.Status != ExecutionStatusSucceeded {
job.Status = ExecutionStatusRunning
cp.updateDependentJobs(job.Name, ExecutionStatusRunning)
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s from group %s running", job.Name, group.Name)
approvedStatuses = []string{ExecutionStatusRunning, ExecutionStatusFailed, ExecutionStatusAborted}
if !pandati.ExistsInSlice(approvedStatuses, job.Status) {
err := cp.executeJob(job, group)
if err != nil {
log.Log.Info("Unable to execute job", "error", err.Error())
if !strings.Contains(err.Error(), "already exists") {
if !strings.Contains(err.Error(), "exists") {
job.Status = ExecutionStatusFailed
group.Status = ExecutionStatusFailed
cp.updateDependentJobs(job.Name, ExecutionStatusFailed)
cp.updateDependentGroups(group.Name, ExecutionStatusFailed)
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s from group %s failed", job.Name, group.Name)
}
return
}
job.Status = ExecutionStatusRunning
cp.updateDependentJobs(job.Name, ExecutionStatusRunning)
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s from group %s running", job.Name, group.Name)
}
}
}
}
fmt.Println("Running group: ", group.Name, " with status: ", group.Status, " accepted: ", run_group)
// fmt.Println("Running group: ", group.Name, " with status: ", group.Status, " accepted: ", run_group)
}
}
}
@@ -283,7 +285,6 @@ func (cp *connPackage) executeJob(j *jobsmanagerv1beta1.ManagedJobDefinition, g
err = cp.r.Client.Create(cp.ctx, &job_handler)
if err != nil || pandati.IsZero(job_handler) {
log.Log.Info("Unable to create job", "job", job_handler.Name, "error", err.Error())
return err
}
@@ -293,22 +294,25 @@ func (cp *connPackage) executeJob(j *jobsmanagerv1beta1.ManagedJobDefinition, g
func (cp *connPackage) checkOverallStatus() {
groupsCompleted := 0
negativeStatuses := []string{ExecutionStatusFailed, ExecutionStatusAborted}
for _, group := range cp.mj.Spec.Groups {
if group.Status == ExecutionStatusSucceeded {
groupsCompleted++
} else if group.Status == ExecutionStatusFailed || group.Status == ExecutionStatusAborted {
} else if pandati.ExistsInSlice(negativeStatuses, group.Status) {
cp.mj.Status = ExecutionStatusFailed
cp.mj.Spec.Status = cp.mj.Status
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failure", "Run failed")
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failure", "Run failed in group %s", group.Name)
} else {
continue
}
}
if groupsCompleted == len(cp.mj.Spec.Groups) {
if cp.mj.Status != ExecutionStatusSucceeded {
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Success", "Run completed successfuly")
}
cp.mj.Status = ExecutionStatusSucceeded
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Success", "Run completed successfuly")
} else {
cp.mj.Status = ExecutionStatusRunning
}
cp.mj.Spec.Status = cp.mj.Status
cp.r.Status().Update(cp.ctx, cp.mj)
}
-64
View File
@@ -2,19 +2,13 @@ package controllers
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/kr/pretty"
"github.com/lukaszraczylo/pandati"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"raczylo.com/jobs-manager-operator/api/v1beta1"
jobsmanagerv1beta1 "raczylo.com/jobs-manager-operator/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
@@ -54,65 +48,8 @@ func (cp *connPackage) getOwnerReference() (metav1.OwnerReference, error) {
}, nil
}
type patchStringValue struct {
Op string `json:"op"`
Path string `json:"path"`
Value string `json:"value"`
}
func (cp *connPackage) getLatestMainJobAndPatch(proposedChanges *jobsmanagerv1beta1.ManagedJob) (bool, error) {
mj := jobsmanagerv1beta1.ManagedJob{}
err := cp.r.Client.Get(cp.ctx, cp.req.NamespacedName, &mj)
if err != nil {
return false, err
}
diff, diffIdentical, _ := pandati.CompareStructsReplaced(mj, proposedChanges)
if !diffIdentical {
if err != nil {
log.Log.Error(err, "Unable to marshal ManagedJob")
}
var payloadBytesArr []patchStringValue
for _, d := range diff {
if strings.HasPrefix(d.Key, "/spec") {
operation := "replace"
if d.OldValue == "" || d.OldValue == nil {
operation = "add"
}
payload := patchStringValue{
Op: operation,
Path: d.Key,
Value: d.Value.(string),
}
payloadBytesArr = append(payloadBytesArr, payload)
}
}
patchPayload, err := json.Marshal(payloadBytesArr)
if err != nil {
log.Log.Error(err, "Unable to marshal ManagedJob")
}
fmt.Printf("PatchPayload: %# v", pretty.Formatter(fmt.Sprintf("%s", patchPayload)))
kubepath := client.RawPatch(types.JSONPatchType, patchPayload)
err = cp.r.Client.Patch(cp.ctx, &mj, kubepath)
// if err != nil {
// log.Log.Error(err, "Unable to patch ManagedJob")
// }
return true, err
}
return false, nil
}
func (cp *connPackage) updateCRDStatusDirectly() error {
cp.mtx.Lock()
// defer cp.mtx.Unlock()
// val, err := cp.getLatestMainJobAndPatch(cp.mj)
// if err != nil {
// log.Log.Error(err, "Unable to get latest ManagedJob")
// return err
// }
// if !val && err == nil {
err := cp.r.Update(cp.ctx, cp.mj)
if err != nil {
// log.Log.Info("Error", err.Error(), "more", "Unable to update ManagedJob status directly")
@@ -122,7 +59,6 @@ func (cp *connPackage) updateCRDStatusDirectly() error {
if err != nil {
log.Log.Error(err, "Unable to get updated ManagedJob")
}
// }
cp.mtx.Unlock()
return err
}
+2 -2
View File
@@ -64,14 +64,14 @@ func (r *ManagedJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// TODO: Re-enable after testing
cp.checkRunningJobsStatus()
cp.runPendingJobs()
cp.checkOverallStatus()
_, theSame, _ := pandati.CompareStructsReplaced(originalMainJobDefinition, cp.mj)
if !theSame {
cp.updateCRDStatusDirectly()
}
// fmt.Printf("Reconcile: %# v", pretty.Formatter(r.Updater))
cp.checkOverallStatus()
// fmt.Printf("Reconcile: %# v", pretty.Formatter(r.Updater))
return ctrl.Result{}, nil
}