Browse Source

En cours de développement

master
losyme 6 months ago
parent
commit
0350ad878a
  1. 5
      README.md
  2. 2
      Taskfile.yml
  3. 136
      client/client.go
  4. 90
      client/config.go
  5. 192
      client/endpoint.go
  6. 41
      factory/factory.go
  7. 123
      factory/job.go
  8. 86
      factory/workflow.go
  9. 16
      go.mod
  10. 22
      go.sum
  11. 21
      runner/handler.go
  12. 32
      runner/job.go
  13. 119
      runner/runner.go
  14. 69
      sdk.go
  15. 16
      todo.md
  16. 54
      workers/config.go
  17. 90
      workers/dashboard.go
  18. 93
      workers/pool.go
  19. 95
      workers/supervisor.go
  20. 184
      workers/worker.go
  21. 61
      workers/workers.go

5
README.md

@ -2,10 +2,13 @@
A FAIRE.
[![Go Report Card](https://goreportcard.com/badge/forge.chapril.org/dune/sdk)](https://goreportcard.com/report/forge.chapril.org/dune/sdk)
[![Go Reference](https://pkg.go.dev/badge/forge.chapril.org/dune/sdk.svg)](https://pkg.go.dev/forge.chapril.org/dune/sdk)
## Licence
MIT.
---
Copyright (c) 2021 `losyme`.
Copyright (c) 2021 `losyme`

2
Taskfile.yml

@ -6,7 +6,7 @@ version: '3'
tasks:
upgrade:
cmds:
- go get -u ./...
- go get -d -u ./...
- go mod tidy
lint:
cmds:

136
client/client.go

@ -1,136 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package client
import (
"crypto/tls"
"encoding/json"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/roundrobin"
"forge.chapril.org/losyme/uuid"
"forge.chapril.org/dune/sdk"
)
type Client struct {
endpoints *roundrobin.AllList
logger sdk.Logger
}
func buildTLSCertificate(cfg *Config) (*tls.Certificate, error) {
if cfg.Cert == "" || cfg.Key == "" {
return nil, nil
}
cert, err := tls.LoadX509KeyPair(cfg.Cert, cfg.Key)
if err != nil {
return nil, err
}
return &cert, nil
}
func New(cfg *Config) (*Client, error) {
if err := cfg.validate(); err != nil {
return nil, err
}
endpoints := make([]interface{}, 0, len(cfg.Endpoints))
tlsCert, err := buildTLSCertificate(cfg)
if err != nil {
return nil, err
}
for _, ce := range cfg.Endpoints {
ep, err := newEndpoint(cfg, ce, tlsCert)
if err != nil {
return nil, err
}
endpoints = append(endpoints, ep)
}
rr, _ := roundrobin.NewAllList(endpoints)
c := &Client{
endpoints: rr,
logger: cfg.Logger,
}
return c, nil
}
func noEndpoint(reqID string) error {
return errors.New("unable to execute this request", "request", reqID) //////////////////////////////////////////////
}
func (c *Client) CreateJob(job *jw.Job) (*jw.Job, error) {
if job.ID == "" {
job.ID = uuid.New()
}
content, err := json.Marshal(job)
if err != nil {
return nil, err
}
reqID := job.ID
for _, item := range c.endpoints.Slice() {
ep := item.(*endpoint)
if !ep.isReady() {
continue
}
c.logger.Debug( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Create job",
"request", reqID,
"endpoint", ep.URL,
)
job, err := ep.createJob(reqID, content)
if err == nil {
return job, nil
}
c.logger.Warning( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Create job",
"request", reqID,
"endpoint", ep.URL,
"error", err,
)
if !errors.Is(err, _errRetry) {
return nil, err
}
}
return nil, noEndpoint(reqID)
}
func (c *Client) NextJob(namespace string) (*jw.Job, error) {
return nil, nil
}
func (c *Client) UpdateJob(job *jw.Job) (*jw.Job, error) {
return nil, errors.NotImplemented()
}
func (c *Client) CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error) {
if wf.ID == "" {
wf.ID = uuid.New()
}
return nil, errors.NotImplemented()
}
/*
######################################################################################################## @(°_°)@ #######
*/

90
client/config.go

@ -1,90 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package client
import (
"net/url"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/util"
"forge.chapril.org/dune/sdk"
)
type Endpoint struct {
URL string
Username string
Password string
CA string
}
type Config struct {
Endpoints []*Endpoint
Cert string
Key string
Logger sdk.Logger
}
func (cfg *Config) validate() error {
if cfg.Logger == nil {
return errors.New("the logger cannot be nil") //////////////////////////////////////////////////////////////////
}
if len(cfg.Endpoints) == 0 {
return errors.New("the list of endpoints cannot be empty") /////////////////////////////////////////////////////
}
for _, ep := range cfg.Endpoints {
url, err := url.Parse(ep.URL)
if err != nil || (url.Scheme != "http" && url.Scheme != "https") {
return errors.WithMessage( /////////////////////////////////////////////////////////////////////////////////
err,
"this url is not valid",
"url", ep.URL,
)
}
if ep.Username == "" || ep.Password == "" {
return errors.New( /////////////////////////////////////////////////////////////////////////////////////////
"the username and/or password are not specified for this url",
"url", ep.URL,
)
}
if ep.CA == "" {
continue
}
if ca, err := util.FileExists(ep.CA); err != nil {
return err
} else if !ca {
return errors.New("this file doesn't exist", "file", ep.CA) ////////////////////////////////////////////////
}
}
if cfg.Cert == "" || cfg.Key == "" {
return nil
}
if cert, err := util.FileExists(cfg.Cert); err != nil {
return err
} else if !cert {
return errors.New("this file doesn't exist", "file", cfg.Cert) /////////////////////////////////////////////////
}
if key, err := util.FileExists(cfg.Key); err != nil {
return err
} else if !key {
return errors.New("this file doesn't exist", "file", cfg.Key) //////////////////////////////////////////////////
}
return nil
}
/*
######################################################################################################## @(°_°)@ #######
*/

192
client/endpoint.go

@ -1,192 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package client
import (
"context"
"crypto/tls"
"crypto/x509"
"net/http"
"os"
"time"
"forge.chapril.org/losyme/breaker"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/king"
"forge.chapril.org/losyme/king/request"
_kong "forge.chapril.org/losyme/kong/context"
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
)
const (
_defaultCtoO = 3
_defaultHOtoC = 1
_defaultOtoHO = 60 * time.Second
_clientTimeout = 10 * time.Second
_errRetry = errors.Sentinel("retry")
ErrJobCreated = errors.Sentinel("job created")
)
type endpoint struct {
*Endpoint
breaker *breaker.Breaker
client *http.Client
}
func buildTransport(ce *Endpoint, tlsCert *tls.Certificate) (http.RoundTripper, error) {
if ce.CA == "" {
return nil, nil
}
buf, err := os.ReadFile(ce.CA)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(buf)
tlsConfig := &tls.Config{
RootCAs: certPool,
}
if tlsCert != nil {
tlsConfig.Certificates = []tls.Certificate{*tlsCert}
}
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsConfig,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ForceAttemptHTTP2: true,
}
return t, nil
}
func buildBreaker(logger sdk.Logger, url string) *breaker.Breaker {
b := breaker.New(_defaultCtoO, _defaultHOtoC, _defaultOtoHO)
b.OnStateChange(
func(from, to breaker.State) {
msg := "Endpoint breaker state"
kv := []interface{}{
"url", url,
"from", from,
"to", to,
}
switch to {
case breaker.StateClosed:
logger.Notice(msg, kv...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
case breaker.StateHalfOpen:
logger.Warning(msg, kv...) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
case breaker.StateOpen:
logger.Error(msg, kv...) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
},
)
return b
}
func newEndpoint(cfg *Config, ce *Endpoint, tlsCert *tls.Certificate) (*endpoint, error) {
t, err := buildTransport(ce, tlsCert)
if err != nil {
return nil, err
}
c := &http.Client{
Transport: t,
Timeout: _clientTimeout, // AFINIR
}
ep := &endpoint{
Endpoint: ce,
breaker: buildBreaker(cfg.Logger, ce.URL),
client: c,
}
return ep, nil
}
func (ep *endpoint) isReady() bool {
return ep.breaker.IsReady()
}
func (ep *endpoint) update(success bool) {
ep.breaker.Update(success)
}
func (ep *endpoint) afterDoRequest(err error) error {
if err == nil {
ep.update(true)
return nil
}
ep.update(false)
return errors.Wrap(_errRetry, err)
}
func onError(resp *http.Response) error {
to := new(_kong.Error)
if err := king.DecodeJSON(resp, to); err != nil {
return errors.WithMessage(err, "status", resp.StatusCode) //////////////////////////////////////////////////////
}
return errors.New(to.Message, "status", to.Status) /////////////////////////////////////////////////////////////////
}
func (ep *endpoint) createJob(reqID string, content []byte) (*jw.Job, error) {
var job *jw.Job
err := request.
New(ep.URL).
Path("api/jobs").
Post().
BasicAuth(ep.Username, ep.Password).
ContentTypeJSON().
ID(reqID).
BodyData(content).
Client(ep.client).
Execute(
context.TODO(), // AFINIR
ep.afterDoRequest,
func(resp *http.Response) error {
switch resp.StatusCode {
case http.StatusNoContent:
return nil
case http.StatusCreated:
to := new(jw.Job)
if err := king.DecodeJSON(resp, to); err != nil {
return errors.Wrap(ErrJobCreated, err) /////////////////////////////////////////////////////////
}
job = to
return nil
}
return onError(resp)
},
)
return job, err
}
/*
######################################################################################################## @(°_°)@ #######
*/

41
factory/factory.go

@ -1,41 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package factory
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
)
type Factory struct {
model sdk.Model
}
func New(model sdk.Model) *Factory {
return &Factory{
model: model,
}
}
func (f *Factory) NewJob(namespace, _type string) *Job {
return &Job{
job: jw.NewJob(namespace, _type),
model: f.model,
}
}
func (f *Factory) NewWorkflow(firstStep string, allSteps map[string]*jw.Step) *Workflow {
return &Workflow{
wf: jw.NewWorkflow(firstStep, allSteps),
model: f.model,
}
}
/*
######################################################################################################## @(°_°)@ #######
*/

123
factory/job.go

@ -1,123 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package factory
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
)
type Job struct {
job *jw.Job
model sdk.Model
}
func (job *Job) SetID(value string) *Job {
job.job.SetID(value)
return job
}
func (job *Job) SetName(value string) *Job {
job.job.SetName(value)
return job
}
func (job *Job) SetOrigin(value string) *Job {
job.job.SetOrigin(value)
return job
}
func (job *Job) SetPriority(value jw.Priority) *Job {
job.job.SetPriority(value)
return job
}
func (job *Job) SetPriorityNone() *Job {
job.job.SetPriorityNone()
return job
}
func (job *Job) SetPriorityLow() *Job {
job.job.SetPriorityLow()
return job
}
func (job *Job) SetPriorityMedium() *Job {
job.job.SetPriorityMedium()
return job
}
func (job *Job) SetPriorityHigh() *Job {
job.job.SetPriorityHigh()
return job
}
func (job *Job) SetPriorityCritical() *Job {
job.job.SetPriorityCritical()
return job
}
func (job *Job) SetPublic(key string, value interface{}) *Job {
job.job.SetPublic(key, value)
return job
}
func (job *Job) SetPrivate(key string, value interface{}) *Job {
job.job.SetPrivate(key, value)
return job
}
func (job *Job) SetRunAfter(value time.Time) *Job {
job.job.SetRunAfter(value)
return job
}
func (job *Job) SetExclusivity(value jw.Exclusivity) *Job {
job.job.SetExclusivity(value)
return job
}
func (job *Job) SetExclusivityNo() *Job {
job.job.SetExclusivityNo()
return job
}
func (job *Job) SetExclusivityItself() *Job {
job.job.SetExclusivityItself()
return job
}
func (job *Job) SetExclusivityNamespace() *Job {
job.job.SetExclusivityNamespace()
return job
}
func (job *Job) SetMaxOccurrences(value int) *Job {
job.job.SetMaxOccurrences(value)
return job
}
func (job *Job) SetMaxAttempts(value int) *Job {
job.job.SetMaxAttempts(value)
return job
}
func (job *Job) SetCategory(value string) *Job {
job.job.SetCategory(value)
return job
}
func (job *Job) CreateJob() (*jw.Job, error) {
return job.model.CreateJob(job.job)
}
/*
######################################################################################################## @(°_°)@ #######
*/

86
factory/workflow.go

@ -1,86 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package factory
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
)
type Workflow struct {
wf *jw.Workflow
model sdk.Model
}
func (wf *Workflow) SetID(value string) *Workflow {
wf.wf.SetID(value)
return wf
}
func (wf *Workflow) SetType(value string) *Workflow {
wf.wf.SetType(value)
return wf
}
func (wf *Workflow) SetDescription(value string) *Workflow {
wf.wf.SetDescription(value)
return wf
}
func (wf *Workflow) SetOrigin(value string) *Workflow {
wf.wf.SetOrigin(value)
return wf
}
func (wf *Workflow) SetPriority(value jw.Priority) *Workflow {
wf.wf.SetPriority(value)
return wf
}
func (wf *Workflow) SetPriorityNone() *Workflow {
wf.wf.SetPriorityNone()
return wf
}
func (wf *Workflow) SetPriorityLow() *Workflow {
wf.wf.SetPriorityLow()
return wf
}
func (wf *Workflow) SetPriorityMedium() *Workflow {
wf.wf.SetPriorityMedium()
return wf
}
func (wf *Workflow) SetPriorityHigh() *Workflow {
wf.wf.SetPriorityHigh()
return wf
}
func (wf *Workflow) SetPriorityCritical() *Workflow {
wf.wf.SetPriorityCritical()
return wf
}
func (wf *Workflow) SetData(key string, value interface{}) *Workflow {
wf.wf.SetData(key, value)
return wf
}
func (wf *Workflow) SetExternalID(value string) *Workflow {
wf.wf.SetExternalID(value)
return wf
}
func (wf *Workflow) CreateWorkflow() (*jw.Workflow, error) {
return wf.model.CreateWorkflow(wf.wf)
}
/*
######################################################################################################## @(°_°)@ #######
*/

16
go.mod

@ -2,18 +2,4 @@ module forge.chapril.org/dune/sdk
go 1.17
require (
forge.chapril.org/dune/jw v0.0.0-20211218193326-289352360d86
forge.chapril.org/losyme/breaker v0.0.0-20211222133549-f8c241e20992
forge.chapril.org/losyme/errors v0.0.0-20211223160917-556015e45544
forge.chapril.org/losyme/king v0.0.0-20211223161002-58b73b8c00a0
forge.chapril.org/losyme/kong v0.0.0-20211223161008-2b3447846cd1
forge.chapril.org/losyme/roundrobin v0.0.0-20211222132434-ed489e1aa541
forge.chapril.org/losyme/util v0.0.0-20211222131335-836bfcf9c937
forge.chapril.org/losyme/uuid v0.0.0-20211222221547-0394055b579c
)
require (
forge.chapril.org/losyme/buffer v0.0.0-20211222133100-13a4d4e4c828 // indirect
forge.chapril.org/losyme/kvfmt v0.0.0-20211223160822-602d61ebbf46 // indirect
)
require forge.chapril.org/dune/jw v0.0.0-20211231224838-48284564ddcb

22
go.sum

@ -1,20 +1,2 @@
forge.chapril.org/dune/jw v0.0.0-20211218193326-289352360d86 h1:lg+YXf4pN8ERwMdcJtum6/NjJVpQirXgsN7/ksgnIHk=
forge.chapril.org/dune/jw v0.0.0-20211218193326-289352360d86/go.mod h1:7CyK2ZvzHntSlOZ7kWcqVhkmUzlE0QI3uKW9kPbNbOY=
forge.chapril.org/losyme/breaker v0.0.0-20211222133549-f8c241e20992 h1:Dv+8GY/2olryyAWwYjFrEAD2zOyQ9iVx094QqeobAv0=
forge.chapril.org/losyme/breaker v0.0.0-20211222133549-f8c241e20992/go.mod h1:1vBH8Q+vML8EBZj1Pvcpzxx+eEBK+f24QzcXT5Bo+MY=
forge.chapril.org/losyme/buffer v0.0.0-20211222133100-13a4d4e4c828 h1:jc/9nhx80Y0bnt6GI4qgKXizGzbRdMTZSVOEvRh+A3A=
forge.chapril.org/losyme/buffer v0.0.0-20211222133100-13a4d4e4c828/go.mod h1:4Yl585hmF3bWZfuwGF2Ym8ErykJZiF9yawlmK7szIkc=
forge.chapril.org/losyme/errors v0.0.0-20211223160917-556015e45544 h1:XVurl1Uw4vvqvoDN3gXcpCBxJHyFUjk42sVrMdI7HLM=
forge.chapril.org/losyme/errors v0.0.0-20211223160917-556015e45544/go.mod h1:8jyT32YCDD8K+6IOCLnzn+iec+t49x9S5ldD+gXVwvU=
forge.chapril.org/losyme/king v0.0.0-20211223161002-58b73b8c00a0 h1:4iTSOpiClOy1Vum+5USkmJwWrrWIyFeSVyjLF2mbkr0=
forge.chapril.org/losyme/king v0.0.0-20211223161002-58b73b8c00a0/go.mod h1:RODeIdzyUqEMN4ec5hDg9v8Mk2jQyEsp3J3bmCupLl0=
forge.chapril.org/losyme/kong v0.0.0-20211223161008-2b3447846cd1 h1:lnScyX2gWOVwQQiW8GZtgHwpRBOyHCX31JeVzPahGZw=
forge.chapril.org/losyme/kong v0.0.0-20211223161008-2b3447846cd1/go.mod h1:qZbrxG1QwroIAojug7U3i56jla9PfjQXJ9cITdSbiLQ=
forge.chapril.org/losyme/kvfmt v0.0.0-20211223160822-602d61ebbf46 h1:iOjbgnSE4Xu85xyI7sb0oN8k+rtLwPLlKpwEgKXIxrs=
forge.chapril.org/losyme/kvfmt v0.0.0-20211223160822-602d61ebbf46/go.mod h1:2pBxv3Jq8+jNpksBOhXGazCJSDMTtAlSitBN5ZZgQDA=
forge.chapril.org/losyme/roundrobin v0.0.0-20211222132434-ed489e1aa541 h1:O8c27NGSIm1vSTP8CBxPX2fhq99188ULA/cbS8nxIyA=
forge.chapril.org/losyme/roundrobin v0.0.0-20211222132434-ed489e1aa541/go.mod h1:0YIujnqdL+SQPIRlSzGMBeWpWc4Y44K8aI0x+N+Wydg=
forge.chapril.org/losyme/util v0.0.0-20211222131335-836bfcf9c937 h1:3p4DCKrNo+GtUJsI84xjH+aQoC1ZSNlMX4UDWWXVSRw=
forge.chapril.org/losyme/util v0.0.0-20211222131335-836bfcf9c937/go.mod h1:lKYvkuBOTU3E2HuQZDh4ZSA4u3VcFopQbdPUKaHGHEk=
forge.chapril.org/losyme/uuid v0.0.0-20211222221547-0394055b579c h1:S+BO+MNk4PSVvhEX87PlFsE7m3cLPJosdHQ9i+iog9c=
forge.chapril.org/losyme/uuid v0.0.0-20211222221547-0394055b579c/go.mod h1:AgvbvGzTkQkbiM+s9eVhvLUSHbA0V6N+q5oBFI2v0LA=
forge.chapril.org/dune/jw v0.0.0-20211231224838-48284564ddcb h1:r9QKe90LlRysLhBhCB0+AxVLKKoHSUjdvWJ+RRwNkOw=
forge.chapril.org/dune/jw v0.0.0-20211231224838-48284564ddcb/go.mod h1:7CyK2ZvzHntSlOZ7kWcqVhkmUzlE0QI3uKW9kPbNbOY=

21
runner/handler.go

@ -1,21 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package runner
type JobHandler interface {
Run(job *Job)
}
type JHF func(job *Job)
func (jhf JHF) Run(job *Job) {
jhf(job)
}
/*
######################################################################################################## @(°_°)@ #######
*/

32
runner/job.go

@ -1,32 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package runner
import (
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/dune/sdk/factory"
)
type Job struct {
*jw.Job
Logger sdk.Logger
Factory *factory.Factory
}
func newJob(job *jw.Job, logger sdk.Logger, factory *factory.Factory) *Job {
return &Job{
Job: job,
Logger: logger,
Factory: factory,
}
}
/*
######################################################################################################## @(°_°)@ #######
*/

119
runner/runner.go

@ -1,119 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package runner
import (
"runtime/debug"
"sync"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/dune/sdk/factory"
)
type PanicHandler func(job *jw.Job, logger sdk.Logger, data interface{}, stack []byte)
type Runner struct {
namespace string
model sdk.Model
factory *factory.Factory
jobHandlers map[string]JobHandler
mutex sync.Mutex
panicHandler PanicHandler
}
func New(namespace string, model sdk.Model) *Runner {
return &Runner{
namespace: namespace,
model: model,
factory: factory.New(model),
jobHandlers: make(map[string]JobHandler),
}
}
func (r *Runner) Namespace() string {
return r.namespace
}
func (r *Runner) Model() sdk.Model {
return r.model
}
func (r *Runner) SetPanicHandler(panicHandler PanicHandler) {
r.panicHandler = panicHandler
}
func (r *Runner) jobHandler(job *jw.Job) JobHandler {
r.mutex.Lock()
defer r.mutex.Unlock()
if handler, ok := r.jobHandlers[job.Type]; ok {
return handler
}
return nil
}
func (r *Runner) runJob(job *jw.Job, logger sdk.Logger) {
defer func() {
if data := recover(); data != nil {
if r.panicHandler != nil {
r.panicHandler(job, logger, data, debug.Stack())
} else {
err := errors.New("PANIC ERROR RECOVERED", "data", data) ///////////////////////////////////////////////
job.Failed().SetError(err.Error())
logger.Error(err.Error()) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
}
}()
if handler := r.jobHandler(job); handler != nil {
handler.Run(newJob(job, logger, r.factory))
if job.Result == nil {
job.Succeeded()
}
} else {
job.Failed().SetError("the type of this job is unknown") ///////////////////////////////////////////////////////
}
}
func (r *Runner) RunJob(job *jw.Job, logger sdk.Logger) {
r.runJob(job, logger)
jr := job.Result
if jr.Status == jw.StatusFailed {
logger.Error("Result", jr.Fields()...) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
} else {
logger.Info("Result", jr.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
}
func (r *Runner) AddJobHandler(jobType string, handler JobHandler) error {
r.mutex.Lock()
defer r.mutex.Unlock()
_, ok := r.jobHandlers[jobType]
if ok {
return errors.New( /////////////////////////////////////////////////////////////////////////////////////////////
"a handler already exists for this job type",
"type", jobType,
)
}
r.jobHandlers[jobType] = handler
return nil
}
/*
######################################################################################################## @(°_°)@ #######
*/

69
sdk.go

@ -6,68 +6,7 @@
package sdk
import (
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/errors"
)
const ErrStorage = errors.Sentinel("storage error")
type JobsRunning map[string]int
type JobsToRun interface {
Next() (*jw.Job, error)
}
type SelectNextJob func(jr JobsRunning, jtr JobsToRun) (*jw.Job, error)
type Dashboard struct {
Jobs struct {
Todo int
Pending int
Running int
Succeeded int
Failed int
Total int
}
Workflows struct {
Running int
Succeeded int
Failed int
Total int
}
}
type Storage interface {
ValidateJob(job *jw.Job) error
InsertJob(job *jw.Job) (bool, error)
UpdateJob(job *jw.Job) error
NotifyJob(id string, data interface{}) error
SetJobPriority(id string, priority jw.Priority) error
SetJobRunAfter(id string, duration time.Duration) error
NextJob(namespace string, fn SelectNextJob) (*jw.Job, error)
ValidateWorkflow(wf *jw.Workflow) error
InsertWorkflow(wf *jw.Workflow, job *jw.Job) error
Workflow(id string, mustExist bool) (*jw.Workflow, error)
UpdateWorkflow(wf *jw.Workflow, job, nextJob *jw.Job) error
SetWorkflowPriority(id string, priority jw.Priority) error
Dashboard() (*Dashboard, error)
Close() error
}
type Logger interface {
Trace(msg string, kv ...interface{})
Debug(msg string, kv ...interface{})
Info(msg string, kv ...interface{})
Notice(msg string, kv ...interface{})
Warning(msg string, kv ...interface{})
Error(msg string, kv ...interface{})
}
import "forge.chapril.org/dune/jw"
type Model interface {
CreateJob(job *jw.Job) (*jw.Job, error)
@ -76,12 +15,6 @@ type Model interface {
CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error)
}
type Runner interface {
Namespace() string
Model() Model
RunJob(job *jw.Job, logger Logger)
}
/*
######################################################################################################## @(°_°)@ #######
*/

16
todo.md

@ -0,0 +1,16 @@
# sdk
## finaliser les tests
## finaliser la documentation
- utilisation d'un fichier doc.go ?
- ajouter des exemples au README.md
## créer la première version
v0.1.0
---
Copyright (c) 2021 `losyme`

54
workers/config.go

@ -1,54 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"forge.chapril.org/losyme/errors"
"forge.chapril.org/dune/sdk"
)
const _maxPoolSize = 20
type Logger interface {
sdk.Logger
Clone(id, name string) (Logger, error)
Remove()
}
type Config struct {
PoolSize int
Host string
Logger Logger
Runner sdk.Runner
}
func (cfg *Config) validate() error {
if cfg.PoolSize < 0 {
cfg.PoolSize = 0
} else if cfg.PoolSize > _maxPoolSize {
cfg.PoolSize = _maxPoolSize
}
if cfg.Host == "" {
cfg.Host = "?"
}
if cfg.Logger == nil {
return errors.New("the logger cannot be nil") //////////////////////////////////////////////////////////////////
}
if cfg.Runner == nil {
return errors.New("the runner cannot be nil") //////////////////////////////////////////////////////////////////
}
return nil
}
/*
######################################################################################################## @(°_°)@ #######
*/

90
workers/dashboard.go

@ -1,90 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"sync"
"time"
"forge.chapril.org/dune/jw"
)
type Worker struct {
Jobs int
Busy bool
JobID string
JobNamespace string
JobType string
StartedAt time.Time
}
type dashboard struct {
workers map[string]*Worker
mutex sync.Mutex
}
func newDashboard() *dashboard {
return &dashboard{
workers: make(map[string]*Worker),
}
}
func (d *dashboard) addJob(wID string, job *jw.Job) int {
d.mutex.Lock()
defer d.mutex.Unlock()
w := d.workers[wID]
w.Jobs++
w.Busy = true
w.JobID = job.ID
w.JobNamespace = job.Namespace
w.JobType = job.Type
w.StartedAt = time.Now()
return w.Jobs
}
func (d *dashboard) removeJob(wID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.workers[wID].Busy = false
}
func (d *dashboard) addWorker(wID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.workers[wID] = new(Worker)
}
func (d *dashboard) removeWorker(wID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
delete(d.workers, wID)
}
func (d *dashboard) data() map[string]*Worker {
d.mutex.Lock()
defer d.mutex.Unlock()
data := make(map[string]*Worker)
for id, w := range d.workers {
clone := new(Worker)
*clone = *w
data[id] = clone
}
return data
}
/*
######################################################################################################## @(°_°)@ #######
*/

93
workers/pool.go

@ -1,93 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"sync"
"forge.chapril.org/dune/sdk"
)
type pool struct {
host string
runner sdk.Runner
dashboard *dashboard
logger Logger
workers map[string]*worker
mutex sync.Mutex
}
func newPool(host string, runner sdk.Runner, dashboard *dashboard, logger Logger) *pool {
return &pool{
host: host,
runner: runner,
dashboard: dashboard,
logger: logger,
workers: make(map[string]*worker),
}
}
func (p *pool) size() int {
p.mutex.Lock()
defer p.mutex.Unlock()
return len(p.workers)
}
func (p *pool) startOneWorker(stoppedCh chan<- string) {
worker, err := newWorker(p.host, p.runner, p.dashboard, p.logger)
if err != nil {
p.logger.Error("Unable to create a worker", "reason", err) //:::::::::::::::::::::::::::::::::::::::::::::::::::
return
}
p.mutex.Lock()
p.workers[worker.id] = worker
p.mutex.Unlock()
worker.start(stoppedCh)
}
func (p *pool) stopOneWorker() { // AFINIR: choix du worker ?
p.mutex.Lock()
defer p.mutex.Unlock()
for _, worker := range p.workers {
worker.stop()
delete(p.workers, worker.id)
return
}
}
func (p *pool) workerStopped(wID string, end bool) int {
p.mutex.Lock()
defer p.mutex.Unlock()
worker, ok := p.workers[wID]
if ok {
delete(p.workers, worker.id)
if !end {
p.logger.Warning("A worker has stopped", "worker", wID) //::::::::::::::::::::::::::::::::::::::::::::::::::
}
}
return len(p.workers)
}
func (p *pool) stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
for _, worker := range p.workers {
worker.stop()
}
}
/*
######################################################################################################## @(°_°)@ #######
*/

95
workers/supervisor.go

@ -1,95 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import "sync"
type supervisor struct {
poolSize int
pool *pool
eventCh chan string
}
func newSupervisor(poolSize int, pool *pool) *supervisor {
return &supervisor{
poolSize: poolSize,
pool: pool,
eventCh: make(chan string),
}
}
func (s *supervisor) addOneWorker() {
s.eventCh <- "+1"
}
func (s *supervisor) stopOneWorker() {
s.eventCh <- "-1"
}
func (s *supervisor) stop() {
s.eventCh <- "stop"