Browse Source

En cours de développement

master
losyme 2 months ago
parent
commit
ee3ee862be
  1. 49
      <Plug>_<Plug>_
  2. 72
      internal/storage/mongo/clean.go

49
<Plug>_<Plug>_

@ -0,0 +1,49 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021-2022 losyme ################################################ MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package mongo
import (
"time"
"forge.chapril.org/losyme/util"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
func (ms *mongoStorage) Clean() error {
ctx, cancel := util.CtxWithTimeout(10 * time.Second)
defer cancel()
session, err := ms.client.StartSession()
if err != nil {
return nil, err
}
defer session.EndSession(ctx)
_, err := session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
_, err := ms.cJobs.DeleteMany(
ctx,
bson.M{
"workflow": nil,
"finished_at": bson.M{"$lt": time.Now().Sub(ms.cleanAfter)},
},
)
if err != nil {
return nil, err
}
},
)
return err
}
/*
######################################################################################################## @(°_°)@ #######
*/

72
internal/storage/mongo/clean.go

@ -6,8 +6,78 @@
package mongo
import (
"time"
"forge.chapril.org/losyme/util"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func (ms *mongoStorage) Clean() error {
return nil
ctx, cancel := util.CtxWithTimeout(10 * time.Second)
defer cancel()
session, err := ms.client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
_, err = session.WithTransaction(
ctx,
func(ctx mongo.SessionContext) (interface{}, error) {
_, err := ms.cJobs().DeleteMany(
ctx,
bson.M{
"workflow": nil,
"finished_at": bson.M{"$lt": time.Now().Add(-ms.cleanAfter)},
},
)
if err != nil {
return nil, err
}
cur, err := ms.cWorkflows().Find(
ctx,
bson.M{
"finished_at": bson.M{"$lt": time.Now().Add(-ms.cleanAfter)},
},
options.Find().SetProjection(bson.M{"_id": 1}),
)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
var results []bson.D
if err := cur.All(ctx, &results); err != nil {
return nil, err
}
for _, rs := range results {
wfID := rs[0].Value
_, err = ms.cJobs().DeleteMany(ctx, bson.M{"workflow": wfID})
if err != nil {
return nil, err
}
_, err = ms.cWorkflows().DeleteOne(ctx, bson.M{"_id": wfID})
if err != nil {
return nil, err
}
}
return nil, nil
},
)
return err
}
/*

Loading…
Cancel
Save