Browse Source

En cours de développement

master
losyme 4 months ago
parent
commit
013a74d79f
  1. 60
      internal/storage/memory/jobs.go
  2. 20
      internal/storage/memory/memory.go
  3. 16
      internal/storage/memory/next.go
  4. 48
      internal/storage/memory/workflows.go

60
internal/storage/memory/jobs.go

@ -15,8 +15,8 @@ import (
"forge.chapril.org/dune/dune/internal/storage"
)
func (m *memory) findJob(id string) (int, *jw.Job) {
for i, job := range m.jobs {
func (ms *memoryStorage) findJob(id string) (int, *jw.Job) {
for i, job := range ms.jobs {
if job.ID == id {
return i, job
}
@ -25,12 +25,12 @@ func (m *memory) findJob(id string) (int, *jw.Job) {
return 0, nil
}
func (m *memory) ValidateJob(_ *jw.Job) error {
func (ms *memoryStorage) ValidateJob(_ *jw.Job) error {
return nil
}
func (m *memory) insertJob(job *jw.Job) (bool, error) {
if _, j := m.findJob(job.ID); j != nil {
func (ms *memoryStorage) insertJob(job *jw.Job) (bool, error) {
if _, j := ms.findJob(job.ID); j != nil {
return false, errors.Wrap( /////////////////////////////////////////////////////////////////////////////////////
storage.ErrStorage,
errors.New("this job identifier already exists", "id", job.ID),
@ -38,7 +38,7 @@ func (m *memory) insertJob(job *jw.Job) (bool, error) {
}
if job.Category != nil && job.Workflow == nil {
for _, j := range m.jobs {
for _, j := range ms.jobs {
if j.Category != nil && j.Workflow == nil && *j.Category == *job.Category &&
j.Namespace == job.Namespace &&
j.Type == job.Type &&
@ -48,21 +48,21 @@ func (m *memory) insertJob(job *jw.Job) (bool, error) {
}
}
m.jobs = append(m.jobs, job)
ms.jobs = append(ms.jobs, job)
return true, nil
}
func (m *memory) InsertJob(job *jw.Job) (bool, error) {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) InsertJob(job *jw.Job) (bool, error) {
ms.sm.Lock()
defer ms.sm.Unlock()
return m.insertJob(job)
return ms.insertJob(job)
}
func (m *memory) updateJob(job *jw.Job) error {
if i, j := m.findJob(job.ID); j != nil {
m.jobs[i] = job
func (ms *memoryStorage) updateJob(job *jw.Job) error {
if i, j := ms.findJob(job.ID); j != nil {
ms.jobs[i] = job
return nil
}
@ -72,18 +72,18 @@ func (m *memory) updateJob(job *jw.Job) error {
)
}
func (m *memory) UpdateJob(job *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) UpdateJob(job *jw.Job) error {
ms.sm.Lock()
defer ms.sm.Unlock()
return m.updateJob(job)
return ms.updateJob(job)
}
func (m *memory) NotifyJob(id string, data interface{}) error {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) NotifyJob(id string, data interface{}) error {
ms.sm.Lock()
defer ms.sm.Unlock()
_, job := m.findJob(id)
_, job := ms.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
@ -93,11 +93,11 @@ func (m *memory) NotifyJob(id string, data interface{}) error {
return errors.NotImplemented()
}
func (m *memory) SetJobPriority(id string, priority jw.Priority) error {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) SetJobPriority(id string, priority jw.Priority) error {
ms.sm.Lock()
defer ms.sm.Unlock()
_, job := m.findJob(id)
_, job := ms.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}
@ -111,11 +111,11 @@ func (m *memory) SetJobPriority(id string, priority jw.Priority) error {
return nil
}
func (m *memory) SetJobRunAfter(id string, duration time.Duration) error {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) SetJobRunAfter(id string, duration time.Duration) error {
ms.sm.Lock()
defer ms.sm.Unlock()
_, job := m.findJob(id)
_, job := ms.findJob(id)
if job == nil {
return errors.New("this job does not exist", "job", id) ////////////////////////////////////////////////////////
}

20
internal/storage/memory/memory.go

@ -14,26 +14,26 @@ import (
"forge.chapril.org/dune/dune/internal/storage"
)
type memory struct {
type memoryStorage struct {
sm sync.Mutex
jobs []*jw.Job
workflows []*jw.Workflow
}
func New() storage.Storage {
return &memory{
return &memoryStorage{
jobs: make([]*jw.Job, 0),
workflows: make([]*jw.Workflow, 0),
}
}
func (m *memory) State() (*storage.State, error) {
func (ms *memoryStorage) State() (*storage.State, error) {
state := new(storage.State)
m.sm.Lock()
defer m.sm.Unlock()
ms.sm.Lock()
defer ms.sm.Unlock()
for _, job := range m.jobs {
for _, job := range ms.jobs {
switch job.Status {
case jw.StatusTodo:
state.Jobs.Todo += 1
@ -48,9 +48,9 @@ func (m *memory) State() (*storage.State, error) {
}
}
state.Jobs.Total = len(m.jobs)
state.Jobs.Total = len(ms.jobs)
for _, wf := range m.workflows {
for _, wf := range ms.workflows {
switch wf.Status {
case jw.StatusRunning:
state.Workflows.Running += 1
@ -61,12 +61,12 @@ func (m *memory) State() (*storage.State, error) {
}
}
state.Workflows.Total = len(m.workflows)
state.Workflows.Total = len(ms.workflows)
return state, nil
}
func (m *memory) Close() error {
func (ms *memoryStorage) Close() error {
return nil
}

16
internal/storage/memory/next.go

@ -33,10 +33,10 @@ func (jtr *jobsToRun) Next() (*jw.Job, error) {
return nil, nil
}
func (m *memory) jobsRunning(namespace string) storage.JobsRunning {
func (ms *memoryStorage) jobsRunning(namespace string) storage.JobsRunning {
jr := make(storage.JobsRunning)
for _, job := range m.jobs {
for _, job := range ms.jobs {
if job.Namespace == namespace && job.Status == jw.StatusRunning {
jr[job.Type] += 1
}
@ -45,12 +45,12 @@ func (m *memory) jobsRunning(namespace string) storage.JobsRunning {
return jr
}
func (m *memory) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, error) {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, error) {
ms.sm.Lock()
defer ms.sm.Unlock()
jobs := make([]*jw.Job, len(m.jobs))
copy(jobs, m.jobs)
jobs := make([]*jw.Job, len(ms.jobs))
copy(jobs, ms.jobs)
sort.Slice(
jobs,
@ -68,7 +68,7 @@ func (m *memory) NextJob(namespace string, fn storage.SelectNextJob) (*jw.Job, e
jobs: jobs,
}
job, err := fn(m.jobsRunning(namespace), jtr)
job, err := fn(ms.jobsRunning(namespace), jtr)
if job == nil || err != nil {
return nil, err
}

48
internal/storage/memory/workflows.go

@ -13,8 +13,8 @@ import (
"forge.chapril.org/dune/dune/internal/storage"
)
func (m *memory) findWorkflow(id string) (int, *jw.Workflow) {
for i, wf := range m.workflows {
func (ms *memoryStorage) findWorkflow(id string) (int, *jw.Workflow) {
for i, wf := range ms.workflows {
if wf.ID == id {
return i, wf
}
@ -23,35 +23,35 @@ func (m *memory) findWorkflow(id string) (int, *jw.Workflow) {
return 0, nil
}
func (m *memory) ValidateWorkflow(_ *jw.Workflow) error {
func (ms *memoryStorage) ValidateWorkflow(_ *jw.Workflow) error {
return nil
}
func (m *memory) InsertWorkflow(wf *jw.Workflow, job *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) InsertWorkflow(wf *jw.Workflow, job *jw.Job) error {
ms.sm.Lock()
defer ms.sm.Unlock()
if _, w := m.findWorkflow(wf.ID); w != nil {
if _, w := ms.findWorkflow(wf.ID); w != nil {
return errors.Wrap( ////////////////////////////////////////////////////////////////////////////////////////////
storage.ErrStorage,
errors.New("this workflow identifier already exists", "id", wf.ID),
)
}
if _, err := m.insertJob(job); err != nil {
if _, err := ms.insertJob(job); err != nil {
return err
}
m.workflows = append(m.workflows, wf)
ms.workflows = append(ms.workflows, wf)
return nil
}
func (m *memory) Workflow(id string, mustExist bool) (*jw.Workflow, error) {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) Workflow(id string, mustExist bool) (*jw.Workflow, error) {
ms.sm.Lock()
defer ms.sm.Unlock()
if _, wf := m.findWorkflow(id); wf != nil {
if _, wf := ms.findWorkflow(id); wf != nil {
return wf, nil
}
@ -62,16 +62,16 @@ func (m *memory) Workflow(id string, mustExist bool) (*jw.Workflow, error) {
return nil, nil
}
func (m *memory) UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
ms.sm.Lock()
defer ms.sm.Unlock()
if err := m.updateJob(job); err != nil {
if err := ms.updateJob(job); err != nil {
return err
}
if wf != nil {
i, w := m.findWorkflow(wf.ID)
i, w := ms.findWorkflow(wf.ID)
if w == nil {
return errors.Wrap( ////////////////////////////////////////////////////////////////////////////////////////
storage.ErrStorage,
@ -79,23 +79,23 @@ func (m *memory) UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error {
)
}
m.workflows[i] = wf
ms.workflows[i] = wf
}
if nextJob == nil {
return nil
}
_, err := m.insertJob(nextJob)
_, err := ms.insertJob(nextJob)
return err
}
func (m *memory) SetWorkflowPriority(id string, priority jw.Priority) error {
m.sm.Lock()
defer m.sm.Unlock()
func (ms *memoryStorage) SetWorkflowPriority(id string, priority jw.Priority) error {
ms.sm.Lock()
defer ms.sm.Unlock()
_, wf := m.findWorkflow(id)
_, wf := ms.findWorkflow(id)
if wf == nil {
return errors.New("this workflow does not exist", "workflow", id) //////////////////////////////////////////////
}

Loading…
Cancel
Save