From 719405ebe3fe10e5cacc8913e2ff4097ff5c621c Mon Sep 17 00:00:00 2001 From: losyme Date: Thu, 14 Apr 2022 15:54:17 +0200 Subject: [PATCH] =?UTF-8?q?En=20cours=20de=20d=C3=A9veloppement?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/storage/mongo/next.go | 102 ++++++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 27 deletions(-) diff --git a/internal/storage/mongo/next.go b/internal/storage/mongo/next.go index 0f17a11..b6766c6 100644 --- a/internal/storage/mongo/next.go +++ b/internal/storage/mongo/next.go @@ -19,11 +19,74 @@ import ( "forge.chapril.org/dune/dune/internal/storage" ) +func jobsRunning(ctx mongo.SessionContext, col *mongo.Collection, namespace string) (storage.JobsRunning, error) { + cur, err := col.Find( + ctx, + bson.M{ + "namespace": namespace, + "status": jw.StatusRunning, + }, + ) + if err != nil { + return nil, err + } + + defer cur.Close(ctx) + + jr := make(storage.JobsRunning) + job := new(jw.Job) + + for cur.Next(ctx) { + if err := cur.Decode(job); err != nil { + return nil, err + } + + jr[job.Type] += 1 + } + + if err := cur.Err(); err != nil { + return nil, err + } + + return jr, nil +} + type jobsToRun struct { ctx mongo.SessionContext cursor *mongo.Cursor } +func newJobsToRun(ctx mongo.SessionContext, col *mongo.Collection, namespace string) (*jobsToRun, error) { + cur, err := col.Find( + ctx, + bson.M{ + "namespace": namespace, + "run_after": bson.M{"$lt": time.Now()}, + "$or": []bson.M{ + {"status": jw.StatusTodo}, + {"status": jw.StatusPending}, + }, + }, + options.Find().SetSort( + bson.D{ + primitive.E{Key: "priority", Value: 1}, + primitive.E{Key: "weight", Value: -1}, + primitive.E{Key: "time_reference", Value: -1}, + }, + ), + ) + if err != nil { + return nil, err + } + + jtr := &jobsToRun{ + ctx: ctx, + cursor: cur, + } + + return jtr, nil +} + func (jtr *jobsToRun) Next() (*jw.Job, error) { cur := jtr.cursor @@ -41,8 +104,12 @@ func (jtr *jobsToRun) Next() (*jw.Job, error) { return nil, cur.Err() } +func (jtr *jobsToRun) close() { + jtr.cursor.Close(jtr.ctx) +} + func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, error) { - collection := ms.Collection("jobs") + col := ms.Collection("jobs") ctx, cancel := util.CtxWithTimeout(10 * time.Second) defer cancel() @@ -57,43 +124,24 @@ func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw result, err := session.WithTransaction( ctx, func(ctx mongo.SessionContext) (interface{}, error) { - var jr storage.JobsRunning - - cur, err := collection.Find( - ctx, - bson.M{ - "namespace": namespace, - "run_after": bson.M{"$lt": time.Now()}, - "$or": []bson.M{ - {"status": jw.StatusTodo}, - {"status": jw.StatusPending}, - }, - }, - options.Find().SetSort( - bson.D{ - primitive.E{Key: "priority", Value: 1}, - primitive.E{Key: "weight", Value: -1}, - primitive.E{Key: "time_reference", Value: -1}, - }, - ), - ) + jr, err := jobsRunning(ctx, col, namespace) if err != nil { return nil, err } - defer cur.Close(ctx) - - jtr := &jobsToRun{ - ctx: ctx, - cursor: cur, + jtr, err := newJobsToRun(ctx, col, namespace) + if err != nil { + return nil, err } + defer jtr.close() + job, err := fn(jr, jtr) if err != nil || job == nil { return nil, err } - _, err = collection.UpdateByID(ctx, job.ID, bson.M{"$set": bson.M{"status": jw.StatusRunning}}) + _, err = col.UpdateByID(ctx, job.ID, bson.M{"$set": bson.M{"status": jw.StatusRunning}}) if err != nil { return nil, err }