mirror of
https://github.com/lukaszraczylo/jobs-manager-operator.git
synced 2026-06-09 22:53:59 +00:00
Merge pull request #1 from lukaszraczylo/improve-tree-generation
Completely rebuild the tree generation logic.
This commit is contained in:
@@ -100,6 +100,7 @@ jobs:
|
||||
name: "Building docker image"
|
||||
needs: [ prepare, test ]
|
||||
runs-on: ubuntu-20.04
|
||||
if: needs.prepare.outputs.CHECK_IF_MASTER_BRANCH == 'true'
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v3
|
||||
|
||||
@@ -28,7 +28,6 @@ metadata:
|
||||
labels:
|
||||
name: managedjob-sample
|
||||
spec:
|
||||
# Globally defined parameters and environment variables
|
||||
retries: 3
|
||||
params:
|
||||
env:
|
||||
@@ -40,9 +39,7 @@ spec:
|
||||
# Job groups definitions
|
||||
groups:
|
||||
- name: "first-group"
|
||||
# Group will run in parallel with other defined groups
|
||||
parallel: true
|
||||
# Group specific parameters
|
||||
params:
|
||||
env:
|
||||
- name: "FEE"
|
||||
@@ -63,6 +60,11 @@ spec:
|
||||
args:
|
||||
- "sleep"
|
||||
- "10"
|
||||
- name: "second-half-job"
|
||||
image: "busybox"
|
||||
args:
|
||||
- "sleep"
|
||||
- "10"
|
||||
|
||||
- name: "second-group"
|
||||
parallel: true
|
||||
@@ -79,9 +81,44 @@ spec:
|
||||
args:
|
||||
- "sleep"
|
||||
- "10"
|
||||
parallel: false
|
||||
|
||||
- name: "third-group"
|
||||
parallel: false
|
||||
jobs:
|
||||
- name: "fifth-job"
|
||||
image: "busybox"
|
||||
args:
|
||||
- "echo"
|
||||
- "Hello world!"
|
||||
parallel: true
|
||||
```
|
||||
|
||||
### How does it look in practice?
|
||||
|
||||
```yaml
|
||||
managedjob-sample
|
||||
├── first-group
|
||||
│ ├── first-job
|
||||
│ ├── second-job
|
||||
│ │ └── Depends on: managedjob-sample-first-group-first-job
|
||||
│ └── second-half-job
|
||||
│ ├── Depends on: managedjob-sample-first-group-first-job
|
||||
│ └── Depends on: managedjob-sample-first-group-second-job
|
||||
├── second-group
|
||||
│ ├── third-job
|
||||
│ └── fourth-job
|
||||
│ └── Depends on: managedjob-sample-second-group-third-job
|
||||
└── third-group
|
||||
├── fifth-job
|
||||
├── Depends on group: first-group
|
||||
└── Depends on group: second-group
|
||||
```
|
||||
|
||||
If dependency exists on the group level - the group will not be executed until all of remaining groups have finished successfuly.
|
||||
If dependency exists on the job level - the job will not be executed until all of remaining jobs have finished successfuly.
|
||||
Remember that **ORDER matters**.
|
||||
|
||||
### Things to remember
|
||||
|
||||
Parameters **params** are always merged downwards to DRY your definitions.
|
||||
|
||||
@@ -24,10 +24,8 @@ import (
|
||||
type ManagedJobDependencies struct {
|
||||
// +kubebuilder:validation:Optional
|
||||
// +kubebuilder:default=""
|
||||
Group string `json:"group"`
|
||||
// +kubebuilder:validation:Optional
|
||||
// +kubebuilder:default=""
|
||||
Job string `json:"job"`
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
type ManagedJobDefinition struct {
|
||||
@@ -48,8 +46,9 @@ type ManagedJobDefinition struct {
|
||||
// +kubebuilder:validation:Optional
|
||||
// +kubebuilder:default=pending
|
||||
Status string `json:"status"`
|
||||
// +kubebuilder:validation:Optional
|
||||
// +optional
|
||||
Dependencies ManagedJobDependencies `json:"dependencies"`
|
||||
Dependencies []*ManagedJobDependencies `json:"dependencies"`
|
||||
// +optional
|
||||
CompiledParams ManagedJobParameters `json:"compiledParams"`
|
||||
}
|
||||
@@ -68,6 +67,9 @@ type ManagedJobGroup struct {
|
||||
// +kubebuilder:validation:Optional
|
||||
Params ManagedJobParameters `json:"params"`
|
||||
// +kubebuilder:validation:Optional
|
||||
// +optional
|
||||
Dependencies []*ManagedJobDependencies `json:"dependencies"`
|
||||
// +kubebuilder:validation:Optional
|
||||
// +kubebuilder:default=pending
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
@@ -61,7 +61,17 @@ func (in *ManagedJobDefinition) DeepCopyInto(out *ManagedJobDefinition) {
|
||||
copy(*out, *in)
|
||||
}
|
||||
in.Params.DeepCopyInto(&out.Params)
|
||||
out.Dependencies = in.Dependencies
|
||||
if in.Dependencies != nil {
|
||||
in, out := &in.Dependencies, &out.Dependencies
|
||||
*out = make([]*ManagedJobDependencies, len(*in))
|
||||
for i := range *in {
|
||||
if (*in)[i] != nil {
|
||||
in, out := &(*in)[i], &(*out)[i]
|
||||
*out = new(ManagedJobDependencies)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
}
|
||||
in.CompiledParams.DeepCopyInto(&out.CompiledParams)
|
||||
}
|
||||
|
||||
@@ -105,6 +115,17 @@ func (in *ManagedJobGroup) DeepCopyInto(out *ManagedJobGroup) {
|
||||
}
|
||||
}
|
||||
in.Params.DeepCopyInto(&out.Params)
|
||||
if in.Dependencies != nil {
|
||||
in, out := &in.Dependencies, &out.Dependencies
|
||||
*out = make([]*ManagedJobDependencies, len(*in))
|
||||
for i := range *in {
|
||||
if (*in)[i] != nil {
|
||||
in, out := &(*in)[i], &(*out)[i]
|
||||
*out = new(ManagedJobDependencies)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagedJobGroup.
|
||||
|
||||
@@ -38,6 +38,18 @@ spec:
|
||||
groups:
|
||||
items:
|
||||
properties:
|
||||
dependencies:
|
||||
items:
|
||||
properties:
|
||||
name:
|
||||
default: ""
|
||||
type: string
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
type: object
|
||||
type: array
|
||||
jobs:
|
||||
items:
|
||||
properties:
|
||||
@@ -2100,14 +2112,17 @@ spec:
|
||||
type: array
|
||||
type: object
|
||||
dependencies:
|
||||
properties:
|
||||
group:
|
||||
default: ""
|
||||
type: string
|
||||
job:
|
||||
default: ""
|
||||
type: string
|
||||
type: object
|
||||
items:
|
||||
properties:
|
||||
name:
|
||||
default: ""
|
||||
type: string
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
type: object
|
||||
type: array
|
||||
image:
|
||||
minLength: 5
|
||||
type: string
|
||||
|
||||
@@ -38,6 +38,18 @@ spec:
|
||||
groups:
|
||||
items:
|
||||
properties:
|
||||
dependencies:
|
||||
items:
|
||||
properties:
|
||||
name:
|
||||
default: ""
|
||||
type: string
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
type: object
|
||||
type: array
|
||||
jobs:
|
||||
items:
|
||||
properties:
|
||||
@@ -2170,14 +2182,17 @@ spec:
|
||||
type: array
|
||||
type: object
|
||||
dependencies:
|
||||
properties:
|
||||
group:
|
||||
default: ""
|
||||
type: string
|
||||
job:
|
||||
default: ""
|
||||
type: string
|
||||
type: object
|
||||
items:
|
||||
properties:
|
||||
name:
|
||||
default: ""
|
||||
type: string
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
type: object
|
||||
type: array
|
||||
image:
|
||||
minLength: 5
|
||||
type: string
|
||||
|
||||
@@ -41,6 +41,11 @@ spec:
|
||||
args:
|
||||
- "sleep"
|
||||
- "10"
|
||||
- name: "second-half-job"
|
||||
image: "busybox"
|
||||
args:
|
||||
- "sleep"
|
||||
- "10"
|
||||
|
||||
- name: "second-group"
|
||||
parallel: true
|
||||
@@ -57,4 +62,14 @@ spec:
|
||||
args:
|
||||
- "sleep"
|
||||
- "10"
|
||||
parallel: false
|
||||
|
||||
- name: "third-group"
|
||||
parallel: false
|
||||
jobs:
|
||||
- name: "fifth-job"
|
||||
image: "busybox"
|
||||
args:
|
||||
- "echo"
|
||||
- "Hello world!"
|
||||
parallel: true
|
||||
@@ -0,0 +1,160 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/lukaszraczylo/pandati"
|
||||
jobsmanagerv1beta1 "raczylo.com/jobs-manager-operator/api/v1beta1"
|
||||
)
|
||||
|
||||
const (
|
||||
newLine = "\n"
|
||||
emptySpace = " "
|
||||
middleItem = "├── "
|
||||
continueItem = "│ "
|
||||
lastItem = "└── "
|
||||
)
|
||||
|
||||
func New(text string) Tree {
|
||||
return &tree{
|
||||
text: text,
|
||||
items: []Tree{},
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a node to the tree
|
||||
func (t *tree) Add(text string) Tree {
|
||||
n := New(text)
|
||||
t.items = append(t.items, n)
|
||||
return n
|
||||
}
|
||||
|
||||
// AddTree adds a tree as an item
|
||||
func (t *tree) AddTree(tree Tree) {
|
||||
t.items = append(t.items, tree)
|
||||
}
|
||||
|
||||
// Text returns the node's value
|
||||
func (t *tree) Text() string {
|
||||
return t.text
|
||||
}
|
||||
|
||||
// Items returns all items in the tree
|
||||
func (t *tree) Items() []Tree {
|
||||
return t.items
|
||||
}
|
||||
|
||||
func (t *tree) Print() string {
|
||||
p := &printer{}
|
||||
return p.Print(t)
|
||||
}
|
||||
|
||||
func (p *printer) Print(t Tree) string {
|
||||
return t.Text() + newLine + p.printItems(t.Items(), []bool{})
|
||||
}
|
||||
|
||||
func (p *printer) printText(text string, spaces []bool, last bool) string {
|
||||
var result string
|
||||
for _, space := range spaces {
|
||||
if space {
|
||||
result += emptySpace
|
||||
} else {
|
||||
result += continueItem
|
||||
}
|
||||
}
|
||||
|
||||
indicator := middleItem
|
||||
if last {
|
||||
indicator = lastItem
|
||||
}
|
||||
|
||||
var out string
|
||||
lines := strings.Split(text, "\n")
|
||||
for i := range lines {
|
||||
text := lines[i]
|
||||
if i == 0 {
|
||||
out += result + indicator + text + newLine
|
||||
continue
|
||||
}
|
||||
if last {
|
||||
indicator = emptySpace
|
||||
} else {
|
||||
indicator = continueItem
|
||||
}
|
||||
out += result + indicator + text + newLine
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (p *printer) printItems(t []Tree, spaces []bool) string {
|
||||
var result string
|
||||
for i, f := range t {
|
||||
last := i == len(t)-1
|
||||
result += p.printText(f.Text(), spaces, last)
|
||||
if len(f.Items()) > 0 {
|
||||
spacesChild := append(spaces, last)
|
||||
result += p.printItems(f.Items(), spacesChild)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (cp *connPackage) checkIfPresentInDependencies(currentDependencies []*jobsmanagerv1beta1.ManagedJobDependencies, dependencyName string) bool {
|
||||
for _, dependency := range currentDependencies {
|
||||
if dependency.Name == dependencyName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (cp *connPackage) generateDependencyTree() {
|
||||
// First pass - initialize the tree and get all the gathered jobs
|
||||
originalMainJobDefinition := cp.mj.DeepCopy()
|
||||
|
||||
mainTree := New(cp.mj.Name)
|
||||
for _, group := range cp.mj.Spec.Groups {
|
||||
groupTree := mainTree.Add(group.Name)
|
||||
for _, job := range group.Jobs {
|
||||
jobTree := groupTree.Add(job.Name)
|
||||
if job.Parallel {
|
||||
continue
|
||||
} else {
|
||||
// get the groupTree items before this job and add them as dependencies
|
||||
for _, jobTreePrevious := range groupTree.Items() {
|
||||
if jobTreePrevious.Text() == job.Name {
|
||||
break
|
||||
}
|
||||
generatedJobName := jobNameGenerator(cp.mj.Name, group.Name, jobTreePrevious.Text())
|
||||
jobTree.Add("Depends on: " + generatedJobName)
|
||||
if !cp.checkIfPresentInDependencies(job.Dependencies, generatedJobName) {
|
||||
job.Dependencies = append(job.Dependencies, &jobsmanagerv1beta1.ManagedJobDependencies{Name: generatedJobName, Status: ExecutionStatusPending})
|
||||
job.CompiledParams = cp.compileParameters(cp.mj.Spec.Params, group.Params, job.Params)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if group.Parallel {
|
||||
continue
|
||||
} else {
|
||||
// get the mainTree items before this group and add them as dependencies
|
||||
for _, groupTreePrevious := range mainTree.Items() {
|
||||
if groupTreePrevious.Text() == group.Name {
|
||||
break
|
||||
}
|
||||
groupTree.Add("Depends on group: " + groupTreePrevious.Text())
|
||||
if !cp.checkIfPresentInDependencies(group.Dependencies, groupTreePrevious.Text()) {
|
||||
group.Dependencies = append(group.Dependencies, &jobsmanagerv1beta1.ManagedJobDependencies{Name: groupTreePrevious.Text(), Status: ExecutionStatusPending})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, theSame, _ := pandati.CompareStructsReplaced(originalMainJobDefinition, cp.mj)
|
||||
if !theSame {
|
||||
cp.updateCRDStatusDirectly()
|
||||
}
|
||||
// fmt.Print(mainTree.Print())
|
||||
// fmt.Printf("Dependency tree: %# v", pretty.Formatter(mainTree))
|
||||
}
|
||||
+132
-79
@@ -1,6 +1,9 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/lukaszraczylo/pandati"
|
||||
kbatch "k8s.io/api/batch/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -59,59 +62,40 @@ func (cp *connPackage) compileParameters(params ...jobsmanagerv1beta1.ManagedJob
|
||||
return cparams
|
||||
}
|
||||
|
||||
type previousJobsAndGroups struct {
|
||||
GroupID string
|
||||
JobID string
|
||||
}
|
||||
|
||||
func (cp *connPackage) buildJobsDependencyTree() error {
|
||||
changePending := false
|
||||
for i, group := range cp.mj.Spec.Groups {
|
||||
var group_previous string
|
||||
if group.Parallel {
|
||||
group_previous = group.Name
|
||||
} else {
|
||||
if i > 0 {
|
||||
group_previous = cp.mj.Spec.Groups[i-1].Name
|
||||
} else {
|
||||
group_previous = group.Name
|
||||
}
|
||||
}
|
||||
for j, job := range group.Jobs {
|
||||
if job.Dependencies.Group == "" || job.Dependencies.Job == "" {
|
||||
changePending = true
|
||||
}
|
||||
job.Dependencies.Group = group_previous
|
||||
if job.Parallel {
|
||||
job.Dependencies.Job = job.Name
|
||||
} else {
|
||||
if j > 0 {
|
||||
job.Dependencies.Job = group.Jobs[j-1].Name
|
||||
} else {
|
||||
job.Dependencies.Job = job.Name
|
||||
func (cp *connPackage) updateDependentJobs(completedJob string, jobStatus string) {
|
||||
for _, group := range cp.mj.Spec.Groups {
|
||||
for _, job := range group.Jobs {
|
||||
for _, dependency := range job.Dependencies {
|
||||
if dependency.Name == completedJob && dependency.Status != jobStatus {
|
||||
dependency.Status = jobStatus
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if changePending {
|
||||
cp.updateCRDStatusDirectly()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *connPackage) checkJobStatus() {
|
||||
func (cp *connPackage) updateDependentGroups(completedGroup string, jobStatus string) {
|
||||
for _, group := range cp.mj.Spec.Groups {
|
||||
for _, dependency := range group.Dependencies {
|
||||
if dependency.Name == completedGroup && dependency.Status != jobStatus {
|
||||
dependency.Status = jobStatus
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *connPackage) checkRunningJobsStatus() {
|
||||
var childJobs kbatch.JobList
|
||||
labelSelector := labels.SelectorFromSet(labels.Set{
|
||||
"jobmanager.raczylo.com/workflow-name": cp.mj.Name,
|
||||
})
|
||||
listOptions := &client.ListOptions{LabelSelector: labelSelector, Namespace: cp.mj.Namespace}
|
||||
|
||||
err := cp.r.Client.List(cp.ctx, &childJobs, listOptions)
|
||||
if err != nil {
|
||||
// log.Log.Error(err, "unable to list child jobs")
|
||||
log.Log.Info("Unable to list child jobs", "error", err.Error())
|
||||
return
|
||||
}
|
||||
changePresent := false
|
||||
|
||||
for _, childJob := range childJobs.Items {
|
||||
for _, group := range cp.mj.Spec.Groups {
|
||||
for _, job := range group.Jobs {
|
||||
@@ -119,75 +103,122 @@ func (cp *connPackage) checkJobStatus() {
|
||||
if childJob.Name == generatedJobName {
|
||||
if childJob.Status.Succeeded > 0 && job.Status != ExecutionStatusSucceeded {
|
||||
job.Status = ExecutionStatusSucceeded
|
||||
changePresent = true
|
||||
cp.updateDependentJobs(generatedJobName, ExecutionStatusSucceeded)
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Completed", "Job %s completed", childJob.Name)
|
||||
} else if childJob.Status.Failed > 0 && job.Status != ExecutionStatusFailed {
|
||||
job.Status = ExecutionStatusFailed
|
||||
changePresent = true
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Failed", "Job %s failed", childJob.Name)
|
||||
cp.updateDependentJobs(generatedJobName, ExecutionStatusFailed)
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s failed", childJob.Name)
|
||||
} else if childJob.Status.Active > 0 && job.Status != ExecutionStatusRunning {
|
||||
changePresent = true
|
||||
job.Status = ExecutionStatusRunning
|
||||
cp.updateDependentJobs(generatedJobName, ExecutionStatusRunning)
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s running", childJob.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if changePresent {
|
||||
cp.updateCRDStatusDirectly()
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *connPackage) runPendingJobs() {
|
||||
copyMJ := cp.mj.DeepCopy()
|
||||
changePresent := false
|
||||
// originalMainJobDefinition := cp.mj.DeepCopy()
|
||||
for _, group := range cp.mj.Spec.Groups {
|
||||
run_group := false
|
||||
|
||||
groupJobsCompleted := 0
|
||||
for _, job := range group.Jobs {
|
||||
if job.Status == ExecutionStatusPending {
|
||||
if job.Dependencies.Group == group.Name && job.Dependencies.Job == job.Name {
|
||||
job.CompiledParams = cp.compileParameters(job.Params, group.Params, cp.mj.Spec.Params)
|
||||
err := cp.executeJob(job, group)
|
||||
if err != nil {
|
||||
// log.Log.Info("Unable to execute job", "group", group.Name, "job", job.Name, "error", err)
|
||||
continue
|
||||
if job.Status == ExecutionStatusSucceeded {
|
||||
groupJobsCompleted++
|
||||
}
|
||||
}
|
||||
if groupJobsCompleted == len(group.Jobs) {
|
||||
group.Status = ExecutionStatusSucceeded
|
||||
cp.updateDependentGroups(group.Name, group.Status)
|
||||
continue
|
||||
}
|
||||
|
||||
if group.Status == ExecutionStatusSucceeded || group.Status == ExecutionStatusFailed || group.Status == ExecutionStatusAborted {
|
||||
cp.updateDependentGroups(group.Name, group.Status)
|
||||
}
|
||||
|
||||
if group.Status == ExecutionStatusPending || group.Status == ExecutionStatusRunning {
|
||||
if len(group.Dependencies) > 0 {
|
||||
groupsCompleted := 0
|
||||
for _, group_dependency := range group.Dependencies {
|
||||
if group_dependency.Status == ExecutionStatusSucceeded {
|
||||
groupsCompleted++
|
||||
}
|
||||
job.Status = ExecutionStatusRunning
|
||||
changePresent = true
|
||||
continue
|
||||
} else {
|
||||
for _, group2 := range copyMJ.Spec.Groups {
|
||||
for _, job2 := range group2.Jobs {
|
||||
if job2.Name == job.Dependencies.Job && group2.Name == job.Dependencies.Group {
|
||||
switch job2.Status {
|
||||
case ExecutionStatusSucceeded:
|
||||
job.CompiledParams = cp.compileParameters(job.Params, group.Params, cp.mj.Spec.Params)
|
||||
err := cp.executeJob(job, group)
|
||||
if err != nil {
|
||||
log.Log.Info("Unable to execute job", "group", group.Name, "job", job.Name, "error", err)
|
||||
continue
|
||||
}
|
||||
job.Status = ExecutionStatusRunning
|
||||
changePresent = true
|
||||
case ExecutionStatusFailed:
|
||||
if group_dependency.Status == ExecutionStatusFailed {
|
||||
group.Status = ExecutionStatusAborted
|
||||
cp.updateDependentGroups(group.Name, ExecutionStatusFailed)
|
||||
}
|
||||
}
|
||||
if groupsCompleted == len(group.Dependencies) {
|
||||
run_group = true
|
||||
}
|
||||
} else {
|
||||
run_group = true
|
||||
}
|
||||
|
||||
if !run_group {
|
||||
fmt.Println("Group "+group.Name+" is not running as dependencies were not met", group.Dependencies)
|
||||
continue // not running the group as dependencies were not met
|
||||
} else {
|
||||
group.Status = ExecutionStatusRunning
|
||||
cp.updateDependentGroups(group.Name, ExecutionStatusRunning)
|
||||
|
||||
for _, job := range group.Jobs {
|
||||
run_job := false
|
||||
if job.Status == ExecutionStatusPending {
|
||||
if len(job.Dependencies) > 0 {
|
||||
jobsCompleted := 0
|
||||
for _, job_dependency := range job.Dependencies {
|
||||
if job_dependency.Status == ExecutionStatusSucceeded {
|
||||
jobsCompleted++
|
||||
}
|
||||
if job_dependency.Status == ExecutionStatusFailed {
|
||||
job.Status = ExecutionStatusAborted
|
||||
changePresent = true
|
||||
cp.updateDependentJobs(job.Name, ExecutionStatusFailed)
|
||||
}
|
||||
}
|
||||
if jobsCompleted == len(job.Dependencies) {
|
||||
run_job = true
|
||||
}
|
||||
} else {
|
||||
run_job = true
|
||||
}
|
||||
}
|
||||
|
||||
if !run_job {
|
||||
continue // job is not ready as dependencies were not met
|
||||
} else {
|
||||
if job.Status != ExecutionStatusRunning && job.Status != ExecutionStatusFailed && job.Status != ExecutionStatusSucceeded {
|
||||
job.Status = ExecutionStatusRunning
|
||||
cp.updateDependentJobs(job.Name, ExecutionStatusRunning)
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Running", "Job %s from group %s running", job.Name, group.Name)
|
||||
err := cp.executeJob(job, group)
|
||||
if err != nil {
|
||||
log.Log.Info("Unable to execute job", "error", err.Error())
|
||||
if !strings.Contains(err.Error(), "already exists") {
|
||||
job.Status = ExecutionStatusFailed
|
||||
group.Status = ExecutionStatusFailed
|
||||
cp.updateDependentJobs(job.Name, ExecutionStatusFailed)
|
||||
cp.updateDependentGroups(group.Name, ExecutionStatusFailed)
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failed", "Job %s from group %s failed", job.Name, group.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("Running group: ", group.Name, " with status: ", group.Status, " accepted: ", run_group)
|
||||
}
|
||||
}
|
||||
if changePresent {
|
||||
cp.updateCRDStatusDirectly()
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *connPackage) executeJob(j *jobsmanagerv1beta1.ManagedJobDefinition, g *jobsmanagerv1beta1.ManagedJobGroup) (err error) {
|
||||
generatedJobName := jobNameGenerator(cp.mj.Name, g.Name, j.Name)
|
||||
|
||||
convertRetries := func(retries int) *int32 {
|
||||
if retries == 0 {
|
||||
return nil
|
||||
@@ -252,10 +283,32 @@ func (cp *connPackage) executeJob(j *jobsmanagerv1beta1.ManagedJobDefinition, g
|
||||
|
||||
err = cp.r.Client.Create(cp.ctx, &job_handler)
|
||||
if err != nil || pandati.IsZero(job_handler) {
|
||||
log.Log.Error(err, "Unable to create job", "job", job_handler.Name)
|
||||
log.Log.Info("Unable to create job", "job", job_handler.Name, "error", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Created", "Created job %s", job_handler.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *connPackage) checkOverallStatus() {
|
||||
groupsCompleted := 0
|
||||
for _, group := range cp.mj.Spec.Groups {
|
||||
if group.Status == ExecutionStatusSucceeded {
|
||||
groupsCompleted++
|
||||
} else if group.Status == ExecutionStatusFailed || group.Status == ExecutionStatusAborted {
|
||||
cp.mj.Status = ExecutionStatusFailed
|
||||
cp.mj.Spec.Status = cp.mj.Status
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeWarning, "Failure", "Run failed")
|
||||
}
|
||||
}
|
||||
|
||||
if groupsCompleted == len(cp.mj.Spec.Groups) {
|
||||
cp.mj.Status = ExecutionStatusSucceeded
|
||||
cp.r.Recorder.Eventf(cp.mj, corev1.EventTypeNormal, "Success", "Run completed successfuly")
|
||||
} else {
|
||||
cp.mj.Status = ExecutionStatusRunning
|
||||
}
|
||||
cp.mj.Spec.Status = cp.mj.Status
|
||||
cp.r.Status().Update(cp.ctx, cp.mj)
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package controllers
|
||||
|
||||
// +kubebuilder:validation:Enum=Allow;Forbid;Replace
|
||||
type ExecutionStatus string
|
||||
|
||||
const (
|
||||
ExecutionStatusPending string = "pending"
|
||||
ExecutionStatusRunning string = "running"
|
||||
@@ -15,3 +13,29 @@ const (
|
||||
var (
|
||||
jobOwnerKey = ".metadata.controller"
|
||||
)
|
||||
|
||||
type (
|
||||
ExecutionStatus string
|
||||
|
||||
tree struct {
|
||||
text string
|
||||
items []Tree
|
||||
}
|
||||
|
||||
// Tree is tree interface
|
||||
Tree interface {
|
||||
Add(text string) Tree
|
||||
AddTree(tree Tree)
|
||||
Items() []Tree
|
||||
Text() string
|
||||
Print() string
|
||||
}
|
||||
|
||||
printer struct {
|
||||
}
|
||||
|
||||
// Printer is printer interface
|
||||
Printer interface {
|
||||
Print(Tree) string
|
||||
}
|
||||
)
|
||||
|
||||
@@ -30,11 +30,12 @@ type jobStatusUpdate struct {
|
||||
}
|
||||
|
||||
type connPackage struct {
|
||||
r *ManagedJobReconciler
|
||||
ctx context.Context
|
||||
req ctrl.Request
|
||||
mtx sync.Mutex
|
||||
mj *jobsmanagerv1beta1.ManagedJob
|
||||
r *ManagedJobReconciler
|
||||
ctx context.Context
|
||||
req ctrl.Request
|
||||
mtx sync.Mutex
|
||||
mj *jobsmanagerv1beta1.ManagedJob
|
||||
dependencyTree Tree
|
||||
}
|
||||
|
||||
func (cp *connPackage) getOwnerReference() (metav1.OwnerReference, error) {
|
||||
|
||||
@@ -19,6 +19,7 @@ package controllers
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/lukaszraczylo/pandati"
|
||||
kbatch "k8s.io/api/batch/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/record"
|
||||
@@ -44,9 +45,10 @@ func (r *ManagedJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
_ = log.FromContext(ctx)
|
||||
|
||||
cp := &connPackage{
|
||||
r: r,
|
||||
ctx: ctx,
|
||||
req: req,
|
||||
r: r,
|
||||
ctx: ctx,
|
||||
req: req,
|
||||
dependencyTree: nil,
|
||||
}
|
||||
|
||||
var managedJob jobsmanagerv1beta1.ManagedJob
|
||||
@@ -55,11 +57,19 @@ func (r *ManagedJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
}
|
||||
|
||||
cp.mj = &managedJob
|
||||
cp.buildJobsDependencyTree()
|
||||
cp.checkJobStatus()
|
||||
cp.runPendingJobs()
|
||||
cp.checkGroupsStatus()
|
||||
|
||||
originalMainJobDefinition := cp.mj.DeepCopy()
|
||||
cp.generateDependencyTree()
|
||||
|
||||
// TODO: Re-enable after testing
|
||||
cp.checkRunningJobsStatus()
|
||||
cp.runPendingJobs()
|
||||
cp.checkOverallStatus()
|
||||
|
||||
_, theSame, _ := pandati.CompareStructsReplaced(originalMainJobDefinition, cp.mj)
|
||||
if !theSame {
|
||||
cp.updateCRDStatusDirectly()
|
||||
}
|
||||
// fmt.Printf("Reconcile: %# v", pretty.Formatter(r.Updater))
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
|
||||
@@ -47,7 +47,7 @@ require (
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.14.0 // indirect
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/common v0.39.0 // indirect
|
||||
github.com/prometheus/common v0.40.0 // indirect
|
||||
github.com/prometheus/procfs v0.9.0 // indirect
|
||||
github.com/rogpeppe/go-internal v1.9.0 // indirect
|
||||
github.com/rs/zerolog v1.29.0 // indirect
|
||||
@@ -75,7 +75,7 @@ require (
|
||||
k8s.io/component-base v0.26.1 // indirect
|
||||
k8s.io/klog/v2 v2.90.0 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20230217203603-ff9a8e8fa21d // indirect
|
||||
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
|
||||
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
|
||||
@@ -292,6 +292,8 @@ github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8
|
||||
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
|
||||
github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI=
|
||||
github.com/prometheus/common v0.39.0/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
|
||||
github.com/prometheus/common v0.40.0 h1:Afz7EVRqGg2Mqqf4JuF9vdvp1pi220m55Pi9T2JnO4Q=
|
||||
github.com/prometheus/common v0.40.0/go.mod h1:L65ZJPSmfn/UBWLQIHV7dBrKFidB/wPlF1y5TlSt9OE=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
@@ -725,6 +727,8 @@ k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 h1:KTgPnR10d5zhztWptI952TNtt/4u5
|
||||
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
|
||||
k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=
|
||||
k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
|
||||
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 h1:kmDqav+P+/5e1i9tFfHq1qcF3sOrDp+YEkVDAHu7Jwk=
|
||||
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||
|
||||
Reference in New Issue
Block a user