diff --git a/internal/storage/mongo/jobs.go b/internal/storage/mongo/jobs.go index 033ea47..434d015 100644 --- a/internal/storage/mongo/jobs.go +++ b/internal/storage/mongo/jobs.go @@ -11,6 +11,9 @@ import ( "forge.chapril.org/dune/jw" "forge.chapril.org/losyme/errors" + "forge.chapril.org/losyme/util" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" ) func (ms *mongoStorage) ValidateJob(_ *jw.Job) error { @@ -18,7 +21,58 @@ func (ms *mongoStorage) ValidateJob(_ *jw.Job) error { } func (ms *mongoStorage) InsertJob(job *jw.Job) (bool, error) { - return false, errors.NotImplemented() + collection := ms.Collection("jobs") + + ctx, cancel := util.CtxWithTimeout(5 * time.Second) + defer cancel() + + if job.Category == nil { + _, err := collection.InsertOne(ctx, job) + if err != nil { + return false, err + } + + return true, nil + } + + session, err := ms.client.StartSession() + if err != nil { + return false, err + } + + defer session.EndSession(ctx) + + result, err := session.WithTransaction( + ctx, + func(ctx mongo.SessionContext) (interface{}, error) { + err := collection.FindOne( + ctx, + bson.M{ + "category": *job.Category, + "namespace": job.Namespace, + "type": job.Type, + "workflow": nil, + "$or": []bson.M{ + {"status": jw.StatusTodo}, + {"status": jw.StatusPending}, + {"status": jw.StatusRunning}, + }, + }, + ).Err() + if err == nil || err != mongo.ErrNoDocuments { + return false, err + } + + _, err = collection.InsertOne(ctx, job) + if err != nil { + return false, err + } + + return true, nil + }, + ) + + return result.(bool), err } func (ms *mongoStorage) UpdateJob(job *jw.Job) error {