Gestionnaire de jobs et workflows
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
1.9 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021-2022 losyme ################################################ MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package memory
import (
"sort"
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/dune/internal/storage"
)
type jobsToRun struct { // @FIXME sync.Pool ?
now time.Time
namespace string
jobs []*jw.Job
}
func (jtr *jobsToRun) Next() (*jw.Job, error) {
for i, job := range jtr.jobs {
if job.Namespace == jtr.namespace &&
(job.Status == jw.StatusTodo || job.Status == jw.StatusPending) && job.RunAfter.Before(jtr.now) {
jtr.jobs = jtr.jobs[i+1:]
return job, nil
}
}
return nil, nil
}
func (ms *memoryStorage) jobsRunning(namespace string) storage.JobsRunning {
jr := make(storage.JobsRunning)
for _, job := range ms.jobs {
if job.Namespace == namespace && job.Status == jw.StatusRunning {
jr[job.Type] += 1
}
}
return jr
}
func (ms *memoryStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, error) {
ms.sm.Lock()
defer ms.sm.Unlock()
jobs := make([]*jw.Job, len(ms.jobs))
copy(jobs, ms.jobs)
sort.Slice(
jobs,
func(i, j int) bool {
ji := jobs[i]
jj := jobs[j]
return ji.Priority > jj.Priority || ji.Weight < jj.Weight || ji.TimeReference.Before(jj.TimeReference)
},
)
jtr := &jobsToRun{
now: time.Now(),
namespace: namespace,
jobs: jobs,
}
job, err := fn(ms.jobsRunning(namespace), jtr)
if err != nil || job == nil {
return nil, err
}
// Sufffisant ?
clone := new(jw.Job)
*clone = *job
job.Status = jw.StatusRunning
return clone, nil
}
/*
######################################################################################################## @(°_°)@ #######
*/