Gestionnaire de jobs et workflows
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
3.6 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021-2022 losyme ################################################ MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package model
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
)
func (m *model) createJob(job *jw.Job) (bool, error) {
if err := m.validateJob(job); err != nil {
return false, errors.Wrap( /////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "this job is not valid"),
)
}
done, err := m.storageInsertJob(job)
if err != nil {
return false, errors.WithMessage(err, "unable to insert this job") /////////////////////////////////////////////
}
return done, nil
}
func (m *model) CreateJob(job *jw.Job) (*jw.Job, error) {
done, err := m.createJob(job)
if err != nil {
m.logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to create this job",
append(
job.Fields(),
nil, err,
)...,
)
return nil, err
}
if done {
return job, nil
}
return nil, nil
}
func (m *model) updateJob(job *jw.Job) error {
if job.Workflow == nil || job.Status == jw.StatusPending {
err := m.storageUpdateJob(job)
if err != nil {
m.logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to update this job",
append(
job.Fields(),
nil, err,
)...,
)
}
return err
}
wf, err := m.cs.Storage().Workflow(*job.Workflow, true)
if err != nil {
m.logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Cannot retrieve the workflow of this job",
append(
job.Fields(),
nil, err,
)...,
)
return err
}
if err := m.updateWorkflow(wf, job); err != nil {
m.logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to update this workflow",
append(
wf.Fields(),
nil, err,
)...,
)
return err
}
return nil
}
func (m *model) UpdateJob(job *jw.Job) (*jw.Job, error) {
if job.Result == nil {
job.Succeeded()
}
jr := job.Result
m.logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Job result",
"id", job.ID,
"status", jr.Status,
"next_step", jr.NextStep,
"value", jr.Value,
"duration", jr.Duration.String(),
"error", jr.Error,
)
switch jr.Status {
case jw.StatusSucceeded:
job.Finished(jw.StatusSucceeded)
case jw.StatusPending:
if jr.ErrMsg == "" {
job.RunAfter = time.Now().Add(jr.Duration)
job.Status = jw.StatusPending
break
}
if job.Attempt < job.MaxAttempts {
job.RunAfter = time.Now().Add(jr.Duration)
job.Status = jw.StatusPending
job.Attempt++
break
}
fallthrough
default:
job.Finished(jw.StatusFailed)
}
if err := m.updateJob(job); err != nil {
return nil, err
}
return job, nil
}
func (m *model) NotifyJob(id string, data interface{}) error {
// AFINIR
return nil
}
func (m *model) SetJobPriority(id string, priority jw.Priority) error {
return m.cs.Storage().SetJobPriority(id, priority)
}
func (m *model) SetJobRunAfter(id string, duration time.Duration) error {
return m.cs.Storage().SetJobRunAfter(id, duration)
}
/*
######################################################################################################## @(°_°)@ #######
*/