Gestionnaire de jobs et workflows
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
3.3 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021 losyme ##################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package memory
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
)
func (m *memory) findJob(id string) (int, *jw.Job) {
for i, job := range m.jobs {
if job.ID == id {
return i, job
}
}
return 0, nil
}
func (m *memory) ValidateJob(_ *jw.Job) error {
return nil
}
func (m *memory) insertJob(job *jw.Job) (bool, error) {
if _, j := m.findJob(job.ID); j != nil {
return false, errors.Wrap( /////////////////////////////////////////////////////////////////////////////////////
jw.ErrStorage,
errors.New("this job identifier already exists", "id", job.ID),
)
}
if job.Category != nil && job.Workflow == nil {
for _, j := range m.jobs {
if j.Category != nil && j.Workflow == nil && *j.Category == *job.Category &&
j.Namespace == job.Namespace &&
j.Type == job.Type &&
(j.Status == jw.StatusTodo || j.Status == jw.StatusPending || j.Status == jw.StatusRunning) {
return false, nil
}
}
}
m.jobs = append(m.jobs, job)
return true, nil
}
func (m *memory) InsertJob(job *jw.Job) (bool, error) {
m.sm.Lock()
defer m.sm.Unlock()
return m.insertJob(job)
}
func (m *memory) updateJob(job *jw.Job) error {
if i, j := m.findJob(job.ID); j != nil {
m.jobs[i] = job
return nil
}
return errors.Wrap( ////////////////////////////////////////////////////////////////////////////////////////////////
jw.ErrStorage,
errors.New("this job does not exist", "job", job.ID),
)
}
func (m *memory) UpdateJob(job *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
return m.updateJob(job)
}
func (m *memory) NotifyJob(id string, data interface{}) error {
m.sm.Lock()
defer m.sm.Unlock()
_, job := m.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
// AFINIR
return errors.NotImplemented()
}
func (m *memory) SetJobPriority(id string, priority jw.Priority) error {
m.sm.Lock()
defer m.sm.Unlock()
_, job := m.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
if job.Status == jw.StatusSucceeded || job.Status == jw.StatusFailed {
return errors.New("this job is no longer running", "job", id) //////////////////////////////////////////////////
}
job.Priority = priority
return nil
}
func (m *memory) SetJobRunAfter(id string, duration time.Duration) error {
m.sm.Lock()
defer m.sm.Unlock()
_, job := m.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
if job.Status != jw.StatusTodo && job.Status != jw.StatusPending {
return errors.New("this job is not waiting to be executed", "job", id) /////////////////////////////////////////
}
job.RunAfter = time.Now().Add(duration)
return nil
}
/*
######################################################################################################## @(°_°)@ #######
*/