Browse Source

En cours de développement

master
losyme 4 months ago
parent
commit
7c631194ce
  1. 1
      go.mod
  2. 2
      go.sum
  3. 4
      sdk.go
  4. 84
      workers/config.go
  5. 90
      workers/pool.go
  6. 90
      workers/state.go
  7. 100
      workers/supervisor.go
  8. 78
      workers/worker.go
  9. 48
      workers/workers.go

1
go.mod

@ -11,6 +11,7 @@ require (
forge.chapril.org/losyme/roundrobin v0.0.0-20220101202922-42e4b1e1d136
forge.chapril.org/losyme/util v0.0.0-20220101202923-23640bbd8739
forge.chapril.org/losyme/uuid v0.0.0-20220101202925-b6817a6cc0bb
forge.chapril.org/losyme/zombie v0.0.0-20220102163906-aee9cf804b35
)
require (

2
go.sum

@ -18,3 +18,5 @@ forge.chapril.org/losyme/util v0.0.0-20220101202923-23640bbd8739 h1:02ofB/g/A1UT
forge.chapril.org/losyme/util v0.0.0-20220101202923-23640bbd8739/go.mod h1:lKYvkuBOTU3E2HuQZDh4ZSA4u3VcFopQbdPUKaHGHEk=
forge.chapril.org/losyme/uuid v0.0.0-20220101202925-b6817a6cc0bb h1:Zn6GNf8b2uJ/w8ubBNCCfysKO5lgKGQCPMr56C9KqHk=
forge.chapril.org/losyme/uuid v0.0.0-20220101202925-b6817a6cc0bb/go.mod h1:AgvbvGzTkQkbiM+s9eVhvLUSHbA0V6N+q5oBFI2v0LA=
forge.chapril.org/losyme/zombie v0.0.0-20220102163906-aee9cf804b35 h1:g/TW4CZgASNxGw7CjfzuUq3cbzTI3IZnRjhfZ/+Z6d0=
forge.chapril.org/losyme/zombie v0.0.0-20220102163906-aee9cf804b35/go.mod h1:f2dCQZ+o+tk8hzWDapLPvqjiEF1QShkS19jOptvoYaM=

4
sdk.go

@ -15,6 +15,10 @@ type Model interface {
CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error)
}
type Runner interface {
Run(job *jw.Job)
}
/*
######################################################################################################## @(°_°)@ #######
*/

84
workers/config.go

@ -0,0 +1,84 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"forge.chapril.org/dune/sdk"
"forge.chapril.org/losyme/errors"
)
const (
_defaultPoolSize = 10
_maxPoolSize = 20
)
type Logger interface {
Trace(msg string, kv ...interface{})
Info(msg string, kv ...interface{})
Warning(msg string, kv ...interface{})
}
type Config struct {
PoolSize int
Runner sdk.Runner
Logger Logger
}
type ConfigOption func(*Config)
func WithPoolSize(poolSize int) ConfigOption {
return func(c *Config) {
c.PoolSize = poolSize
}
}
func WithRunner(runner sdk.Runner) ConfigOption {
return func(c *Config) {
c.Runner = runner
}
}
func WithLogger(logger Logger) ConfigOption {
return func(c *Config) {
c.Logger = logger
}
}
func (c *Config) validate() error {
if c.PoolSize < 0 {
c.PoolSize = 0
} else if c.PoolSize > _maxPoolSize {
c.PoolSize = _maxPoolSize
}
if c.Runner == nil {
return errors.New("field 'Runner' cannot be nil") //////////////////////////////////////////////////////////////
}
return nil
}
func (c *Config) BuildWorkers() (*Workers, error) {
if err := c.validate(); err != nil {
return nil, err
}
state := newState()
pool := newPool(c, "?", state)
supervisor := newSupervisor(c.PoolSize, c.Logger, pool)
ws := &Workers{
state: state,
supervisor: supervisor,
}
return ws, nil
}
/*
######################################################################################################## @(°_°)@ #######
*/

90
workers/pool.go

@ -0,0 +1,90 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"context"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/losyme/zombie"
)
type pool struct {
host string
runner sdk.Runner
logger Logger
state *state
workers map[string]*worker
stopCh chan bool
zombie *zombie.GroupWithContext
}
func newPool(cfg *Config, host string, state *state) *pool {
return &pool{
host: host,
runner: cfg.Runner,
logger: cfg.Logger,
state: state,
workers: make(map[string]*worker),
stopCh: make(chan bool, _maxPoolSize),
}
}
func (p *pool) size() int {
return len(p.workers)
}
func (p *pool) startOneWorker(stoppedCh chan<- string) {
worker := newWorker(p.host, p.runner, p.logger, p.state)
p.workers[worker.id] = worker
if p.zombie == nil {
p.zombie = zombie.GoWithContext(
context.Background(),
1,
func(ctx context.Context) { worker.run(ctx, p.stopCh, stoppedCh) },
zombie.WithName("worker"),
zombie.WithLogger(p.logger),
)
} else {
p.zombie.Go(1)
}
}
func (p *pool) stopOneWorker() {
p.stopCh <- true
}
func (p *pool) workerStopped(wID string, end bool) int {
worker, ok := p.workers[wID]
if ok {
delete(p.workers, worker.id)
if !end && p.logger != nil {
p.logger.Warning("A worker has stopped", "worker", wID) //::::::::::::::::::::::::::::::::::::::::::::::::::
}
}
return len(p.workers)
}
func (p *pool) stop() {
if z := p.zombie; z != nil {
z.Stop()
}
}
func (p *pool) wait() {
if z := p.zombie; z != nil {
z.Wait()
}
}
/*
######################################################################################################## @(°_°)@ #######
*/

90
workers/state.go

@ -0,0 +1,90 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"sync"
"time"
)
type Worker struct {
Jobs int
Busy bool
JobID string
JobNamespace string
JobType string
StartedAt time.Time
}
type state struct {
workers map[string]*Worker
mutex sync.Mutex
}
func newState() *state {
return &state{
workers: make(map[string]*Worker),
}
}
/*
func (s *state) addJob(wID string, job *jw.Job) int {
s.mutex.Lock()
defer s.mutex.Unlock()
w := s.workers[wID]
w.Jobs++
w.Busy = true
w.JobID = job.ID
w.JobNamespace = job.Namespace
w.JobType = job.Type
w.StartedAt = time.Now()
return w.Jobs
}
func (s *state) removeJob(wID string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.workers[wID].Busy = false
}
*/
func (s *state) addWorker(wID string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.workers[wID] = new(Worker)
}
func (s *state) removeWorker(wID string) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.workers, wID)
}
func (s *state) data() map[string]*Worker {
s.mutex.Lock()
defer s.mutex.Unlock()
data := make(map[string]*Worker)
for id, w := range s.workers {
clone := new(Worker)
*clone = *w
data[id] = clone
}
return data
}
/*
######################################################################################################## @(°_°)@ #######
*/

100
workers/supervisor.go

@ -0,0 +1,100 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"context"
"forge.chapril.org/losyme/zombie"
)
type supervisor struct {
poolSize int
logger Logger
pool *pool
eventCh chan string
zombie *zombie.GroupWithContext
}
func newSupervisor(poolSize int, logger Logger, pool *pool) *supervisor {
return &supervisor{
poolSize: poolSize,
logger: logger,
pool: pool,
eventCh: make(chan string),
}
}
func (s *supervisor) loop(ctx context.Context) {
end := false
for {
if !end {
if s.pool.size() < s.poolSize {
s.pool.startOneWorker(s.eventCh)
continue
}
if s.pool.size() > s.poolSize {
s.pool.stopOneWorker()
continue
}
}
select {
case <-ctx.Done():
if !end {
end = true
s.pool.stop()
}
case e := <-s.eventCh:
switch e {
case "-1":
if !end && s.poolSize > 0 {
s.poolSize--
}
case "+1":
if !end && s.poolSize < _maxPoolSize {
s.poolSize++
}
default: // wID
size := s.pool.workerStopped(e, end)
if end && size == 0 {
return
}
}
}
}
}
func (s *supervisor) start() {
s.zombie = zombie.GoWithContext(
context.Background(),
1,
s.loop,
zombie.WithName("supervisor"),
zombie.WithLogger(s.logger),
)
}
func (s *supervisor) addOneWorker() {
s.eventCh <- "+1"
}
func (s *supervisor) stopOneWorker() {
s.eventCh <- "-1"
}
func (s *supervisor) stop() {
s.zombie.Stop()
s.pool.wait()
s.zombie.Wait()
}
/*
######################################################################################################## @(°_°)@ #######
*/

78
workers/worker.go

@ -0,0 +1,78 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"context"
"time"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/losyme/uuid"
)
type worker struct {
id string
host string
runner sdk.Runner
logger Logger
state *state
}
func newWorker(host string, runner sdk.Runner, logger Logger, state *state) *worker {
return &worker{
id: uuid.New(),
host: host,
runner: runner,
logger: logger,
state: state,
}
}
func (w *worker) maybeRunJob() time.Duration {
return 2 * time.Second
}
func (w *worker) loop(ctx context.Context, stopCh <-chan bool) {
timer := time.NewTimer(0)
defer func() {
if !timer.Stop() {
<-timer.C
}
}()
for {
select {
case <-ctx.Done():
return
case <-stopCh:
return
case <-timer.C:
timer.Reset(w.maybeRunJob())
}
}
}
func (w *worker) run(ctx context.Context, stopCh <-chan bool, stoppedCh chan<- string) {
if w.logger != nil {
w.logger.Info("Started", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
w.state.addWorker(w.id)
w.loop(ctx, stopCh)
w.state.removeWorker(w.id)
if w.logger != nil {
w.logger.Info("Stopped", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
stoppedCh <- w.id
}
/*
######################################################################################################## @(°_°)@ #######
*/

48
workers/workers.go

@ -0,0 +1,48 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
type Workers struct {
state *state
supervisor *supervisor
}
func New(options ...ConfigOption) (*Workers, error) {
cfg := &Config{
PoolSize: _defaultPoolSize,
}
for _, option := range options {
option(cfg)
}
return cfg.BuildWorkers()
}
func (ws *Workers) Start() {
ws.supervisor.start()
}
func (ws *Workers) AddOneWorker() {
ws.supervisor.addOneWorker()
}
func (ws *Workers) StopOneWorker() {
ws.supervisor.stopOneWorker()
}
func (ws *Workers) State() map[string]*Worker {
return ws.state.data()
}
func (ws *Workers) Stop() {
ws.supervisor.stop()
}
/*
######################################################################################################## @(°_°)@ #######
*/
Loading…
Cancel
Save