Browse Source

En cours de développement

master
losyme 2 months ago
parent
commit
97a72763a0
  1. 4
      internal/application/events.go
  2. 27
      internal/application/storage.go
  3. 8
      internal/storage/memory/clean.go
  4. 15
      internal/storage/memory/memory.go
  5. 2
      internal/storage/mongo/clean.go
  6. 18
      internal/storage/mongo/mongo.go
  7. 2
      internal/storage/storage.go

4
internal/application/events.go

@ -14,8 +14,8 @@ func (app *Application) onMessage(msg *scheduler.Message) {
cs.Logger().Info("Event", "name", msg.Event) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
switch msg.Event {
case "storage.purge":
if err := cs.Storage().Purge(); err != nil {
case "storage.clean":
if err := cs.Storage().Clean(); err != nil {
cs.Logger().Error("Cannot delete old jobs/workflows", nil, err) //::::::::::::::::::::::::::::::::::::::::::
}
}

27
internal/application/storage.go

@ -7,6 +7,9 @@
package application
import (
"time"
"forge.chapril.org/losyme/config"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/minikit/minikit"
@ -16,6 +19,8 @@ import (
"forge.chapril.org/dune/dune/internal/storage/mongo"
)
const _defaultCleanAfter = 30 * time.Minute
type cStorage struct {
minikit.Cpt
cs *components.Components
@ -31,9 +36,27 @@ func (c *cStorage) Name() string {
return "storage"
}
func (c *cStorage) cleanAfter() (time.Duration, error) {
ca, err := c.cs.Config().GetDuration("storage", "clean")
if err == nil {
return ca, nil
}
if errors.Is(err, config.ErrKeyNotFound) {
return _defaultCleanAfter, nil
}
return 0, err
}
func (c *cStorage) Build() error {
cs := c.cs
cleanAfter, err := c.cleanAfter()
if err != nil {
return err
}
impl, err := cs.Config().GetString("storage", "impl")
if err != nil {
return err
@ -43,9 +66,9 @@ func (c *cStorage) Build() error {
switch impl {
case "memory":
storage = memory.New()
storage = memory.New(cleanAfter)
case "mongo":
storage, err = mongo.New(cs)
storage, err = mongo.New(cs, cleanAfter)
if err != nil {
return err
}

8
internal/storage/memory/purge.go → internal/storage/memory/clean.go

@ -12,9 +12,7 @@ import (
"forge.chapril.org/dune/jw"
)
func (ms *memoryStorage) Purge() error {
duration := 7 * 24 * time.Hour
func (ms *memoryStorage) Clean() error {
now := time.Now()
deleted := make(map[string]bool)
@ -24,7 +22,7 @@ func (ms *memoryStorage) Purge() error {
workflows := make([]*jw.Workflow, 0, len(ms.workflows))
for _, wf := range ms.workflows {
if wf.FinishedAt != nil || now.After(wf.FinishedAt.Add(duration)) {
if wf.FinishedAt != nil || now.After(wf.FinishedAt.Add(ms.cleanAfter)) {
deleted[wf.ID] = true
continue
}
@ -36,7 +34,7 @@ func (ms *memoryStorage) Purge() error {
for _, job := range ms.jobs {
if job.Workflow == nil {
if job.FinishedAt != nil && now.After(job.FinishedAt.Add(duration)) {
if job.FinishedAt != nil && now.After(job.FinishedAt.Add(ms.cleanAfter)) {
continue
}
} else if deleted[*(job.Workflow)] {

15
internal/storage/memory/memory.go

@ -8,6 +8,7 @@ package memory
import (
"sync"
"time"
"forge.chapril.org/dune/jw"
@ -15,15 +16,17 @@ import (
)
type memoryStorage struct {
sm sync.Mutex
jobs []*jw.Job
workflows []*jw.Workflow
sm sync.Mutex
jobs []*jw.Job
workflows []*jw.Workflow
cleanAfter time.Duration
}
func New() storage.Storage {
func New(cleanAfter time.Duration) storage.Storage {
return &memoryStorage{
jobs: make([]*jw.Job, 0),
workflows: make([]*jw.Workflow, 0),
jobs: make([]*jw.Job, 0),
workflows: make([]*jw.Workflow, 0),
cleanAfter: cleanAfter,
}
}

2
internal/storage/mongo/purge.go → internal/storage/mongo/clean.go

@ -6,7 +6,7 @@
package mongo
func (ms *mongoStorage) Purge() error {
func (ms *mongoStorage) Clean() error {
return nil
}

18
internal/storage/mongo/mongo.go

@ -7,6 +7,8 @@
package mongo
import (
"time"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/logger"
_mgo "forge.chapril.org/losyme/mongo"
@ -18,9 +20,10 @@ import (
)
type mongoStorage struct {
client *mongo.Client
db *mongo.Database
logger *logger.Logger
client *mongo.Client
db *mongo.Database
cleanAfter time.Duration
logger *logger.Logger
}
func loadConfig(cs *components.Components) (*_mgo.Config, error) {
@ -42,7 +45,7 @@ func loadConfig(cs *components.Components) (*_mgo.Config, error) {
return cfg, nil
}
func New(cs *components.Components) (storage.Storage, error) {
func New(cs *components.Components, cleanAfter time.Duration) (storage.Storage, error) {
logger, err := cs.Logger().NewLogger(uuid.New(), "storage")
if err != nil {
return nil, err
@ -61,9 +64,10 @@ func New(cs *components.Components) (storage.Storage, error) {
database := client.Database(cs.Application().Name())
ms := &mongoStorage{
client: client,
db: database,
logger: logger,
client: client,
db: database,
cleanAfter: cleanAfter,
logger: logger,
}
return ms, nil

2
internal/storage/storage.go

@ -56,7 +56,7 @@ type Storage interface {
UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error
SetWorkflowPriority(id string, priority jw.Priority) error
Purge() error
Clean() error
State() (*State, error)
Close() error
}

Loading…
Cancel
Save