Browse Source

En cours de développement

master
losyme 2 months ago
parent
commit
af4678f339
  1. 49
      internal/model/jobs.go
  2. 63
      internal/model/storage.go
  3. 53
      internal/model/workflows.go
  4. 14
      internal/storage/memory/workflows.go

49
internal/model/jobs.go

@ -13,41 +13,34 @@ import (
"forge.chapril.org/losyme/errors"
)
func (m *Model) insertJob(job *jw.Job) (bool, error) {
done, err := m.Storage.InsertJob(job)
if err != nil {
m.Logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to insert a new job",
append(
job.Fields(),
"reason", err,
)...,
func (m *Model) createJob(job *jw.Job) (bool, error) {
if err := m.validateJob(job); err != nil {
return false, errors.Wrap( /////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "this job is not valid"),
)
return false, err
}
if !done {
m.Logger.Notice("A job with the same category already exists", job.Fields()...) //::::::::::::::::::::::::::::::
return false, nil
done, err := m.storageInsertJob(job)
if err != nil {
return false, errors.WithMessage(err, "unable to insert this job") /////////////////////////////////////////////
}
m.Logger.Info("New job", job.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
return true, nil
return done, nil
}
func (m *Model) CreateJob(job *jw.Job) (*jw.Job, error) {
if err := m.validateJob(job); err != nil {
return nil, errors.Wrap( ///////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "unable to create this job"),
done, err := m.createJob(job)
if err != nil {
m.Logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to create this job",
append(
job.Fields(),
"reason", err,
)...,
)
}
done, err := m.insertJob(job)
if err != nil {
return nil, errors.WithMessage(err, "unable to insert this job") ///////////////////////////////////////////////
return nil, err
}
if done {
@ -59,10 +52,10 @@ func (m *Model) CreateJob(job *jw.Job) (*jw.Job, error) {
func (m *Model) updateJob(job *jw.Job) error {
if job.Workflow == nil || job.Status == jw.StatusPending {
err := m.storeJob(job)
err := m.storageUpdateJob(job)
if err != nil {
m.Logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Impossible to update this job",
"Unable to update this job",
append(
job.Fields(),
"reason", err,
@ -76,7 +69,7 @@ func (m *Model) updateJob(job *jw.Job) error {
wf, err := m.Storage.Workflow(*job.Workflow, true)
if err != nil {
m.Logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Cannot retrieve the workflow associated with this job",
"Cannot retrieve the workflow of this job",
append(
job.Fields(),
"reason", err,

63
internal/model/storage.go

@ -13,14 +13,18 @@ import (
)
func (m *Model) logJob(job *jw.Job) {
if job.Status == jw.StatusPending {
switch job.Status {
case jw.StatusTodo:
m.Logger.Info("New job", job.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
case jw.StatusPending:
m.Logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Job to continue",
"id", job.ID,
"run_after", job.RunAfter.String(),
"attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxAttempts),
"session", job.Session,
)
} else {
default:
m.Logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Job finished",
"id", job.ID,
@ -30,7 +34,24 @@ func (m *Model) logJob(job *jw.Job) {
}
}
func (m *Model) storeJob(job *jw.Job) error {
func (m *Model) storageInsertJob(job *jw.Job) (bool, error) {
done, err := m.Storage.InsertJob(job)
if err != nil {
// AFINIR
return false, err
}
if !done {
m.Logger.Notice("A job with the same category already exists", job.Fields()...) //::::::::::::::::::::::::::::::
return false, nil
}
m.logJob(job)
return true, nil
}
func (m *Model) storageUpdateJob(job *jw.Job) error {
if err := m.Storage.UpdateJob(job); err != nil {
// AFINIR
return err
@ -41,29 +62,37 @@ func (m *Model) storeJob(job *jw.Job) error {
return nil
}
func (m *Model) logWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) {
m.logJob(job)
if nextJob != nil {
m.Logger.Info("New job", nextJob.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
return
func (m *Model) storageInsertWorkflow(wf *jw.Workflow, job *jw.Job) error {
if err := m.Storage.InsertWorkflow(wf, job); err != nil {
// AFINIR
return err
}
m.Logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Workflow finished",
"id", wf.ID,
"status", wf.Status,
"duration", (*wf.FinishedAt).Sub(wf.CreatedAt).String(),
)
m.Logger.Info("New workflow [begin]", wf.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
m.logJob(job)
return nil
}
func (m *Model) storeWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
func (m *Model) storageUpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
if err := m.Storage.UpdateWorkflow(wf, job, nextJob); err != nil {
// AFINIR
return err
}
m.logWorkflow(wf, job, nextJob)
m.logJob(job)
if nextJob == nil {
m.Logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Workflow finished",
"id", wf.ID,
"status", wf.Status,
"duration", (*wf.FinishedAt).Sub(wf.CreatedAt).String(),
)
} else {
m.logJob(nextJob)
}
return nil
}

53
internal/model/workflows.go

@ -43,39 +43,42 @@ func (m *Model) stepToJob(wf *jw.Workflow, name string) (*jw.Job, error) {
return job, nil
}
func (m *Model) insertWorkflow(wf *jw.Workflow) error {
job, _ := m.stepToJob(wf, wf.FirstStep)
job.Public = wf.Data
if err := m.Storage.InsertWorkflow(wf, job); err != nil {
m.Logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to insert a new workflow",
append(
wf.Fields(),
"reason", err,
)...,
func (m *Model) createWorkflow(wf *jw.Workflow) error {
if err := m.validateWorkflow(wf); err != nil {
return errors.Wrap( ////////////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "this workflow is not valid"),
)
}
return err
job, err := m.stepToJob(wf, wf.FirstStep)
if err != nil {
return errors.Wrap( ////////////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "this workflow is not valid"),
)
}
m.Logger.Info("New workflow [begin]", wf.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
m.Logger.Info("New job", job.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
job.Public = wf.Data
if err := m.storageInsertWorkflow(wf, job); err != nil {
return errors.WithMessage(err, "unable to insert this workflow") ///////////////////////////////////////////////
}
return nil
}
func (m *Model) CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error) {
if err := m.validateWorkflow(wf); err != nil {
return nil, errors.Wrap( ///////////////////////////////////////////////////////////////////////////////////////
ErrValidation,
errors.WithMessage(err, "unable to create this workflow"),
if err := m.createWorkflow(wf); err != nil {
m.Logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Unable to create this workflow",
append(
wf.Fields(),
"reason", err,
)...,
)
}
if err := m.insertWorkflow(wf); err != nil {
return nil, errors.WithMessage(err, "unable to insert this workflow") //////////////////////////////////////////
return nil, err
}
return wf, nil
@ -189,10 +192,12 @@ func (m *Model) updateWorkflow(wf *jw.Workflow, job *jw.Job) error {
return err
}
// AFINIR: statut du workflow
if stepName == "" {
wf.Status = jw.StatusSucceeded
wf.Finished()
return m.storeWorkflow(wf, job, nil)
return m.storageUpdateWorkflow(wf, job, nil)
}
nextJob, err := m.nextJob(wf, job, stepName, data)
@ -200,7 +205,7 @@ func (m *Model) updateWorkflow(wf *jw.Workflow, job *jw.Job) error {
return err
}
return m.storeWorkflow(wf, job, nextJob)
return m.storageUpdateWorkflow(nil, job, nextJob)
}
func (m *Model) SetWorkflowPriority(id string, priority jw.Priority) error {

14
internal/storage/memory/workflows.go

@ -64,15 +64,17 @@ func (m *memory) UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
i, w := m.findWorkflow(wf.ID)
if w == nil {
return errors.New("this workflow does not exist") //////////////////////////////////////////////////////////////
if err := m.updateJob(job); err != nil {
return err
}
m.workflows[i] = wf
if wf != nil {
i, w := m.findWorkflow(wf.ID)
if w == nil {
return errors.New("this workflow does not exist") //////////////////////////////////////////////////////////
}
if err := m.updateJob(job); err != nil {
return err
m.workflows[i] = wf
}
if nextJob == nil {

Loading…
Cancel
Save