|
|
|
@ -7,13 +7,92 @@
|
|
|
|
|
package mongo |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"forge.chapril.org/dune/jw" |
|
|
|
|
"forge.chapril.org/losyme/util" |
|
|
|
|
"go.mongodb.org/mongo-driver/bson" |
|
|
|
|
"go.mongodb.org/mongo-driver/mongo" |
|
|
|
|
|
|
|
|
|
"forge.chapril.org/dune/dune/internal/storage" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type jobsToRun struct { |
|
|
|
|
ctx mongo.SessionContext |
|
|
|
|
cursor *mongo.Cursor |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (jtr *jobsToRun) Next() (*jw.Job, error) { |
|
|
|
|
cur := jtr.cursor |
|
|
|
|
|
|
|
|
|
if cur.Next(jtr.ctx) { |
|
|
|
|
job := new(jw.Job) |
|
|
|
|
|
|
|
|
|
err := cur.Decode(job) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return job, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil, cur.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, error) { |
|
|
|
|
return nil, nil |
|
|
|
|
collection := ms.Collection("jobs") |
|
|
|
|
|
|
|
|
|
ctx, cancel := util.CtxWithTimeout(10 * time.Second) |
|
|
|
|
defer cancel() |
|
|
|
|
|
|
|
|
|
session, err := ms.client.StartSession() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
defer session.EndSession(ctx) |
|
|
|
|
|
|
|
|
|
result, err := session.WithTransaction( |
|
|
|
|
ctx, |
|
|
|
|
func(ctx mongo.SessionContext) (interface{}, error) { |
|
|
|
|
var jr storage.JobsRunning |
|
|
|
|
|
|
|
|
|
cur, err := collection.Find( |
|
|
|
|
ctx, |
|
|
|
|
bson.M{ |
|
|
|
|
"namespace": namespace, |
|
|
|
|
"$or": []bson.M{ |
|
|
|
|
{"status": jw.StatusTodo}, |
|
|
|
|
{"status": jw.StatusPending}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
defer cur.Close(ctx) |
|
|
|
|
|
|
|
|
|
jtr := &jobsToRun{ |
|
|
|
|
ctx: ctx, |
|
|
|
|
cursor: cur, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
job, err := fn(jr, jtr) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
_, err = collection.UpdateByID(ctx, job.ID, bson.M{"$set": bson.M{"status": jw.StatusRunning}}) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return job, nil |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return result.(*jw.Job), err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* |
|
|
|
|