Browse Source

En cours de développement

master
losyme 7 months ago
parent
commit
d73185ea45
  1. 4
      client/client.go
  2. 11
      client/config.go
  3. 4
      client/endpoint.go
  4. 10
      factory/factory.go
  5. 4
      factory/job.go
  6. 18
      factory/model.go
  7. 8
      factory/workflow.go
  8. 2
      go.mod
  9. 4
      go.sum
  10. 5
      runner/job.go
  11. 23
      runner/model.go
  12. 22
      runner/runner.go
  13. 87
      sdk.go
  14. 13
      workers/config.go
  15. 10
      workers/pool.go
  16. 6
      workers/worker.go

4
client/client.go

@ -14,11 +14,13 @@ import (
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/roundrobin"
"forge.chapril.org/losyme/uuid"
"forge.chapril.org/dune/sdk"
)
type Client struct {
endpoints *roundrobin.RoundRobin
logger Logger
logger sdk.Logger
}
func buildTLSCertificate(cfg *Config) (*tls.Certificate, error) {

11
client/config.go

@ -11,14 +11,9 @@ import (
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/util"
)
type Logger interface {
Debug(msg string, kv ...interface{})
Notice(msg string, kv ...interface{})
Warning(msg string, kv ...interface{})
Error(msg string, kv ...interface{})
}
"forge.chapril.org/dune/sdk"
)
type Endpoint struct {
URL string
@ -31,7 +26,7 @@ type Config struct {
Endpoints []*Endpoint
Cert string
Key string
Logger Logger
Logger sdk.Logger
}
func (cfg *Config) validate() error {

4
client/endpoint.go

@ -14,6 +14,8 @@ import (
"time"
"forge.chapril.org/losyme/breaker"
"forge.chapril.org/dune/sdk"
)
const (
@ -55,7 +57,7 @@ func buildTransport(ce *Endpoint, tlsCert *tls.Certificate) (http.RoundTripper,
return t, nil
}
func buildBreaker(logger Logger, url string) *breaker.Breaker {
func buildBreaker(logger sdk.Logger, url string) *breaker.Breaker {
b := breaker.New(_defaultCtoO, _defaultHOtoC, _defaultOtoHO)
b.OnStateChange(
func(from, to breaker.State) {

10
factory/factory.go

@ -6,13 +6,17 @@
package factory
import "forge.chapril.org/dune/jw"
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
)
type Factory struct {
model Model
model sdk.Model
}
func New(model Model) *Factory {
func New(model sdk.Model) *Factory {
return &Factory{
model: model,
}

4
factory/job.go

@ -10,11 +10,13 @@ import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
)
type Job struct {
job *jw.Job
model Model
model sdk.Model
}
func (job *Job) SetID(value string) *Job {

18
factory/model.go

@ -1,18 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package factory
import "forge.chapril.org/dune/jw"
type Model interface {
CreateJob(job *jw.Job) (*jw.Job, error)
CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error)
}
/*
######################################################################################################## @(°_°)@ #######
*/

8
factory/workflow.go

@ -6,11 +6,15 @@
package factory
import "forge.chapril.org/dune/jw"
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
)
type Workflow struct {
wf *jw.Workflow
model Model
model sdk.Model
}
func (wf *Workflow) SetID(value string) *Workflow {

2
go.mod

@ -3,7 +3,7 @@ module forge.chapril.org/dune/sdk
go 1.17
require (
forge.chapril.org/dune/jw v0.0.0-20211205143034-b316149dd167
forge.chapril.org/dune/jw v0.0.0-20211212110653-0fbfac5be8fc
forge.chapril.org/losyme/breaker v0.0.0-20211211181953-36cc377d4b8d
forge.chapril.org/losyme/errors v0.0.0-20211003204336-ad5510c24b40
forge.chapril.org/losyme/roundrobin v0.0.0-20211211174613-d99b65a22ebf

4
go.sum

@ -1,5 +1,5 @@
forge.chapril.org/dune/jw v0.0.0-20211205143034-b316149dd167 h1:IslU++qhiIM+BxkfeduMfjEu6TgwaRrqSYlNcucLzKE=
forge.chapril.org/dune/jw v0.0.0-20211205143034-b316149dd167/go.mod h1:LEDlIDkGkdvYAzSdZeucZDkbhkf0K5WUV7rTK8ILsc4=
forge.chapril.org/dune/jw v0.0.0-20211212110653-0fbfac5be8fc h1:GkiPEqkns9+lGzpC5Li3IY9YifnvpqCSozOfW/nx4E4=
forge.chapril.org/dune/jw v0.0.0-20211212110653-0fbfac5be8fc/go.mod h1:7CyK2ZvzHntSlOZ7kWcqVhkmUzlE0QI3uKW9kPbNbOY=
forge.chapril.org/losyme/breaker v0.0.0-20211211181953-36cc377d4b8d h1:97DWanb3cSDZOGUN2Qa3EijpN8Sjoq9aJvXSqIIIOAA=
forge.chapril.org/losyme/breaker v0.0.0-20211211181953-36cc377d4b8d/go.mod h1:1vBH8Q+vML8EBZj1Pvcpzxx+eEBK+f24QzcXT5Bo+MY=
forge.chapril.org/losyme/buffer v0.0.0-20211003203540-771701f5a518 h1:B7wu0DWUwpt0Mw/VTuWQZikhV/VD6kc2iJWbIY+jgEM=

5
runner/job.go

@ -9,16 +9,17 @@ package runner
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/dune/sdk/factory"
)
type Job struct {
*jw.Job
Logger Logger
Logger sdk.Logger
Factory *factory.Factory
}
func newJob(job *jw.Job, logger Logger, factory *factory.Factory) *Job {
func newJob(job *jw.Job, logger sdk.Logger, factory *factory.Factory) *Job {
return &Job{
Job: job,
Logger: logger,

23
runner/model.go

@ -1,23 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package runner
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk/factory"
)
type Model interface {
factory.Model
NextJob(namespace string) (*jw.Job, error)
UpdateJob(job *jw.Job) (*jw.Job, error)
}
/*
######################################################################################################## @(°_°)@ #######
*/

22
runner/runner.go

@ -13,30 +13,22 @@ import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/dune/sdk/factory"
)
type Logger interface {
Trace(msg string, kv ...interface{})
Debug(msg string, kv ...interface{})
Info(msg string, kv ...interface{})
Notice(msg string, kv ...interface{})
Warning(msg string, kv ...interface{})
Error(msg string, kv ...interface{})
}
type PanicHandler func(job *jw.Job, logger Logger, data interface{}, stack []byte)
type PanicHandler func(job *jw.Job, logger sdk.Logger, data interface{}, stack []byte)
type Runner struct {
namespace string
model Model
model sdk.Model
factory *factory.Factory
jobHandlers map[string]JobHandler
mutex sync.Mutex
panicHandler PanicHandler
}
func New(namespace string, model Model) *Runner {
func New(namespace string, model sdk.Model) *Runner {
return &Runner{
namespace: namespace,
model: model,
@ -49,7 +41,7 @@ func (r *Runner) Namespace() string {
return r.namespace
}
func (r *Runner) Model() Model {
func (r *Runner) Model() sdk.Model {
return r.model
}
@ -68,7 +60,7 @@ func (r *Runner) jobHandler(job *jw.Job) JobHandler {
return nil
}
func (r *Runner) runJob(job *jw.Job, logger Logger) {
func (r *Runner) runJob(job *jw.Job, logger sdk.Logger) {
defer func() {
if data := recover(); data != nil {
if r.panicHandler != nil {
@ -93,7 +85,7 @@ func (r *Runner) runJob(job *jw.Job, logger Logger) {
}
}
func (r *Runner) RunJob(job *jw.Job, logger Logger) {
func (r *Runner) RunJob(job *jw.Job, logger sdk.Logger) {
r.runJob(job, logger)
jr := job.Result

87
sdk.go

@ -0,0 +1,87 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package sdk
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
)
const ErrStorage = errors.Sentinel("storage error")
type JobsRunning map[string]int
type JobsToRun interface {
Next() (*jw.Job, error)
}
type SelectNextJob func(jr JobsRunning, jtr JobsToRun) (*jw.Job, error)
type Dashboard struct {
Jobs struct {
Todo int
Pending int
Running int
Succeeded int
Failed int
Total int
}
Workflows struct {
Running int
Succeeded int
Failed int
Total int
}
}
type Storage interface {
ValidateJob(job *jw.Job) error
InsertJob(job *jw.Job) (bool, error)
UpdateJob(job *jw.Job) error
NotifyJob(id string, data interface{}) error
SetJobPriority(id string, priority jw.Priority) error
SetJobRunAfter(id string, duration time.Duration) error
NextJob(namespace string, fn SelectNextJob) (*jw.Job, error)
ValidateWorkflow(wf *jw.Workflow) error
InsertWorkflow(wf *jw.Workflow, job *jw.Job) error
Workflow(id string, mustExist bool) (*jw.Workflow, error)
UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error
SetWorkflowPriority(id string, priority jw.Priority) error
Dashboard() (*Dashboard, error)
Close() error
}
type Logger interface {
Trace(msg string, kv ...interface{})
Debug(msg string, kv ...interface{})
Info(msg string, kv ...interface{})
Notice(msg string, kv ...interface{})
Warning(msg string, kv ...interface{})
Error(msg string, kv ...interface{})
}
type Model interface {
CreateJob(job *jw.Job) (*jw.Job, error)
NextJob(namespace string) (*jw.Job, error)
UpdateJob(job *jw.Job) (*jw.Job, error)
CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error)
}
type Runner interface {
Namespace() string
Model() Model
RunJob(job *jw.Job, logger Logger)
}
/*
######################################################################################################## @(°_°)@ #######
*/

13
workers/config.go

@ -7,31 +7,24 @@
package workers
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/dune/sdk/runner"
"forge.chapril.org/dune/sdk"
)
const _maxPoolSize = 20
type Logger interface {
runner.Logger
sdk.Logger
Clone(id, name string) (Logger, error)
Remove()
}
type Runner interface {
Namespace() string
Model() runner.Model
RunJob(job *jw.Job, logger runner.Logger)
}
type Config struct {
PoolSize int
Host string
Logger Logger
Runner Runner
Runner sdk.Runner
}
func (cfg *Config) validate() error {

10
workers/pool.go

@ -6,18 +6,22 @@
package workers
import "sync"
import (
"sync"
"forge.chapril.org/dune/sdk"
)
type pool struct {
host string
runner Runner
runner sdk.Runner
dashboard *dashboard
logger Logger
workers map[string]*worker
mutex sync.Mutex
}
func newPool(host string, runner Runner, dashboard *dashboard, logger Logger) *pool {
func newPool(host string, runner sdk.Runner, dashboard *dashboard, logger Logger) *pool {
return &pool{
host: host,
runner: runner,

6
workers/worker.go

@ -14,6 +14,8 @@ import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/uuid"
"forge.chapril.org/dune/sdk"
)
const (
@ -24,14 +26,14 @@ const (
type worker struct {
id string
host string
runner Runner
runner sdk.Runner
dashboard *dashboard
logger Logger
stopCh chan struct{}
closeOnce sync.Once
}
func newWorker(host string, runner Runner, dashboard *dashboard, logger Logger) (*worker, error) {
func newWorker(host string, runner sdk.Runner, dashboard *dashboard, logger Logger) (*worker, error) {
id := uuid.New()
logger, err := logger.Clone(id, "worker")

Loading…
Cancel
Save