Browse Source

En cours de développement

master
losyme 4 months ago
parent
commit
048df28c02
  1. 1
      sdk.go
  2. 1
      workers/config.go
  3. 4
      workers/pool.go
  4. 4
      workers/state.go
  5. 49
      workers/worker.go

1
sdk.go

@ -16,6 +16,7 @@ type Model interface {
}
type Runner interface {
Namespace() string
Run(job *jw.Job)
}

1
workers/config.go

@ -20,6 +20,7 @@ type Logger interface {
Trace(msg string, kv ...interface{})
Info(msg string, kv ...interface{})
Warning(msg string, kv ...interface{})
Error(msg string, kv ...interface{})
}
type Config struct {

4
workers/pool.go

@ -16,6 +16,7 @@ import (
type pool struct {
host string
model sdk.Model
runner sdk.Runner
logger Logger
state *state
@ -32,6 +33,7 @@ func newPool(cfg *Config, state *state) *pool {
return &pool{
host: fqdn,
model: cfg.Model,
runner: cfg.Runner,
logger: cfg.Logger,
state: state,
@ -45,7 +47,7 @@ func (p *pool) size() int {
}
func (p *pool) startOneWorker(stoppedCh chan<- string) {
worker := newWorker(p.host, p.runner, p.logger, p.state)
worker := newWorker(p.host, p.model, p.runner, p.logger, p.state)
fn := func(ctx context.Context) { worker.run(ctx, p.stopCh, stoppedCh) }
p.workers[worker.id] = worker

4
workers/state.go

@ -9,6 +9,8 @@ package workers
import (
"sync"
"time"
"forge.chapril.org/dune/jw"
)
type Worker struct {
@ -31,7 +33,6 @@ func newState() *state {
}
}
/*
func (s *state) addJob(wID string, job *jw.Job) int {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -53,7 +54,6 @@ func (s *state) removeJob(wID string) {
s.workers[wID].Busy = false
}
*/
func (s *state) addWorker(wID string) {
s.mutex.Lock()

49
workers/worker.go

@ -8,32 +8,77 @@ package workers
import (
"context"
"runtime/debug"
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/losyme/uuid"
)
const (
_timeoutOnError = 9 * time.Second
_timeoutNoJob = 2 * time.Second
)
type worker struct {
id string
host string
model sdk.Model
runner sdk.Runner
logger Logger
state *state
}
func newWorker(host string, runner sdk.Runner, logger Logger, state *state) *worker {
func newWorker(host string, model sdk.Model, runner sdk.Runner, logger Logger, state *state) *worker {
return &worker{
id: uuid.New(),
host: host,
model: model,
runner: runner,
logger: logger,
state: state,
}
}
func (w *worker) updateJob(job *jw.Job) {
}
func (w *worker) maybeRunJob() time.Duration {
return 2 * time.Second
defer func() {
if data := recover(); data != nil {
if w.logger != nil {
w.logger.Error("PANIC ERROR RECOVERED", "data", data, "stack", string(debug.Stack())) //::::::::::::::::
}
}
}()
job, err := w.model.NextJob(w.runner.Namespace())
if err != nil {
return _timeoutOnError
}
if job == nil {
return _timeoutNoJob
}
number := w.state.addJob(w.id, job)
defer w.state.removeJob(w.id)
if logger := w.logger; logger != nil {
logger.Info("JOB", job.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
logger.Info( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Worker",
"id", w.id,
"job", job.ID,
"number", number,
)
}
w.runner.Run(job)
w.updateJob(job)
return 0
}
func (w *worker) loop(ctx context.Context, stopCh <-chan bool) {

Loading…
Cancel
Save