Browse Source

En cours de développement

master
losyme 2 months ago
parent
commit
719405ebe3
  1. 102
      internal/storage/mongo/next.go

102
internal/storage/mongo/next.go

@ -19,11 +19,74 @@ import (
"forge.chapril.org/dune/dune/internal/storage"
)
func jobsRunning(ctx mongo.SessionContext, col *mongo.Collection, namespace string) (storage.JobsRunning, error) {
cur, err := col.Find(
ctx,
bson.M{
"namespace": namespace,
"status": jw.StatusRunning,
},
)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
jr := make(storage.JobsRunning)
job := new(jw.Job)
for cur.Next(ctx) {
if err := cur.Decode(job); err != nil {
return nil, err
}
jr[job.Type] += 1
}
if err := cur.Err(); err != nil {
return nil, err
}
return jr, nil
}
type jobsToRun struct {
ctx mongo.SessionContext
cursor *mongo.Cursor
}
func newJobsToRun(ctx mongo.SessionContext, col *mongo.Collection, namespace string) (*jobsToRun, error) {
cur, err := col.Find(
ctx,
bson.M{
"namespace": namespace,
"run_after": bson.M{"$lt": time.Now()},
"$or": []bson.M{
{"status": jw.StatusTodo},
{"status": jw.StatusPending},
},
},
options.Find().SetSort(
bson.D{
primitive.E{Key: "priority", Value: 1},
primitive.E{Key: "weight", Value: -1},
primitive.E{Key: "time_reference", Value: -1},
},
),
)
if err != nil {
return nil, err
}
jtr := &jobsToRun{
ctx: ctx,
cursor: cur,
}
return jtr, nil
}
func (jtr *jobsToRun) Next() (*jw.Job, error) {
cur := jtr.cursor
@ -41,8 +104,12 @@ func (jtr *jobsToRun) Next() (*jw.Job, error) {
return nil, cur.Err()
}
func (jtr *jobsToRun) close() {
jtr.cursor.Close(jtr.ctx)
}
func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, error) {
collection := ms.Collection("jobs")
col := ms.Collection("jobs")
ctx, cancel := util.CtxWithTimeout(10 * time.Second)
defer cancel()
@ -57,43 +124,24 @@ func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw
result, err := session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
var jr storage.JobsRunning
cur, err := collection.Find(
ctx,
bson.M{
"namespace": namespace,
"run_after": bson.M{"$lt": time.Now()},
"$or": []bson.M{
{"status": jw.StatusTodo},
{"status": jw.StatusPending},
},
},
options.Find().SetSort(
bson.D{
primitive.E{Key: "priority", Value: 1},
primitive.E{Key: "weight", Value: -1},
primitive.E{Key: "time_reference", Value: -1},
},
),
)
jr, err := jobsRunning(ctx, col, namespace)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
jtr := &jobsToRun{
ctx: ctx,
cursor: cur,
jtr, err := newJobsToRun(ctx, col, namespace)
if err != nil {
return nil, err
}
defer jtr.close()
job, err := fn(jr, jtr)
if err != nil || job == nil {
return nil, err
}
_, err = collection.UpdateByID(ctx, job.ID, bson.M{"$set": bson.M{"status": jw.StatusRunning}})
_, err = col.UpdateByID(ctx, job.ID, bson.M{"$set": bson.M{"status": jw.StatusRunning}})
if err != nil {
return nil, err
}

Loading…
Cancel
Save