Gestionnaire de jobs et workflows
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
2.6 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021-2022 losyme ################################################ MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package mongo
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/util"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
func (ms *mongoStorage) ValidateWorkflow(_ *jw.Workflow) error {
return nil
}
func (ms *mongoStorage) InsertWorkflow(wf *jw.Workflow, job *jw.Job) error {
ctx, cancel := util.CtxWithTimeout(5 * time.Second)
defer cancel()
session, err := ms.client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
_, err = session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
if err := ms.txInsertJob(ctx, job); err != nil {
return nil, err
}
_, err := ms.cWorkflows().InsertOne(ctx, wf)
return nil, err
},
)
return err
}
func (ms *mongoStorage) Workflow(id string, mustExist bool) (*jw.Workflow, error) {
ctx, cancel := util.CtxWithTimeout(5 * time.Second)
defer cancel()
wf := new(jw.Workflow)
err := ms.cWorkflows().FindOne(ctx, bson.M{"_id": id}).Decode(wf)
if err == nil {
return wf, nil
}
if err != mongo.ErrNoDocuments {
return nil, err
}
if mustExist {
return nil, errors.New("this workflow does not exist", "id", id) ///////////////////////////////////////////////
}
return nil, nil
}
func (ms *mongoStorage) UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
ctx, cancel := util.CtxWithTimeout(5 * time.Second)
defer cancel()
session, err := ms.client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
_, err = session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
if err := ms.txUpdateJob(ctx, job); err != nil {
return nil, err
}
if wf != nil {
if err := ms.cWorkflows().FindOneAndReplace(ctx, bson.M{"_id": wf.ID}, wf).Err(); err != nil {
return nil, err
}
}
if nextJob == nil {
return nil, nil
}
return nil, ms.txInsertJob(ctx, nextJob)
},
)
return err
}
func (ms *mongoStorage) SetWorkflowPriority(id string, priority jw.Priority) error {
return errors.NotImplemented()
}
/*
######################################################################################################## @(°_°)@ #######
*/