Browse Source

En cours de développement

master
losyme 2 months ago
parent
commit
6090cbb58a
  1. 22
      internal/storage/mongo/jobs.go
  2. 8
      internal/storage/mongo/mongo.go
  3. 68
      internal/storage/mongo/next.go
  4. 28
      internal/storage/mongo/workflows.go

22
internal/storage/mongo/jobs.go

@ -21,7 +21,12 @@ func (ms *mongoStorage) ValidateJob(_ *jw.Job) error {
return nil
}
func (ms *mongoStorage) txInsertJob(ctx context.Context, job *jw.Job, col *mongo.Collection) (bool, error) {
func (ms *mongoStorage) txInsertJob(ctx mongo.SessionContext, job *jw.Job) error {
_, err := ms.cJobs().InsertOne(ctx, job)
return err
}
func (ms *mongoStorage) maybeInsertJob(ctx context.Context, job *jw.Job) (bool, error) {
session, err := ms.client.StartSession()
if err != nil {
return false, err
@ -32,7 +37,7 @@ func (ms *mongoStorage) txInsertJob(ctx context.Context, job *jw.Job, col *mongo
result, err := session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
err := col.FindOne(
err := ms.cJobs().FindOne(
ctx,
bson.M{
"category": *job.Category,
@ -50,8 +55,7 @@ func (ms *mongoStorage) txInsertJob(ctx context.Context, job *jw.Job, col *mongo
return false, err
}
_, err = col.InsertOne(ctx, job)
if err != nil {
if err := ms.txInsertJob(ctx, job); err != nil {
return false, err
}
@ -63,13 +67,11 @@ func (ms *mongoStorage) txInsertJob(ctx context.Context, job *jw.Job, col *mongo
}
func (ms *mongoStorage) InsertJob(job *jw.Job) (bool, error) {
col := ms.Collection("jobs")
ctx, cancel := util.CtxWithTimeout(5 * time.Second)
defer cancel()
if job.Category == nil {
_, err := col.InsertOne(ctx, job)
_, err := ms.cJobs().InsertOne(ctx, job)
if err != nil {
return false, err
}
@ -77,16 +79,14 @@ func (ms *mongoStorage) InsertJob(job *jw.Job) (bool, error) {
return true, nil
}
return ms.txInsertJob(ctx, job, col)
return ms.maybeInsertJob(ctx, job)
}
func (ms *mongoStorage) UpdateJob(job *jw.Job) error {
col := ms.Collection("jobs")
ctx, cancel := util.CtxWithTimeout(5 * time.Second)
defer cancel()
return col.FindOneAndReplace(ctx, bson.M{"_id": job.ID}, job).Err()
return ms.cJobs().FindOneAndReplace(ctx, bson.M{"_id": job.ID}, job).Err()
}
func (ms *mongoStorage) NotifyJob(id string, data interface{}) error {

8
internal/storage/mongo/mongo.go

@ -73,6 +73,14 @@ func (ms *mongoStorage) Collection(name string) *mongo.Collection {
return ms.db.Collection(name)
}
func (ms *mongoStorage) cJobs() *mongo.Collection {
return ms.Collection("jobs")
}
func (ms *mongoStorage) cWorkflows() *mongo.Collection {
return ms.Collection("workflows")
}
func (ms *mongoStorage) State() (*storage.State, error) {
return nil, errors.NotImplemented()
}

68
internal/storage/mongo/next.go

@ -19,8 +19,34 @@ import (
"forge.chapril.org/dune/dune/internal/storage"
)
func jobsRunning(ctx mongo.SessionContext, col *mongo.Collection, namespace string) (storage.JobsRunning, error) {
cur, err := col.Find(
type jobsToRun struct {
ctx mongo.SessionContext
cursor *mongo.Cursor
}
func (jtr *jobsToRun) Next() (*jw.Job, error) {
cur := jtr.cursor
if cur.Next(jtr.ctx) {
job := new(jw.Job)
err := cur.Decode(job)
if err != nil {
return nil, err
}
return job, nil
}
return nil, cur.Err()
}
func (jtr *jobsToRun) close() {
jtr.cursor.Close(jtr.ctx)
}
func (ms *mongoStorage) jobsRunning(ctx mongo.SessionContext, namespace string) (storage.JobsRunning, error) {
cur, err := ms.cJobs().Find(
ctx,
bson.M{
"namespace": namespace,
@ -51,13 +77,8 @@ func jobsRunning(ctx mongo.SessionContext, col *mongo.Collection, namespace stri
return jr, nil
}
type jobsToRun struct {
ctx mongo.SessionContext
cursor *mongo.Cursor
}
func newJobsToRun(ctx mongo.SessionContext, col *mongo.Collection, namespace string) (*jobsToRun, error) {
cur, err := col.Find(
func (ms *mongoStorage) newJobsToRun(ctx mongo.SessionContext, namespace string) (*jobsToRun, error) {
cur, err := ms.cJobs().Find(
ctx,
bson.M{
"namespace": namespace,
@ -87,30 +108,7 @@ func newJobsToRun(ctx mongo.SessionContext, col *mongo.Collection, namespace str
return jtr, nil
}
func (jtr *jobsToRun) Next() (*jw.Job, error) {
cur := jtr.cursor
if cur.Next(jtr.ctx) {
job := new(jw.Job)
err := cur.Decode(job)
if err != nil {
return nil, err
}
return job, nil
}
return nil, cur.Err()
}
func (jtr *jobsToRun) close() {
jtr.cursor.Close(jtr.ctx)
}
func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, error) {
col := ms.Collection("jobs")
ctx, cancel := util.CtxWithTimeout(10 * time.Second)
defer cancel()
@ -124,12 +122,12 @@ func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw
result, err := session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
jr, err := jobsRunning(ctx, col, namespace)
jr, err := ms.jobsRunning(ctx, namespace)
if err != nil {
return nil, err
}
jtr, err := newJobsToRun(ctx, col, namespace)
jtr, err := ms.newJobsToRun(ctx, namespace)
if err != nil {
return nil, err
}
@ -141,7 +139,7 @@ func (ms *mongoStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw
return nil, err
}
_, err = col.UpdateByID(ctx, job.ID, bson.M{"$set": bson.M{"status": jw.StatusRunning}})
_, err = ms.cJobs().UpdateByID(ctx, job.ID, bson.M{"$set": bson.M{"status": jw.StatusRunning}})
if err != nil {
return nil, err
}

28
internal/storage/mongo/workflows.go

@ -7,8 +7,12 @@
package mongo
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/util"
"go.mongodb.org/mongo-driver/mongo"
)
func (ms *mongoStorage) ValidateWorkflow(_ *jw.Workflow) error {
@ -16,7 +20,29 @@ func (ms *mongoStorage) ValidateWorkflow(_ *jw.Workflow) error {
}
func (ms *mongoStorage) InsertWorkflow(wf *jw.Workflow, job *jw.Job) error {
return errors.NotImplemented()
ctx, cancel := util.CtxWithTimeout(5 * time.Second)
defer cancel()
session, err := ms.client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
_, err = session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
if err := ms.txInsertJob(ctx, job); err != nil {
return nil, err
}
_, err := ms.cWorkflows().InsertOne(ctx, wf)
return nil, err
},
)
return err
}
func (ms *mongoStorage) Workflow(id string, mustExist bool) (*jw.Workflow, error) {

Loading…
Cancel
Save