AFAIRE
sdk
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
3.9 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021 losyme ###################################################### MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"fmt"
"sync"
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/uuid"
)
type worker struct {
id string
hostname string
runner Runner
dashboard *dashboard
logger Logger
stopCh chan struct{}
closeOnce sync.Once
}
func newWorker(hostname string, runner Runner, dashboard *dashboard, logger Logger) (*worker, error) {
id := uuid.New()
logger, err := logger.Clone(id, "worker")
if err != nil {
return nil, err
}
w := &worker{
id: id,
hostname: hostname,
runner: runner,
dashboard: dashboard,
logger: logger,
stopCh: make(chan struct{}),
}
return w, nil
}
func (w *worker) cloneLogger(job *jw.Job) (Logger, error) {
id := job.ID
name := "job"
if job.Workflow != nil {
id = *job.Workflow
name = "workflow"
}
return w.logger.Clone(id, name)
}
func (w *worker) maybeRunJob() (duration time.Duration) {
defer func() {
if data := recover(); data != nil {
w.logger.Fatal("PANIC ERROR RECOVERED", "data", data) //::::::::::::::::::::::::::::::::::::::::::::::::::::
}
duration = 5 * time.Second
w.stop()
}()
job, err := w.runner.Model().NextJob(w.runner.Namespace())
if err != nil {
return 5 * time.Second
}
if job == nil {
return 500 * time.Millisecond
}
w.dashboard.addJob(w.id, job)
defer w.dashboard.removeJob(w.id)
logger, err := w.cloneLogger(job)
if err == nil {
defer logger.Remove()
} else {
w.logger.Warning("Unable to clone the logger", "reason", err) //::::::::::::::::::::::::::::::::::::::::::::::::
logger = w.logger
}
logger.Info("JOB", job.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Worker",
"id", w.id,
//"jobs", w.jobs, AFINIR
)
w.runner.RunJob(job, logger)
if job.Result == nil {
job.Succeeded()
}
job.Result.Host = w.hostname // AFINIR
job.Result.Worker = w.id
if job, err := w.runner.Model().UpdateJob(job); err == nil {
if job.Status == jw.StatusPending {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"status", job.Status,
"run_after", job.RunAfter.Round(time.Second).String(),
"retries", fmt.Sprintf("%d/%d", job.Attempt, job.MaxRetries),
)
} else {
logger.Info("END", "status", job.Status) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
} else {
logger.Info("END", "model", err) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
return 0
}
func (w *worker) loop() {
timer := time.NewTimer(0)
defer func() {
if !timer.Stop() {
<-timer.C
}
w.stop() // close
}()
for {
select {
case <-timer.C:
timer.Reset(w.maybeRunJob())
case <-w.stopCh:
return
}
}
}
func (w *worker) start(stoppedCh chan<- string) {
go func() { //@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
defer func() {
stoppedCh <- w.id
}()
w.logger.Info("Started", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
w.dashboard.addWorker(w.id)
w.loop()
w.dashboard.removeWorker(w.id)
w.logger.Info("Stopped", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
w.logger.Remove()
}()
}
func (w *worker) stop() {
w.closeOnce.Do(
func() {
close(w.stopCh)
},
)
}
/*
######################################################################################################## @(°_°)@ #######
*/