From cc5530d2c59c6e0be6749b52d74a0b07513505e9 Mon Sep 17 00:00:00 2001 From: losyme Date: Fri, 15 Apr 2022 14:54:18 +0200 Subject: [PATCH] =?UTF-8?q?En=20cours=20de=20d=C3=A9veloppement?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/storage/mongo/jobs.go | 4 ++++ internal/storage/mongo/workflows.go | 33 ++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/internal/storage/mongo/jobs.go b/internal/storage/mongo/jobs.go index 30d14c1..30a8736 100644 --- a/internal/storage/mongo/jobs.go +++ b/internal/storage/mongo/jobs.go @@ -82,6 +82,10 @@ func (ms *mongoStorage) InsertJob(job *jw.Job) (bool, error) { return ms.maybeInsertJob(ctx, job) } +func (ms *mongoStorage) txUpdateJob(ctx mongo.SessionContext, job *jw.Job) error { + return ms.cJobs().FindOneAndReplace(ctx, bson.M{"_id": job.ID}, job).Err() +} + func (ms *mongoStorage) UpdateJob(job *jw.Job) error { ctx, cancel := util.CtxWithTimeout(5 * time.Second) defer cancel() diff --git a/internal/storage/mongo/workflows.go b/internal/storage/mongo/workflows.go index a99d210..0ad8f4e 100644 --- a/internal/storage/mongo/workflows.go +++ b/internal/storage/mongo/workflows.go @@ -69,7 +69,38 @@ func (ms *mongoStorage) Workflow(id string, mustExist bool) (*jw.Workflow, error } func (ms *mongoStorage) UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error { - return errors.NotImplemented() + ctx, cancel := util.CtxWithTimeout(5 * time.Second) + defer cancel() + + session, err := ms.client.StartSession() + if err != nil { + return err + } + + defer session.EndSession(ctx) + + _, err = session.WithTransaction( + ctx, + func(ctx mongo.SessionContext) (interface{}, error) { + if err := ms.txUpdateJob(ctx, job); err != nil { + return nil, err + } + + if wf != nil { + if err := ms.cWorkflows().FindOneAndReplace(ctx, bson.M{"_id": wf.ID}, wf).Err(); err != nil { + return nil, err + } + } + + if nextJob == nil { + return nil, nil + } + + return nil, ms.txInsertJob(ctx, nextJob) + }, + ) + + return err } func (ms *mongoStorage) SetWorkflowPriority(id string, priority jw.Priority) error {