Gestionnaire de jobs et workflows
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
5.6 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021-2022 losyme ################################################ MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package model
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/util"
"forge.chapril.org/losyme/uuid"
)
func (m *model) stepToJob(wf *jw.Workflow, name string) (*jw.Job, error) {
step, ok := wf.AllSteps[name]
if !ok {
return nil, errors.New( ////////////////////////////////////////////////////////////////////////////////////////
"this step does not exist",
"step", name,
)
}
job := jw.NewJob(step.Namespace, step.Type).
SetID(uuid.New()).
SetName(name).
SetOrigin(wf.Origin).
SetPriority(wf.Priority).
SetExclusive(step.Exclusive).
SetMaxOccurrences(step.MaxOccurrences).
SetMaxAttempts(step.MaxAttempts)
job.Private = util.CopyKV(step.Config, job.Private)
job.Workflow = &wf.ID
job.CreatedAt = time.Now()
job.Status = jw.StatusTodo
job.TimeReference = wf.CreatedAt
return job, nil
}
func (m *model) createWorkflow(wf *jw.Workflow) error {
if err := m.validateWorkflow(wf); err != nil {
return errors.Wrap( ////////////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "this workflow is not valid"),
)
}
job, err := m.stepToJob(wf, wf.FirstStep)
if err != nil {
return errors.Wrap( ////////////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "this workflow is not valid"),
)
}
job.Public = wf.Data
if err := m.storageInsertWorkflow(wf, job); err != nil {
return errors.WithMessage(err, "unable to insert this workflow") ///////////////////////////////////////////////
}
return nil
}
func (m *model) CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error) {
if err := m.createWorkflow(wf); err != nil {
m.logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to create this workflow",
append(
wf.Fields(),
nil, err,
)...,
)
return nil, err
}
return wf, nil
}
func (m *model) nextStepAdvanced(step string, value interface{}) (string, map[string]interface{}, error) {
switch next := value.(type) {
case nil:
return "", nil, nil
case string:
return next, nil, nil
case map[string]interface{}:
vs, ok := next["step"]
if !ok {
return "", nil, errors.New( ////////////////////////////////////////////////////////////////////////////////
"'step' key is missing in this step",
"step", step,
)
}
switch label := vs.(type) {
case nil:
return "", nil, nil
case string:
vd, ok := next["data"]
if !ok {
return label, nil, nil
}
switch data := vd.(type) {
case nil:
return label, nil, nil
case map[string]interface{}:
return label, data, nil
default:
return "", nil, errors.New( ////////////////////////////////////////////////////////////////////////////
"'data' key is not valid in this step",
"step", step,
)
}
default:
return "", nil, errors.New( ////////////////////////////////////////////////////////////////////////////////
"'step' key is not valid in this step",
"step", step,
)
}
default:
return "", nil, errors.New( ////////////////////////////////////////////////////////////////////////////////////
"configuration of this step is not valid",
"step", step,
)
}
}
func (m *model) nextStep(wf *jw.Workflow, job *jw.Job) (string, map[string]interface{}, error) {
jr := job.Result
if jr.NextStep != "" {
return jr.NextStep, nil, nil
}
step, ok := wf.AllSteps[job.Name]
if !ok {
return "", nil, errors.New( ////////////////////////////////////////////////////////////////////////////////////
"unable to find this step",
"step", job.Name,
)
}
if step.Next == nil {
return "", nil, nil
}
for _, v := range []string{jr.Value, string(job.Status)} {
if v == "" {
continue
}
value, ok := step.Next[v]
if !ok {
continue
}
return m.nextStepAdvanced(job.Name, value)
}
return "", nil, errors.New( ////////////////////////////////////////////////////////////////////////////////////////
"unable to find the next step of this step",
"step", job.Name,
)
}
func (m *model) nextJob(wf *jw.Workflow, job *jw.Job, stepName string, data map[string]interface{}) (*jw.Job, error) {
nextJob, err := m.stepToJob(wf, stepName)
if err != nil {
return nil, err
}
nextJob.Public = job.Public
nextJob.Private = util.CopyKV(data, nextJob.Private)
nextJob.ErrorCounter = job.ErrorCounter
nextJob.LastError = job.LastError
return nextJob, nil
}
func (m *model) updateWorkflow(wf *jw.Workflow, job *jw.Job) error {
stepName, data, err := m.nextStep(wf, job)
if err != nil {
return err
}
if stepName == "" {
if job.ErrorCounter > 0 {
wf.Status = jw.StatusFailed
} else {
wf.Status = jw.StatusSucceeded
}
wf.Finished()
return m.storageUpdateWorkflow(wf, job, nil)
}
nextJob, err := m.nextJob(wf, job, stepName, data)
if err != nil {
return err
}
return m.storageUpdateWorkflow(nil, job, nextJob)
}
func (m *model) SetWorkflowPriority(id string, priority jw.Priority) error {
return m.cs.Storage().SetWorkflowPriority(id, priority)
}
/*
######################################################################################################## @(°_°)@ #######
*/