|
|
|
@ -14,14 +14,15 @@ import (
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type jobsToRun struct { // AFINIR: sync.Pool ?
|
|
|
|
|
jobs []*jw.Job |
|
|
|
|
now time.Time |
|
|
|
|
namespace string |
|
|
|
|
jobs []*jw.Job |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (jtr *jobsToRun) Next() (*jw.Job, error) { |
|
|
|
|
now := time.Now() |
|
|
|
|
|
|
|
|
|
for i, job := range jtr.jobs { |
|
|
|
|
if (job.Status == jw.StatusTodo || job.Status == jw.StatusPending) && job.RunAfter.Before(now) { |
|
|
|
|
if job.Namespace == jtr.namespace && |
|
|
|
|
(job.Status == jw.StatusTodo || job.Status == jw.StatusPending) && job.RunAfter.Before(jtr.now) { |
|
|
|
|
jtr.jobs = jtr.jobs[i+1:] |
|
|
|
|
return job, nil |
|
|
|
|
} |
|
|
|
@ -44,7 +45,7 @@ func (m *memory) jobsRunning() jw.JobsRunning {
|
|
|
|
|
return jr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (m *memory) NextJob(fn jw.SelectNextJob) (*jw.Job, error) { |
|
|
|
|
func (m *memory) NextJob(namespace string, fn jw.SelectNextJob) (*jw.Job, error) { |
|
|
|
|
m.sm.Lock() |
|
|
|
|
defer m.sm.Unlock() |
|
|
|
|
|
|
|
|
@ -57,11 +58,17 @@ func (m *memory) NextJob(fn jw.SelectNextJob) (*jw.Job, error) {
|
|
|
|
|
ji := jobs[i] |
|
|
|
|
jj := jobs[j] |
|
|
|
|
|
|
|
|
|
return ji.Priority > jj.Priority || ji.Weight < jj.Weight || ji.TimeReference.Before(jj.TimeReference) |
|
|
|
|
return ji.Priority > jj.Priority || ji.Session < jj.Session || ji.TimeReference.Before(jj.TimeReference) |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
job, err := fn(m.jobsRunning(), &jobsToRun{jobs}) |
|
|
|
|
jtr := &jobsToRun{ |
|
|
|
|
now: time.Now(), |
|
|
|
|
namespace: namespace, |
|
|
|
|
jobs: jobs, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
job, err := fn(m.jobsRunning(), jtr) |
|
|
|
|
if job == nil || err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
@ -71,7 +78,6 @@ func (m *memory) NextJob(fn jw.SelectNextJob) (*jw.Job, error) {
|
|
|
|
|
*clone = *job |
|
|
|
|
|
|
|
|
|
job.Status = jw.StatusRunning |
|
|
|
|
job.Weight++ |
|
|
|
|
|
|
|
|
|
return clone, nil |
|
|
|
|
} |
|
|
|
|