Browse Source

En cours de développement

master
losyme 2 months ago
parent
commit
362ff16fbc
  1. 133
      internal/storage/memory/jobs.go
  2. 69
      internal/storage/memory/memory.go
  3. 107
      internal/storage/memory/workflows.go

133
internal/storage/memory/jobs.go

@ -0,0 +1,133 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021 losyme ##################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package memory
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
)
func (m *memory) findJob(id string) (int, *jw.Job) {
for i, job := range m.jobs {
if job.ID == id {
return i, job
}
}
return 0, nil
}
func (m *memory) ValidateJob(_ *jw.Job) error {
return nil
}
func (m *memory) insertJob(job *jw.Job) (bool, error) {
if _, j := m.findJob(job.ID); j != nil {
return false, errors.New( //////////////////////////////////////////////////////////////////////////////////////
"this job identifier already exists",
"id", job.ID,
)
}
if job.Category != nil && job.Workflow == nil {
for _, j := range m.jobs {
if j.Category != nil && j.Workflow == nil && *j.Category == *job.Category &&
j.Namespace == job.Namespace &&
j.Type == job.Type &&
(j.Status == jw.StatusTodo || j.Status == jw.StatusPending || j.Status == jw.StatusRunning) {
return false, nil
}
}
}
m.jobs = append(m.jobs, job)
return true, nil
}
func (m *memory) InsertJob(job *jw.Job) (bool, error) {
m.sm.Lock()
defer m.sm.Unlock()
return m.insertJob(job)
}
func (m *memory) NextJob(namespace string) (*jw.Job, error) {
return nil, nil
}
func (m *memory) updateJob(job *jw.Job) error {
if i, j := m.findJob(job.ID); j != nil {
m.jobs[i] = job
return nil
}
return errors.New("this job does not exist") ///////////////////////////////////////////////////////////////////////
}
func (m *memory) UpdateJob(job *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
return m.updateJob(job)
}
func (m *memory) NotifyJob(id string, data interface{}) error {
m.sm.Lock()
defer m.sm.Unlock()
_, job := m.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
// AFINIR
return errors.NotImplemented()
}
func (m *memory) SetJobPriority(id string, priority jw.Priority) error {
m.sm.Lock()
defer m.sm.Unlock()
_, job := m.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
if job.Status == jw.StatusSucceeded || job.Status == jw.StatusFailed {
return errors.New("this job is no longer running", "job", id) //////////////////////////////////////////////////
}
job.Priority = priority
return nil
}
func (m *memory) SetJobRunAfter(id string, duration time.Duration) error {
m.sm.Lock()
defer m.sm.Unlock()
_, job := m.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
if job.Status != jw.StatusTodo && job.Status != jw.StatusPending {
return errors.New("this job is not waiting to be executed", "job", id) /////////////////////////////////////////
}
job.RunAfter = time.Now().Add(duration)
return nil
}
/*
######################################################################################################## @(°_°)@ #######
*/

69
internal/storage/memory/memory.go

@ -0,0 +1,69 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021 losyme ##################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package memory
import (
"sync"
"forge.chapril.org/dune/jw"
)
type memory struct {
sm sync.Mutex
jobs []*jw.Job
workflows []*jw.Workflow
}
func New() jw.Storage {
return &memory{
jobs: make([]*jw.Job, 0),
workflows: make([]*jw.Workflow, 0),
}
}
func (m *memory) Stats() (*jw.Stats, error) {
stats := new(jw.Stats)
m.sm.Lock()
defer m.sm.Unlock()
for _, job := range m.jobs {
switch job.Status {
case jw.StatusTodo:
stats.Jobs.Todo += 1
case jw.StatusPending:
stats.Jobs.Pending += 1
case jw.StatusRunning:
stats.Jobs.Running += 1
case jw.StatusSucceeded:
stats.Jobs.Succeeded += 1
case jw.StatusFailed:
stats.Jobs.Failed += 1
}
stats.Jobs.Total += 1
}
for _, wf := range m.workflows {
switch wf.Status {
case jw.StatusRunning:
stats.Workflows.Running += 1
case jw.StatusSucceeded:
stats.Workflows.Succeeded += 1
case jw.StatusFailed:
stats.Workflows.Failed += 1
}
stats.Workflows.Total += 1
}
return stats, nil
}
/*
######################################################################################################## @(°_°)@ #######
*/

107
internal/storage/memory/workflows.go

@ -0,0 +1,107 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### dune ####### Copyright (c) 2021 losyme ##################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package memory
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
)
func (m *memory) findWorkflow(id string) (int, *jw.Workflow) {
for i, wf := range m.workflows {
if wf.ID == id {
return i, wf
}
}
return 0, nil
}
func (m *memory) ValidateWorkflow(_ *jw.Workflow) error {
return nil
}
func (m *memory) InsertWorkflow(wf *jw.Workflow, job *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
if _, w := m.findWorkflow(wf.ID); w != nil {
return errors.New( /////////////////////////////////////////////////////////////////////////////////////////////
"this workflow identifier already exists",
"id", wf.ID,
)
}
if _, err := m.insertJob(job); err != nil {
return err
}
m.workflows = append(m.workflows, wf)
return nil
}
func (m *memory) Workflow(id string, mustExist bool) (*jw.Workflow, error) {
m.sm.Lock()
defer m.sm.Unlock()
if _, wf := m.findWorkflow(id); wf != nil {
return wf, nil
}
if mustExist {
return nil, errors.New("this workflow does not exist") /////////////////////////////////////////////////////////
}
return nil, nil
}
func (m *memory) UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
i, w := m.findWorkflow(wf.ID)
if w == nil {
return errors.New("this workflow does not exist") //////////////////////////////////////////////////////////////
}
m.workflows[i] = wf
if err := m.updateJob(job); err != nil {
return err
}
if nextJob == nil {
return nil
}
_, err := m.insertJob(nextJob)
return err
}
func (m *memory) SetWorkflowPriority(id string, priority jw.Priority) error {
m.sm.Lock()
defer m.sm.Unlock()
_, wf := m.findWorkflow(id)
if wf == nil {
return errors.New("this workflow does not exist", "workflow", id) //////////////////////////////////////////////
}
if wf.Status != jw.StatusRunning {
return errors.New("this workflow is no longer running", "workflow", id) ////////////////////////////////////////
}
wf.Priority = priority
return nil
}
/*
######################################################################################################## @(°_°)@ #######
*/
Loading…
Cancel
Save