diff --git a/__ b/__ new file mode 100644 index 0000000..73eb21e --- /dev/null +++ b/__ @@ -0,0 +1,49 @@ +/* +------------------------------------------------------------------------------------------------------------------------ +####### dune ####### Copyright (c) 2021-2022 losyme ################################################ MIT License ####### +------------------------------------------------------------------------------------------------------------------------ +*/ + +package mongo + +import ( + "time" + + "forge.chapril.org/losyme/util" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +func (ms *mongoStorage) Clean() error { + ctx, cancel := util.CtxWithTimeout(10 * time.Second) + defer cancel() + + session, err := ms.client.StartSession() + if err != nil { + return nil, err + } + + defer session.EndSession(ctx) + + _, err := session.WithTransaction( + ctx, + func(ctx mongo.SessionContext) (interface{}, error) { + _, err := ms.cJobs.DeleteMany( + ctx, + bson.M{ + "workflow": nil, + "finished_at": bson.M{"$lt": time.Now().Sub(ms.cleanAfter)}, + }, + ) + if err != nil { + return nil, err + } + }, + ) + + return err +} + +/* +######################################################################################################## @(°_°)@ ####### +*/ diff --git a/internal/storage/mongo/clean.go b/internal/storage/mongo/clean.go index 816ace7..bb9fded 100644 --- a/internal/storage/mongo/clean.go +++ b/internal/storage/mongo/clean.go @@ -6,8 +6,78 @@ package mongo +import ( + "time" + + "forge.chapril.org/losyme/util" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + func (ms *mongoStorage) Clean() error { - return nil + ctx, cancel := util.CtxWithTimeout(10 * time.Second) + defer cancel() + + session, err := ms.client.StartSession() + if err != nil { + return err + } + + defer session.EndSession(ctx) + + _, err = session.WithTransaction( + ctx, + func(ctx mongo.SessionContext) (interface{}, error) { + _, err := ms.cJobs().DeleteMany( + ctx, + bson.M{ + "workflow": nil, + "finished_at": bson.M{"$lt": time.Now().Add(-ms.cleanAfter)}, + }, + ) + if err != nil { + return nil, err + } + + cur, err := ms.cWorkflows().Find( + ctx, + bson.M{ + "finished_at": bson.M{"$lt": time.Now().Add(-ms.cleanAfter)}, + }, + options.Find().SetProjection(bson.M{"_id": 1}), + ) + if err != nil { + return nil, err + } + + defer cur.Close(ctx) + + var results []bson.D + + if err := cur.All(ctx, &results); err != nil { + return nil, err + } + + for _, rs := range results { + wfID := rs[0].Value + + _, err = ms.cJobs().DeleteMany(ctx, bson.M{"workflow": wfID}) + if err != nil { + return nil, err + } + + _, err = ms.cWorkflows().DeleteOne(ctx, bson.M{"_id": wfID}) + if err != nil { + return nil, err + } + } + + return nil, nil + }, + ) + + return err } /*