Gestionnaire de jobs et workflows
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

82 lines
1.7 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021 losyme ##################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package memory
import (
"sort"
"time"
"forge.chapril.org/dune/jw"
)
type jobsToRun struct { // AFINIR: sync.Pool ?
jobs []*jw.Job
}
func (jtr *jobsToRun) Next() (*jw.Job, error) {
now := time.Now()
for i, job := range jtr.jobs {
if (job.Status == jw.StatusTodo || job.Status == jw.StatusPending) && job.RunAfter.Before(now) {
jtr.jobs = jtr.jobs[i+1:]
return job, nil
}
}
return nil, nil
}
func (m *memory) jobsRunning() jw.JobsRunning {
jr := make(jw.JobsRunning)
for _, job := range m.jobs {
if job.Status != jw.StatusRunning {
continue
}
jr[job.Namespace+job.Type] += 1
}
return jr
}
func (m *memory) NextJob(fn jw.SelectNextJob) (*jw.Job, error) {
m.sm.Lock()
defer m.sm.Unlock()
jobs := make([]*jw.Job, len(m.jobs))
copy(jobs, m.jobs)
sort.Slice(
jobs,
func(i, j int) bool {
ji := jobs[i]
jj := jobs[j]
return ji.Priority > jj.Priority || ji.Weight < jj.Weight || ji.TimeReference.Before(jj.TimeReference)
},
)
job, err := fn(m.jobsRunning(), &jobsToRun{jobs})
if job == nil || err != nil {
return nil, err
}
// Sufffisant ?
clone := new(jw.Job)
*clone = *job
job.Status = jw.StatusRunning
job.Weight++
return clone, nil
}
/*
######################################################################################################## @(°_°)@ #######
*/