mirror of
https://github.com/lukaszraczylo/jobs-manager-operator.git
synced 2026-06-05 22:33:44 +00:00
2b36071647
* Multiple fixes - add goreleaser to the build / release process - add kubectl plugin for job graphs visualization - add installation scripts - update dependencies * Update the release & CRD content. * Next set of improvements. Code Quality - Label constants: Added LabelWorkflowName, LabelGroupName, LabelJobName, LabelJobID in controllers/definitions.go - Removed commented debug code: Cleaned up dead code from multiple files - Removed unused dependencyTree field: Cleaned connPackage struct - Fixed snake_case variables: Changed to camelCase (runGroup, groupDep, runJob, jobDep, k8sJob) Kubernetes Best Practices - Finalizers: Implemented handleDeletion() and deleteChildJobs() for proper cleanup - Status enum validation: Added +kubebuilder:validation:Enum=pending;running;succeeded;failed;aborted - ImagePullPolicy default: Created getImagePullPolicy() helper that defaults to IfNotPresent - Resource limits support: Added Resources *corev1.ResourceRequirements to ManagedJobParameters Observability - Prometheus metrics: Created controllers/metrics.go with counters (jobs created/succeeded/failed), histogram (reconciliation duration), and gauge (active jobs) - Structured logging: Added logger field to connPackage, used context-based logging throughout Configuration - Leader election ID: Made configurable via --leader-election-id flag - Development mode: Made configurable via --dev-mode flag and LOG_LEVEL env var Performance - Dependency lookup optimization: Changed from O(n*m) to O(1) using lookup maps (jobDepMap, groupDepMap) - Reconciliation backoff: Added RequeueAfter: 30*time.Second when workflow is running Documentation & Testing - Godoc documentation: Added comprehensive comments to API types and controller - Unit tests: Added helpers_test.go with tests for all helper functions - Integration tests: Added managedjob_controller_test.go with Ginkgo/Gomega tests * Add the helm chart release. * Add reasonable test coverage.
161 lines
3.9 KiB
Go
161 lines
3.9 KiB
Go
package controllers
|
|
|
|
import (
|
|
"strings"
|
|
|
|
"github.com/lukaszraczylo/pandati"
|
|
jobsmanagerv1beta1 "raczylo.com/jobs-manager-operator/api/v1beta1"
|
|
)
|
|
|
|
const (
|
|
newLine = "\n"
|
|
emptySpace = " "
|
|
middleItem = "├── "
|
|
continueItem = "│ "
|
|
lastItem = "└── "
|
|
)
|
|
|
|
func New(text string) Tree {
|
|
return &tree{
|
|
text: text,
|
|
items: []Tree{},
|
|
}
|
|
}
|
|
|
|
// Add adds a node to the tree
|
|
func (t *tree) Add(text string) Tree {
|
|
n := New(text)
|
|
t.items = append(t.items, n)
|
|
return n
|
|
}
|
|
|
|
// AddTree adds a tree as an item
|
|
func (t *tree) AddTree(tree Tree) {
|
|
t.items = append(t.items, tree)
|
|
}
|
|
|
|
// Text returns the node's value
|
|
func (t *tree) Text() string {
|
|
return t.text
|
|
}
|
|
|
|
// Items returns all items in the tree
|
|
func (t *tree) Items() []Tree {
|
|
return t.items
|
|
}
|
|
|
|
func (t *tree) Print() string {
|
|
p := &printer{}
|
|
return p.Print(t)
|
|
}
|
|
|
|
func (p *printer) Print(t Tree) string {
|
|
return t.Text() + newLine + p.printItems(t.Items(), []bool{})
|
|
}
|
|
|
|
func (p *printer) printText(text string, spaces []bool, last bool) string {
|
|
var result string
|
|
for _, space := range spaces {
|
|
if space {
|
|
result += emptySpace
|
|
} else {
|
|
result += continueItem
|
|
}
|
|
}
|
|
|
|
indicator := middleItem
|
|
if last {
|
|
indicator = lastItem
|
|
}
|
|
|
|
var out string
|
|
lines := strings.Split(text, "\n")
|
|
for i := range lines {
|
|
text := lines[i]
|
|
if i == 0 {
|
|
out += result + indicator + text + newLine
|
|
continue
|
|
}
|
|
if last {
|
|
indicator = emptySpace
|
|
} else {
|
|
indicator = continueItem
|
|
}
|
|
out += result + indicator + text + newLine
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
func (p *printer) printItems(t []Tree, spaces []bool) string {
|
|
var result string
|
|
for i, f := range t {
|
|
last := i == len(t)-1
|
|
result += p.printText(f.Text(), spaces, last)
|
|
if len(f.Items()) > 0 {
|
|
spacesChild := append(spaces, last)
|
|
result += p.printItems(f.Items(), spacesChild)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (cp *connPackage) checkIfPresentInDependencies(currentDependencies []*jobsmanagerv1beta1.ManagedJobDependencies, dependencyName string) bool {
|
|
for _, dependency := range currentDependencies {
|
|
if dependency.Name == dependencyName {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (cp *connPackage) generateDependencyTree() {
|
|
// First pass - initialize the tree and get all the gathered jobs
|
|
originalMainJobDefinition := cp.mj.DeepCopy()
|
|
|
|
mainTree := New(cp.mj.Name)
|
|
for _, group := range cp.mj.Spec.Groups {
|
|
groupTree := mainTree.Add(group.Name)
|
|
for _, job := range group.Jobs {
|
|
jobTree := groupTree.Add(job.Name)
|
|
job.CompiledParams = cp.compileParameters(cp.mj.Spec.Params, group.Params, job.Params)
|
|
if job.Parallel {
|
|
continue
|
|
} else {
|
|
// get the groupTree items before this job and add them as dependencies
|
|
for _, jobTreePrevious := range groupTree.Items() {
|
|
if jobTreePrevious.Text() == job.Name {
|
|
break
|
|
}
|
|
generatedJobName := jobNameGenerator(cp.mj.Name, group.Name, jobTreePrevious.Text())
|
|
jobTree.Add("Depends on: " + generatedJobName)
|
|
if !cp.checkIfPresentInDependencies(job.Dependencies, generatedJobName) {
|
|
job.Dependencies = append(job.Dependencies, &jobsmanagerv1beta1.ManagedJobDependencies{Name: generatedJobName, Status: ExecutionStatusPending})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if group.Parallel {
|
|
continue
|
|
} else {
|
|
// get the mainTree items before this group and add them as dependencies
|
|
for _, groupTreePrevious := range mainTree.Items() {
|
|
if groupTreePrevious.Text() == group.Name {
|
|
break
|
|
}
|
|
groupTree.Add("Depends on group: " + groupTreePrevious.Text())
|
|
if !cp.checkIfPresentInDependencies(group.Dependencies, groupTreePrevious.Text()) {
|
|
group.Dependencies = append(group.Dependencies, &jobsmanagerv1beta1.ManagedJobDependencies{Name: groupTreePrevious.Text(), Status: ExecutionStatusPending})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
_, theSame, _ := pandati.CompareStructsReplaced(originalMainJobDefinition, cp.mj)
|
|
if !theSame {
|
|
if err := cp.updateCRDStatusDirectly(); err != nil {
|
|
cp.logger.Error(err, "Failed to update CRD status in dependency tree")
|
|
}
|
|
}
|
|
}
|