AFAIRE
sdk
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

196 lines
4.4 KiB

/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package workers
import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"
"forge.chapril.org/dune/jw"
"forge.chapril.org/dune/sdk"
"forge.chapril.org/losyme/uuid"
)
const (
_timeoutOnError = 9 * time.Second
_timeoutNoJob = 2 * time.Second
)
type worker struct {
id string
host string
model sdk.Model
runner sdk.Runner
logger Logger
state *state
}
func newWorker(host string, model sdk.Model, runner sdk.Runner, logger Logger, state *state) *worker {
return &worker{
id: uuid.New(),
host: host,
model: model,
runner: runner,
logger: logger,
state: state,
}
}
func (w *worker) updateJob(job *jw.Job) {
if job.Result == nil {
_ = job.Succeeded()
}
job.Result.Host = w.host
job.Result.Worker = w.id
jobID := job.ID
job, err := w.model.UpdateJob(job)
if logger := w.logger; logger != nil {
if err != nil {
logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"job", jobID,
"workflow", job.WorkflowValue(),
nil, err,
)
return
}
if job.Status == jw.StatusPending {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"job", job.ID,
"workflow", job.WorkflowValue(),
"status", job.Status,
"run_after", job.RunAfter.String(),
"attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxAttempts),
)
return
}
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"job", job.ID,
"workflow", job.WorkflowValue(),
"status", job.Status,
)
}
}
func (w *worker) runJob(job *jw.Job) {
defer func() {
if data := recover(); data != nil {
if w.logger != nil {
w.logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"PANIC ERROR RECOVERED",
"job", job.ID,
"workflow", job.WorkflowValue(),
"data", data,
"stack", string(debug.Stack()),
)
}
_ = job.Failed().SetError(fmt.Sprintf("PANIC ERROR RECOVERED: %v", data))
}
}()
if err := w.runner.Run(job); err != nil {
result := new(jw.Result)
if errors.As(err, &result) {
job.Result = result
} else {
_ = job.Failed().SetError(err.Error())
}
if w.logger != nil {
w.logger.Error( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"RUN",
"job", job.ID,
"workflow", job.WorkflowValue(),
nil, err,
)
}
}
}
func (w *worker) maybeRunJob() time.Duration {
job, err := w.model.NextJob(w.runner.Namespace())
if err != nil {
return _timeoutOnError
}
if job == nil {
return _timeoutNoJob
}
count := w.state.addJob(w.id, job)
defer w.state.removeJob(w.id)
if logger := w.logger; logger != nil {
logger.Info("JOB", job.Fields()...) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
logger.Info( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Worker",
"id", w.id,
"job", job.ID,
"workflow", job.WorkflowValue(),
"count", count,
)
}
w.runJob(job)
w.updateJob(job)
return 0
}
func (w *worker) loop(ctx context.Context, stopCh <-chan bool) {
timer := time.NewTimer(0)
defer func() {
if !timer.Stop() {
<-timer.C
}
}()
for {
select {
case <-ctx.Done():
return
case <-stopCh:
return
case <-timer.C:
timer.Reset(w.maybeRunJob())
}
}
}
func (w *worker) run(ctx context.Context, stopCh <-chan bool, stoppedCh chan<- string) {
if w.logger != nil {
w.logger.Info("Worker started", "id", w.id) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
w.state.addWorker(w.id)
w.loop(ctx, stopCh)
w.state.removeWorker(w.id)
if w.logger != nil {
w.logger.Info("Worker stopped", "id", w.id) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
stoppedCh <- w.id
}
/*
######################################################################################################## @(°_°)@ #######
*/