Files
lukaszraczylo 2b36071647 Multiple fixes (#29)
* Multiple fixes

- add goreleaser to the build / release process
- add kubectl plugin for job graphs visualization
- add installation scripts
- update dependencies

* Update the release & CRD content.

* Next set of improvements.

  Code Quality

  - Label constants: Added LabelWorkflowName, LabelGroupName, LabelJobName, LabelJobID in controllers/definitions.go
  - Removed commented debug code: Cleaned up dead code from multiple files
  - Removed unused dependencyTree field: Cleaned connPackage struct
  - Fixed snake_case variables: Changed to camelCase (runGroup, groupDep, runJob, jobDep, k8sJob)

  Kubernetes Best Practices

  - Finalizers: Implemented handleDeletion() and deleteChildJobs() for proper cleanup
  - Status enum validation: Added +kubebuilder:validation:Enum=pending;running;succeeded;failed;aborted
  - ImagePullPolicy default: Created getImagePullPolicy() helper that defaults to IfNotPresent
  - Resource limits support: Added Resources *corev1.ResourceRequirements to ManagedJobParameters

  Observability

  - Prometheus metrics: Created controllers/metrics.go with counters (jobs created/succeeded/failed), histogram (reconciliation duration), and gauge (active jobs)
  - Structured logging: Added logger field to connPackage, used context-based logging throughout

  Configuration

  - Leader election ID: Made configurable via --leader-election-id flag
  - Development mode: Made configurable via --dev-mode flag and LOG_LEVEL env var

  Performance

  - Dependency lookup optimization: Changed from O(n*m) to O(1) using lookup maps (jobDepMap, groupDepMap)
  - Reconciliation backoff: Added RequeueAfter: 30*time.Second when workflow is running

  Documentation & Testing

  - Godoc documentation: Added comprehensive comments to API types and controller
  - Unit tests: Added helpers_test.go with tests for all helper functions
  - Integration tests: Added managedjob_controller_test.go with Ginkgo/Gomega tests

* Add the helm chart release.

* Add reasonable test coverage.
2025-12-17 22:33:23 +00:00

180 lines
5.7 KiB
Go

/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"time"
"github.com/lukaszraczylo/pandati"
kbatch "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
jobsmanagerv1beta1 "raczylo.com/jobs-manager-operator/api/v1beta1"
)
const (
// RequeueDelay is the time to wait before requeuing when jobs are running
RequeueDelay = 30 * time.Second
)
// ManagedJobReconciler reconciles a ManagedJob object
type ManagedJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
//+kubebuilder:rbac:groups=jobsmanager.raczylo.com,resources=managedjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=jobsmanager.raczylo.com,resources=managedjobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=jobsmanager.raczylo.com,resources=managedjobs/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch;delete;get;list;watch
// Reconcile ensures ManagedJob workflows progress toward completion.
// It orchestrates job execution respecting dependencies, manages retries,
// and tracks overall workflow status.
func (r *ManagedJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("managedJob", req.NamespacedName)
var managedJob jobsmanagerv1beta1.ManagedJob
if err := r.Get(ctx, req.NamespacedName, &managedJob); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
cp := &connPackage{
r: r,
ctx: ctx,
req: req,
logger: logger,
mj: &managedJob,
}
// Handle deletion with finalizer
if !managedJob.DeletionTimestamp.IsZero() {
return r.handleDeletion(ctx, cp)
}
// Add finalizer if not present
if !controllerutil.ContainsFinalizer(&managedJob, FinalizerName) {
controllerutil.AddFinalizer(&managedJob, FinalizerName)
if err := r.Update(ctx, &managedJob); err != nil {
logger.Error(err, "Failed to add finalizer")
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Second}, nil
}
originalMainJobDefinition := cp.mj.DeepCopy()
cp.generateDependencyTree()
cp.buildDependencyMaps() // Build lookup maps for O(1) dependency updates
_, theSame, _ := pandati.CompareStructsReplaced(originalMainJobDefinition, cp.mj)
if !theSame {
if err := cp.updateCRDStatusDirectly(); err != nil {
logger.Error(err, "Failed to update CRD status after dependency tree generation")
}
return ctrl.Result{}, nil
}
originalMainJobDefinition = cp.mj.DeepCopy()
cp.checkRunningJobsStatus()
cp.runPendingJobs()
_, theSame, _ = pandati.CompareStructsReplaced(originalMainJobDefinition, cp.mj)
if !theSame {
if err := cp.updateCRDStatusDirectly(); err != nil {
logger.Error(err, "Failed to update CRD status after job processing")
}
}
cp.checkOverallStatus()
// If workflow is still running, requeue after a delay to check status
if cp.mj.Status == ExecutionStatusRunning {
return ctrl.Result{RequeueAfter: RequeueDelay}, nil
}
return ctrl.Result{}, nil
}
// handleDeletion cleans up child jobs before removing the finalizer
func (r *ManagedJobReconciler) handleDeletion(ctx context.Context, cp *connPackage) (ctrl.Result, error) {
if !controllerutil.ContainsFinalizer(cp.mj, FinalizerName) {
return ctrl.Result{}, nil
}
cp.logger.Info("Cleaning up child jobs before deletion")
// Delete all child jobs
if err := r.deleteChildJobs(ctx, cp); err != nil {
cp.logger.Error(err, "Failed to delete child jobs")
return ctrl.Result{}, err
}
// Remove finalizer
controllerutil.RemoveFinalizer(cp.mj, FinalizerName)
if err := r.Update(ctx, cp.mj); err != nil {
cp.logger.Error(err, "Failed to remove finalizer")
return ctrl.Result{}, err
}
cp.logger.Info("Successfully cleaned up ManagedJob")
return ctrl.Result{}, nil
}
// deleteChildJobs removes all jobs owned by this ManagedJob
func (r *ManagedJobReconciler) deleteChildJobs(ctx context.Context, cp *connPackage) error {
var childJobs kbatch.JobList
labelSelector := labels.SelectorFromSet(labels.Set{
LabelWorkflowName: cp.mj.Name,
})
listOptions := &client.ListOptions{
LabelSelector: labelSelector,
Namespace: cp.mj.Namespace,
}
if err := r.Client.List(ctx, &childJobs, listOptions); err != nil {
return err
}
for i := range childJobs.Items {
job := &childJobs.Items[i]
if err := r.Client.Delete(ctx, job, client.PropagationPolicy("Background")); err != nil {
cp.logger.Error(err, "Failed to delete child job", "job", job.Name)
// Continue trying to delete other jobs
} else {
cp.logger.Info("Deleted child job", "job", job.Name)
}
}
return nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *ManagedJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&jobsmanagerv1beta1.ManagedJob{}).
Owns(&kbatch.Job{}).
Complete(r)
}